2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "afs/param.h"
16 #include <afs/param.h>
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
64 #include "rx_globals.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
116 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
118 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119 afs_int32 rxi_start_in_error;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
124 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125 * currently allocated within rx. This number is used to allocate the
126 * memory required to return the statistics when queried.
129 static unsigned int rxi_rpc_peer_stat_cnt;
132 * rxi_rpc_process_stat_cnt counts the total number of local process stat
133 * structures currently allocated within rx. The number is used to allocate
134 * the memory required to return the statistics when queried.
137 static unsigned int rxi_rpc_process_stat_cnt;
139 #if !defined(offsetof)
140 #include <stddef.h> /* for definition of offsetof() */
143 #ifdef AFS_PTHREAD_ENV
147 * Use procedural initialization of mutexes/condition variables
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
172 static void rxi_InitPthread(void) {
173 assert(pthread_mutex_init(&rx_clock_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxi_connCacheMutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_init_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&epoch_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_event_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&des_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&des_random_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&osi_malloc_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&event_handler_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&listener_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_if_init_mutex,
194 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_mutex_init(&rx_if_mutex,
196 (const pthread_mutexattr_t*)0)==0);
197 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198 (const pthread_mutexattr_t*)0)==0);
199 assert(pthread_mutex_init(&rxkad_random_mutex,
200 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_mutex_init(&rxkad_stats_mutex,
202 (const pthread_mutexattr_t*)0)==0);
203 assert(pthread_mutex_init(&rx_debug_mutex,
204 (const pthread_mutexattr_t*)0)==0);
206 assert(pthread_cond_init(&rx_event_handler_cond,
207 (const pthread_condattr_t*)0)==0);
208 assert(pthread_cond_init(&rx_listener_cond,
209 (const pthread_condattr_t*)0)==0);
210 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
217 * The rx_stats_mutex mutex protects the following global variables:
222 * rxi_lowConnRefCount
223 * rxi_lowPeerRefCount
232 #define INIT_PTHREAD_LOCKS
236 /* Variables for handling the minProcs implementation. availProcs gives the
237 * number of threads available in the pool at this moment (not counting dudes
238 * executing right now). totalMin gives the total number of procs required
239 * for handling all minProcs requests. minDeficit is a dynamic variable
240 * tracking the # of procs required to satisfy all of the remaining minProcs
242 * For fine grain locking to work, the quota check and the reservation of
243 * a server thread has to come while rxi_availProcs and rxi_minDeficit
244 * are locked. To this end, the code has been modified under #ifdef
245 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246 * same time. A new function, ReturnToServerPool() returns the allocation.
248 * A call can be on several queue's (but only one at a time). When
249 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250 * that no one else is touching the queue. To this end, we store the address
251 * of the queue lock in the call structure (under the call lock) when we
252 * put the call on a queue, and we clear the call_queue_lock when the
253 * call is removed from a queue (once the call lock has been obtained).
254 * This allows rxi_ResetCall to safely synchronize with others wishing
255 * to manipulate the queue.
258 #ifdef RX_ENABLE_LOCKS
259 static afs_kmutex_t rx_rpc_stats;
260 void rxi_StartUnlocked();
263 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
264 ** pretty good that the next packet coming in is from the same connection
265 ** as the last packet, since we're send multiple packets in a transmit window.
267 struct rx_connection *rxLastConn = 0;
269 #ifdef RX_ENABLE_LOCKS
270 /* The locking hierarchy for rx fine grain locking is composed of these
273 * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
274 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
275 * call->lock - locks call data fields.
276 * These are independent of each other:
277 * rx_freeCallQueue_lock
282 * serverQueueEntry->lock
284 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
285 * peer->lock - locks peer data fields.
286 * conn_data_lock - that more than one thread is not updating a conn data
287 * field at the same time.
295 * Do we need a lock to protect the peer field in the conn structure?
296 * conn->peer was previously a constant for all intents and so has no
297 * lock protecting this field. The multihomed client delta introduced
298 * a RX code change : change the peer field in the connection structure
299 * to that remote inetrface from which the last packet for this
300 * connection was sent out. This may become an issue if further changes
303 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
304 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
306 /* rxdb_fileID is used to identify the lock location, along with line#. */
307 static int rxdb_fileID = RXDB_FILE_RX;
308 #endif /* RX_LOCKS_DB */
309 #else /* RX_ENABLE_LOCKS */
310 #define SET_CALL_QUEUE_LOCK(C, L)
311 #define CLEAR_CALL_QUEUE_LOCK(C)
312 #endif /* RX_ENABLE_LOCKS */
313 struct rx_serverQueueEntry *rx_waitForPacket = 0;
315 /* ------------Exported Interfaces------------- */
317 /* This function allows rxkad to set the epoch to a suitably random number
318 * which rx_NewConnection will use in the future. The principle purpose is to
319 * get rxnull connections to use the same epoch as the rxkad connections do, at
320 * least once the first rxkad connection is established. This is important now
321 * that the host/port addresses aren't used in FindConnection: the uniqueness
322 * of epoch/cid matters and the start time won't do. */
324 #ifdef AFS_PTHREAD_ENV
326 * This mutex protects the following global variables:
330 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
331 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
335 #endif /* AFS_PTHREAD_ENV */
337 void rx_SetEpoch (afs_uint32 epoch)
344 /* Initialize rx. A port number may be mentioned, in which case this
345 * becomes the default port number for any service installed later.
346 * If 0 is provided for the port number, a random port will be chosen
347 * by the kernel. Whether this will ever overlap anything in
348 * /etc/services is anybody's guess... Returns 0 on success, -1 on
350 static int rxinit_status = 1;
351 #ifdef AFS_PTHREAD_ENV
353 * This mutex protects the following global variables:
357 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
358 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
361 #define UNLOCK_RX_INIT
364 int rx_Init(u_int port)
371 char *htable, *ptable;
374 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
375 __djgpp_set_quiet_socket(1);
382 if (rxinit_status == 0) {
383 tmp_status = rxinit_status;
385 return tmp_status; /* Already started; return previous error code. */
389 if (afs_winsockInit()<0)
395 * Initialize anything necessary to provide a non-premptive threading
398 rxi_InitializeThreadSupport();
401 /* Allocate and initialize a socket for client and perhaps server
404 rx_socket = rxi_GetUDPSocket((u_short)port);
405 if (rx_socket == OSI_NULLSOCKET) {
411 #ifdef RX_ENABLE_LOCKS
414 #endif /* RX_LOCKS_DB */
415 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
417 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
418 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
419 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
421 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
422 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
423 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
424 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
426 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
428 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
430 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
431 #endif /* KERNEL && AFS_HPUX110_ENV */
432 #else /* RX_ENABLE_LOCKS */
433 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
434 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
435 #endif /* AFS_GLOBAL_SUNLOCK */
436 #endif /* RX_ENABLE_LOCKS */
439 rx_connDeadTime = 12;
440 rx_tranquil = 0; /* reset flag */
441 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
443 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
444 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
445 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
446 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
447 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
448 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
450 /* Malloc up a bunch of packets & buffers */
452 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
453 queue_Init(&rx_freePacketQueue);
454 rxi_NeedMorePackets = FALSE;
455 rxi_MorePackets(rx_nPackets);
463 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
464 tv.tv_sec = clock_now.sec;
465 tv.tv_usec = clock_now.usec;
466 srand((unsigned int) tv.tv_usec);
473 #if defined(KERNEL) && !defined(UKERNEL)
474 /* Really, this should never happen in a real kernel */
477 struct sockaddr_in addr;
478 int addrlen = sizeof(addr);
479 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
483 rx_port = addr.sin_port;
486 rx_stats.minRtt.sec = 9999999;
488 rx_SetEpoch (tv.tv_sec | 0x80000000);
490 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
491 * will provide a randomer value. */
493 MUTEX_ENTER(&rx_stats_mutex);
494 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
495 MUTEX_EXIT(&rx_stats_mutex);
496 /* *Slightly* random start time for the cid. This is just to help
497 * out with the hashing function at the peer */
498 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
499 rx_connHashTable = (struct rx_connection **) htable;
500 rx_peerHashTable = (struct rx_peer **) ptable;
502 rx_lastAckDelay.sec = 0;
503 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
504 rx_hardAckDelay.sec = 0;
505 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
506 rx_softAckDelay.sec = 0;
507 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
509 rxevent_Init(20, rxi_ReScheduleEvents);
511 /* Initialize various global queues */
512 queue_Init(&rx_idleServerQueue);
513 queue_Init(&rx_incomingCallQueue);
514 queue_Init(&rx_freeCallQueue);
516 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
517 /* Initialize our list of usable IP addresses. */
521 /* Start listener process (exact function is dependent on the
522 * implementation environment--kernel or user space) */
527 tmp_status = rxinit_status = 0;
532 /* called with unincremented nRequestsRunning to see if it is OK to start
533 * a new thread in this service. Could be "no" for two reasons: over the
534 * max quota, or would prevent others from reaching their min quota.
536 #ifdef RX_ENABLE_LOCKS
537 /* This verion of QuotaOK reserves quota if it's ok while the
538 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
540 static int QuotaOK(register struct rx_service *aservice)
542 /* check if over max quota */
543 if (aservice->nRequestsRunning >= aservice->maxProcs) {
547 /* under min quota, we're OK */
548 /* otherwise, can use only if there are enough to allow everyone
549 * to go to their min quota after this guy starts.
551 MUTEX_ENTER(&rx_stats_mutex);
552 if ((aservice->nRequestsRunning < aservice->minProcs) ||
553 (rxi_availProcs > rxi_minDeficit)) {
554 aservice->nRequestsRunning++;
555 /* just started call in minProcs pool, need fewer to maintain
557 if (aservice->nRequestsRunning <= aservice->minProcs)
560 MUTEX_EXIT(&rx_stats_mutex);
563 MUTEX_EXIT(&rx_stats_mutex);
568 static void ReturnToServerPool(register struct rx_service *aservice)
570 aservice->nRequestsRunning--;
571 MUTEX_ENTER(&rx_stats_mutex);
572 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
574 MUTEX_EXIT(&rx_stats_mutex);
577 #else /* RX_ENABLE_LOCKS */
578 static int QuotaOK(register struct rx_service *aservice)
581 /* under min quota, we're OK */
582 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
584 /* check if over max quota */
585 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
587 /* otherwise, can use only if there are enough to allow everyone
588 * to go to their min quota after this guy starts.
590 if (rxi_availProcs > rxi_minDeficit) rc = 1;
593 #endif /* RX_ENABLE_LOCKS */
596 /* Called by rx_StartServer to start up lwp's to service calls.
597 NExistingProcs gives the number of procs already existing, and which
598 therefore needn't be created. */
599 void rxi_StartServerProcs(int nExistingProcs)
601 register struct rx_service *service;
606 /* For each service, reserve N processes, where N is the "minimum"
607 number of processes that MUST be able to execute a request in parallel,
608 at any time, for that process. Also compute the maximum difference
609 between any service's maximum number of processes that can run
610 (i.e. the maximum number that ever will be run, and a guarantee
611 that this number will run if other services aren't running), and its
612 minimum number. The result is the extra number of processes that
613 we need in order to provide the latter guarantee */
614 for (i=0; i<RX_MAX_SERVICES; i++) {
616 service = rx_services[i];
617 if (service == (struct rx_service *) 0) break;
618 nProcs += service->minProcs;
619 diff = service->maxProcs - service->minProcs;
620 if (diff > maxdiff) maxdiff = diff;
622 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
623 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
624 for (i = 0; i<nProcs; i++) {
625 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
630 /* This routine must be called if any services are exported. If the
631 * donateMe flag is set, the calling process is donated to the server
633 void rx_StartServer(int donateMe)
635 register struct rx_service *service;
636 register int i, nProcs=0;
642 /* Start server processes, if necessary (exact function is dependent
643 * on the implementation environment--kernel or user space). DonateMe
644 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
645 * case, one less new proc will be created rx_StartServerProcs.
647 rxi_StartServerProcs(donateMe);
649 /* count up the # of threads in minProcs, and add set the min deficit to
650 * be that value, too.
652 for (i=0; i<RX_MAX_SERVICES; i++) {
653 service = rx_services[i];
654 if (service == (struct rx_service *) 0) break;
655 MUTEX_ENTER(&rx_stats_mutex);
656 rxi_totalMin += service->minProcs;
657 /* below works even if a thread is running, since minDeficit would
658 * still have been decremented and later re-incremented.
660 rxi_minDeficit += service->minProcs;
661 MUTEX_EXIT(&rx_stats_mutex);
664 /* Turn on reaping of idle server connections */
665 rxi_ReapConnections();
674 #ifdef AFS_PTHREAD_ENV
676 pid = (pid_t) pthread_self();
677 #else /* AFS_PTHREAD_ENV */
679 LWP_CurrentProcess(&pid);
680 #endif /* AFS_PTHREAD_ENV */
682 sprintf(name,"srv_%d", ++nProcs);
684 (*registerProgram)(pid, name);
686 #endif /* AFS_NT40_ENV */
687 rx_ServerProc(); /* Never returns */
692 /* Create a new client connection to the specified service, using the
693 * specified security object to implement the security model for this
695 struct rx_connection *rx_NewConnection(register afs_uint32 shost,
696 u_short sport, u_short sservice,
697 register struct rx_securityClass *securityObject, int serviceSecurityIndex)
701 register struct rx_connection *conn;
706 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
707 shost, sport, sservice, securityObject, serviceSecurityIndex));
709 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
710 * the case of kmem_alloc? */
711 conn = rxi_AllocConnection();
712 #ifdef RX_ENABLE_LOCKS
713 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
714 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
715 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
719 MUTEX_ENTER(&rx_connHashTable_lock);
720 cid = (rx_nextCid += RX_MAXCALLS);
721 conn->type = RX_CLIENT_CONNECTION;
723 conn->epoch = rx_epoch;
724 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
725 conn->serviceId = sservice;
726 conn->securityObject = securityObject;
727 /* This doesn't work in all compilers with void (they're buggy), so fake it
729 conn->securityData = (VOID *) 0;
730 conn->securityIndex = serviceSecurityIndex;
731 rx_SetConnDeadTime(conn, rx_connDeadTime);
732 conn->ackRate = RX_FAST_ACK_RATE;
734 conn->specific = NULL;
735 conn->challengeEvent = NULL;
736 conn->delayedAbortEvent = NULL;
737 conn->abortCount = 0;
740 RXS_NewConnection(securityObject, conn);
741 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
743 conn->refCount++; /* no lock required since only this thread knows... */
744 conn->next = rx_connHashTable[hashindex];
745 rx_connHashTable[hashindex] = conn;
746 MUTEX_ENTER(&rx_stats_mutex);
747 rx_stats.nClientConns++;
748 MUTEX_EXIT(&rx_stats_mutex);
750 MUTEX_EXIT(&rx_connHashTable_lock);
756 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
758 /* The idea is to set the dead time to a value that allows several
759 * keepalives to be dropped without timing out the connection. */
760 conn->secondsUntilDead = MAX(seconds, 6);
761 conn->secondsUntilPing = conn->secondsUntilDead/6;
764 int rxi_lowPeerRefCount = 0;
765 int rxi_lowConnRefCount = 0;
768 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
769 * NOTE: must not be called with rx_connHashTable_lock held.
771 void rxi_CleanupConnection(struct rx_connection *conn)
773 /* Notify the service exporter, if requested, that this connection
774 * is being destroyed */
775 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
776 (*conn->service->destroyConnProc)(conn);
778 /* Notify the security module that this connection is being destroyed */
779 RXS_DestroyConnection(conn->securityObject, conn);
781 /* If this is the last connection using the rx_peer struct, set its
782 * idle time to now. rxi_ReapConnections will reap it if it's still
783 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
785 MUTEX_ENTER(&rx_peerHashTable_lock);
786 if (--conn->peer->refCount <= 0) {
787 conn->peer->idleWhen = clock_Sec();
788 if (conn->peer->refCount < 0) {
789 conn->peer->refCount = 0;
790 MUTEX_ENTER(&rx_stats_mutex);
791 rxi_lowPeerRefCount ++;
792 MUTEX_EXIT(&rx_stats_mutex);
795 MUTEX_EXIT(&rx_peerHashTable_lock);
797 MUTEX_ENTER(&rx_stats_mutex);
798 if (conn->type == RX_SERVER_CONNECTION)
799 rx_stats.nServerConns--;
801 rx_stats.nClientConns--;
802 MUTEX_EXIT(&rx_stats_mutex);
805 if (conn->specific) {
807 for (i = 0 ; i < conn->nSpecific ; i++) {
808 if (conn->specific[i] && rxi_keyCreate_destructor[i])
809 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
810 conn->specific[i] = NULL;
812 free(conn->specific);
814 conn->specific = NULL;
818 MUTEX_DESTROY(&conn->conn_call_lock);
819 MUTEX_DESTROY(&conn->conn_data_lock);
820 CV_DESTROY(&conn->conn_call_cv);
822 rxi_FreeConnection(conn);
825 /* Destroy the specified connection */
826 void rxi_DestroyConnection(register struct rx_connection *conn)
828 MUTEX_ENTER(&rx_connHashTable_lock);
829 rxi_DestroyConnectionNoLock(conn);
830 /* conn should be at the head of the cleanup list */
831 if (conn == rx_connCleanup_list) {
832 rx_connCleanup_list = rx_connCleanup_list->next;
833 MUTEX_EXIT(&rx_connHashTable_lock);
834 rxi_CleanupConnection(conn);
836 #ifdef RX_ENABLE_LOCKS
838 MUTEX_EXIT(&rx_connHashTable_lock);
840 #endif /* RX_ENABLE_LOCKS */
843 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
845 register struct rx_connection **conn_ptr;
846 register int havecalls = 0;
847 struct rx_packet *packet;
854 MUTEX_ENTER(&conn->conn_data_lock);
855 if (conn->refCount > 0)
858 MUTEX_ENTER(&rx_stats_mutex);
859 rxi_lowConnRefCount++;
860 MUTEX_EXIT(&rx_stats_mutex);
863 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
864 /* Busy; wait till the last guy before proceeding */
865 MUTEX_EXIT(&conn->conn_data_lock);
870 /* If the client previously called rx_NewCall, but it is still
871 * waiting, treat this as a running call, and wait to destroy the
872 * connection later when the call completes. */
873 if ((conn->type == RX_CLIENT_CONNECTION) &&
874 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
875 conn->flags |= RX_CONN_DESTROY_ME;
876 MUTEX_EXIT(&conn->conn_data_lock);
880 MUTEX_EXIT(&conn->conn_data_lock);
882 /* Check for extant references to this connection */
883 for (i = 0; i<RX_MAXCALLS; i++) {
884 register struct rx_call *call = conn->call[i];
887 if (conn->type == RX_CLIENT_CONNECTION) {
888 MUTEX_ENTER(&call->lock);
889 if (call->delayedAckEvent) {
890 /* Push the final acknowledgment out now--there
891 * won't be a subsequent call to acknowledge the
892 * last reply packets */
893 rxevent_Cancel(call->delayedAckEvent, call,
894 RX_CALL_REFCOUNT_DELAY);
895 if (call->state == RX_STATE_PRECALL ||
896 call->state == RX_STATE_ACTIVE) {
897 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
899 rxi_AckAll(NULL, call, 0);
902 MUTEX_EXIT(&call->lock);
906 #ifdef RX_ENABLE_LOCKS
908 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
909 MUTEX_EXIT(&conn->conn_data_lock);
912 /* Someone is accessing a packet right now. */
916 #endif /* RX_ENABLE_LOCKS */
919 /* Don't destroy the connection if there are any call
920 * structures still in use */
921 MUTEX_ENTER(&conn->conn_data_lock);
922 conn->flags |= RX_CONN_DESTROY_ME;
923 MUTEX_EXIT(&conn->conn_data_lock);
928 if (conn->delayedAbortEvent) {
929 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
930 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
932 MUTEX_ENTER(&conn->conn_data_lock);
933 rxi_SendConnectionAbort(conn, packet, 0, 1);
934 MUTEX_EXIT(&conn->conn_data_lock);
935 rxi_FreePacket(packet);
939 /* Remove from connection hash table before proceeding */
940 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
941 conn->epoch, conn->type) ];
942 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
943 if (*conn_ptr == conn) {
944 *conn_ptr = conn->next;
948 /* if the conn that we are destroying was the last connection, then we
949 * clear rxLastConn as well */
950 if ( rxLastConn == conn )
953 /* Make sure the connection is completely reset before deleting it. */
954 /* get rid of pending events that could zap us later */
955 if (conn->challengeEvent)
956 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
957 if (conn->checkReachEvent)
958 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
960 /* Add the connection to the list of destroyed connections that
961 * need to be cleaned up. This is necessary to avoid deadlocks
962 * in the routines we call to inform others that this connection is
963 * being destroyed. */
964 conn->next = rx_connCleanup_list;
965 rx_connCleanup_list = conn;
968 /* Externally available version */
969 void rx_DestroyConnection(register struct rx_connection *conn)
975 rxi_DestroyConnection (conn);
980 /* Start a new rx remote procedure call, on the specified connection.
981 * If wait is set to 1, wait for a free call channel; otherwise return
982 * 0. Maxtime gives the maximum number of seconds this call may take,
983 * after rx_MakeCall returns. After this time interval, a call to any
984 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
985 * For fine grain locking, we hold the conn_call_lock in order to
986 * to ensure that we don't get signalle after we found a call in an active
987 * state and before we go to sleep.
989 struct rx_call *rx_NewCall(register struct rx_connection *conn)
992 register struct rx_call *call;
993 struct clock queueTime;
997 dpf (("rx_MakeCall(conn %x)\n", conn));
1000 clock_GetTime(&queueTime);
1002 MUTEX_ENTER(&conn->conn_call_lock);
1005 * Check if there are others waiting for a new call.
1006 * If so, let them go first to avoid starving them.
1007 * This is a fairly simple scheme, and might not be
1008 * a complete solution for large numbers of waiters.
1010 if (conn->makeCallWaiters) {
1011 #ifdef RX_ENABLE_LOCKS
1012 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1019 for (i=0; i<RX_MAXCALLS; i++) {
1020 call = conn->call[i];
1022 MUTEX_ENTER(&call->lock);
1023 if (call->state == RX_STATE_DALLY) {
1024 rxi_ResetCall(call, 0);
1025 (*call->callNumber)++;
1028 MUTEX_EXIT(&call->lock);
1031 call = rxi_NewCall(conn, i);
1035 if (i < RX_MAXCALLS) {
1038 MUTEX_ENTER(&conn->conn_data_lock);
1039 conn->flags |= RX_CONN_MAKECALL_WAITING;
1040 MUTEX_EXIT(&conn->conn_data_lock);
1042 conn->makeCallWaiters++;
1043 #ifdef RX_ENABLE_LOCKS
1044 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1048 conn->makeCallWaiters--;
1051 * Wake up anyone else who might be giving us a chance to
1052 * run (see code above that avoids resource starvation).
1054 #ifdef RX_ENABLE_LOCKS
1055 CV_BROADCAST(&conn->conn_call_cv);
1060 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1062 /* Client is initially in send mode */
1063 call->state = RX_STATE_ACTIVE;
1064 call->mode = RX_MODE_SENDING;
1066 /* remember start time for call in case we have hard dead time limit */
1067 call->queueTime = queueTime;
1068 clock_GetTime(&call->startTime);
1069 hzero(call->bytesSent);
1070 hzero(call->bytesRcvd);
1072 /* Turn on busy protocol. */
1073 rxi_KeepAliveOn(call);
1075 MUTEX_EXIT(&call->lock);
1076 MUTEX_EXIT(&conn->conn_call_lock);
1080 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1081 /* Now, if TQ wasn't cleared earlier, do it now. */
1083 MUTEX_ENTER(&call->lock);
1084 while (call->flags & RX_CALL_TQ_BUSY) {
1085 call->flags |= RX_CALL_TQ_WAIT;
1086 #ifdef RX_ENABLE_LOCKS
1087 CV_WAIT(&call->cv_tq, &call->lock);
1088 #else /* RX_ENABLE_LOCKS */
1089 osi_rxSleep(&call->tq);
1090 #endif /* RX_ENABLE_LOCKS */
1092 if (call->flags & RX_CALL_TQ_CLEARME) {
1093 rxi_ClearTransmitQueue(call, 0);
1094 queue_Init(&call->tq);
1096 MUTEX_EXIT(&call->lock);
1098 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1103 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1106 register struct rx_call *tcall;
1110 for(i=0; i<RX_MAXCALLS; i++) {
1111 if ((tcall = aconn->call[i])) {
1112 if ((tcall->state == RX_STATE_ACTIVE)
1113 || (tcall->state == RX_STATE_PRECALL)) {
1123 int rxi_GetCallNumberVector(register struct rx_connection *aconn,
1124 register afs_int32 *aint32s)
1127 register struct rx_call *tcall;
1131 for(i=0; i<RX_MAXCALLS; i++) {
1132 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1133 aint32s[i] = aconn->callNumber[i]+1;
1135 aint32s[i] = aconn->callNumber[i];
1141 int rxi_SetCallNumberVector(register struct rx_connection *aconn,
1142 register afs_int32 *aint32s)
1145 register struct rx_call *tcall;
1149 for(i=0; i<RX_MAXCALLS; i++) {
1150 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1151 aconn->callNumber[i] = aint32s[i] - 1;
1153 aconn->callNumber[i] = aint32s[i];
1159 /* Advertise a new service. A service is named locally by a UDP port
1160 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1163 char *serviceName; Name for identification purposes (e.g. the
1164 service name might be used for probing for
1166 struct rx_service *rx_NewService(u_short port, u_short serviceId,
1168 struct rx_securityClass **securityObjects,
1169 int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1171 osi_socket socket = OSI_NULLSOCKET;
1172 register struct rx_service *tservice;
1178 if (serviceId == 0) {
1179 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1185 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1192 tservice = rxi_AllocService();
1195 for (i = 0; i<RX_MAX_SERVICES; i++) {
1196 register struct rx_service *service = rx_services[i];
1198 if (port == service->servicePort) {
1199 if (service->serviceId == serviceId) {
1200 /* The identical service has already been
1201 * installed; if the caller was intending to
1202 * change the security classes used by this
1203 * service, he/she loses. */
1204 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1207 rxi_FreeService(tservice);
1210 /* Different service, same port: re-use the socket
1211 * which is bound to the same port */
1212 socket = service->socket;
1215 if (socket == OSI_NULLSOCKET) {
1216 /* If we don't already have a socket (from another
1217 * service on same port) get a new one */
1218 socket = rxi_GetUDPSocket(port);
1219 if (socket == OSI_NULLSOCKET) {
1222 rxi_FreeService(tservice);
1227 service->socket = socket;
1228 service->servicePort = port;
1229 service->serviceId = serviceId;
1230 service->serviceName = serviceName;
1231 service->nSecurityObjects = nSecurityObjects;
1232 service->securityObjects = securityObjects;
1233 service->minProcs = 0;
1234 service->maxProcs = 1;
1235 service->idleDeadTime = 60;
1236 service->connDeadTime = rx_connDeadTime;
1237 service->executeRequestProc = serviceProc;
1238 service->checkReach = 0;
1239 rx_services[i] = service; /* not visible until now */
1247 rxi_FreeService(tservice);
1248 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1252 /* Generic request processing loop. This routine should be called
1253 * by the implementation dependent rx_ServerProc. If socketp is
1254 * non-null, it will be set to the file descriptor that this thread
1255 * is now listening on. If socketp is null, this routine will never
1257 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1259 register struct rx_call *call;
1260 register afs_int32 code;
1261 register struct rx_service *tservice = NULL;
1268 call = rx_GetCall(threadID, tservice, socketp);
1269 if (socketp && *socketp != OSI_NULLSOCKET) {
1270 /* We are now a listener thread */
1275 /* if server is restarting( typically smooth shutdown) then do not
1276 * allow any new calls.
1279 if ( rx_tranquil && (call != NULL) ) {
1284 MUTEX_ENTER(&call->lock);
1286 rxi_CallError(call, RX_RESTARTING);
1287 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1289 MUTEX_EXIT(&call->lock);
1295 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1296 #ifdef RX_ENABLE_LOCKS
1298 #endif /* RX_ENABLE_LOCKS */
1299 afs_termState = AFSOP_STOP_AFS;
1300 afs_osi_Wakeup(&afs_termState);
1301 #ifdef RX_ENABLE_LOCKS
1303 #endif /* RX_ENABLE_LOCKS */
1308 tservice = call->conn->service;
1310 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1312 code = call->conn->service->executeRequestProc(call);
1314 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1316 rx_EndCall(call, code);
1317 MUTEX_ENTER(&rx_stats_mutex);
1319 MUTEX_EXIT(&rx_stats_mutex);
1324 void rx_WakeupServerProcs(void)
1326 struct rx_serverQueueEntry *np, *tqp;
1331 MUTEX_ENTER(&rx_serverPool_lock);
1333 #ifdef RX_ENABLE_LOCKS
1334 if (rx_waitForPacket)
1335 CV_BROADCAST(&rx_waitForPacket->cv);
1336 #else /* RX_ENABLE_LOCKS */
1337 if (rx_waitForPacket)
1338 osi_rxWakeup(rx_waitForPacket);
1339 #endif /* RX_ENABLE_LOCKS */
1340 MUTEX_ENTER(&freeSQEList_lock);
1341 for (np = rx_FreeSQEList; np; np = tqp) {
1342 tqp = *(struct rx_serverQueueEntry **)np;
1343 #ifdef RX_ENABLE_LOCKS
1344 CV_BROADCAST(&np->cv);
1345 #else /* RX_ENABLE_LOCKS */
1347 #endif /* RX_ENABLE_LOCKS */
1349 MUTEX_EXIT(&freeSQEList_lock);
1350 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1351 #ifdef RX_ENABLE_LOCKS
1352 CV_BROADCAST(&np->cv);
1353 #else /* RX_ENABLE_LOCKS */
1355 #endif /* RX_ENABLE_LOCKS */
1357 MUTEX_EXIT(&rx_serverPool_lock);
1363 * One thing that seems to happen is that all the server threads get
1364 * tied up on some empty or slow call, and then a whole bunch of calls
1365 * arrive at once, using up the packet pool, so now there are more
1366 * empty calls. The most critical resources here are server threads
1367 * and the free packet pool. The "doreclaim" code seems to help in
1368 * general. I think that eventually we arrive in this state: there
1369 * are lots of pending calls which do have all their packets present,
1370 * so they won't be reclaimed, are multi-packet calls, so they won't
1371 * be scheduled until later, and thus are tying up most of the free
1372 * packet pool for a very long time.
1374 * 1. schedule multi-packet calls if all the packets are present.
1375 * Probably CPU-bound operation, useful to return packets to pool.
1376 * Do what if there is a full window, but the last packet isn't here?
1377 * 3. preserve one thread which *only* runs "best" calls, otherwise
1378 * it sleeps and waits for that type of call.
1379 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1380 * the current dataquota business is badly broken. The quota isn't adjusted
1381 * to reflect how many packets are presently queued for a running call.
1382 * So, when we schedule a queued call with a full window of packets queued
1383 * up for it, that *should* free up a window full of packets for other 2d-class
1384 * calls to be able to use from the packet pool. But it doesn't.
1386 * NB. Most of the time, this code doesn't run -- since idle server threads
1387 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1388 * as a new call arrives.
1390 /* Sleep until a call arrives. Returns a pointer to the call, ready
1391 * for an rx_Read. */
1392 #ifdef RX_ENABLE_LOCKS
1393 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1395 struct rx_serverQueueEntry *sq;
1396 register struct rx_call *call = (struct rx_call *) 0;
1397 struct rx_service *service = NULL;
1400 MUTEX_ENTER(&freeSQEList_lock);
1402 if ((sq = rx_FreeSQEList)) {
1403 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1404 MUTEX_EXIT(&freeSQEList_lock);
1405 } else { /* otherwise allocate a new one and return that */
1406 MUTEX_EXIT(&freeSQEList_lock);
1407 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1408 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1409 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1412 MUTEX_ENTER(&rx_serverPool_lock);
1413 if (cur_service != NULL) {
1414 ReturnToServerPool(cur_service);
1417 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1418 register struct rx_call *tcall, *ncall, *choice2 = NULL;
1420 /* Scan for eligible incoming calls. A call is not eligible
1421 * if the maximum number of calls for its service type are
1422 * already executing */
1423 /* One thread will process calls FCFS (to prevent starvation),
1424 * while the other threads may run ahead looking for calls which
1425 * have all their input data available immediately. This helps
1426 * keep threads from blocking, waiting for data from the client. */
1427 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1428 service = tcall->conn->service;
1429 if (!QuotaOK(service)) {
1432 if (!tno || !tcall->queue_item_header.next) {
1433 /* If we're thread 0, then we'll just use
1434 * this call. If we haven't been able to find an optimal
1435 * choice, and we're at the end of the list, then use a
1436 * 2d choice if one has been identified. Otherwise... */
1437 call = (choice2 ? choice2 : tcall);
1438 service = call->conn->service;
1439 } else if (!queue_IsEmpty(&tcall->rq)) {
1440 struct rx_packet *rp;
1441 rp = queue_First(&tcall->rq, rx_packet);
1442 if (rp->header.seq == 1) {
1443 if (!meltdown_1pkt ||
1444 (rp->header.flags & RX_LAST_PACKET)) {
1446 } else if (rxi_2dchoice && !choice2 &&
1447 !(tcall->flags & RX_CALL_CLEARED) &&
1448 (tcall->rprev > rxi_HardAckRate)) {
1450 } else rxi_md2cnt++;
1456 ReturnToServerPool(service);
1463 MUTEX_EXIT(&rx_serverPool_lock);
1464 MUTEX_ENTER(&call->lock);
1466 if (call->state != RX_STATE_PRECALL || call->error) {
1467 MUTEX_EXIT(&call->lock);
1468 MUTEX_ENTER(&rx_serverPool_lock);
1469 ReturnToServerPool(service);
1474 if (queue_IsEmpty(&call->rq) ||
1475 queue_First(&call->rq, rx_packet)->header.seq != 1)
1476 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1478 CLEAR_CALL_QUEUE_LOCK(call);
1479 call->flags &= ~RX_CALL_WAIT_PROC;
1480 MUTEX_ENTER(&rx_stats_mutex);
1482 MUTEX_EXIT(&rx_stats_mutex);
1486 /* If there are no eligible incoming calls, add this process
1487 * to the idle server queue, to wait for one */
1491 *socketp = OSI_NULLSOCKET;
1493 sq->socketp = socketp;
1494 queue_Append(&rx_idleServerQueue, sq);
1495 #ifndef AFS_AIX41_ENV
1496 rx_waitForPacket = sq;
1497 #endif /* AFS_AIX41_ENV */
1499 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1501 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1502 MUTEX_EXIT(&rx_serverPool_lock);
1503 return (struct rx_call *)0;
1506 } while (!(call = sq->newcall) &&
1507 !(socketp && *socketp != OSI_NULLSOCKET));
1508 MUTEX_EXIT(&rx_serverPool_lock);
1510 MUTEX_ENTER(&call->lock);
1516 MUTEX_ENTER(&freeSQEList_lock);
1517 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1518 rx_FreeSQEList = sq;
1519 MUTEX_EXIT(&freeSQEList_lock);
1522 clock_GetTime(&call->startTime);
1523 call->state = RX_STATE_ACTIVE;
1524 call->mode = RX_MODE_RECEIVING;
1526 rxi_calltrace(RX_CALL_START, call);
1527 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1528 call->conn->service->servicePort,
1529 call->conn->service->serviceId, call));
1531 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1532 MUTEX_EXIT(&call->lock);
1534 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1539 #else /* RX_ENABLE_LOCKS */
1540 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1542 struct rx_serverQueueEntry *sq;
1543 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1544 struct rx_service *service = NULL;
1549 MUTEX_ENTER(&freeSQEList_lock);
1551 if ((sq = rx_FreeSQEList)) {
1552 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1553 MUTEX_EXIT(&freeSQEList_lock);
1554 } else { /* otherwise allocate a new one and return that */
1555 MUTEX_EXIT(&freeSQEList_lock);
1556 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1557 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1558 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1560 MUTEX_ENTER(&sq->lock);
1562 if (cur_service != NULL) {
1563 cur_service->nRequestsRunning--;
1564 if (cur_service->nRequestsRunning < cur_service->minProcs)
1568 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1569 register struct rx_call *tcall, *ncall;
1570 /* Scan for eligible incoming calls. A call is not eligible
1571 * if the maximum number of calls for its service type are
1572 * already executing */
1573 /* One thread will process calls FCFS (to prevent starvation),
1574 * while the other threads may run ahead looking for calls which
1575 * have all their input data available immediately. This helps
1576 * keep threads from blocking, waiting for data from the client. */
1577 choice2 = (struct rx_call *) 0;
1578 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1579 service = tcall->conn->service;
1580 if (QuotaOK(service)) {
1581 if (!tno || !tcall->queue_item_header.next ) {
1582 /* If we're thread 0, then we'll just use
1583 * this call. If we haven't been able to find an optimal
1584 * choice, and we're at the end of the list, then use a
1585 * 2d choice if one has been identified. Otherwise... */
1586 call = (choice2 ? choice2 : tcall);
1587 service = call->conn->service;
1588 } else if (!queue_IsEmpty(&tcall->rq)) {
1589 struct rx_packet *rp;
1590 rp = queue_First(&tcall->rq, rx_packet);
1591 if (rp->header.seq == 1
1592 && (!meltdown_1pkt ||
1593 (rp->header.flags & RX_LAST_PACKET))) {
1595 } else if (rxi_2dchoice && !choice2 &&
1596 !(tcall->flags & RX_CALL_CLEARED) &&
1597 (tcall->rprev > rxi_HardAckRate)) {
1599 } else rxi_md2cnt++;
1609 /* we can't schedule a call if there's no data!!! */
1610 /* send an ack if there's no data, if we're missing the
1611 * first packet, or we're missing something between first
1612 * and last -- there's a "hole" in the incoming data. */
1613 if (queue_IsEmpty(&call->rq) ||
1614 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1615 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1616 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1618 call->flags &= (~RX_CALL_WAIT_PROC);
1619 service->nRequestsRunning++;
1620 /* just started call in minProcs pool, need fewer to maintain
1622 if (service->nRequestsRunning <= service->minProcs)
1626 /* MUTEX_EXIT(&call->lock); */
1629 /* If there are no eligible incoming calls, add this process
1630 * to the idle server queue, to wait for one */
1633 *socketp = OSI_NULLSOCKET;
1635 sq->socketp = socketp;
1636 queue_Append(&rx_idleServerQueue, sq);
1640 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1643 return (struct rx_call *)0;
1646 } while (!(call = sq->newcall) &&
1647 !(socketp && *socketp != OSI_NULLSOCKET));
1649 MUTEX_EXIT(&sq->lock);
1651 MUTEX_ENTER(&freeSQEList_lock);
1652 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1653 rx_FreeSQEList = sq;
1654 MUTEX_EXIT(&freeSQEList_lock);
1657 clock_GetTime(&call->startTime);
1658 call->state = RX_STATE_ACTIVE;
1659 call->mode = RX_MODE_RECEIVING;
1661 rxi_calltrace(RX_CALL_START, call);
1662 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1663 call->conn->service->servicePort,
1664 call->conn->service->serviceId, call));
1666 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1674 #endif /* RX_ENABLE_LOCKS */
1678 /* Establish a procedure to be called when a packet arrives for a
1679 * call. This routine will be called at most once after each call,
1680 * and will also be called if there is an error condition on the or
1681 * the call is complete. Used by multi rx to build a selection
1682 * function which determines which of several calls is likely to be a
1683 * good one to read from.
1684 * NOTE: the way this is currently implemented it is probably only a
1685 * good idea to (1) use it immediately after a newcall (clients only)
1686 * and (2) only use it once. Other uses currently void your warranty
1688 void rx_SetArrivalProc(register struct rx_call *call,
1689 register VOID (*proc)(register struct rx_call *call,
1690 register struct multi_handle *mh, register int index),
1691 register VOID *handle, register VOID *arg)
1693 call->arrivalProc = proc;
1694 call->arrivalProcHandle = handle;
1695 call->arrivalProcArg = arg;
1698 /* Call is finished (possibly prematurely). Return rc to the peer, if
1699 * appropriate, and return the final error code from the conversation
1702 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1704 register struct rx_connection *conn = call->conn;
1705 register struct rx_service *service;
1706 register struct rx_packet *tp; /* Temporary packet pointer */
1707 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1711 dpf(("rx_EndCall(call %x)\n", call));
1715 MUTEX_ENTER(&call->lock);
1717 if (rc == 0 && call->error == 0) {
1718 call->abortCode = 0;
1719 call->abortCount = 0;
1722 call->arrivalProc = (VOID (*)()) 0;
1723 if (rc && call->error == 0) {
1724 rxi_CallError(call, rc);
1725 /* Send an abort message to the peer if this error code has
1726 * only just been set. If it was set previously, assume the
1727 * peer has already been sent the error code or will request it
1729 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1731 if (conn->type == RX_SERVER_CONNECTION) {
1732 /* Make sure reply or at least dummy reply is sent */
1733 if (call->mode == RX_MODE_RECEIVING) {
1734 rxi_WriteProc(call, 0, 0);
1736 if (call->mode == RX_MODE_SENDING) {
1737 rxi_FlushWrite(call);
1739 service = conn->service;
1740 rxi_calltrace(RX_CALL_END, call);
1741 /* Call goes to hold state until reply packets are acknowledged */
1742 if (call->tfirst + call->nSoftAcked < call->tnext) {
1743 call->state = RX_STATE_HOLD;
1745 call->state = RX_STATE_DALLY;
1746 rxi_ClearTransmitQueue(call, 0);
1747 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1748 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1751 else { /* Client connection */
1753 /* Make sure server receives input packets, in the case where
1754 * no reply arguments are expected */
1755 if ((call->mode == RX_MODE_SENDING)
1756 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1757 (void) rxi_ReadProc(call, &dummy, 1);
1760 /* If we had an outstanding delayed ack, be nice to the server
1761 * and force-send it now.
1763 if (call->delayedAckEvent) {
1764 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1765 call->delayedAckEvent = NULL;
1766 rxi_SendDelayedAck(NULL, call, NULL);
1769 /* We need to release the call lock since it's lower than the
1770 * conn_call_lock and we don't want to hold the conn_call_lock
1771 * over the rx_ReadProc call. The conn_call_lock needs to be held
1772 * here for the case where rx_NewCall is perusing the calls on
1773 * the connection structure. We don't want to signal until
1774 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1775 * have checked this call, found it active and by the time it
1776 * goes to sleep, will have missed the signal.
1778 MUTEX_EXIT(&call->lock);
1779 MUTEX_ENTER(&conn->conn_call_lock);
1780 MUTEX_ENTER(&call->lock);
1781 MUTEX_ENTER(&conn->conn_data_lock);
1782 conn->flags |= RX_CONN_BUSY;
1783 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1784 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1785 MUTEX_EXIT(&conn->conn_data_lock);
1786 #ifdef RX_ENABLE_LOCKS
1787 CV_BROADCAST(&conn->conn_call_cv);
1792 #ifdef RX_ENABLE_LOCKS
1794 MUTEX_EXIT(&conn->conn_data_lock);
1796 #endif /* RX_ENABLE_LOCKS */
1797 call->state = RX_STATE_DALLY;
1799 error = call->error;
1801 /* currentPacket, nLeft, and NFree must be zeroed here, because
1802 * ResetCall cannot: ResetCall may be called at splnet(), in the
1803 * kernel version, and may interrupt the macros rx_Read or
1804 * rx_Write, which run at normal priority for efficiency. */
1805 if (call->currentPacket) {
1806 rxi_FreePacket(call->currentPacket);
1807 call->currentPacket = (struct rx_packet *) 0;
1808 call->nLeft = call->nFree = call->curlen = 0;
1811 call->nLeft = call->nFree = call->curlen = 0;
1813 /* Free any packets from the last call to ReadvProc/WritevProc */
1814 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1819 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1820 MUTEX_EXIT(&call->lock);
1821 if (conn->type == RX_CLIENT_CONNECTION) {
1822 MUTEX_EXIT(&conn->conn_call_lock);
1823 conn->flags &= ~RX_CONN_BUSY;
1828 * Map errors to the local host's errno.h format.
1830 error = ntoh_syserr_conv(error);
1834 #if !defined(KERNEL)
1836 /* Call this routine when shutting down a server or client (especially
1837 * clients). This will allow Rx to gracefully garbage collect server
1838 * connections, and reduce the number of retries that a server might
1839 * make to a dead client.
1840 * This is not quite right, since some calls may still be ongoing and
1841 * we can't lock them to destroy them. */
1842 void rx_Finalize(void)
1844 register struct rx_connection **conn_ptr, **conn_end;
1848 if (rxinit_status == 1) {
1850 return; /* Already shutdown. */
1852 rxi_DeleteCachedConnections();
1853 if (rx_connHashTable) {
1854 MUTEX_ENTER(&rx_connHashTable_lock);
1855 for (conn_ptr = &rx_connHashTable[0],
1856 conn_end = &rx_connHashTable[rx_hashTableSize];
1857 conn_ptr < conn_end; conn_ptr++) {
1858 struct rx_connection *conn, *next;
1859 for (conn = *conn_ptr; conn; conn = next) {
1861 if (conn->type == RX_CLIENT_CONNECTION) {
1862 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1864 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1865 #ifdef RX_ENABLE_LOCKS
1866 rxi_DestroyConnectionNoLock(conn);
1867 #else /* RX_ENABLE_LOCKS */
1868 rxi_DestroyConnection(conn);
1869 #endif /* RX_ENABLE_LOCKS */
1873 #ifdef RX_ENABLE_LOCKS
1874 while (rx_connCleanup_list) {
1875 struct rx_connection *conn;
1876 conn = rx_connCleanup_list;
1877 rx_connCleanup_list = rx_connCleanup_list->next;
1878 MUTEX_EXIT(&rx_connHashTable_lock);
1879 rxi_CleanupConnection(conn);
1880 MUTEX_ENTER(&rx_connHashTable_lock);
1882 MUTEX_EXIT(&rx_connHashTable_lock);
1883 #endif /* RX_ENABLE_LOCKS */
1892 /* if we wakeup packet waiter too often, can get in loop with two
1893 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1894 void rxi_PacketsUnWait(void)
1896 if (!rx_waitingForPackets) {
1900 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1901 return; /* still over quota */
1904 rx_waitingForPackets = 0;
1905 #ifdef RX_ENABLE_LOCKS
1906 CV_BROADCAST(&rx_waitingForPackets_cv);
1908 osi_rxWakeup(&rx_waitingForPackets);
1914 /* ------------------Internal interfaces------------------------- */
1916 /* Return this process's service structure for the
1917 * specified socket and service */
1918 struct rx_service *rxi_FindService(register osi_socket socket,
1919 register u_short serviceId)
1921 register struct rx_service **sp;
1922 for (sp = &rx_services[0]; *sp; sp++) {
1923 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1929 /* Allocate a call structure, for the indicated channel of the
1930 * supplied connection. The mode and state of the call must be set by
1931 * the caller. Returns the call with mutex locked. */
1932 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1933 register int channel)
1935 register struct rx_call *call;
1936 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1937 register struct rx_call *cp; /* Call pointer temp */
1938 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1939 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1941 /* Grab an existing call structure, or allocate a new one.
1942 * Existing call structures are assumed to have been left reset by
1944 MUTEX_ENTER(&rx_freeCallQueue_lock);
1946 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1948 * EXCEPT that the TQ might not yet be cleared out.
1949 * Skip over those with in-use TQs.
1952 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1953 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1959 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1960 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1961 call = queue_First(&rx_freeCallQueue, rx_call);
1962 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1964 MUTEX_ENTER(&rx_stats_mutex);
1965 rx_stats.nFreeCallStructs--;
1966 MUTEX_EXIT(&rx_stats_mutex);
1967 MUTEX_EXIT(&rx_freeCallQueue_lock);
1968 MUTEX_ENTER(&call->lock);
1969 CLEAR_CALL_QUEUE_LOCK(call);
1970 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1971 /* Now, if TQ wasn't cleared earlier, do it now. */
1972 if (call->flags & RX_CALL_TQ_CLEARME) {
1973 rxi_ClearTransmitQueue(call, 0);
1974 queue_Init(&call->tq);
1976 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1977 /* Bind the call to its connection structure */
1979 rxi_ResetCall(call, 1);
1982 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1984 MUTEX_EXIT(&rx_freeCallQueue_lock);
1985 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1986 MUTEX_ENTER(&call->lock);
1987 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1988 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1989 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1991 MUTEX_ENTER(&rx_stats_mutex);
1992 rx_stats.nCallStructs++;
1993 MUTEX_EXIT(&rx_stats_mutex);
1994 /* Initialize once-only items */
1995 queue_Init(&call->tq);
1996 queue_Init(&call->rq);
1997 queue_Init(&call->iovq);
1998 /* Bind the call to its connection structure (prereq for reset) */
2000 rxi_ResetCall(call, 1);
2002 call->channel = channel;
2003 call->callNumber = &conn->callNumber[channel];
2004 /* Note that the next expected call number is retained (in
2005 * conn->callNumber[i]), even if we reallocate the call structure
2007 conn->call[channel] = call;
2008 /* if the channel's never been used (== 0), we should start at 1, otherwise
2009 the call number is valid from the last time this channel was used */
2010 if (*call->callNumber == 0) *call->callNumber = 1;
2015 /* A call has been inactive long enough that so we can throw away
2016 * state, including the call structure, which is placed on the call
2018 * Call is locked upon entry.
2019 * haveCTLock set if called from rxi_ReapConnections
2021 #ifdef RX_ENABLE_LOCKS
2022 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2023 #else /* RX_ENABLE_LOCKS */
2024 void rxi_FreeCall(register struct rx_call *call)
2025 #endif /* RX_ENABLE_LOCKS */
2027 register int channel = call->channel;
2028 register struct rx_connection *conn = call->conn;
2031 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2032 (*call->callNumber)++;
2033 rxi_ResetCall(call, 0);
2034 call->conn->call[channel] = (struct rx_call *) 0;
2036 MUTEX_ENTER(&rx_freeCallQueue_lock);
2037 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2038 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2039 /* A call may be free even though its transmit queue is still in use.
2040 * Since we search the call list from head to tail, put busy calls at
2041 * the head of the list, and idle calls at the tail.
2043 if (call->flags & RX_CALL_TQ_BUSY)
2044 queue_Prepend(&rx_freeCallQueue, call);
2046 queue_Append(&rx_freeCallQueue, call);
2047 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2048 queue_Append(&rx_freeCallQueue, call);
2049 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2050 MUTEX_ENTER(&rx_stats_mutex);
2051 rx_stats.nFreeCallStructs++;
2052 MUTEX_EXIT(&rx_stats_mutex);
2054 MUTEX_EXIT(&rx_freeCallQueue_lock);
2056 /* Destroy the connection if it was previously slated for
2057 * destruction, i.e. the Rx client code previously called
2058 * rx_DestroyConnection (client connections), or
2059 * rxi_ReapConnections called the same routine (server
2060 * connections). Only do this, however, if there are no
2061 * outstanding calls. Note that for fine grain locking, there appears
2062 * to be a deadlock in that rxi_FreeCall has a call locked and
2063 * DestroyConnectionNoLock locks each call in the conn. But note a
2064 * few lines up where we have removed this call from the conn.
2065 * If someone else destroys a connection, they either have no
2066 * call lock held or are going through this section of code.
2068 if (conn->flags & RX_CONN_DESTROY_ME) {
2069 MUTEX_ENTER(&conn->conn_data_lock);
2071 MUTEX_EXIT(&conn->conn_data_lock);
2072 #ifdef RX_ENABLE_LOCKS
2074 rxi_DestroyConnectionNoLock(conn);
2076 rxi_DestroyConnection(conn);
2077 #else /* RX_ENABLE_LOCKS */
2078 rxi_DestroyConnection(conn);
2079 #endif /* RX_ENABLE_LOCKS */
2083 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2084 char *rxi_Alloc(register size_t size)
2088 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2089 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2092 int glockOwner = ISAFS_GLOCK();
2096 MUTEX_ENTER(&rx_stats_mutex);
2097 rxi_Alloccnt++; rxi_Allocsize += size;
2098 MUTEX_EXIT(&rx_stats_mutex);
2099 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2100 if (size > AFS_SMALLOCSIZ) {
2101 p = (char *) osi_AllocMediumSpace(size);
2103 p = (char *) osi_AllocSmall(size, 1);
2104 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2109 p = (char *) osi_Alloc(size);
2111 if (!p) osi_Panic("rxi_Alloc error");
2116 void rxi_Free(void *addr, register size_t size)
2118 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2119 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2122 int glockOwner = ISAFS_GLOCK();
2126 MUTEX_ENTER(&rx_stats_mutex);
2127 rxi_Alloccnt--; rxi_Allocsize -= size;
2128 MUTEX_EXIT(&rx_stats_mutex);
2129 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2130 if (size > AFS_SMALLOCSIZ)
2131 osi_FreeMediumSpace(addr);
2133 osi_FreeSmall(addr);
2134 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2139 osi_Free(addr, size);
2143 /* Find the peer process represented by the supplied (host,port)
2144 * combination. If there is no appropriate active peer structure, a
2145 * new one will be allocated and initialized
2146 * The origPeer, if set, is a pointer to a peer structure on which the
2147 * refcount will be be decremented. This is used to replace the peer
2148 * structure hanging off a connection structure */
2149 struct rx_peer *rxi_FindPeer(register afs_uint32 host,
2150 register u_short port, struct rx_peer *origPeer, int create)
2152 register struct rx_peer *pp;
2154 hashIndex = PEER_HASH(host, port);
2155 MUTEX_ENTER(&rx_peerHashTable_lock);
2156 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2157 if ((pp->host == host) && (pp->port == port)) break;
2161 pp = rxi_AllocPeer(); /* This bzero's *pp */
2162 pp->host = host; /* set here or in InitPeerParams is zero */
2164 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2165 queue_Init(&pp->congestionQueue);
2166 queue_Init(&pp->rpcStats);
2167 pp->next = rx_peerHashTable[hashIndex];
2168 rx_peerHashTable[hashIndex] = pp;
2169 rxi_InitPeerParams(pp);
2170 MUTEX_ENTER(&rx_stats_mutex);
2171 rx_stats.nPeerStructs++;
2172 MUTEX_EXIT(&rx_stats_mutex);
2179 origPeer->refCount--;
2180 MUTEX_EXIT(&rx_peerHashTable_lock);
2185 /* Find the connection at (host, port) started at epoch, and with the
2186 * given connection id. Creates the server connection if necessary.
2187 * The type specifies whether a client connection or a server
2188 * connection is desired. In both cases, (host, port) specify the
2189 * peer's (host, pair) pair. Client connections are not made
2190 * automatically by this routine. The parameter socket gives the
2191 * socket descriptor on which the packet was received. This is used,
2192 * in the case of server connections, to check that *new* connections
2193 * come via a valid (port, serviceId). Finally, the securityIndex
2194 * parameter must match the existing index for the connection. If a
2195 * server connection is created, it will be created using the supplied
2196 * index, if the index is valid for this service */
2197 struct rx_connection *rxi_FindConnection(osi_socket socket,
2198 register afs_int32 host, register u_short port, u_short serviceId,
2199 afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2201 int hashindex, flag;
2202 register struct rx_connection *conn;
2203 hashindex = CONN_HASH(host, port, cid, epoch, type);
2204 MUTEX_ENTER(&rx_connHashTable_lock);
2205 rxLastConn ? (conn = rxLastConn, flag = 0) :
2206 (conn = rx_connHashTable[hashindex], flag = 1);
2208 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2209 && (epoch == conn->epoch)) {
2210 register struct rx_peer *pp = conn->peer;
2211 if (securityIndex != conn->securityIndex) {
2212 /* this isn't supposed to happen, but someone could forge a packet
2213 like this, and there seems to be some CM bug that makes this
2214 happen from time to time -- in which case, the fileserver
2216 MUTEX_EXIT(&rx_connHashTable_lock);
2217 return (struct rx_connection *) 0;
2219 if (pp->host == host && pp->port == port)
2221 if (type == RX_CLIENT_CONNECTION && pp->port == port)
2223 if (type == RX_CLIENT_CONNECTION && (conn->epoch & 0x80000000))
2228 /* the connection rxLastConn that was used the last time is not the
2229 ** one we are looking for now. Hence, start searching in the hash */
2231 conn = rx_connHashTable[hashindex];
2237 struct rx_service *service;
2238 if (type == RX_CLIENT_CONNECTION) {
2239 MUTEX_EXIT(&rx_connHashTable_lock);
2240 return (struct rx_connection *) 0;
2242 service = rxi_FindService(socket, serviceId);
2243 if (!service || (securityIndex >= service->nSecurityObjects)
2244 || (service->securityObjects[securityIndex] == 0)) {
2245 MUTEX_EXIT(&rx_connHashTable_lock);
2246 return (struct rx_connection *) 0;
2248 conn = rxi_AllocConnection(); /* This bzero's the connection */
2249 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2251 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2253 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2254 conn->next = rx_connHashTable[hashindex];
2255 rx_connHashTable[hashindex] = conn;
2256 conn->peer = rxi_FindPeer(host, port, 0, 1);
2257 conn->type = RX_SERVER_CONNECTION;
2258 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2259 conn->epoch = epoch;
2260 conn->cid = cid & RX_CIDMASK;
2261 /* conn->serial = conn->lastSerial = 0; */
2262 /* conn->timeout = 0; */
2263 conn->ackRate = RX_FAST_ACK_RATE;
2264 conn->service = service;
2265 conn->serviceId = serviceId;
2266 conn->securityIndex = securityIndex;
2267 conn->securityObject = service->securityObjects[securityIndex];
2268 conn->nSpecific = 0;
2269 conn->specific = NULL;
2270 rx_SetConnDeadTime(conn, service->connDeadTime);
2271 rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
2272 /* Notify security object of the new connection */
2273 RXS_NewConnection(conn->securityObject, conn);
2274 /* XXXX Connection timeout? */
2275 if (service->newConnProc) (*service->newConnProc)(conn);
2276 MUTEX_ENTER(&rx_stats_mutex);
2277 rx_stats.nServerConns++;
2278 MUTEX_EXIT(&rx_stats_mutex);
2281 MUTEX_ENTER(&conn->conn_data_lock);
2283 MUTEX_EXIT(&conn->conn_data_lock);
2285 rxLastConn = conn; /* store this connection as the last conn used */
2286 MUTEX_EXIT(&rx_connHashTable_lock);
2290 /* There are two packet tracing routines available for testing and monitoring
2291 * Rx. One is called just after every packet is received and the other is
2292 * called just before every packet is sent. Received packets, have had their
2293 * headers decoded, and packets to be sent have not yet had their headers
2294 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2295 * containing the network address. Both can be modified. The return value, if
2296 * non-zero, indicates that the packet should be dropped. */
2298 int (*rx_justReceived)() = 0;
2299 int (*rx_almostSent)() = 0;
2301 /* A packet has been received off the interface. Np is the packet, socket is
2302 * the socket number it was received from (useful in determining which service
2303 * this packet corresponds to), and (host, port) reflect the host,port of the
2304 * sender. This call returns the packet to the caller if it is finished with
2305 * it, rather than de-allocating it, just as a small performance hack */
2307 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np,
2308 osi_socket socket, afs_uint32 host, u_short port,
2309 int *tnop, struct rx_call **newcallp)
2311 register struct rx_call *call;
2312 register struct rx_connection *conn;
2314 afs_uint32 currentCallNumber;
2320 struct rx_packet *tnp;
2323 /* We don't print out the packet until now because (1) the time may not be
2324 * accurate enough until now in the lwp implementation (rx_Listener only gets
2325 * the time after the packet is read) and (2) from a protocol point of view,
2326 * this is the first time the packet has been seen */
2327 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2328 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2329 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2330 np->header.serial, packetType, host, port, np->header.serviceId,
2331 np->header.epoch, np->header.cid, np->header.callNumber,
2332 np->header.seq, np->header.flags, np));
2335 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2336 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2339 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2340 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2343 /* If an input tracer function is defined, call it with the packet and
2344 * network address. Note this function may modify its arguments. */
2345 if (rx_justReceived) {
2346 struct sockaddr_in addr;
2348 addr.sin_family = AF_INET;
2349 addr.sin_port = port;
2350 addr.sin_addr.s_addr = host;
2351 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2352 addr.sin_len = sizeof(addr);
2353 #endif /* AFS_OSF_ENV */
2354 drop = (*rx_justReceived) (np, &addr);
2355 /* drop packet if return value is non-zero */
2356 if (drop) return np;
2357 port = addr.sin_port; /* in case fcn changed addr */
2358 host = addr.sin_addr.s_addr;
2362 /* If packet was not sent by the client, then *we* must be the client */
2363 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2364 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2366 /* Find the connection (or fabricate one, if we're the server & if
2367 * necessary) associated with this packet */
2368 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2369 np->header.cid, np->header.epoch, type,
2370 np->header.securityIndex);
2373 /* If no connection found or fabricated, just ignore the packet.
2374 * (An argument could be made for sending an abort packet for
2379 MUTEX_ENTER(&conn->conn_data_lock);
2380 if (conn->maxSerial < np->header.serial)
2381 conn->maxSerial = np->header.serial;
2382 MUTEX_EXIT(&conn->conn_data_lock);
2384 /* If the connection is in an error state, send an abort packet and ignore
2385 * the incoming packet */
2387 /* Don't respond to an abort packet--we don't want loops! */
2388 MUTEX_ENTER(&conn->conn_data_lock);
2389 if (np->header.type != RX_PACKET_TYPE_ABORT)
2390 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2392 MUTEX_EXIT(&conn->conn_data_lock);
2396 /* Check for connection-only requests (i.e. not call specific). */
2397 if (np->header.callNumber == 0) {
2398 switch (np->header.type) {
2399 case RX_PACKET_TYPE_ABORT:
2400 /* What if the supplied error is zero? */
2401 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2402 MUTEX_ENTER(&conn->conn_data_lock);
2404 MUTEX_EXIT(&conn->conn_data_lock);
2406 case RX_PACKET_TYPE_CHALLENGE:
2407 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2408 MUTEX_ENTER(&conn->conn_data_lock);
2410 MUTEX_EXIT(&conn->conn_data_lock);
2412 case RX_PACKET_TYPE_RESPONSE:
2413 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2414 MUTEX_ENTER(&conn->conn_data_lock);
2416 MUTEX_EXIT(&conn->conn_data_lock);
2418 case RX_PACKET_TYPE_PARAMS:
2419 case RX_PACKET_TYPE_PARAMS+1:
2420 case RX_PACKET_TYPE_PARAMS+2:
2421 /* ignore these packet types for now */
2422 MUTEX_ENTER(&conn->conn_data_lock);
2424 MUTEX_EXIT(&conn->conn_data_lock);
2429 /* Should not reach here, unless the peer is broken: send an
2431 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2432 MUTEX_ENTER(&conn->conn_data_lock);
2433 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2435 MUTEX_EXIT(&conn->conn_data_lock);
2440 channel = np->header.cid & RX_CHANNELMASK;
2441 call = conn->call[channel];
2442 #ifdef RX_ENABLE_LOCKS
2444 MUTEX_ENTER(&call->lock);
2445 /* Test to see if call struct is still attached to conn. */
2446 if (call != conn->call[channel]) {
2448 MUTEX_EXIT(&call->lock);
2449 if (type == RX_SERVER_CONNECTION) {
2450 call = conn->call[channel];
2451 /* If we started with no call attached and there is one now,
2452 * another thread is also running this routine and has gotten
2453 * the connection channel. We should drop this packet in the tests
2454 * below. If there was a call on this connection and it's now
2455 * gone, then we'll be making a new call below.
2456 * If there was previously a call and it's now different then
2457 * the old call was freed and another thread running this routine
2458 * has created a call on this channel. One of these two threads
2459 * has a packet for the old call and the code below handles those
2463 MUTEX_ENTER(&call->lock);
2466 /* This packet can't be for this call. If the new call address is
2467 * 0 then no call is running on this channel. If there is a call
2468 * then, since this is a client connection we're getting data for
2469 * it must be for the previous call.
2471 MUTEX_ENTER(&rx_stats_mutex);
2472 rx_stats.spuriousPacketsRead++;
2473 MUTEX_EXIT(&rx_stats_mutex);
2474 MUTEX_ENTER(&conn->conn_data_lock);
2476 MUTEX_EXIT(&conn->conn_data_lock);
2481 currentCallNumber = conn->callNumber[channel];
2483 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2484 if (np->header.callNumber < currentCallNumber) {
2485 MUTEX_ENTER(&rx_stats_mutex);
2486 rx_stats.spuriousPacketsRead++;
2487 MUTEX_EXIT(&rx_stats_mutex);
2488 #ifdef RX_ENABLE_LOCKS
2490 MUTEX_EXIT(&call->lock);
2492 MUTEX_ENTER(&conn->conn_data_lock);
2494 MUTEX_EXIT(&conn->conn_data_lock);
2498 MUTEX_ENTER(&conn->conn_call_lock);
2499 call = rxi_NewCall(conn, channel);
2500 MUTEX_EXIT(&conn->conn_call_lock);
2501 *call->callNumber = np->header.callNumber;
2502 call->state = RX_STATE_PRECALL;
2503 clock_GetTime(&call->queueTime);
2504 hzero(call->bytesSent);
2505 hzero(call->bytesRcvd);
2506 rxi_KeepAliveOn(call);
2508 else if (np->header.callNumber != currentCallNumber) {
2509 /* Wait until the transmit queue is idle before deciding
2510 * whether to reset the current call. Chances are that the
2511 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2514 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2515 while ((call->state == RX_STATE_ACTIVE) &&
2516 (call->flags & RX_CALL_TQ_BUSY)) {
2517 call->flags |= RX_CALL_TQ_WAIT;
2518 #ifdef RX_ENABLE_LOCKS
2519 CV_WAIT(&call->cv_tq, &call->lock);
2520 #else /* RX_ENABLE_LOCKS */
2521 osi_rxSleep(&call->tq);
2522 #endif /* RX_ENABLE_LOCKS */
2524 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2525 /* If the new call cannot be taken right now send a busy and set
2526 * the error condition in this call, so that it terminates as
2527 * quickly as possible */
2528 if (call->state == RX_STATE_ACTIVE) {
2529 struct rx_packet *tp;
2531 rxi_CallError(call, RX_CALL_DEAD);
2532 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2533 MUTEX_EXIT(&call->lock);
2534 MUTEX_ENTER(&conn->conn_data_lock);
2536 MUTEX_EXIT(&conn->conn_data_lock);
2539 rxi_ResetCall(call, 0);
2540 *call->callNumber = np->header.callNumber;
2541 call->state = RX_STATE_PRECALL;
2542 clock_GetTime(&call->queueTime);
2543 hzero(call->bytesSent);
2544 hzero(call->bytesRcvd);
2546 * If the number of queued calls exceeds the overload
2547 * threshold then abort this call.
2549 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2550 struct rx_packet *tp;
2552 rxi_CallError(call, rx_BusyError);
2553 tp = rxi_SendCallAbort(call, np, 1, 0);
2554 MUTEX_EXIT(&call->lock);
2555 MUTEX_ENTER(&conn->conn_data_lock);
2557 MUTEX_EXIT(&conn->conn_data_lock);
2560 rxi_KeepAliveOn(call);
2563 /* Continuing call; do nothing here. */
2565 } else { /* we're the client */
2566 /* Ignore all incoming acknowledgements for calls in DALLY state */
2567 if ( call && (call->state == RX_STATE_DALLY)
2568 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2569 MUTEX_ENTER(&rx_stats_mutex);
2570 rx_stats.ignorePacketDally++;
2571 MUTEX_EXIT(&rx_stats_mutex);
2572 #ifdef RX_ENABLE_LOCKS
2574 MUTEX_EXIT(&call->lock);
2577 MUTEX_ENTER(&conn->conn_data_lock);
2579 MUTEX_EXIT(&conn->conn_data_lock);
2583 /* Ignore anything that's not relevant to the current call. If there
2584 * isn't a current call, then no packet is relevant. */
2585 if (!call || (np->header.callNumber != currentCallNumber)) {
2586 MUTEX_ENTER(&rx_stats_mutex);
2587 rx_stats.spuriousPacketsRead++;
2588 MUTEX_EXIT(&rx_stats_mutex);
2589 #ifdef RX_ENABLE_LOCKS
2591 MUTEX_EXIT(&call->lock);
2594 MUTEX_ENTER(&conn->conn_data_lock);
2596 MUTEX_EXIT(&conn->conn_data_lock);
2599 /* If the service security object index stamped in the packet does not
2600 * match the connection's security index, ignore the packet */
2601 if (np->header.securityIndex != conn->securityIndex) {
2602 #ifdef RX_ENABLE_LOCKS
2603 MUTEX_EXIT(&call->lock);
2605 MUTEX_ENTER(&conn->conn_data_lock);
2607 MUTEX_EXIT(&conn->conn_data_lock);
2611 /* If we're receiving the response, then all transmit packets are
2612 * implicitly acknowledged. Get rid of them. */
2613 if (np->header.type == RX_PACKET_TYPE_DATA) {
2614 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2615 /* XXX Hack. Because we must release the global rx lock when
2616 * sending packets (osi_NetSend) we drop all acks while we're
2617 * traversing the tq in rxi_Start sending packets out because
2618 * packets may move to the freePacketQueue as result of being here!
2619 * So we drop these packets until we're safely out of the
2620 * traversing. Really ugly!
2621 * For fine grain RX locking, we set the acked field in the
2622 * packets and let rxi_Start remove them from the transmit queue.
2624 if (call->flags & RX_CALL_TQ_BUSY) {
2625 #ifdef RX_ENABLE_LOCKS
2626 rxi_SetAcksInTransmitQueue(call);
2629 return np; /* xmitting; drop packet */
2633 rxi_ClearTransmitQueue(call, 0);
2635 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2636 rxi_ClearTransmitQueue(call, 0);
2637 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2639 if (np->header.type == RX_PACKET_TYPE_ACK) {
2640 /* now check to see if this is an ack packet acknowledging that the
2641 * server actually *lost* some hard-acked data. If this happens we
2642 * ignore this packet, as it may indicate that the server restarted in
2643 * the middle of a call. It is also possible that this is an old ack
2644 * packet. We don't abort the connection in this case, because this
2645 * *might* just be an old ack packet. The right way to detect a server
2646 * restart in the midst of a call is to notice that the server epoch
2648 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2649 * XXX unacknowledged. I think that this is off-by-one, but
2650 * XXX I don't dare change it just yet, since it will
2651 * XXX interact badly with the server-restart detection
2652 * XXX code in receiveackpacket. */
2653 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2654 MUTEX_ENTER(&rx_stats_mutex);
2655 rx_stats.spuriousPacketsRead++;
2656 MUTEX_EXIT(&rx_stats_mutex);
2657 MUTEX_EXIT(&call->lock);
2658 MUTEX_ENTER(&conn->conn_data_lock);
2660 MUTEX_EXIT(&conn->conn_data_lock);
2664 } /* else not a data packet */
2667 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2668 /* Set remote user defined status from packet */
2669 call->remoteStatus = np->header.userStatus;
2671 /* Note the gap between the expected next packet and the actual
2672 * packet that arrived, when the new packet has a smaller serial number
2673 * than expected. Rioses frequently reorder packets all by themselves,
2674 * so this will be quite important with very large window sizes.
2675 * Skew is checked against 0 here to avoid any dependence on the type of
2676 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2678 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2679 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2680 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2682 MUTEX_ENTER(&conn->conn_data_lock);
2683 skew = conn->lastSerial - np->header.serial;
2684 conn->lastSerial = np->header.serial;
2685 MUTEX_EXIT(&conn->conn_data_lock);
2687 register struct rx_peer *peer;
2689 if (skew > peer->inPacketSkew) {
2690 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2691 peer->inPacketSkew = skew;
2695 /* Now do packet type-specific processing */
2696 switch (np->header.type) {
2697 case RX_PACKET_TYPE_DATA:
2698 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2701 case RX_PACKET_TYPE_ACK:
2702 /* Respond immediately to ack packets requesting acknowledgement
2704 if (np->header.flags & RX_REQUEST_ACK) {
2706 (void) rxi_SendCallAbort(call, 0, 1, 0);
2708 (void) rxi_SendAck(call, 0, np->header.serial,
2709 RX_ACK_PING_RESPONSE, 1);
2711 np = rxi_ReceiveAckPacket(call, np, 1);
2713 case RX_PACKET_TYPE_ABORT:
2714 /* An abort packet: reset the connection, passing the error up to
2716 /* What if error is zero? */
2717 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2719 case RX_PACKET_TYPE_BUSY:
2722 case RX_PACKET_TYPE_ACKALL:
2723 /* All packets acknowledged, so we can drop all packets previously
2724 * readied for sending */
2725 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2726 /* XXX Hack. We because we can't release the global rx lock when
2727 * sending packets (osi_NetSend) we drop all ack pkts while we're
2728 * traversing the tq in rxi_Start sending packets out because
2729 * packets may move to the freePacketQueue as result of being
2730 * here! So we drop these packets until we're safely out of the
2731 * traversing. Really ugly!
2732 * For fine grain RX locking, we set the acked field in the packets
2733 * and let rxi_Start remove the packets from the transmit queue.
2735 if (call->flags & RX_CALL_TQ_BUSY) {
2736 #ifdef RX_ENABLE_LOCKS
2737 rxi_SetAcksInTransmitQueue(call);
2739 #else /* RX_ENABLE_LOCKS */
2741 return np; /* xmitting; drop packet */
2742 #endif /* RX_ENABLE_LOCKS */
2744 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2745 rxi_ClearTransmitQueue(call, 0);
2748 /* Should not reach here, unless the peer is broken: send an abort
2750 rxi_CallError(call, RX_PROTOCOL_ERROR);
2751 np = rxi_SendCallAbort(call, np, 1, 0);
2754 /* Note when this last legitimate packet was received, for keep-alive
2755 * processing. Note, we delay getting the time until now in the hope that
2756 * the packet will be delivered to the user before any get time is required
2757 * (if not, then the time won't actually be re-evaluated here). */
2758 call->lastReceiveTime = clock_Sec();
2759 MUTEX_EXIT(&call->lock);
2760 MUTEX_ENTER(&conn->conn_data_lock);
2762 MUTEX_EXIT(&conn->conn_data_lock);
2766 /* return true if this is an "interesting" connection from the point of view
2767 of someone trying to debug the system */
2768 int rxi_IsConnInteresting(struct rx_connection *aconn)
2771 register struct rx_call *tcall;
2773 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2775 for(i=0;i<RX_MAXCALLS;i++) {
2776 tcall = aconn->call[i];
2778 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2780 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2788 /* if this is one of the last few packets AND it wouldn't be used by the
2789 receiving call to immediately satisfy a read request, then drop it on
2790 the floor, since accepting it might prevent a lock-holding thread from
2791 making progress in its reading. If a call has been cleared while in
2792 the precall state then ignore all subsequent packets until the call
2793 is assigned to a thread. */
2795 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2798 MUTEX_ENTER(&rx_stats_mutex);
2799 if (((ap->header.seq != 1) &&
2800 (acall->flags & RX_CALL_CLEARED) &&
2801 (acall->state == RX_STATE_PRECALL)) ||
2802 ((rx_nFreePackets < rxi_dataQuota+2) &&
2803 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2804 && (acall->flags & RX_CALL_READER_WAIT)))) {
2807 MUTEX_EXIT(&rx_stats_mutex);
2812 static void rxi_CheckReachEvent(struct rxevent *event,
2813 struct rx_connection *conn, struct rx_call *acall)
2815 struct rx_call *call = acall;
2819 MUTEX_ENTER(&conn->conn_data_lock);
2820 conn->checkReachEvent = NULL;
2821 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2822 if (event) conn->refCount--;
2823 MUTEX_EXIT(&conn->conn_data_lock);
2827 MUTEX_ENTER(&conn->conn_call_lock);
2828 MUTEX_ENTER(&conn->conn_data_lock);
2829 for (i=0; i<RX_MAXCALLS; i++) {
2830 struct rx_call *tc = conn->call[i];
2831 if (tc && tc->state == RX_STATE_PRECALL) {
2837 /* Indicate that rxi_CheckReachEvent is no longer running by
2838 * clearing the flag. Must be atomic under conn_data_lock to
2839 * avoid a new call slipping by: rxi_CheckConnReach holds
2840 * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2842 conn->flags &= ~RX_CONN_ATTACHWAIT;
2843 MUTEX_EXIT(&conn->conn_data_lock);
2844 MUTEX_EXIT(&conn->conn_call_lock);
2848 if (call != acall) MUTEX_ENTER(&call->lock);
2849 rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
2850 if (call != acall) MUTEX_EXIT(&call->lock);
2852 clock_GetTime(&when);
2853 when.sec += RX_CHECKREACH_TIMEOUT;
2854 MUTEX_ENTER(&conn->conn_data_lock);
2855 if (!conn->checkReachEvent) {
2857 conn->checkReachEvent =
2858 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2860 MUTEX_EXIT(&conn->conn_data_lock);
2865 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2867 struct rx_service *service = conn->service;
2868 struct rx_peer *peer = conn->peer;
2869 afs_uint32 now, lastReach;
2871 if (service->checkReach == 0)
2875 MUTEX_ENTER(&peer->peer_lock);
2876 lastReach = peer->lastReachTime;
2877 MUTEX_EXIT(&peer->peer_lock);
2878 if (now - lastReach < RX_CHECKREACH_TTL)
2881 MUTEX_ENTER(&conn->conn_data_lock);
2882 if (conn->flags & RX_CONN_ATTACHWAIT) {
2883 MUTEX_EXIT(&conn->conn_data_lock);
2886 conn->flags |= RX_CONN_ATTACHWAIT;
2887 MUTEX_EXIT(&conn->conn_data_lock);
2888 if (!conn->checkReachEvent)
2889 rxi_CheckReachEvent(NULL, conn, call);
2894 /* try to attach call, if authentication is complete */
2895 static void TryAttach(register struct rx_call *acall,
2896 register osi_socket socket, register int *tnop,
2897 register struct rx_call **newcallp, int reachOverride)
2899 struct rx_connection *conn = acall->conn;
2901 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2902 /* Don't attach until we have any req'd. authentication. */
2903 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2904 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2905 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2906 /* Note: this does not necessarily succeed; there
2907 * may not any proc available
2911 rxi_ChallengeOn(acall->conn);
2916 /* A data packet has been received off the interface. This packet is
2917 * appropriate to the call (the call is in the right state, etc.). This
2918 * routine can return a packet to the caller, for re-use */
2920 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call,
2921 register struct rx_packet *np, int istack, osi_socket socket,
2922 afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2924 int ackNeeded = 0; /* 0 means no, otherwise ack_reason */
2928 afs_uint32 seq, serial, flags;
2930 struct rx_packet *tnp;
2932 MUTEX_ENTER(&rx_stats_mutex);
2933 rx_stats.dataPacketsRead++;
2934 MUTEX_EXIT(&rx_stats_mutex);
2937 /* If there are no packet buffers, drop this new packet, unless we can find
2938 * packet buffers from inactive calls */
2940 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2941 MUTEX_ENTER(&rx_freePktQ_lock);
2942 rxi_NeedMorePackets = TRUE;
2943 MUTEX_EXIT(&rx_freePktQ_lock);
2944 MUTEX_ENTER(&rx_stats_mutex);
2945 rx_stats.noPacketBuffersOnRead++;
2946 MUTEX_EXIT(&rx_stats_mutex);
2947 call->rprev = np->header.serial;
2948 rxi_calltrace(RX_TRACE_DROP, call);
2949 dpf (("packet %x dropped on receipt - quota problems", np));
2951 rxi_ClearReceiveQueue(call);
2952 clock_GetTime(&when);
2953 clock_Add(&when, &rx_softAckDelay);
2954 if (!call->delayedAckEvent ||
2955 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2956 rxevent_Cancel(call->delayedAckEvent, call,
2957 RX_CALL_REFCOUNT_DELAY);
2958 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2959 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2962 /* we've damaged this call already, might as well do it in. */
2968 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2969 * packet is one of several packets transmitted as a single
2970 * datagram. Do not send any soft or hard acks until all packets
2971 * in a jumbogram have been processed. Send negative acks right away.
2973 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2974 /* tnp is non-null when there are more packets in the
2975 * current jumbo gram */
2982 seq = np->header.seq;
2983 serial = np->header.serial;
2984 flags = np->header.flags;
2986 /* If the call is in an error state, send an abort message */
2988 return rxi_SendCallAbort(call, np, istack, 0);
2990 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2991 * AFS 3.5 jumbogram. */
2992 if (flags & RX_JUMBO_PACKET) {
2993 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2998 if (np->header.spare != 0) {
2999 MUTEX_ENTER(&call->conn->conn_data_lock);
3000 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3001 MUTEX_EXIT(&call->conn->conn_data_lock);
3004 /* The usual case is that this is the expected next packet */
3005 if (seq == call->rnext) {
3007 /* Check to make sure it is not a duplicate of one already queued */
3008 if (queue_IsNotEmpty(&call->rq)
3009 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3010 MUTEX_ENTER(&rx_stats_mutex);
3011 rx_stats.dupPacketsRead++;
3012 MUTEX_EXIT(&rx_stats_mutex);
3013 dpf (("packet %x dropped on receipt - duplicate", np));
3014 rxevent_Cancel(call->delayedAckEvent, call,
3015 RX_CALL_REFCOUNT_DELAY);
3016 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3022 /* It's the next packet. Stick it on the receive queue
3023 * for this call. Set newPackets to make sure we wake
3024 * the reader once all packets have been processed */
3025 queue_Prepend(&call->rq, np);
3027 np = NULL; /* We can't use this anymore */
3030 /* If an ack is requested then set a flag to make sure we
3031 * send an acknowledgement for this packet */
3032 if (flags & RX_REQUEST_ACK) {
3033 ackNeeded = RX_ACK_REQUESTED;
3036 /* Keep track of whether we have received the last packet */
3037 if (flags & RX_LAST_PACKET) {
3038 call->flags |= RX_CALL_HAVE_LAST;
3042 /* Check whether we have all of the packets for this call */
3043 if (call->flags & RX_CALL_HAVE_LAST) {
3044 afs_uint32 tseq; /* temporary sequence number */
3045 struct rx_packet *tp; /* Temporary packet pointer */
3046 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3048 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3049 if (tseq != tp->header.seq)
3051 if (tp->header.flags & RX_LAST_PACKET) {
3052 call->flags |= RX_CALL_RECEIVE_DONE;
3059 /* Provide asynchronous notification for those who want it
3060 * (e.g. multi rx) */
3061 if (call->arrivalProc) {
3062 (*call->arrivalProc)(call, call->arrivalProcHandle,
3063 (int) call->arrivalProcArg);
3064 call->arrivalProc = (VOID (*)()) 0;
3067 /* Update last packet received */
3070 /* If there is no server process serving this call, grab
3071 * one, if available. We only need to do this once. If a
3072 * server thread is available, this thread becomes a server
3073 * thread and the server thread becomes a listener thread. */
3075 TryAttach(call, socket, tnop, newcallp, 0);
3078 /* This is not the expected next packet. */
3080 /* Determine whether this is a new or old packet, and if it's
3081 * a new one, whether it fits into the current receive window.
3082 * Also figure out whether the packet was delivered in sequence.
3083 * We use the prev variable to determine whether the new packet
3084 * is the successor of its immediate predecessor in the
3085 * receive queue, and the missing flag to determine whether
3086 * any of this packets predecessors are missing. */
3088 afs_uint32 prev; /* "Previous packet" sequence number */
3089 struct rx_packet *tp; /* Temporary packet pointer */
3090 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3091 int missing; /* Are any predecessors missing? */
3093 /* If the new packet's sequence number has been sent to the
3094 * application already, then this is a duplicate */
3095 if (seq < call->rnext) {
3096 MUTEX_ENTER(&rx_stats_mutex);
3097 rx_stats.dupPacketsRead++;
3098 MUTEX_EXIT(&rx_stats_mutex);
3099 rxevent_Cancel(call->delayedAckEvent, call,
3100 RX_CALL_REFCOUNT_DELAY);
3101 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3107 /* If the sequence number is greater than what can be
3108 * accomodated by the current window, then send a negative
3109 * acknowledge and drop the packet */
3110 if ((call->rnext + call->rwind) <= seq) {
3111 rxevent_Cancel(call->delayedAckEvent, call,
3112 RX_CALL_REFCOUNT_DELAY);
3113 np = rxi_SendAck(call, np, serial,
3114 RX_ACK_EXCEEDS_WINDOW, istack);
3120 /* Look for the packet in the queue of old received packets */
3121 for (prev = call->rnext - 1, missing = 0,
3122 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3123 /*Check for duplicate packet */
3124 if (seq == tp->header.seq) {
3125 MUTEX_ENTER(&rx_stats_mutex);
3126 rx_stats.dupPacketsRead++;
3127 MUTEX_EXIT(&rx_stats_mutex);
3128 rxevent_Cancel(call->delayedAckEvent, call,
3129 RX_CALL_REFCOUNT_DELAY);
3130 np = rxi_SendAck(call, np, serial,
3131 RX_ACK_DUPLICATE, istack);
3136 /* If we find a higher sequence packet, break out and
3137 * insert the new packet here. */
3138 if (seq < tp->header.seq) break;
3139 /* Check for missing packet */
3140 if (tp->header.seq != prev+1) {
3144 prev = tp->header.seq;
3147 /* Keep track of whether we have received the last packet. */
3148 if (flags & RX_LAST_PACKET) {
3149 call->flags |= RX_CALL_HAVE_LAST;
3152 /* It's within the window: add it to the the receive queue.
3153 * tp is left by the previous loop either pointing at the
3154 * packet before which to insert the new packet, or at the
3155 * queue head if the queue is empty or the packet should be
3157 queue_InsertBefore(tp, np);
3161 /* Check whether we have all of the packets for this call */
3162 if ((call->flags & RX_CALL_HAVE_LAST)
3163 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3164 afs_uint32 tseq; /* temporary sequence number */
3166 for (tseq = call->rnext,
3167 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3168 if (tseq != tp->header.seq)
3170 if (tp->header.flags & RX_LAST_PACKET) {
3171 call->flags |= RX_CALL_RECEIVE_DONE;
3178 /* We need to send an ack of the packet is out of sequence,
3179 * or if an ack was requested by the peer. */
3180 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3181 ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
3184 /* Acknowledge the last packet for each call */
3185 if (flags & RX_LAST_PACKET) {
3196 * If the receiver is waiting for an iovec, fill the iovec
3197 * using the data from the receive queue */
3198 if (call->flags & RX_CALL_IOVEC_WAIT) {
3199 didHardAck = rxi_FillReadVec(call, serial);
3200 /* the call may have been aborted */
3209 /* Wakeup the reader if any */
3210 if ((call->flags & RX_CALL_READER_WAIT) &&
3211 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3212 (call->iovNext >= call->iovMax) ||
3213 (call->flags & RX_CALL_RECEIVE_DONE))) {
3214 call->flags &= ~RX_CALL_READER_WAIT;
3215 #ifdef RX_ENABLE_LOCKS
3216 CV_BROADCAST(&call->cv_rq);
3218 osi_rxWakeup(&call->rq);
3224 * Send an ack when requested by the peer, or once every
3225 * rxi_SoftAckRate packets until the last packet has been
3226 * received. Always send a soft ack for the last packet in
3227 * the server's reply. */
3229 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3230 np = rxi_SendAck(call, np, serial, ackNeeded, istack);
3231 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3232 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3233 np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
3234 } else if (call->nSoftAcks) {
3235 clock_GetTime(&when);
3236 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3237 clock_Add(&when, &rx_lastAckDelay);
3239 clock_Add(&when, &rx_softAckDelay);
3241 if (!call->delayedAckEvent ||
3242 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3243 rxevent_Cancel(call->delayedAckEvent, call,
3244 RX_CALL_REFCOUNT_DELAY);
3245 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3246 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3249 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3250 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3257 static void rxi_ComputeRate();
3260 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3262 struct rx_peer *peer = conn->peer;
3264 MUTEX_ENTER(&peer->peer_lock);
3265 peer->lastReachTime = clock_Sec();
3266 MUTEX_EXIT(&peer->peer_lock);
3268 MUTEX_ENTER(&conn->conn_data_lock);
3269 if (conn->flags & RX_CONN_ATTACHWAIT) {
3272 conn->flags &= ~RX_CONN_ATTACHWAIT;
3273 MUTEX_EXIT(&conn->conn_data_lock);
3275 for (i=0; i<RX_MAXCALLS; i++) {
3276 struct rx_call *call = conn->call[i];
3278 if (call != acall) MUTEX_ENTER(&call->lock);
3279 /* tnop can be null if newcallp is null */
3280 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3281 if (call != acall) MUTEX_EXIT(&call->lock);
3285 MUTEX_EXIT(&conn->conn_data_lock);
3288 /* rxi_ComputePeerNetStats
3290 * Called exclusively by rxi_ReceiveAckPacket to compute network link
3291 * estimates (like RTT and throughput) based on ack packets. Caller
3292 * must ensure that the packet in question is the right one (i.e.
3293 * serial number matches).
3296 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3297 struct rx_ackPacket *ap, struct rx_packet *np)
3299 struct rx_peer *peer = call->conn->peer;
3301 /* Use RTT if not delayed by client. */
3302 if (ap->reason != RX_ACK_DELAY)
3303 rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3305 rxi_ComputeRate(peer, call, p, np, ap->reason);
3309 /* The real smarts of the whole thing. */
3310 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call,
3311 struct rx_packet *np, int istack)
3313 struct rx_ackPacket *ap;
3315 register struct rx_packet *tp;
3316 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3317 register struct rx_connection *conn = call->conn;
3318 struct rx_peer *peer = conn->peer;
3321 /* because there are CM's that are bogus, sending weird values for this. */
3322 afs_uint32 skew = 0;
3327 int newAckCount = 0;
3328 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3329 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3331 MUTEX_ENTER(&rx_stats_mutex);
3332 rx_stats.ackPacketsRead++;
3333 MUTEX_EXIT(&rx_stats_mutex);
3334 ap = (struct rx_ackPacket *) rx_DataOf(np);
3335 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3337 return np; /* truncated ack packet */
3339 /* depends on ack packet struct */
3340 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3341 first = ntohl(ap->firstPacket);
3342 serial = ntohl(ap->serial);
3343 /* temporarily disabled -- needs to degrade over time
3344 skew = ntohs(ap->maxSkew); */
3346 /* Ignore ack packets received out of order */
3347 if (first < call->tfirst) {
3351 if (np->header.flags & RX_SLOW_START_OK) {
3352 call->flags |= RX_CALL_SLOW_START_OK;
3355 if (ap->reason == RX_ACK_PING_RESPONSE)
3356 rxi_UpdatePeerReach(conn, call);
3361 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3362 ap->reason, ntohl(ap->previousPacket),
3363 (unsigned int) np->header.seq, (unsigned int) serial,
3364 (unsigned int) skew, ntohl(ap->firstPacket));
3367 for (offset = 0; offset < nAcks; offset++)
3368 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3374 /* Update the outgoing packet skew value to the latest value of
3375 * the peer's incoming packet skew value. The ack packet, of
3376 * course, could arrive out of order, but that won't affect things
3378 MUTEX_ENTER(&peer->peer_lock);
3379 peer->outPacketSkew = skew;
3381 /* Check for packets that no longer need to be transmitted, and
3382 * discard them. This only applies to packets positively
3383 * acknowledged as having been sent to the peer's upper level.
3384 * All other packets must be retained. So only packets with
3385 * sequence numbers < ap->firstPacket are candidates. */
3386 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3387 if (tp->header.seq >= first) break;
3388 call->tfirst = tp->header.seq + 1;
3389 if (serial && (tp->header.serial == serial ||
3390 tp->firstSerial == serial))
3391 rxi_ComputePeerNetStats(call, tp, ap, np);
3392 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3393 /* XXX Hack. Because we have to release the global rx lock when sending
3394 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3395 * in rxi_Start sending packets out because packets may move to the
3396 * freePacketQueue as result of being here! So we drop these packets until
3397 * we're safely out of the traversing. Really ugly!
3398 * To make it even uglier, if we're using fine grain locking, we can
3399 * set the ack bits in the packets and have rxi_Start remove the packets
3400 * when it's done transmitting.
3402 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3405 if (call->flags & RX_CALL_TQ_BUSY) {
3406 #ifdef RX_ENABLE_LOCKS
3407 tp->flags |= RX_PKTFLAG_ACKED;
3408 call->flags |= RX_CALL_TQ_SOME_ACKED;
3409 #else /* RX_ENABLE_LOCKS */
3411 #endif /* RX_ENABLE_LOCKS */
3413 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3416 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3421 /* Give rate detector a chance to respond to ping requests */
3422 if (ap->reason == RX_ACK_PING_RESPONSE) {
3423 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3427 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3429 /* Now go through explicit acks/nacks and record the results in
3430 * the waiting packets. These are packets that can't be released
3431 * yet, even with a positive acknowledge. This positive
3432 * acknowledge only means the packet has been received by the
3433 * peer, not that it will be retained long enough to be sent to
3434 * the peer's upper level. In addition, reset the transmit timers
3435 * of any missing packets (those packets that must be missing
3436 * because this packet was out of sequence) */
3438 call->nSoftAcked = 0;
3439 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3440 /* Update round trip time if the ack was stimulated on receipt
3442 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3443 #ifdef RX_ENABLE_LOCKS
3444 if (tp->header.seq >= first)
3445 #endif /* RX_ENABLE_LOCKS */
3446 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3447 if (serial && (tp->header.serial == serial ||
3448 tp->firstSerial == serial))
3449 rxi_ComputePeerNetStats(call, tp, ap, np);
3451 /* Set the acknowledge flag per packet based on the
3452 * information in the ack packet. An acknowlegded packet can
3453 * be downgraded when the server has discarded a packet it
3454 * soacked previously, or when an ack packet is received
3455 * out of sequence. */
3456 if (tp->header.seq < first) {
3457 /* Implicit ack information */
3458 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3461 tp->flags |= RX_PKTFLAG_ACKED;
3463 else if (tp->header.seq < first + nAcks) {
3464 /* Explicit ack information: set it in the packet appropriately */
3465 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3466 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3468 tp->flags |= RX_PKTFLAG_ACKED;
3476 tp->flags &= ~RX_PKTFLAG_ACKED;
3481 tp->flags &= ~RX_PKTFLAG_ACKED;
3485 /* If packet isn't yet acked, and it has been transmitted at least
3486 * once, reset retransmit time using latest timeout
3487 * ie, this should readjust the retransmit timer for all outstanding
3488 * packets... So we don't just retransmit when we should know better*/
3490 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3491 tp->retryTime = tp->timeSent;
3492 clock_Add(&tp->retryTime, &peer->timeout);
3493 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3494 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3498 /* If the window has been extended by this acknowledge packet,
3499 * then wakeup a sender waiting in alloc for window space, or try
3500 * sending packets now, if he's been sitting on packets due to
3501 * lack of window space */
3502 if (call->tnext < (call->tfirst + call->twind)) {
3503 #ifdef RX_ENABLE_LOCKS
3504 CV_SIGNAL(&call->cv_twind);
3506 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3507 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3508 osi_rxWakeup(&call->twind);
3511 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3512 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3516 /* if the ack packet has a receivelen field hanging off it,
3517 * update our state */
3518 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3521 /* If the ack packet has a "recommended" size that is less than
3522 * what I am using now, reduce my size to match */
3523 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3524 sizeof(afs_int32), &tSize);
3525 tSize = (afs_uint32) ntohl(tSize);
3526 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3528 /* Get the maximum packet size to send to this peer */
3529 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3531 tSize = (afs_uint32)ntohl(tSize);
3532 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3533 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3535 /* sanity check - peer might have restarted with different params.
3536 * If peer says "send less", dammit, send less... Peer should never
3537 * be unable to accept packets of the size that prior AFS versions would
3538 * send without asking. */
3539 if (peer->maxMTU != tSize) {
3540 peer->maxMTU = tSize;
3541 peer->MTU = MIN(tSize, peer->MTU);
3542 call->MTU = MIN(call->MTU, tSize);
3546 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3548 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3549 sizeof(afs_int32), &tSize);
3550 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3551 if (tSize < call->twind) { /* smaller than our send */
3552 call->twind = tSize; /* window, we must send less... */
3553 call->ssthresh = MIN(call->twind, call->ssthresh);
3556 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3557 * network MTU confused with the loopback MTU. Calculate the
3558 * maximum MTU here for use in the slow start code below.
3560 maxMTU = peer->maxMTU;
3561 /* Did peer restart with older RX version? */
3562 if (peer->maxDgramPackets > 1) {
3563 peer->maxDgramPackets = 1;
3565 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3567 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3568 sizeof(afs_int32), &tSize);
3569 tSize = (afs_uint32) ntohl(tSize);
3571 * As of AFS 3.5 we set the send window to match the receive window.
3573 if (tSize < call->twind) {
3574 call->twind = tSize;
3575 call->ssthresh = MIN(call->twind, call->ssthresh);
3576 } else if (tSize > call->twind) {
3577 call->twind = tSize;
3581 * As of AFS 3.5, a jumbogram is more than one fixed size
3582 * packet transmitted in a single UDP datagram. If the remote
3583 * MTU is smaller than our local MTU then never send a datagram
3584 * larger than the natural MTU.
3586 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3587 sizeof(afs_int32), &tSize);
3588 maxDgramPackets = (afs_uint32) ntohl(tSize);
3589 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3590 maxDgramPackets = MIN(maxDgramPackets,
3591 (int)(peer->ifDgramPackets));
3592 maxDgramPackets = MIN(maxDgramPackets, tSize);
3593 if (maxDgramPackets > 1) {
3594 peer->maxDgramPackets = maxDgramPackets;
3595 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3597 peer->maxDgramPackets = 1;
3598 call->MTU = peer->natMTU;
3600 } else if (peer->maxDgramPackets > 1) {
3601 /* Restarted with lower version of RX */
3602 peer->maxDgramPackets = 1;
3604 } else if (peer->maxDgramPackets > 1 ||
3605 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3606 /* Restarted with lower version of RX */
3607 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3608 peer->natMTU = OLD_MAX_PACKET_SIZE;
3609 peer->MTU = OLD_MAX_PACKET_SIZE;
3610 peer->maxDgramPackets = 1;
3611 peer->nDgramPackets = 1;
3613 call->MTU = OLD_MAX_PACKET_SIZE;
3618 * Calculate how many datagrams were successfully received after
3619 * the first missing packet and adjust the negative ack counter
3624 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3625 if (call->nNacks < nNacked) {
3626 call->nNacks = nNacked;
3635 if (call->flags & RX_CALL_FAST_RECOVER) {
3637 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3639 call->flags &= ~RX_CALL_FAST_RECOVER;
3640 call->cwind = call->nextCwind;
3641 call->nextCwind = 0;
3644 call->nCwindAcks = 0;
3646 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3647 /* Three negative acks in a row trigger congestion recovery */
3648 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3649 MUTEX_EXIT(&peer->peer_lock);
3650 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3651 /* someone else is waiting to start recovery */
3654 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3655 while (call->flags & RX_CALL_TQ_BUSY) {
3656 call->flags |= RX_CALL_TQ_WAIT;
3657 #ifdef RX_ENABLE_LOCKS
3658 CV_WAIT(&call->cv_tq, &call->lock);
3659 #else /* RX_ENABLE_LOCKS */
3660 osi_rxSleep(&call->tq);
3661 #endif /* RX_ENABLE_LOCKS */
3663 MUTEX_ENTER(&peer->peer_lock);
3664 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3665 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3666 call->flags |= RX_CALL_FAST_RECOVER;
3667 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3668 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3670 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3671 call->nextCwind = call->ssthresh;
3674 peer->MTU = call->MTU;
3675 peer->cwind = call->nextCwind;
3676 peer->nDgramPackets = call->nDgramPackets;
3678 call->congestSeq = peer->congestSeq;
3679 /* Reset the resend times on the packets that were nacked
3680 * so we will retransmit as soon as the window permits*/
3681 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3683 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3684 clock_Zero(&tp->retryTime);
3686 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3691 /* If cwind is smaller than ssthresh, then increase
3692 * the window one packet for each ack we receive (exponential
3694 * If cwind is greater than or equal to ssthresh then increase
3695 * the congestion window by one packet for each cwind acks we
3696 * receive (linear growth). */
3697 if (call->cwind < call->ssthresh) {
3698 call->cwind = MIN((int)call->ssthresh,
3699 (int)(call->cwind + newAckCount));
3700 call->nCwindAcks = 0;
3702 call->nCwindAcks += newAckCount;
3703 if (call->nCwindAcks >= call->cwind) {
3704 call->nCwindAcks = 0;
3705 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3709 * If we have received several acknowledgements in a row then
3710 * it is time to increase the size of our datagrams
3712 if ((int)call->nAcks > rx_nDgramThreshold) {
3713 if (peer->maxDgramPackets > 1) {
3714 if (call->nDgramPackets < peer->maxDgramPackets) {
3715 call->nDgramPackets++;
3717 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3718 } else if (call->MTU < peer->maxMTU) {
3719 call->MTU += peer->natMTU;
3720 call->MTU = MIN(call->MTU, peer->maxMTU);
3726 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3728 /* Servers need to hold the call until all response packets have
3729 * been acknowledged. Soft acks are good enough since clients
3730 * are not allowed to clear their receive queues. */
3731 if (call->state == RX_STATE_HOLD &&
3732 call->tfirst + call->nSoftAcked >= call->tnext) {
3733 call->state = RX_STATE_DALLY;
3734 rxi_ClearTransmitQueue(call, 0);
3735 } else if (!queue_IsEmpty(&call->tq)) {
3736 rxi_Start(0, call, istack);
3741 /* Received a response to a challenge packet */
3742 struct rx_packet *rxi_ReceiveResponsePacket(register struct rx_connection *conn,
3743 register struct rx_packet *np, int istack)
3747 /* Ignore the packet if we're the client */
3748 if (conn->type == RX_CLIENT_CONNECTION) return np;
3750 /* If already authenticated, ignore the packet (it's probably a retry) */
3751 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3754 /* Otherwise, have the security object evaluate the response packet */
3755 error = RXS_CheckResponse(conn->securityObject, conn, np);
3757 /* If the response is invalid, reset the connection, sending
3758 * an abort to the peer */
3762 rxi_ConnectionError(conn, error);
3763 MUTEX_ENTER(&conn->conn_data_lock);
3764 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3765 MUTEX_EXIT(&conn->conn_data_lock);
3769 /* If the response is valid, any calls waiting to attach
3770 * servers can now do so */
3773 for (i=0; i<RX_MAXCALLS; i++) {
3774 struct rx_call *call = conn->call[i];
3776 MUTEX_ENTER(&call->lock);
3777 if (call->state == RX_STATE_PRECALL)
3778 rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
3779 /* tnop can be null if newcallp is null */
3780 MUTEX_EXIT(&call->lock);
3784 /* Update the peer reachability information, just in case
3785 * some calls went into attach-wait while we were waiting
3786 * for authentication..
3788 rxi_UpdatePeerReach(conn, NULL);
3793 /* A client has received an authentication challenge: the security
3794 * object is asked to cough up a respectable response packet to send
3795 * back to the server. The server is responsible for retrying the
3796 * challenge if it fails to get a response. */
3798 struct rx_packet *rxi_ReceiveChallengePacket(register struct rx_connection *conn,
3799 register struct rx_packet *np, int istack)
3803 /* Ignore the challenge if we're the server */
3804 if (conn->type == RX_SERVER_CONNECTION) return np;
3806 /* Ignore the challenge if the connection is otherwise idle; someone's
3807 * trying to use us as an oracle. */
3808 if (!rxi_HasActiveCalls(conn)) return np;
3810 /* Send the security object the challenge packet. It is expected to fill
3811 * in the response. */
3812 error = RXS_GetResponse(conn->securityObject, conn, np);
3814 /* If the security object is unable to return a valid response, reset the
3815 * connection and send an abort to the peer. Otherwise send the response
3816 * packet to the peer connection. */
3818 rxi_ConnectionError(conn, error);
3819 MUTEX_ENTER(&conn->conn_data_lock);
3820 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3821 MUTEX_EXIT(&conn->conn_data_lock);
3824 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3825 RX_PACKET_TYPE_RESPONSE, NULL, -1, istack);
3831 /* Find an available server process to service the current request in
3832 * the given call structure. If one isn't available, queue up this
3833 * call so it eventually gets one */
3834 void rxi_AttachServerProc(register struct rx_call *call,
3835 register osi_socket socket, register int *tnop, register struct rx_call **newcallp)
3837 register struct rx_serverQueueEntry *sq;
3838 register struct rx_service *service = call->conn->service;
3839 register int haveQuota = 0;
3841 /* May already be attached */
3842 if (call->state == RX_STATE_ACTIVE) return;
3844 MUTEX_ENTER(&rx_serverPool_lock);
3846 haveQuota = QuotaOK(service);
3847 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3848 /* If there are no processes available to service this call,
3849 * put the call on the incoming call queue (unless it's
3850 * already on the queue).
3852 #ifdef RX_ENABLE_LOCKS
3854 ReturnToServerPool(service);
3855 #endif /* RX_ENABLE_LOCKS */
3857 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3858 call->flags |= RX_CALL_WAIT_PROC;
3859 MUTEX_ENTER(&rx_stats_mutex);
3861 MUTEX_EXIT(&rx_stats_mutex);
3862 rxi_calltrace(RX_CALL_ARRIVAL, call);
3863 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3864 queue_Append(&rx_incomingCallQueue, call);
3868 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3870 /* If hot threads are enabled, and both newcallp and sq->socketp
3871 * are non-null, then this thread will process the call, and the
3872 * idle server thread will start listening on this threads socket.
3875 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3878 *sq->socketp = socket;
3879 clock_GetTime(&call->startTime);
3880 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3884 if (call->flags & RX_CALL_WAIT_PROC) {
3885 /* Conservative: I don't think this should happen */
3886 call->flags &= ~RX_CALL_WAIT_PROC;
3887 MUTEX_ENTER(&rx_stats_mutex);
3889 MUTEX_EXIT(&rx_stats_mutex);
3892 call->state = RX_STATE_ACTIVE;
3893 call->mode = RX_MODE_RECEIVING;
3894 if (call->flags & RX_CALL_CLEARED) {
3895 /* send an ack now to start the packet flow up again */
3896 call->flags &= ~RX_CALL_CLEARED;
3897 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
3899 #ifdef RX_ENABLE_LOCKS
3902 service->nRequestsRunning++;
3903 if (service->nRequestsRunning <= service->minProcs)
3909 MUTEX_EXIT(&rx_serverPool_lock);
3912 /* Delay the sending of an acknowledge event for a short while, while
3913 * a new call is being prepared (in the case of a client) or a reply
3914 * is being prepared (in the case of a server). Rather than sending
3915 * an ack packet, an ACKALL packet is sent. */
3916 void rxi_AckAll(struct rxevent *event, register struct rx_call *call, char *dummy)
3918 #ifdef RX_ENABLE_LOCKS
3920 MUTEX_ENTER(&call->lock);
3921 call->delayedAckEvent = NULL;
3922 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3924 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3925 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3927 MUTEX_EXIT(&call->lock);
3928 #else /* RX_ENABLE_LOCKS */
3929 if (event) call->delayedAckEvent = NULL;
3930 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3931 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3932 #endif /* RX_ENABLE_LOCKS */
3935 void rxi_SendDelayedAck(struct rxevent *event, register struct rx_call *call, char *dummy)
3937 #ifdef RX_ENABLE_LOCKS
3939 MUTEX_ENTER(&call->lock);
3940 if (event == call->delayedAckEvent)
3941 call->delayedAckEvent = NULL;
3942 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3944 (void) rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
3946 MUTEX_EXIT(&call->lock);
3947 #else /* RX_ENABLE_LOCKS */
3948 if (event) call->delayedAckEvent = NULL;
3949 (void) rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
3950 #endif /* RX_ENABLE_LOCKS */
3954 #ifdef RX_ENABLE_LOCKS
3955 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3956 * clearing them out.
3958 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call)
3960 register struct rx_packet *p, *tp;
3963 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3966 p->flags |= RX_PKTFLAG_ACKED;
3970 call->flags |= RX_CALL_TQ_CLEARME;
3971 call->flags |= RX_CALL_TQ_SOME_ACKED;
3974 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3975 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3976 call->tfirst = call->tnext;
3977 call->nSoftAcked = 0;
3979 if (call->flags & RX_CALL_FAST_RECOVER) {
3980 call->flags &= ~RX_CALL_FAST_RECOVER;
3981 call->cwind = call->nextCwind;
3982 call->nextCwind = 0;
3985 CV_SIGNAL(&call->cv_twind);
3987 #endif /* RX_ENABLE_LOCKS */
3989 /* Clear out the transmit queue for the current call (all packets have
3990 * been received by peer) */
3991 void rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
3993 register struct rx_packet *p, *tp;
3995 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3996 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3998 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4001 p->flags |= RX_PKTFLAG_ACKED;
4005 call->flags |= RX_CALL_TQ_CLEARME;
4006 call->flags |= RX_CALL_TQ_SOME_ACKED;
4009 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4010 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4016 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4017 call->flags &= ~RX_CALL_TQ_CLEARME;
4019 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4021 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4022 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4023 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4024 call->nSoftAcked = 0;
4026 if (call->flags & RX_CALL_FAST_RECOVER) {
4027 call->flags &= ~RX_CALL_FAST_RECOVER;
4028 call->cwind = call->nextCwind;
4031 #ifdef RX_ENABLE_LOCKS
4032 CV_SIGNAL(&call->cv_twind);
4034 osi_rxWakeup(&call->twind);
4038 void rxi_ClearReceiveQueue(register struct rx_call *call)
4040 register struct rx_packet *p, *tp;
4041 if (queue_IsNotEmpty(&call->rq)) {
4042 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4047 rx_packetReclaims++;
4049 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4051 if (call->state == RX_STATE_PRECALL) {
4052 call->flags |= RX_CALL_CLEARED;
4056 /* Send an abort packet for the specified call */
4057 struct rx_packet *rxi_SendCallAbort(register struct rx_call *call,
4058 struct rx_packet *packet, int istack, int force)
4066 /* Clients should never delay abort messages */
4067 if (rx_IsClientConn(call->conn))
4070 if (call->abortCode != call->error) {
4071 call->abortCode = call->error;
4072 call->abortCount = 0;
4075 if (force || rxi_callAbortThreshhold == 0 ||
4076 call->abortCount < rxi_callAbortThreshhold) {
4077 if (call->delayedAbortEvent) {
4078 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4080 error = htonl(call->error);
4082 packet = rxi_SendSpecial(call, call->conn, packet,
4083 RX_PACKET_TYPE_ABORT, (char *)&error,
4084 sizeof(error), istack);
4085 } else if (!call->delayedAbortEvent) {
4086 clock_GetTime(&when);
4087 clock_Addmsec(&when, rxi_callAbortDelay);
4088 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4089 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4095 /* Send an abort packet for the specified connection. Packet is an
4096 * optional pointer to a packet that can be used to send the abort.
4097 * Once the number of abort messages reaches the threshhold, an
4098 * event is scheduled to send the abort. Setting the force flag
4099 * overrides sending delayed abort messages.
4101 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4102 * to send the abort packet.
4104 struct rx_packet *rxi_SendConnectionAbort(register struct rx_connection *conn,
4105 struct rx_packet *packet, int istack, int force)
4113 /* Clients should never delay abort messages */
4114 if (rx_IsClientConn(conn))
4117 if (force || rxi_connAbortThreshhold == 0 ||
4118 conn->abortCount < rxi_connAbortThreshhold) {
4119 if (conn->delayedAbortEvent) {
4120 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4122 error = htonl(conn->error);
4124 MUTEX_EXIT(&conn->conn_data_lock);
4125 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4126 RX_PACKET_TYPE_ABORT, (char *)&error,
4127 sizeof(error), istack);
4128 MUTEX_ENTER(&conn->conn_data_lock);
4129 } else if (!conn->delayedAbortEvent) {
4130 clock_GetTime(&when);
4131 clock_Addmsec(&when, rxi_connAbortDelay);
4132 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4138 /* Associate an error all of the calls owned by a connection. Called
4139 * with error non-zero. This is only for really fatal things, like
4140 * bad authentication responses. The connection itself is set in
4141 * error at this point, so that future packets received will be
4143 void rxi_ConnectionError(register struct rx_connection *conn,
4144 register afs_int32 error)
4148 MUTEX_ENTER(&conn->conn_data_lock);
4149 if (conn->challengeEvent)
4150 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4151 if (conn->checkReachEvent) {
4152 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4153 conn->checkReachEvent = 0;
4154 conn->flags &= ~RX_CONN_ATTACHWAIT;
4157 MUTEX_EXIT(&conn->conn_data_lock);
4158 for (i=0; i<RX_MAXCALLS; i++) {
4159 struct rx_call *call = conn->call[i];
4161 MUTEX_ENTER(&call->lock);
4162 rxi_CallError(call, error);
4163 MUTEX_EXIT(&call->lock);
4166 conn->error = error;
4167 MUTEX_ENTER(&rx_stats_mutex);
4168 rx_stats.fatalErrors++;
4169 MUTEX_EXIT(&rx_stats_mutex);
4173 void rxi_CallError(register struct rx_call *call, afs_int32 error)
4175 if (call->error) error = call->error;
4176 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4177 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4178 rxi_ResetCall(call, 0);
4181 rxi_ResetCall(call, 0);
4183 call->error = error;
4184 call->mode = RX_MODE_ERROR;
4187 /* Reset various fields in a call structure, and wakeup waiting
4188 * processes. Some fields aren't changed: state & mode are not
4189 * touched (these must be set by the caller), and bufptr, nLeft, and
4190 * nFree are not reset, since these fields are manipulated by
4191 * unprotected macros, and may only be reset by non-interrupting code.
4194 /* this code requires that call->conn be set properly as a pre-condition. */
4195 #endif /* ADAPT_WINDOW */
4197 void rxi_ResetCall(register struct rx_call *call, register int newcall)
4200 register struct rx_peer *peer;
4201 struct rx_packet *packet;
4203 /* Notify anyone who is waiting for asynchronous packet arrival */
4204 if (call->arrivalProc) {
4205 (*call->arrivalProc)(call, call->arrivalProcHandle, (int) call->arrivalProcArg);
4206 call->arrivalProc = (VOID (*)()) 0;
4209 if (call->delayedAbortEvent) {
4210 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4211 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4213 rxi_SendCallAbort(call, packet, 0, 1);
4214 rxi_FreePacket(packet);
4219 * Update the peer with the congestion information in this call
4220 * so other calls on this connection can pick up where this call
4221 * left off. If the congestion sequence numbers don't match then
4222 * another call experienced a retransmission.
4224 peer = call->conn->peer;
4225 MUTEX_ENTER(&peer->peer_lock);
4227 if (call->congestSeq == peer->congestSeq) {
4228 peer->cwind = MAX(peer->cwind, call->cwind);
4229 peer->MTU = MAX(peer->MTU, call->MTU);
4230 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4233 call->abortCode = 0;
4234 call->abortCount = 0;
4236 if (peer->maxDgramPackets > 1) {
4237 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4239 call->MTU = peer->MTU;
4241 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4242 call->ssthresh = rx_maxSendWindow;
4243 call->nDgramPackets = peer->nDgramPackets;
4244 call->congestSeq = peer->congestSeq;
4245 MUTEX_EXIT(&peer->peer_lock);
4247 flags = call->flags;
4248 rxi_ClearReceiveQueue(call);
4249 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4250 if (call->flags & RX_CALL_TQ_BUSY) {
4251 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4252 call->flags |= (flags & RX_CALL_TQ_WAIT);
4254 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4256 rxi_ClearTransmitQueue(call, 0);
4257 queue_Init(&call->tq);
4260 queue_Init(&call->rq);
4262 call->rwind = rx_initReceiveWindow;
4263 call->twind = rx_initSendWindow;
4264 call->nSoftAcked = 0;
4265 call->nextCwind = 0;
4268 call->nCwindAcks = 0;
4269 call->nSoftAcks = 0;
4270 call->nHardAcks = 0;
4272 call->tfirst = call->rnext = call->tnext = 1;
4274 call->lastAcked = 0;
4275 call->localStatus = call->remoteStatus = 0;
4277 if (flags & RX_CALL_READER_WAIT) {
4278 #ifdef RX_ENABLE_LOCKS
4279 CV_BROADCAST(&call->cv_rq);
4281 osi_rxWakeup(&call->rq);
4284 if (flags & RX_CALL_WAIT_PACKETS) {
4285 MUTEX_ENTER(&rx_freePktQ_lock);
4286 rxi_PacketsUnWait(); /* XXX */
4287 MUTEX_EXIT(&rx_freePktQ_lock);
4290 #ifdef RX_ENABLE_LOCKS
4291 CV_SIGNAL(&call->cv_twind);
4293 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4294 osi_rxWakeup(&call->twind);
4297 #ifdef RX_ENABLE_LOCKS
4298 /* The following ensures that we don't mess with any queue while some
4299 * other thread might also be doing so. The call_queue_lock field is
4300 * is only modified under the call lock. If the call is in the process
4301 * of being removed from a queue, the call is not locked until the
4302 * the queue lock is dropped and only then is the call_queue_lock field
4303 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4304 * Note that any other routine which removes a call from a queue has to
4305 * obtain the queue lock before examing the queue and removing the call.
4307 if (call->call_queue_lock) {
4308 MUTEX_ENTER(call->call_queue_lock);
4309 if (queue_IsOnQueue(call)) {
4311 if (flags & RX_CALL_WAIT_PROC) {
4312 MUTEX_ENTER(&rx_stats_mutex);
4314 MUTEX_EXIT(&rx_stats_mutex);
4317 MUTEX_EXIT(call->call_queue_lock);
4318 CLEAR_CALL_QUEUE_LOCK(call);
4320 #else /* RX_ENABLE_LOCKS */
4321 if (queue_IsOnQueue(call)) {
4323 if (flags & RX_CALL_WAIT_PROC)
4326 #endif /* RX_ENABLE_LOCKS */
4328 rxi_KeepAliveOff(call);
4329 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4332 /* Send an acknowledge for the indicated packet (seq,serial) of the
4333 * indicated call, for the indicated reason (reason). This
4334 * acknowledge will specifically acknowledge receiving the packet, and
4335 * will also specify which other packets for this call have been
4336 * received. This routine returns the packet that was used to the
4337 * caller. The caller is responsible for freeing it or re-using it.
4338 * This acknowledgement also returns the highest sequence number
4339 * actually read out by the higher level to the sender; the sender
4340 * promises to keep around packets that have not been read by the
4341 * higher level yet (unless, of course, the sender decides to abort
4342 * the call altogether). Any of p, seq, serial, pflags, or reason may
4343 * be set to zero without ill effect. That is, if they are zero, they
4344 * will not convey any information.
4345 * NOW there is a trailer field, after the ack where it will safely be
4346 * ignored by mundanes, which indicates the maximum size packet this
4347 * host can swallow. */
4349 register struct rx_packet *optionalPacket; use to send ack (or null)
4350 int seq; Sequence number of the packet we are acking
4351 int serial; Serial number of the packet
4352 int pflags; Flags field from packet header
4353 int reason; Reason an acknowledge was prompted
4356 struct rx_packet *rxi_SendAck(register struct rx_call *call,
4357 register struct rx_packet *optionalPacket, int serial,
4358 int reason, int istack)
4360 struct rx_ackPacket *ap;
4361 register struct rx_packet *rqp;
4362 register struct rx_packet *nxp; /* For queue_Scan */
4363 register struct rx_packet *p;
4368 * Open the receive window once a thread starts reading packets
4370 if (call->rnext > 1) {
4371 call->rwind = rx_maxReceiveWindow;
4374 call->nHardAcks = 0;
4375 call->nSoftAcks = 0;
4376 if (call->rnext > call->lastAcked)
4377 call->lastAcked = call->rnext;
4381 rx_computelen(p, p->length); /* reset length, you never know */
4382 } /* where that's been... */
4384 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4385 /* We won't send the ack, but don't panic. */
4386 return optionalPacket;
4389 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4391 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4392 if (!optionalPacket) rxi_FreePacket(p);
4393 return optionalPacket;
4395 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4396 if (rx_Contiguous(p)<templ) {
4397 if (!optionalPacket) rxi_FreePacket(p);
4398 return optionalPacket;
4400 } /* MTUXXX failing to send an ack is very serious. We should */
4401 /* try as hard as possible to send even a partial ack; it's */
4402 /* better than nothing. */
4404 ap = (struct rx_ackPacket *) rx_DataOf(p);
4405 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4406 ap->reason = reason;
4408 /* The skew computation used to be bogus, I think it's better now. */
4409 /* We should start paying attention to skew. XXX */
4410 ap->serial = htonl(serial);
4411 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4413 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4414 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4416 /* No fear of running out of ack packet here because there can only be at most
4417 * one window full of unacknowledged packets. The window size must be constrained
4418 * to be less than the maximum ack size, of course. Also, an ack should always
4419 * fit into a single packet -- it should not ever be fragmented. */
4420 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4421 if (!rqp || !call->rq.next
4422 || (rqp->header.seq > (call->rnext + call->rwind))) {
4423 if (!optionalPacket) rxi_FreePacket(p);
4424 rxi_CallError(call, RX_CALL_DEAD);
4425 return optionalPacket;
4428 while (rqp->header.seq > call->rnext + offset)
4429 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4430 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4432 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4433 if (!optionalPacket) rxi_FreePacket(p);
4434 rxi_CallError(call, RX_CALL_DEAD);
4435 return optionalPacket;
4440 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4442 /* these are new for AFS 3.3 */
4443 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4444 templ = htonl(templ);
4445 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4446 templ = htonl(call->conn->peer->ifMTU);
4447 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4449 /* new for AFS 3.4 */
4450 templ = htonl(call->rwind);
4451 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4453 /* new for AFS 3.5 */
4454 templ = htonl(call->conn->peer->ifDgramPackets);
4455 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4457 p->header.serviceId = call->conn->serviceId;
4458 p->header.cid = (call->conn->cid | call->channel);
4459 p->header.callNumber = *call->callNumber;
4461 p->header.securityIndex = call->conn->securityIndex;
4462 p->header.epoch = call->conn->epoch;
4463 p->header.type = RX_PACKET_TYPE_ACK;
4464 p->header.flags = RX_SLOW_START_OK;
4465 if (reason == RX_ACK_PING) {
4466 p->header.flags |= RX_REQUEST_ACK;
4468 clock_GetTime(&call->pingRequestTime);
4471 if (call->conn->type == RX_CLIENT_CONNECTION)
4472 p->header.flags |= RX_CLIENT_INITIATED;
4476 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4477 ap->reason, ntohl(ap->previousPacket),
4478 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4480 for (offset = 0; offset < ap->nAcks; offset++)
4481 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4488 register int i, nbytes = p->length;
4490 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4491 if (nbytes <= p->wirevec[i].iov_len) {
4492 register int savelen, saven;
4494 savelen = p->wirevec[i].iov_len;
4496 p->wirevec[i].iov_len = nbytes;
4498 rxi_Send(call, p, istack);
4499 p->wirevec[i].iov_len = savelen;
4503 else nbytes -= p->wirevec[i].iov_len;
4506 MUTEX_ENTER(&rx_stats_mutex);
4507 rx_stats.ackPacketsSent++;
4508 MUTEX_EXIT(&rx_stats_mutex);
4509 if (!optionalPacket) rxi_FreePacket(p);
4510 return optionalPacket; /* Return packet for re-use by caller */
4513 /* Send all of the packets in the list in single datagram */
4514 static void rxi_SendList(struct rx_call *call, struct rx_packet **list,
4515 int len, int istack, int moreFlag, struct clock *now,
4516 struct clock *retryTime, int resending)
4521 struct rx_connection *conn = call->conn;
4522 struct rx_peer *peer = conn->peer;
4524 MUTEX_ENTER(&peer->peer_lock);
4526 if (resending) peer->reSends += len;
4527 MUTEX_ENTER(&rx_stats_mutex);
4528 rx_stats.dataPacketsSent += len;
4529 MUTEX_EXIT(&rx_stats_mutex);
4530 MUTEX_EXIT(&peer->peer_lock);
4532 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4536 /* Set the packet flags and schedule the resend events */
4537 /* Only request an ack for the last packet in the list */
4538 for (i = 0 ; i < len ; i++) {
4539 list[i]->retryTime = *retryTime;
4540 if (list[i]->header.serial) {
4541 /* Exponentially backoff retry times */
4542 if (list[i]->backoff < MAXBACKOFF) {
4543 /* so it can't stay == 0 */
4544 list[i]->backoff = (list[i]->backoff << 1) +1;
4546 else list[i]->backoff++;
4547 clock_Addmsec(&(list[i]->retryTime),
4548 ((afs_uint32) list[i]->backoff) << 8);
4551 /* Wait a little extra for the ack on the last packet */
4552 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4553 clock_Addmsec(&(list[i]->retryTime), 400);
4556 /* Record the time sent */
4557 list[i]->timeSent = *now;
4559 /* Ask for an ack on retransmitted packets, on every other packet
4560 * if the peer doesn't support slow start. Ask for an ack on every
4561 * packet until the congestion window reaches the ack rate. */
4562 if (list[i]->header.serial) {
4564 MUTEX_ENTER(&rx_stats_mutex);
4565 rx_stats.dataPacketsReSent++;
4566 MUTEX_EXIT(&rx_stats_mutex);
4568 /* improved RTO calculation- not Karn */
4569 list[i]->firstSent = *now;
4571 && (call->cwind <= (u_short)(conn->ackRate+1)
4572 || (!(call->flags & RX_CALL_SLOW_START_OK)
4573 && (list[i]->header.seq & 1)))) {
4578 MUTEX_ENTER(&peer->peer_lock);
4580 if (resending) peer->reSends++;
4581 MUTEX_ENTER(&rx_stats_mutex);
4582 rx_stats.dataPacketsSent++;
4583 MUTEX_EXIT(&rx_stats_mutex);
4584 MUTEX_EXIT(&peer->peer_lock);
4586 /* Tag this packet as not being the last in this group,
4587 * for the receiver's benefit */
4588 if (i < len-1 || moreFlag) {
4589 list[i]->header.flags |= RX_MORE_PACKETS;
4592 /* Install the new retransmit time for the packet, and
4593 * record the time sent */
4594 list[i]->timeSent = *now;
4598 list[len-1]->header.flags |= RX_REQUEST_ACK;
4601 /* Since we're about to send a data packet to the peer, it's
4602 * safe to nuke any scheduled end-of-packets ack */
4603 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4605 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4606 MUTEX_EXIT(&call->lock);
4608 rxi_SendPacketList(call, conn, list, len, istack);
4610 rxi_SendPacket(call, conn, list[0], istack);
4612 MUTEX_ENTER(&call->lock);
4613 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4615 /* Update last send time for this call (for keep-alive
4616 * processing), and for the connection (so that we can discover
4617 * idle connections) */
4618 conn->lastSendTime = call->lastSendTime = clock_Sec();
4621 /* When sending packets we need to follow these rules:
4622 * 1. Never send more than maxDgramPackets in a jumbogram.
4623 * 2. Never send a packet with more than two iovecs in a jumbogram.
4624 * 3. Never send a retransmitted packet in a jumbogram.
4625 * 4. Never send more than cwind/4 packets in a jumbogram
4626 * We always keep the last list we should have sent so we
4627 * can set the RX_MORE_PACKETS flags correctly.
4629 static void rxi_SendXmitList(struct rx_call *call, struct rx_packet **list,
4630 int len, int istack, struct clock *now, struct clock *retryTime,
4633 int i, cnt, lastCnt = 0;
4634 struct rx_packet **listP, **lastP = 0;
4635 struct rx_peer *peer = call->conn->peer;
4636 int morePackets = 0;
4638 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4639 /* Does the current packet force us to flush the current list? */
4641 && (list[i]->header.serial
4642 || (list[i]->flags & RX_PKTFLAG_ACKED)
4643 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4645 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4646 /* If the call enters an error state stop sending, or if
4647 * we entered congestion recovery mode, stop sending */
4648 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4656 /* Add the current packet to the list if it hasn't been acked.
4657 * Otherwise adjust the list pointer to skip the current packet. */
4658 if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
4660 /* Do we need to flush the list? */
4661 if (cnt >= (int)peer->maxDgramPackets
4662 || cnt >= (int)call->nDgramPackets
4663 || cnt >= (int)call->cwind
4664 || list[i]->header.serial
4665 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4667 rxi_SendList(call, lastP, lastCnt, istack, 1,
4668 now, retryTime, resending);
4669 /* If the call enters an error state stop sending, or if
4670 * we entered congestion recovery mode, stop sending */
4671 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4681 osi_Panic("rxi_SendList error");
4687 /* Send the whole list when the call is in receive mode, when
4688 * the call is in eof mode, when we are in fast recovery mode,
4689 * and when we have the last packet */
4690 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4691 || call->mode == RX_MODE_RECEIVING
4692 || call->mode == RX_MODE_EOF
4693 || (call->flags & RX_CALL_FAST_RECOVER)) {
4694 /* Check for the case where the current list contains
4695 * an acked packet. Since we always send retransmissions
4696 * in a separate packet, we only need to check the first
4697 * packet in the list */
4698 if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
4702 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4703 now, retryTime, resending);
4704 /* If the call enters an error state stop sending, or if
4705 * we entered congestion recovery mode, stop sending */
4706 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4710 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4712 } else if (lastCnt > 0) {
4713 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4717 #ifdef RX_ENABLE_LOCKS
4718 /* Call rxi_Start, below, but with the call lock held. */
4719 void rxi_StartUnlocked(struct rxevent *event, register struct rx_call *call,
4722 MUTEX_ENTER(&call->lock);
4723 rxi_Start(event, call, istack);
4724 MUTEX_EXIT(&call->lock);
4726 #endif /* RX_ENABLE_LOCKS */
4728 /* This routine is called when new packets are readied for
4729 * transmission and when retransmission may be necessary, or when the
4730 * transmission window or burst count are favourable. This should be
4731 * better optimized for new packets, the usual case, now that we've
4732 * got rid of queues of send packets. XXXXXXXXXXX */
4733 void rxi_Start(struct rxevent *event, register struct rx_call *call,
4736 struct rx_packet *p;
4737 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4738 struct rx_peer *peer = call->conn->peer;
4739 struct clock now, retryTime;
4743 struct rx_packet **xmitList;
4746 /* If rxi_Start is being called as a result of a resend event,
4747 * then make sure that the event pointer is removed from the call
4748 * structure, since there is no longer a per-call retransmission
4750 if (event && event == call->resendEvent) {
4751 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4752 call->resendEvent = NULL;
4754 if (queue_IsEmpty(&call->tq)) {
4758 /* Timeouts trigger congestion recovery */
4759 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4760 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4761 /* someone else is waiting to start recovery */
4764 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4765 while (call->flags & RX_CALL_TQ_BUSY) {
4766 call->flags |= RX_CALL_TQ_WAIT;
4767 #ifdef RX_ENABLE_LOCKS
4768 CV_WAIT(&call->cv_tq, &call->lock);
4769 #else /* RX_ENABLE_LOCKS */
4770 osi_rxSleep(&call->tq);
4771 #endif /* RX_ENABLE_LOCKS */
4773 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4774 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4775 call->flags |= RX_CALL_FAST_RECOVER;
4776 if (peer->maxDgramPackets > 1) {
4777 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4779 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4781 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4782 call->nDgramPackets = 1;
4784 call->nextCwind = 1;
4787 MUTEX_ENTER(&peer->peer_lock);
4788 peer->MTU = call->MTU;
4789 peer->cwind = call->cwind;
4790 peer->nDgramPackets = 1;
4792 call->congestSeq = peer->congestSeq;
4793 MUTEX_EXIT(&peer->peer_lock);
4794 /* Clear retry times on packets. Otherwise, it's possible for
4795 * some packets in the queue to force resends at rates faster
4796 * than recovery rates.
4798 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4799 if (!(p->flags & RX_PKTFLAG_ACKED)) {
4800 clock_Zero(&p->retryTime);
4805 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4806 MUTEX_ENTER(&rx_stats_mutex);
4807 rx_tq_debug.rxi_start_in_error ++;
4808 MUTEX_EXIT(&rx_stats_mutex);
4813 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4814 /* Get clock to compute the re-transmit time for any packets
4815 * in this burst. Note, if we back off, it's reasonable to
4816 * back off all of the packets in the same manner, even if
4817 * some of them have been retransmitted more times than more
4818 * recent additions */
4819 clock_GetTime(&now);
4820 retryTime = now; /* initialize before use */
4821 MUTEX_ENTER(&peer->peer_lock);
4822 clock_Add(&retryTime, &peer->timeout);
4823 MUTEX_EXIT(&peer->peer_lock);
4825 /* Send (or resend) any packets that need it, subject to
4826 * window restrictions and congestion burst control
4827 * restrictions. Ask for an ack on the last packet sent in
4828 * this burst. For now, we're relying upon the window being
4829 * considerably bigger than the largest number of packets that
4830 * are typically sent at once by one initial call to
4831 * rxi_Start. This is probably bogus (perhaps we should ask
4832 * for an ack when we're half way through the current
4833 * window?). Also, for non file transfer applications, this
4834 * may end up asking for an ack for every packet. Bogus. XXXX
4837 * But check whether we're here recursively, and let the other guy
4840 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4841 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4842 call->flags |= RX_CALL_TQ_BUSY;
4844 call->flags &= ~RX_CALL_NEED_START;
4845 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4847 maxXmitPackets = MIN(call->twind, call->cwind);
4848 xmitList = (struct rx_packet **)
4849 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4850 if (xmitList == NULL)
4851 osi_Panic("rxi_Start, failed to allocate xmit list");
4852 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4853 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4854 /* We shouldn't be sending packets if a thread is waiting
4855 * to initiate congestion recovery */
4858 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4859 /* Only send one packet during fast recovery */
4862 if ((p->flags & RX_PKTFLAG_FREE) ||
4863 (!queue_IsEnd(&call->tq, nxp)
4864 && (nxp->flags & RX_PKTFLAG_FREE)) ||
4865 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4866 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4867 osi_Panic("rxi_Start: xmit queue clobbered");
4869 if (p->flags & RX_PKTFLAG_ACKED) {
4870 MUTEX_ENTER(&rx_stats_mutex);
4871 rx_stats.ignoreAckedPacket++;
4872 MUTEX_EXIT(&rx_stats_mutex);
4873 continue; /* Ignore this packet if it has been acknowledged */
4876 /* Turn off all flags except these ones, which are the same
4877 * on each transmission */
4878 p->header.flags &= RX_PRESET_FLAGS;
4880 if (p->header.seq >= call->tfirst +
4881 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4882 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4883 /* Note: if we're waiting for more window space, we can
4884 * still send retransmits; hence we don't return here, but
4885 * break out to schedule a retransmit event */
4886 dpf(("call %d waiting for window", *(call->callNumber)));
4890 /* Transmit the packet if it needs to be sent. */
4891 if (!clock_Lt(&now, &p->retryTime)) {
4892 if (nXmitPackets == maxXmitPackets) {
4893 osi_Panic("rxi_Start: xmit list overflowed");
4895 xmitList[nXmitPackets++] = p;
4899 /* xmitList now hold pointers to all of the packets that are
4900 * ready to send. Now we loop to send the packets */
4901 if (nXmitPackets > 0) {
4902 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4903 &now, &retryTime, resending);
4905 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4907 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4909 * TQ references no longer protected by this flag; they must remain
4910 * protected by the global lock.
4912 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4913 call->flags &= ~RX_CALL_TQ_BUSY;
4914 if (call->flags & RX_CALL_TQ_WAIT) {
4915 call->flags &= ~RX_CALL_TQ_WAIT;
4916 #ifdef RX_ENABLE_LOCKS
4917 CV_BROADCAST(&call->cv_tq);
4918 #else /* RX_ENABLE_LOCKS */
4919 osi_rxWakeup(&call->tq);
4920 #endif /* RX_ENABLE_LOCKS */
4925 /* We went into the error state while sending packets. Now is
4926 * the time to reset the call. This will also inform the using
4927 * process that the call is in an error state.
4929 MUTEX_ENTER(&rx_stats_mutex);
4930 rx_tq_debug.rxi_start_aborted ++;
4931 MUTEX_EXIT(&rx_stats_mutex);
4932 call->flags &= ~RX_CALL_TQ_BUSY;
4933 if (call->flags & RX_CALL_TQ_WAIT) {
4934 call->flags &= ~RX_CALL_TQ_WAIT;
4935 #ifdef RX_ENABLE_LOCKS
4936 CV_BROADCAST(&call->cv_tq);
4937 #else /* RX_ENABLE_LOCKS */
4938 osi_rxWakeup(&call->tq);
4939 #endif /* RX_ENABLE_LOCKS */
4941 rxi_CallError(call, call->error);
4944 #ifdef RX_ENABLE_LOCKS
4945 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4946 register int missing;
4947 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4948 /* Some packets have received acks. If they all have, we can clear
4949 * the transmit queue.
4951 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4952 if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
4960 call->flags |= RX_CALL_TQ_CLEARME;
4962 #endif /* RX_ENABLE_LOCKS */
4963 /* Don't bother doing retransmits if the TQ is cleared. */
4964 if (call->flags & RX_CALL_TQ_CLEARME) {
4965 rxi_ClearTransmitQueue(call, 1);
4967 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4970 /* Always post a resend event, if there is anything in the
4971 * queue, and resend is possible. There should be at least
4972 * one unacknowledged packet in the queue ... otherwise none
4973 * of these packets should be on the queue in the first place.
4975 if (call->resendEvent) {
4976 /* Cancel the existing event and post a new one */
4977 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4980 /* The retry time is the retry time on the first unacknowledged
4981 * packet inside the current window */
4982 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4983 /* Don't set timers for packets outside the window */
4984 if (p->header.seq >= call->tfirst + call->twind) {
4988 if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
4990 retryTime = p->retryTime;
4995 /* Post a new event to re-run rxi_Start when retries may be needed */
4996 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4997 #ifdef RX_ENABLE_LOCKS
4998 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4999 call->resendEvent = rxevent_Post(&retryTime,
5001 (void *)call, (void *)istack);
5002 #else /* RX_ENABLE_LOCKS */
5003 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
5004 (void *)call, (void *)istack);
5005 #endif /* RX_ENABLE_LOCKS */
5008 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5009 } while (call->flags & RX_CALL_NEED_START);
5011 * TQ references no longer protected by this flag; they must remain
5012 * protected by the global lock.
5014 call->flags &= ~RX_CALL_TQ_BUSY;
5015 if (call->flags & RX_CALL_TQ_WAIT) {
5016 call->flags &= ~RX_CALL_TQ_WAIT;
5017 #ifdef RX_ENABLE_LOCKS
5018 CV_BROADCAST(&call->cv_tq);
5019 #else /* RX_ENABLE_LOCKS */
5020 osi_rxWakeup(&call->tq);
5021 #endif /* RX_ENABLE_LOCKS */
5024 call->flags |= RX_CALL_NEED_START;
5026 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5028 if (call->resendEvent) {
5029 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5034 /* Also adjusts the keep alive parameters for the call, to reflect
5035 * that we have just sent a packet (so keep alives aren't sent
5037 void rxi_Send(register struct rx_call *call, register struct rx_packet *p,
5040 register struct rx_connection *conn = call->conn;
5042 /* Stamp each packet with the user supplied status */
5043 p->header.userStatus = call->localStatus;
5045 /* Allow the security object controlling this call's security to
5046 * make any last-minute changes to the packet */
5047 RXS_SendPacket(conn->securityObject, call, p);
5049 /* Since we're about to send SOME sort of packet to the peer, it's
5050 * safe to nuke any scheduled end-of-packets ack */
5051 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5053 /* Actually send the packet, filling in more connection-specific fields */
5054 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5055 MUTEX_EXIT(&call->lock);
5056 rxi_SendPacket(call, conn, p, istack);
5057 MUTEX_ENTER(&call->lock);
5058 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5060 /* Update last send time for this call (for keep-alive
5061 * processing), and for the connection (so that we can discover
5062 * idle connections) */
5063 conn->lastSendTime = call->lastSendTime = clock_Sec();
5067 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5068 * that things are fine. Also called periodically to guarantee that nothing
5069 * falls through the cracks (e.g. (error + dally) connections have keepalive
5070 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5072 * haveCTLock Set if calling from rxi_ReapConnections
5074 #ifdef RX_ENABLE_LOCKS
5075 int rxi_CheckCall(register struct rx_call *call, int haveCTLock)
5076 #else /* RX_ENABLE_LOCKS */
5077 int rxi_CheckCall(register struct rx_call *call)
5078 #endif /* RX_ENABLE_LOCKS */
5080 register struct rx_connection *conn = call->conn;
5082 afs_uint32 deadTime;
5084 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5085 if (call->flags & RX_CALL_TQ_BUSY) {
5086 /* Call is active and will be reset by rxi_Start if it's
5087 * in an error state.
5092 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5093 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5094 ((afs_uint32)conn->peer->rtt >> 3) +
5095 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5097 /* These are computed to the second (+- 1 second). But that's
5098 * good enough for these values, which should be a significant
5099 * number of seconds. */
5100 if (now > (call->lastReceiveTime + deadTime)) {
5101 if (call->state == RX_STATE_ACTIVE) {
5102 rxi_CallError(call, RX_CALL_DEAD);
5106 #ifdef RX_ENABLE_LOCKS
5107 /* Cancel pending events */
5108 rxevent_Cancel(call->delayedAckEvent, call,
5109 RX_CALL_REFCOUNT_DELAY);
5110 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5111 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5112 if (call->refCount == 0) {
5113 rxi_FreeCall(call, haveCTLock);
5117 #else /* RX_ENABLE_LOCKS */
5120 #endif /* RX_ENABLE_LOCKS */
5122 /* Non-active calls are destroyed if they are not responding
5123 * to pings; active calls are simply flagged in error, so the
5124 * attached process can die reasonably gracefully. */
5126 /* see if we have a non-activity timeout */
5127 if (call->startWait && conn->idleDeadTime
5128 && ((call->startWait + conn->idleDeadTime) < now)) {
5129 if (call->state == RX_STATE_ACTIVE) {
5130 rxi_CallError(call, RX_CALL_TIMEOUT);
5134 /* see if we have a hard timeout */
5135 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5136 if (call->state == RX_STATE_ACTIVE)
5137 rxi_CallError(call, RX_CALL_TIMEOUT);
5144 /* When a call is in progress, this routine is called occasionally to
5145 * make sure that some traffic has arrived (or been sent to) the peer.
5146 * If nothing has arrived in a reasonable amount of time, the call is
5147 * declared dead; if nothing has been sent for a while, we send a
5148 * keep-alive packet (if we're actually trying to keep the call alive)
5150 void rxi_KeepAliveEvent(struct rxevent *event, register struct rx_call *call,
5153 struct rx_connection *conn;
5156 MUTEX_ENTER(&call->lock);
5157 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5158 if (event == call->keepAliveEvent)
5159 call->keepAliveEvent = NULL;
5162 #ifdef RX_ENABLE_LOCKS
5163 if(rxi_CheckCall(call, 0)) {
5164 MUTEX_EXIT(&call->lock);
5167 #else /* RX_ENABLE_LOCKS */
5168 if (rxi_CheckCall(call)) return;
5169 #endif /* RX_ENABLE_LOCKS */
5171 /* Don't try to keep alive dallying calls */
5172 if (call->state == RX_STATE_DALLY) {
5173 MUTEX_EXIT(&call->lock);
5178 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5179 /* Don't try to send keepalives if there is unacknowledged data */
5180 /* the rexmit code should be good enough, this little hack
5181 * doesn't quite work XXX */
5182 (void) rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
5184 rxi_ScheduleKeepAliveEvent(call);
5185 MUTEX_EXIT(&call->lock);
5189 void rxi_ScheduleKeepAliveEvent(register struct rx_call *call)
5191 if (!call->keepAliveEvent) {
5193 clock_GetTime(&when);
5194 when.sec += call->conn->secondsUntilPing;
5195 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5196 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5200 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5201 void rxi_KeepAliveOn(register struct rx_call *call)
5203 /* Pretend last packet received was received now--i.e. if another
5204 * packet isn't received within the keep alive time, then the call
5205 * will die; Initialize last send time to the current time--even
5206 * if a packet hasn't been sent yet. This will guarantee that a
5207 * keep-alive is sent within the ping time */
5208 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5209 rxi_ScheduleKeepAliveEvent(call);
5212 /* This routine is called to send connection abort messages
5213 * that have been delayed to throttle looping clients. */
5214 void rxi_SendDelayedConnAbort(struct rxevent *event, register struct rx_connection *conn,
5218 struct rx_packet *packet;
5220 MUTEX_ENTER(&conn->conn_data_lock);
5221 conn->delayedAbortEvent = NULL;
5222 error = htonl(conn->error);
5224 MUTEX_EXIT(&conn->conn_data_lock);
5225 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5227 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5228 RX_PACKET_TYPE_ABORT, (char *)&error,
5230 rxi_FreePacket(packet);
5234 /* This routine is called to send call abort messages
5235 * that have been delayed to throttle looping clients. */
5236 void rxi_SendDelayedCallAbort(struct rxevent *event, register struct rx_call *call,
5240 struct rx_packet *packet;
5242 MUTEX_ENTER(&call->lock);
5243 call->delayedAbortEvent = NULL;
5244 error = htonl(call->error);
5246 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5248 packet = rxi_SendSpecial(call, call->conn, packet,
5249 RX_PACKET_TYPE_ABORT, (char *)&error,
5251 rxi_FreePacket(packet);
5253 MUTEX_EXIT(&call->lock);
5256 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5257 * seconds) to ask the client to authenticate itself. The routine
5258 * issues a challenge to the client, which is obtained from the
5259 * security object associated with the connection */
5260 void rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn,
5263 int tries = (int) atries;
5264 conn->challengeEvent = NULL;
5265 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5266 register struct rx_packet *packet;
5270 /* We've failed to authenticate for too long.
5271 * Reset any calls waiting for authentication;
5272 * they are all in RX_STATE_PRECALL.
5276 MUTEX_ENTER(&conn->conn_call_lock);
5277 for (i=0; i<RX_MAXCALLS; i++) {
5278 struct rx_call *call = conn->call[i];
5280 MUTEX_ENTER(&call->lock);
5281 if (call->state == RX_STATE_PRECALL) {
5282 rxi_CallError(call, RX_CALL_DEAD);
5283 rxi_SendCallAbort(call, NULL, 0, 0);
5285 MUTEX_EXIT(&call->lock);
5288 MUTEX_EXIT(&conn->conn_call_lock);
5292 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5294 /* If there's no packet available, do this later. */
5295 RXS_GetChallenge(conn->securityObject, conn, packet);
5296 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5297 RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
5298 rxi_FreePacket(packet);
5300 clock_GetTime(&when);
5301 when.sec += RX_CHALLENGE_TIMEOUT;
5302 conn->challengeEvent =
5303 rxevent_Post(&when, rxi_ChallengeEvent, conn, (void *) (tries-1));
5307 /* Call this routine to start requesting the client to authenticate
5308 * itself. This will continue until authentication is established,
5309 * the call times out, or an invalid response is returned. The
5310 * security object associated with the connection is asked to create
5311 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5312 * defined earlier. */
5313 void rxi_ChallengeOn(register struct rx_connection *conn)
5315 if (!conn->challengeEvent) {
5316 RXS_CreateChallenge(conn->securityObject, conn);
5317 rxi_ChallengeEvent(NULL, conn, (void *) RX_CHALLENGE_MAXTRIES);
5322 /* Compute round trip time of the packet provided, in *rttp.
5325 /* rxi_ComputeRoundTripTime is called with peer locked. */
5326 /* sentp and/or peer may be null */
5327 void rxi_ComputeRoundTripTime(register struct rx_packet *p,
5328 register struct clock *sentp, register struct rx_peer *peer)
5330 struct clock thisRtt, *rttp = &thisRtt;
5332 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5333 /* making year 2038 bugs to get this running now - stroucki */
5334 struct timeval temptime;
5336 register int rtt_timeout;
5338 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5339 /* yet again. This was the worst Heisenbug of the port - stroucki */
5340 clock_GetTime(&temptime);
5341 rttp->sec=(afs_int32)temptime.tv_sec;
5342 rttp->usec=(afs_int32)temptime.tv_usec;
5344 clock_GetTime(rttp);
5346 if (clock_Lt(rttp, sentp)) {
5348 return; /* somebody set the clock back, don't count this time. */
5350 clock_Sub(rttp, sentp);
5351 MUTEX_ENTER(&rx_stats_mutex);
5352 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5353 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5354 if (rttp->sec > 60) {
5355 MUTEX_EXIT(&rx_stats_mutex);
5356 return; /* somebody set the clock ahead */
5358 rx_stats.maxRtt = *rttp;
5360 clock_Add(&rx_stats.totalRtt, rttp);
5361 rx_stats.nRttSamples++;
5362 MUTEX_EXIT(&rx_stats_mutex);
5364 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5366 /* Apply VanJacobson round-trip estimations */
5371 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5372 * srtt is stored as fixed point with 3 bits after the binary
5373 * point (i.e., scaled by 8). The following magic is
5374 * equivalent to the smoothing algorithm in rfc793 with an
5375 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5376 * srtt*8 = srtt*8 + rtt - srtt
5377 * srtt = srtt + rtt/8 - srtt/8
5380 delta = MSEC(rttp) - (peer->rtt >> 3);
5384 * We accumulate a smoothed rtt variance (actually, a smoothed
5385 * mean difference), then set the retransmit timer to smoothed
5386 * rtt + 4 times the smoothed variance (was 2x in van's original
5387 * paper, but 4x works better for me, and apparently for him as
5389 * rttvar is stored as
5390 * fixed point with 2 bits after the binary point (scaled by
5391 * 4). The following is equivalent to rfc793 smoothing with
5392 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5393 * replaces rfc793's wired-in beta.
5394 * dev*4 = dev*4 + (|actual - expected| - dev)
5400 delta -= (peer->rtt_dev >> 2);
5401 peer->rtt_dev += delta;
5404 /* I don't have a stored RTT so I start with this value. Since I'm
5405 * probably just starting a call, and will be pushing more data down
5406 * this, I expect congestion to increase rapidly. So I fudge a
5407 * little, and I set deviance to half the rtt. In practice,
5408 * deviance tends to approach something a little less than
5409 * half the smoothed rtt. */
5410 peer->rtt = (MSEC(rttp) << 3) + 8;
5411 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5413 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5414 * the other of these connections is usually in a user process, and can
5415 * be switched and/or swapped out. So on fast, reliable networks, the
5416 * timeout would otherwise be too short.
5418 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5419 clock_Zero(&(peer->timeout));
5420 clock_Addmsec(&(peer->timeout), rtt_timeout);
5422 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5423 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5424 (peer->timeout.sec),(peer->timeout.usec)) );
5428 /* Find all server connections that have not been active for a long time, and
5430 void rxi_ReapConnections(void)
5433 clock_GetTime(&now);
5435 /* Find server connection structures that haven't been used for
5436 * greater than rx_idleConnectionTime */
5437 { struct rx_connection **conn_ptr, **conn_end;
5438 int i, havecalls = 0;
5439 MUTEX_ENTER(&rx_connHashTable_lock);
5440 for (conn_ptr = &rx_connHashTable[0],
5441 conn_end = &rx_connHashTable[rx_hashTableSize];
5442 conn_ptr < conn_end; conn_ptr++) {
5443 struct rx_connection *conn, *next;
5444 struct rx_call *call;
5448 for (conn = *conn_ptr; conn; conn = next) {
5449 /* XXX -- Shouldn't the connection be locked? */
5452 for(i=0;i<RX_MAXCALLS;i++) {
5453 call = conn->call[i];
5456 MUTEX_ENTER(&call->lock);
5457 #ifdef RX_ENABLE_LOCKS
5458 result = rxi_CheckCall(call, 1);
5459 #else /* RX_ENABLE_LOCKS */
5460 result = rxi_CheckCall(call);
5461 #endif /* RX_ENABLE_LOCKS */
5462 MUTEX_EXIT(&call->lock);
5464 /* If CheckCall freed the call, it might
5465 * have destroyed the connection as well,
5466 * which screws up the linked lists.
5472 if (conn->type == RX_SERVER_CONNECTION) {
5473 /* This only actually destroys the connection if
5474 * there are no outstanding calls */
5475 MUTEX_ENTER(&conn->conn_data_lock);
5476 if (!havecalls && !conn->refCount &&
5477 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5478 conn->refCount++; /* it will be decr in rx_DestroyConn */
5479 MUTEX_EXIT(&conn->conn_data_lock);
5480 #ifdef RX_ENABLE_LOCKS
5481 rxi_DestroyConnectionNoLock(conn);
5482 #else /* RX_ENABLE_LOCKS */
5483 rxi_DestroyConnection(conn);
5484 #endif /* RX_ENABLE_LOCKS */
5486 #ifdef RX_ENABLE_LOCKS
5488 MUTEX_EXIT(&conn->conn_data_lock);
5490 #endif /* RX_ENABLE_LOCKS */
5494 #ifdef RX_ENABLE_LOCKS
5495 while (rx_connCleanup_list) {
5496 struct rx_connection *conn;
5497 conn = rx_connCleanup_list;
5498 rx_connCleanup_list = rx_connCleanup_list->next;
5499 MUTEX_EXIT(&rx_connHashTable_lock);
5500 rxi_CleanupConnection(conn);
5501 MUTEX_ENTER(&rx_connHashTable_lock);
5503 MUTEX_EXIT(&rx_connHashTable_lock);
5504 #endif /* RX_ENABLE_LOCKS */
5507 /* Find any peer structures that haven't been used (haven't had an
5508 * associated connection) for greater than rx_idlePeerTime */
5509 { struct rx_peer **peer_ptr, **peer_end;
5511 MUTEX_ENTER(&rx_rpc_stats);
5512 MUTEX_ENTER(&rx_peerHashTable_lock);
5513 for (peer_ptr = &rx_peerHashTable[0],
5514 peer_end = &rx_peerHashTable[rx_hashTableSize];
5515 peer_ptr < peer_end; peer_ptr++) {
5516 struct rx_peer *peer, *next, *prev;
5517 for (prev = peer = *peer_ptr; peer; peer = next) {
5519 code = MUTEX_TRYENTER(&peer->peer_lock);
5520 if ((code) && (peer->refCount == 0)
5521 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5522 rx_interface_stat_p rpc_stat, nrpc_stat;
5524 MUTEX_EXIT(&peer->peer_lock);
5525 MUTEX_DESTROY(&peer->peer_lock);
5526 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5527 rx_interface_stat)) {
5528 unsigned int num_funcs;
5529 if (!rpc_stat) break;
5530 queue_Remove(&rpc_stat->queue_header);
5531 queue_Remove(&rpc_stat->all_peers);
5532 num_funcs = rpc_stat->stats[0].func_total;
5533 space = sizeof(rx_interface_stat_t) +
5534 rpc_stat->stats[0].func_total *
5535 sizeof(rx_function_entry_v1_t);
5537 rxi_Free(rpc_stat, space);
5538 rxi_rpc_peer_stat_cnt -= num_funcs;
5541 MUTEX_ENTER(&rx_stats_mutex);
5542 rx_stats.nPeerStructs--;
5543 MUTEX_EXIT(&rx_stats_mutex);
5544 if (prev == *peer_ptr) {
5553 MUTEX_EXIT(&peer->peer_lock);
5559 MUTEX_EXIT(&rx_peerHashTable_lock);
5560 MUTEX_EXIT(&rx_rpc_stats);
5563 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5564 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5565 GC, just below. Really, we shouldn't have to keep moving packets from
5566 one place to another, but instead ought to always know if we can
5567 afford to hold onto a packet in its particular use. */
5568 MUTEX_ENTER(&rx_freePktQ_lock);
5569 if (rx_waitingForPackets) {
5570 rx_waitingForPackets = 0;
5571 #ifdef RX_ENABLE_LOCKS
5572 CV_BROADCAST(&rx_waitingForPackets_cv);
5574 osi_rxWakeup(&rx_waitingForPackets);
5577 MUTEX_EXIT(&rx_freePktQ_lock);
5579 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5580 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5584 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5585 * rx.h is sort of strange this is better. This is called with a security
5586 * object before it is discarded. Each connection using a security object has
5587 * its own refcount to the object so it won't actually be freed until the last
5588 * connection is destroyed.
5590 * This is the only rxs module call. A hold could also be written but no one
5593 int rxs_Release (struct rx_securityClass *aobj)
5595 return RXS_Close (aobj);
5599 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5600 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5601 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5602 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5604 /* Adjust our estimate of the transmission rate to this peer, given
5605 * that the packet p was just acked. We can adjust peer->timeout and
5606 * call->twind. Pragmatically, this is called
5607 * only with packets of maximal length.
5608 * Called with peer and call locked.
5611 static void rxi_ComputeRate(register struct rx_peer *peer,
5612 register struct rx_call *call, struct rx_packet *p,
5613 struct rx_packet *ackp, u_char ackReason)
5615 afs_int32 xferSize, xferMs;
5616 register afs_int32 minTime;
5619 /* Count down packets */
5620 if (peer->rateFlag > 0) peer->rateFlag--;
5621 /* Do nothing until we're enabled */
5622 if (peer->rateFlag != 0) return;
5623 if (!call->conn) return;
5625 /* Count only when the ack seems legitimate */
5626 switch (ackReason) {
5627 case RX_ACK_REQUESTED:
5628 xferSize = p->length + RX_HEADER_SIZE +
5629 call->conn->securityMaxTrailerSize;
5633 case RX_ACK_PING_RESPONSE:
5634 if (p) /* want the response to ping-request, not data send */
5636 clock_GetTime(&newTO);
5637 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5638 clock_Sub(&newTO, &call->pingRequestTime);
5639 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5643 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5650 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5651 ntohl(peer->host), ntohs(peer->port),
5652 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5653 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5656 /* Track only packets that are big enough. */
5657 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5661 /* absorb RTT data (in milliseconds) for these big packets */
5662 if (peer->smRtt == 0) {
5663 peer->smRtt = xferMs;
5665 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5666 if (!peer->smRtt) peer->smRtt = 1;
5669 if (peer->countDown) {
5673 peer->countDown = 10; /* recalculate only every so often */
5675 /* In practice, we can measure only the RTT for full packets,
5676 * because of the way Rx acks the data that it receives. (If it's
5677 * smaller than a full packet, it often gets implicitly acked
5678 * either by the call response (from a server) or by the next call
5679 * (from a client), and either case confuses transmission times
5680 * with processing times.) Therefore, replace the above
5681 * more-sophisticated processing with a simpler version, where the
5682 * smoothed RTT is kept for full-size packets, and the time to
5683 * transmit a windowful of full-size packets is simply RTT *
5684 * windowSize. Again, we take two steps:
5685 - ensure the timeout is large enough for a single packet's RTT;
5686 - ensure that the window is small enough to fit in the desired timeout.*/
5688 /* First, the timeout check. */
5689 minTime = peer->smRtt;
5690 /* Get a reasonable estimate for a timeout period */
5692 newTO.sec = minTime / 1000;
5693 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5695 /* Increase the timeout period so that we can always do at least
5696 * one packet exchange */
5697 if (clock_Gt(&newTO, &peer->timeout)) {
5699 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5700 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5701 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5704 peer->timeout = newTO;
5707 /* Now, get an estimate for the transmit window size. */
5708 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5709 /* Now, convert to the number of full packets that could fit in a
5710 * reasonable fraction of that interval */
5711 minTime /= (peer->smRtt << 1);
5712 xferSize = minTime; /* (make a copy) */
5714 /* Now clamp the size to reasonable bounds. */
5715 if (minTime <= 1) minTime = 1;
5716 else if (minTime > rx_Window) minTime = rx_Window;
5717 /* if (minTime != peer->maxWindow) {
5718 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5719 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5720 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5722 peer->maxWindow = minTime;
5723 elide... call->twind = minTime;
5727 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5728 * Discern this by calculating the timeout necessary for rx_Window
5730 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5731 /* calculate estimate for transmission interval in milliseconds */
5732 minTime = rx_Window * peer->smRtt;
5733 if (minTime < 1000) {
5734 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5735 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5736 peer->timeout.usec, peer->smRtt,
5739 newTO.sec = 0; /* cut back on timeout by half a second */
5740 newTO.usec = 500000;
5741 clock_Sub(&peer->timeout, &newTO);
5746 } /* end of rxi_ComputeRate */
5747 #endif /* ADAPT_WINDOW */
5755 /* Don't call this debugging routine directly; use dpf */
5757 rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8, int a9, int a10,
5758 int a11, int a12, int a13, int a14, int a15)
5761 clock_GetTime(&now);
5762 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5763 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5770 * This function is used to process the rx_stats structure that is local
5771 * to a process as well as an rx_stats structure received from a remote
5772 * process (via rxdebug). Therefore, it needs to do minimal version
5775 void rx_PrintTheseStats (FILE *file, struct rx_stats *s, int size,
5776 afs_int32 freePackets, char version)
5780 if (size != sizeof(struct rx_stats)) {
5782 "Unexpected size of stats structure: was %d, expected %d\n",
5783 size, sizeof(struct rx_stats));
5787 "rx stats: free packets %d, allocs %d, ",
5791 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5793 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5794 s->receivePktAllocFailures,
5795 s->receiveCbufPktAllocFailures,
5796 s->sendPktAllocFailures,
5797 s->sendCbufPktAllocFailures,
5798 s->specialPktAllocFailures);
5801 "alloc-failures(rcv %d,send %d,ack %d)\n",
5802 s->receivePktAllocFailures,
5803 s->sendPktAllocFailures,
5804 s->specialPktAllocFailures);
5809 "bogusReads %d (last from host %x), "
5815 s->bogusPacketOnRead,
5818 s->noPacketBuffersOnRead,
5822 fprintf(file, " packets read: ");
5823 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5829 fprintf(file, "\n");
5832 " other read counters: data %d, "
5840 s->spuriousPacketsRead,
5841 s->ignorePacketDally);
5843 fprintf(file, " packets sent: ");
5844 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5850 fprintf(file, "\n");
5853 " other send counters: ack %d, "
5854 "data %d (not resends), "
5857 "acked&ignored %d\n",
5860 s->dataPacketsReSent,
5861 s->dataPacketsPushed,
5862 s->ignoreAckedPacket);
5865 " \t(these should be small) sendFailed %d, "
5868 (int) s->fatalErrors);
5870 if (s->nRttSamples) {
5872 " Average rtt is %0.3f, with %d samples\n",
5873 clock_Float(&s->totalRtt)/s->nRttSamples,
5877 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5878 clock_Float(&s->minRtt),
5879 clock_Float(&s->maxRtt));
5883 " %d server connections, "
5884 "%d client connections, "
5887 "%d free call structs\n",
5892 s->nFreeCallStructs);
5894 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5896 " %d clock updates\n",
5902 /* for backward compatibility */
5903 void rx_PrintStats(FILE *file)
5905 MUTEX_ENTER(&rx_stats_mutex);
5906 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5907 MUTEX_EXIT(&rx_stats_mutex);
5910 void rx_PrintPeerStats(FILE *file, struct rx_peer *peer)
5915 "burst wait %u.%d.\n",
5918 (int) peer->burstSize,
5919 (int) peer->burstWait.sec,
5920 (int) peer->burstWait.usec);
5924 "retry time %u.%06d, "
5928 (int) peer->timeout.sec,
5929 (int) peer->timeout.usec,
5935 "max in packet skew %d, "
5936 "max out packet skew %d\n",
5938 (int) peer->inPacketSkew,
5939 (int) peer->outPacketSkew);
5942 #ifdef AFS_PTHREAD_ENV
5944 * This mutex protects the following static variables:
5948 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5949 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5951 #define LOCK_RX_DEBUG
5952 #define UNLOCK_RX_DEBUG
5953 #endif /* AFS_PTHREAD_ENV */
5955 static int MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr,
5956 afs_uint16 remotePort, u_char type, void *inputData, size_t inputLength,
5957 void *outputData, size_t outputLength)
5959 static afs_int32 counter = 100;
5961 struct rx_header theader;
5963 register afs_int32 code;
5965 struct sockaddr_in taddr, faddr;
5970 endTime = time(0) + 20; /* try for 20 seconds */
5974 tp = &tbuffer[sizeof(struct rx_header)];
5975 taddr.sin_family = AF_INET;
5976 taddr.sin_port = remotePort;
5977 taddr.sin_addr.s_addr = remoteAddr;
5978 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
5979 taddr.sin_len = sizeof(struct sockaddr_in);
5982 memset(&theader, 0, sizeof(theader));
5983 theader.epoch = htonl(999);
5985 theader.callNumber = htonl(counter);
5988 theader.type = type;
5989 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5990 theader.serviceId = 0;
5992 memcpy(tbuffer, &theader, sizeof(theader));
5993 memcpy(tp, inputData, inputLength);
5994 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5995 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5997 /* see if there's a packet available */
5999 FD_SET(socket, &imask);
6002 code = select(socket+1, &imask, 0, 0, &tv);
6004 /* now receive a packet */
6005 faddrLen = sizeof(struct sockaddr_in);
6006 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6007 (struct sockaddr *) &faddr, &faddrLen);
6009 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6010 if (counter == ntohl(theader.callNumber)) break;
6013 /* see if we've timed out */
6014 if (endTime < time(0)) return -1;
6016 code -= sizeof(struct rx_header);
6017 if (code > outputLength) code = outputLength;
6018 memcpy(outputData, tp, code);
6022 afs_int32 rx_GetServerDebug(osi_socket socket, afs_uint32 remoteAddr,
6023 afs_uint16 remotePort, struct rx_debugStats *stat, afs_uint32 *supportedValues)
6025 struct rx_debugIn in;
6028 *supportedValues = 0;
6029 in.type = htonl(RX_DEBUGI_GETSTATS);
6032 rc = MakeDebugCall(socket,
6035 RX_PACKET_TYPE_DEBUG,
6042 * If the call was successful, fixup the version and indicate
6043 * what contents of the stat structure are valid.
6044 * Also do net to host conversion of fields here.
6048 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6049 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6051 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6052 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6054 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6055 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6057 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6058 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6060 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6061 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6063 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6064 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6066 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6067 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6070 stat->nFreePackets = ntohl(stat->nFreePackets);
6071 stat->packetReclaims = ntohl(stat->packetReclaims);
6072 stat->callsExecuted = ntohl(stat->callsExecuted);
6073 stat->nWaiting = ntohl(stat->nWaiting);
6074 stat->idleThreads = ntohl(stat->idleThreads);
6080 afs_int32 rx_GetServerStats(osi_socket socket, afs_uint32 remoteAddr,
6081 afs_uint16 remotePort, struct rx_stats *stat, afs_uint32 *supportedValues)
6083 struct rx_debugIn in;
6084 afs_int32 *lp = (afs_int32 *) stat;
6089 * supportedValues is currently unused, but added to allow future
6090 * versioning of this function.
6093 *supportedValues = 0;
6094 in.type = htonl(RX_DEBUGI_RXSTATS);
6096 memset(stat, 0, sizeof(*stat));
6098 rc = MakeDebugCall(socket,
6101 RX_PACKET_TYPE_DEBUG,
6110 * Do net to host conversion here
6113 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6121 afs_int32 rx_GetServerVersion(osi_socket socket, afs_uint32 remoteAddr,
6122 afs_uint16 remotePort, size_t version_length, char *version)
6125 return MakeDebugCall(socket,
6128 RX_PACKET_TYPE_VERSION,
6135 afs_int32 rx_GetServerConnections(osi_socket socket, afs_uint32 remoteAddr,
6136 afs_uint16 remotePort, afs_int32 *nextConnection, int allConnections,
6137 afs_uint32 debugSupportedValues, struct rx_debugConn *conn, afs_uint32 *supportedValues)
6139 struct rx_debugIn in;
6144 * supportedValues is currently unused, but added to allow future
6145 * versioning of this function.
6148 *supportedValues = 0;
6149 if (allConnections) {
6150 in.type = htonl(RX_DEBUGI_GETALLCONN);
6152 in.type = htonl(RX_DEBUGI_GETCONN);
6154 in.index = htonl(*nextConnection);
6155 memset(conn, 0, sizeof(*conn));
6157 rc = MakeDebugCall(socket,
6160 RX_PACKET_TYPE_DEBUG,
6167 *nextConnection += 1;
6170 * Convert old connection format to new structure.
6173 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6174 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6175 #define MOVEvL(a) (conn->a = vL->a)
6177 /* any old or unrecognized version... */
6178 for (i=0;i<RX_MAXCALLS;i++) {
6179 MOVEvL(callState[i]);
6180 MOVEvL(callMode[i]);
6181 MOVEvL(callFlags[i]);
6182 MOVEvL(callOther[i]);
6184 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6185 MOVEvL(secStats.type);
6186 MOVEvL(secStats.level);
6187 MOVEvL(secStats.flags);
6188 MOVEvL(secStats.expires);
6189 MOVEvL(secStats.packetsReceived);
6190 MOVEvL(secStats.packetsSent);
6191 MOVEvL(secStats.bytesReceived);
6192 MOVEvL(secStats.bytesSent);
6197 * Do net to host conversion here
6199 * I don't convert host or port since we are most likely
6200 * going to want these in NBO.
6202 conn->cid = ntohl(conn->cid);
6203 conn->serial = ntohl(conn->serial);
6204 for(i=0;i<RX_MAXCALLS;i++) {
6205 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6207 conn->error = ntohl(conn->error);
6208 conn->secStats.flags = ntohl(conn->secStats.flags);
6209 conn->secStats.expires = ntohl(conn->secStats.expires);
6210 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6211 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6212 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6213 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6214 conn->epoch = ntohl(conn->epoch);
6215 conn->natMTU = ntohl(conn->natMTU);
6221 afs_int32 rx_GetServerPeers(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
6222 afs_int32 *nextPeer, afs_uint32 debugSupportedValues, struct rx_debugPeer *peer,
6223 afs_uint32 *supportedValues)
6225 struct rx_debugIn in;
6229 * supportedValues is currently unused, but added to allow future
6230 * versioning of this function.
6233 *supportedValues = 0;
6234 in.type = htonl(RX_DEBUGI_GETPEER);
6235 in.index = htonl(*nextPeer);
6236 memset(peer, 0, sizeof(*peer));
6238 rc = MakeDebugCall(socket,
6241 RX_PACKET_TYPE_DEBUG,
6251 * Do net to host conversion here
6253 * I don't convert host or port since we are most likely
6254 * going to want these in NBO.
6256 peer->ifMTU = ntohs(peer->ifMTU);
6257 peer->idleWhen = ntohl(peer->idleWhen);
6258 peer->refCount = ntohs(peer->refCount);
6259 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6260 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6261 peer->rtt = ntohl(peer->rtt);
6262 peer->rtt_dev = ntohl(peer->rtt_dev);
6263 peer->timeout.sec = ntohl(peer->timeout.sec);
6264 peer->timeout.usec = ntohl(peer->timeout.usec);
6265 peer->nSent = ntohl(peer->nSent);
6266 peer->reSends = ntohl(peer->reSends);
6267 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6268 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6269 peer->rateFlag = ntohl(peer->rateFlag);
6270 peer->natMTU = ntohs(peer->natMTU);
6271 peer->maxMTU = ntohs(peer->maxMTU);
6272 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6273 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6274 peer->MTU = ntohs(peer->MTU);
6275 peer->cwind = ntohs(peer->cwind);
6276 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6277 peer->congestSeq = ntohs(peer->congestSeq);
6278 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6279 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6280 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6281 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6286 #endif /* RXDEBUG */
6288 void shutdown_rx(void)
6290 struct rx_serverQueueEntry *np;
6293 register struct rx_call *call;
6294 register struct rx_serverQueueEntry *sq;
6298 if (rxinit_status == 1) {
6300 return; /* Already shutdown. */
6305 #ifndef AFS_PTHREAD_ENV
6306 FD_ZERO(&rx_selectMask);
6307 #endif /* AFS_PTHREAD_ENV */
6308 rxi_dataQuota = RX_MAX_QUOTA;
6309 #ifndef AFS_PTHREAD_ENV
6311 #endif /* AFS_PTHREAD_ENV */
6314 #ifndef AFS_PTHREAD_ENV
6315 #ifndef AFS_USE_GETTIMEOFDAY
6317 #endif /* AFS_USE_GETTIMEOFDAY */
6318 #endif /* AFS_PTHREAD_ENV */
6320 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6321 call = queue_First(&rx_freeCallQueue, rx_call);
6323 rxi_Free(call, sizeof(struct rx_call));
6326 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6327 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6333 struct rx_peer **peer_ptr, **peer_end;
6334 for (peer_ptr = &rx_peerHashTable[0],
6335 peer_end = &rx_peerHashTable[rx_hashTableSize];
6336 peer_ptr < peer_end; peer_ptr++) {
6337 struct rx_peer *peer, *next;
6338 for (peer = *peer_ptr; peer; peer = next) {
6339 rx_interface_stat_p rpc_stat, nrpc_stat;
6341 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6342 rx_interface_stat)) {
6343 unsigned int num_funcs;
6344 if (!rpc_stat) break;
6345 queue_Remove(&rpc_stat->queue_header);
6346 queue_Remove(&rpc_stat->all_peers);
6347 num_funcs = rpc_stat->stats[0].func_total;
6348 space = sizeof(rx_interface_stat_t) +
6349 rpc_stat->stats[0].func_total *
6350 sizeof(rx_function_entry_v1_t);
6352 rxi_Free(rpc_stat, space);
6353 MUTEX_ENTER(&rx_rpc_stats);
6354 rxi_rpc_peer_stat_cnt -= num_funcs;
6355 MUTEX_EXIT(&rx_rpc_stats);
6359 MUTEX_ENTER(&rx_stats_mutex);
6360 rx_stats.nPeerStructs--;
6361 MUTEX_EXIT(&rx_stats_mutex);
6365 for (i = 0; i<RX_MAX_SERVICES; i++) {
6367 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6369 for (i = 0; i < rx_hashTableSize; i++) {
6370 register struct rx_connection *tc, *ntc;
6371 MUTEX_ENTER(&rx_connHashTable_lock);
6372 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6374 for (j = 0; j < RX_MAXCALLS; j++) {
6376 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6379 rxi_Free(tc, sizeof(*tc));
6381 MUTEX_EXIT(&rx_connHashTable_lock);
6384 MUTEX_ENTER(&freeSQEList_lock);
6386 while ((np = rx_FreeSQEList)) {
6387 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6388 MUTEX_DESTROY(&np->lock);
6389 rxi_Free(np, sizeof(*np));
6392 MUTEX_EXIT(&freeSQEList_lock);
6393 MUTEX_DESTROY(&freeSQEList_lock);
6394 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6395 MUTEX_DESTROY(&rx_connHashTable_lock);
6396 MUTEX_DESTROY(&rx_peerHashTable_lock);
6397 MUTEX_DESTROY(&rx_serverPool_lock);
6399 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6400 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6402 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6403 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6405 rxi_FreeAllPackets();
6407 MUTEX_ENTER(&rx_stats_mutex);
6408 rxi_dataQuota = RX_MAX_QUOTA;
6409 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6410 MUTEX_EXIT(&rx_stats_mutex);
6416 #ifdef RX_ENABLE_LOCKS
6417 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6419 if (!MUTEX_ISMINE(lockaddr))
6420 osi_Panic("Lock not held: %s", msg);
6422 #endif /* RX_ENABLE_LOCKS */
6427 * Routines to implement connection specific data.
6430 int rx_KeyCreate(rx_destructor_t rtn)
6433 MUTEX_ENTER(&rxi_keyCreate_lock);
6434 key = rxi_keyCreate_counter++;
6435 rxi_keyCreate_destructor = (rx_destructor_t *)
6436 realloc((void *)rxi_keyCreate_destructor,
6437 (key+1) * sizeof(rx_destructor_t));
6438 rxi_keyCreate_destructor[key] = rtn;
6439 MUTEX_EXIT(&rxi_keyCreate_lock);
6443 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6446 MUTEX_ENTER(&conn->conn_data_lock);
6447 if (!conn->specific) {
6448 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6449 for (i = 0 ; i < key ; i++)
6450 conn->specific[i] = NULL;
6451 conn->nSpecific = key+1;
6452 conn->specific[key] = ptr;
6453 } else if (key >= conn->nSpecific) {
6454 conn->specific = (void **)
6455 realloc(conn->specific,(key+1)*sizeof(void *));
6456 for (i = conn->nSpecific ; i < key ; i++)
6457 conn->specific[i] = NULL;
6458 conn->nSpecific = key+1;
6459 conn->specific[key] = ptr;
6461 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6462 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6463 conn->specific[key] = ptr;
6465 MUTEX_EXIT(&conn->conn_data_lock);
6468 void *rx_GetSpecific(struct rx_connection *conn, int key)
6471 MUTEX_ENTER(&conn->conn_data_lock);
6472 if (key >= conn->nSpecific)
6475 ptr = conn->specific[key];
6476 MUTEX_EXIT(&conn->conn_data_lock);
6480 #endif /* !KERNEL */
6483 * processStats is a queue used to store the statistics for the local
6484 * process. Its contents are similar to the contents of the rpcStats
6485 * queue on a rx_peer structure, but the actual data stored within
6486 * this queue contains totals across the lifetime of the process (assuming
6487 * the stats have not been reset) - unlike the per peer structures
6488 * which can come and go based upon the peer lifetime.
6491 static struct rx_queue processStats = {&processStats,&processStats};
6494 * peerStats is a queue used to store the statistics for all peer structs.
6495 * Its contents are the union of all the peer rpcStats queues.
6498 static struct rx_queue peerStats = {&peerStats,&peerStats};
6501 * rxi_monitor_processStats is used to turn process wide stat collection
6505 static int rxi_monitor_processStats = 0;
6508 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6511 static int rxi_monitor_peerStats = 0;
6514 * rxi_AddRpcStat - given all of the information for a particular rpc
6515 * call, create (if needed) and update the stat totals for the rpc.
6519 * IN stats - the queue of stats that will be updated with the new value
6521 * IN rxInterface - a unique number that identifies the rpc interface
6523 * IN currentFunc - the index of the function being invoked
6525 * IN totalFunc - the total number of functions in this interface
6527 * IN queueTime - the amount of time this function waited for a thread
6529 * IN execTime - the amount of time this function invocation took to execute
6531 * IN bytesSent - the number bytes sent by this invocation
6533 * IN bytesRcvd - the number bytes received by this invocation
6535 * IN isServer - if true, this invocation was made to a server
6537 * IN remoteHost - the ip address of the remote host
6539 * IN remotePort - the port of the remote host
6541 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6543 * INOUT counter - if a new stats structure is allocated, the counter will
6544 * be updated with the new number of allocated stat structures
6551 static int rxi_AddRpcStat(
6552 struct rx_queue *stats,
6553 afs_uint32 rxInterface,
6554 afs_uint32 currentFunc,
6555 afs_uint32 totalFunc,
6556 struct clock *queueTime,
6557 struct clock *execTime,
6558 afs_hyper_t *bytesSent,
6559 afs_hyper_t *bytesRcvd,
6561 afs_uint32 remoteHost,
6562 afs_uint32 remotePort,
6564 unsigned int *counter)
6567 rx_interface_stat_p rpc_stat, nrpc_stat;
6570 * See if there's already a structure for this interface
6573 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6574 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6575 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6579 * Didn't find a match so allocate a new structure and add it to the
6583 if (queue_IsEnd(stats, rpc_stat) ||
6584 (rpc_stat == NULL) ||
6585 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6586 (rpc_stat->stats[0].remote_is_server != isServer)) {
6590 space = sizeof(rx_interface_stat_t) + totalFunc *
6591 sizeof(rx_function_entry_v1_t);
6593 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6594 if (rpc_stat == NULL) {
6598 *counter += totalFunc;
6599 for(i=0;i<totalFunc;i++) {
6600 rpc_stat->stats[i].remote_peer = remoteHost;
6601 rpc_stat->stats[i].remote_port = remotePort;
6602 rpc_stat->stats[i].remote_is_server = isServer;
6603 rpc_stat->stats[i].interfaceId = rxInterface;
6604 rpc_stat->stats[i].func_total = totalFunc;
6605 rpc_stat->stats[i].func_index = i;
6606 hzero(rpc_stat->stats[i].invocations);
6607 hzero(rpc_stat->stats[i].bytes_sent);
6608 hzero(rpc_stat->stats[i].bytes_rcvd);
6609 rpc_stat->stats[i].queue_time_sum.sec = 0;
6610 rpc_stat->stats[i].queue_time_sum.usec = 0;
6611 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6612 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6613 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6614 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6615 rpc_stat->stats[i].queue_time_max.sec = 0;
6616 rpc_stat->stats[i].queue_time_max.usec = 0;
6617 rpc_stat->stats[i].execution_time_sum.sec = 0;
6618 rpc_stat->stats[i].execution_time_sum.usec = 0;
6619 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6620 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6621 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6622 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6623 rpc_stat->stats[i].execution_time_max.sec = 0;
6624 rpc_stat->stats[i].execution_time_max.usec = 0;
6626 queue_Prepend(stats, rpc_stat);
6627 if (addToPeerList) {
6628 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6633 * Increment the stats for this function
6636 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6637 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6638 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6639 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6640 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6641 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6642 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6644 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6645 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6647 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6648 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6649 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6650 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6652 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6653 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6661 * rx_IncrementTimeAndCount - increment the times and count for a particular
6666 * IN peer - the peer who invoked the rpc
6668 * IN rxInterface - a unique number that identifies the rpc interface
6670 * IN currentFunc - the index of the function being invoked
6672 * IN totalFunc - the total number of functions in this interface
6674 * IN queueTime - the amount of time this function waited for a thread
6676 * IN execTime - the amount of time this function invocation took to execute
6678 * IN bytesSent - the number bytes sent by this invocation
6680 * IN bytesRcvd - the number bytes received by this invocation
6682 * IN isServer - if true, this invocation was made to a server
6689 void rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
6690 afs_uint32 currentFunc, afs_uint32 totalFunc, struct clock *queueTime,
6691 struct clock *execTime, afs_hyper_t *bytesSent, afs_hyper_t *bytesRcvd, int isServer)
6694 MUTEX_ENTER(&rx_rpc_stats);
6695 MUTEX_ENTER(&peer->peer_lock);
6697 if (rxi_monitor_peerStats) {
6698 rxi_AddRpcStat(&peer->rpcStats,
6710 &rxi_rpc_peer_stat_cnt);
6713 if (rxi_monitor_processStats) {
6714 rxi_AddRpcStat(&processStats,
6726 &rxi_rpc_process_stat_cnt);
6729 MUTEX_EXIT(&peer->peer_lock);
6730 MUTEX_EXIT(&rx_rpc_stats);
6735 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6739 * IN callerVersion - the rpc stat version of the caller.
6741 * IN count - the number of entries to marshall.
6743 * IN stats - pointer to stats to be marshalled.
6745 * OUT ptr - Where to store the marshalled data.
6751 void rx_MarshallProcessRPCStats(afs_uint32 callerVersion,
6752 int count, rx_function_entry_v1_t *stats, afs_uint32 **ptrP)
6758 * We only support the first version
6760 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6761 *(ptr++) = stats->remote_peer;
6762 *(ptr++) = stats->remote_port;
6763 *(ptr++) = stats->remote_is_server;
6764 *(ptr++) = stats->interfaceId;
6765 *(ptr++) = stats->func_total;
6766 *(ptr++) = stats->func_index;
6767 *(ptr++) = hgethi(stats->invocations);
6768 *(ptr++) = hgetlo(stats->invocations);
6769 *(ptr++) = hgethi(stats->bytes_sent);
6770 *(ptr++) = hgetlo(stats->bytes_sent);
6771 *(ptr++) = hgethi(stats->bytes_rcvd);
6772 *(ptr++) = hgetlo(stats->bytes_rcvd);
6773 *(ptr++) = stats->queue_time_sum.sec;
6774 *(ptr++) = stats->queue_time_sum.usec;
6775 *(ptr++) = stats->queue_time_sum_sqr.sec;
6776 *(ptr++) = stats->queue_time_sum_sqr.usec;
6777 *(ptr++) = stats->queue_time_min.sec;
6778 *(ptr++) = stats->queue_time_min.usec;
6779 *(ptr++) = stats->queue_time_max.sec;
6780 *(ptr++) = stats->queue_time_max.usec;
6781 *(ptr++) = stats->execution_time_sum.sec;
6782 *(ptr++) = stats->execution_time_sum.usec;
6783 *(ptr++) = stats->execution_time_sum_sqr.sec;
6784 *(ptr++) = stats->execution_time_sum_sqr.usec;
6785 *(ptr++) = stats->execution_time_min.sec;
6786 *(ptr++) = stats->execution_time_min.usec;
6787 *(ptr++) = stats->execution_time_max.sec;
6788 *(ptr++) = stats->execution_time_max.usec;
6794 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6799 * IN callerVersion - the rpc stat version of the caller
6801 * OUT myVersion - the rpc stat version of this function
6803 * OUT clock_sec - local time seconds
6805 * OUT clock_usec - local time microseconds
6807 * OUT allocSize - the number of bytes allocated to contain stats
6809 * OUT statCount - the number stats retrieved from this process.
6811 * OUT stats - the actual stats retrieved from this process.
6815 * Returns void. If successful, stats will != NULL.
6818 int rx_RetrieveProcessRPCStats(afs_uint32 callerVersion,
6819 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6820 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6830 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6833 * Check to see if stats are enabled
6836 MUTEX_ENTER(&rx_rpc_stats);
6837 if (!rxi_monitor_processStats) {
6838 MUTEX_EXIT(&rx_rpc_stats);
6842 clock_GetTime(&now);
6843 *clock_sec = now.sec;
6844 *clock_usec = now.usec;
6847 * Allocate the space based upon the caller version
6849 * If the client is at an older version than we are,
6850 * we return the statistic data in the older data format, but
6851 * we still return our version number so the client knows we
6852 * are maintaining more data than it can retrieve.
6855 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6856 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6857 *statCount = rxi_rpc_process_stat_cnt;
6860 * This can't happen yet, but in the future version changes
6861 * can be handled by adding additional code here
6865 if (space > (size_t) 0) {
6867 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6870 rx_interface_stat_p rpc_stat, nrpc_stat;
6873 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6874 rx_interface_stat)) {
6876 * Copy the data based upon the caller version
6878 rx_MarshallProcessRPCStats(callerVersion,
6879 rpc_stat->stats[0].func_total,
6880 rpc_stat->stats, &ptr);
6886 MUTEX_EXIT(&rx_rpc_stats);
6891 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6895 * IN callerVersion - the rpc stat version of the caller
6897 * OUT myVersion - the rpc stat version of this function
6899 * OUT clock_sec - local time seconds
6901 * OUT clock_usec - local time microseconds
6903 * OUT allocSize - the number of bytes allocated to contain stats
6905 * OUT statCount - the number of stats retrieved from the individual
6908 * OUT stats - the actual stats retrieved from the individual peer structures.
6912 * Returns void. If successful, stats will != NULL.
6915 int rx_RetrievePeerRPCStats(afs_uint32 callerVersion,
6916 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6917 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6927 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6930 * Check to see if stats are enabled
6933 MUTEX_ENTER(&rx_rpc_stats);
6934 if (!rxi_monitor_peerStats) {
6935 MUTEX_EXIT(&rx_rpc_stats);
6939 clock_GetTime(&now);
6940 *clock_sec = now.sec;
6941 *clock_usec = now.usec;
6944 * Allocate the space based upon the caller version
6946 * If the client is at an older version than we are,
6947 * we return the statistic data in the older data format, but
6948 * we still return our version number so the client knows we
6949 * are maintaining more data than it can retrieve.
6952 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6953 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6954 *statCount = rxi_rpc_peer_stat_cnt;
6957 * This can't happen yet, but in the future version changes
6958 * can be handled by adding additional code here
6962 if (space > (size_t) 0) {
6964 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6967 rx_interface_stat_p rpc_stat, nrpc_stat;
6970 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
6971 rx_interface_stat)) {
6973 * We have to fix the offset of rpc_stat since we are
6974 * keeping this structure on two rx_queues. The rx_queue
6975 * package assumes that the rx_queue member is the first
6976 * member of the structure. That is, rx_queue assumes that
6977 * any one item is only on one queue at a time. We are
6978 * breaking that assumption and so we have to do a little
6979 * math to fix our pointers.
6982 fix_offset = (char *) rpc_stat;
6983 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
6984 rpc_stat = (rx_interface_stat_p) fix_offset;
6987 * Copy the data based upon the caller version
6989 rx_MarshallProcessRPCStats(callerVersion,
6990 rpc_stat->stats[0].func_total,
6991 rpc_stat->stats, &ptr);
6997 MUTEX_EXIT(&rx_rpc_stats);
7002 * rx_FreeRPCStats - free memory allocated by
7003 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7007 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7008 * rx_RetrievePeerRPCStats
7010 * IN allocSize - the number of bytes in stats.
7017 void rx_FreeRPCStats(afs_uint32 *stats, size_t allocSize)
7019 rxi_Free(stats, allocSize);
7023 * rx_queryProcessRPCStats - see if process rpc stat collection is
7024 * currently enabled.
7030 * Returns 0 if stats are not enabled != 0 otherwise
7033 int rx_queryProcessRPCStats(void)
7036 MUTEX_ENTER(&rx_rpc_stats);
7037 rc = rxi_monitor_processStats;
7038 MUTEX_EXIT(&rx_rpc_stats);
7043 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7049 * Returns 0 if stats are not enabled != 0 otherwise
7052 int rx_queryPeerRPCStats(void)
7055 MUTEX_ENTER(&rx_rpc_stats);
7056 rc = rxi_monitor_peerStats;
7057 MUTEX_EXIT(&rx_rpc_stats);
7062 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7071 void rx_enableProcessRPCStats(void)
7073 MUTEX_ENTER(&rx_rpc_stats);
7074 rx_enable_stats = 1;
7075 rxi_monitor_processStats = 1;
7076 MUTEX_EXIT(&rx_rpc_stats);
7080 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7089 void rx_enablePeerRPCStats(void)
7091 MUTEX_ENTER(&rx_rpc_stats);
7092 rx_enable_stats = 1;
7093 rxi_monitor_peerStats = 1;
7094 MUTEX_EXIT(&rx_rpc_stats);
7098 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7107 void rx_disableProcessRPCStats(void)
7109 rx_interface_stat_p rpc_stat, nrpc_stat;
7112 MUTEX_ENTER(&rx_rpc_stats);
7115 * Turn off process statistics and if peer stats is also off, turn
7119 rxi_monitor_processStats = 0;
7120 if (rxi_monitor_peerStats == 0) {
7121 rx_enable_stats = 0;
7124 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7125 unsigned int num_funcs = 0;
7126 if (!rpc_stat) break;
7127 queue_Remove(rpc_stat);
7128 num_funcs = rpc_stat->stats[0].func_total;
7129 space = sizeof(rx_interface_stat_t) +
7130 rpc_stat->stats[0].func_total *
7131 sizeof(rx_function_entry_v1_t);
7133 rxi_Free(rpc_stat, space);
7134 rxi_rpc_process_stat_cnt -= num_funcs;
7136 MUTEX_EXIT(&rx_rpc_stats);
7140 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7149 void rx_disablePeerRPCStats(void)
7151 struct rx_peer **peer_ptr, **peer_end;
7154 MUTEX_ENTER(&rx_rpc_stats);
7157 * Turn off peer statistics and if process stats is also off, turn
7161 rxi_monitor_peerStats = 0;
7162 if (rxi_monitor_processStats == 0) {
7163 rx_enable_stats = 0;
7166 MUTEX_ENTER(&rx_peerHashTable_lock);
7167 for (peer_ptr = &rx_peerHashTable[0],
7168 peer_end = &rx_peerHashTable[rx_hashTableSize];
7169 peer_ptr < peer_end; peer_ptr++) {
7170 struct rx_peer *peer, *next, *prev;
7171 for (prev = peer = *peer_ptr; peer; peer = next) {
7173 code = MUTEX_TRYENTER(&peer->peer_lock);
7175 rx_interface_stat_p rpc_stat, nrpc_stat;
7177 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7178 rx_interface_stat)) {
7179 unsigned int num_funcs = 0;
7180 if (!rpc_stat) break;
7181 queue_Remove(&rpc_stat->queue_header);
7182 queue_Remove(&rpc_stat->all_peers);
7183 num_funcs = rpc_stat->stats[0].func_total;
7184 space = sizeof(rx_interface_stat_t) +
7185 rpc_stat->stats[0].func_total *
7186 sizeof(rx_function_entry_v1_t);
7188 rxi_Free(rpc_stat, space);
7189 rxi_rpc_peer_stat_cnt -= num_funcs;
7191 MUTEX_EXIT(&peer->peer_lock);
7192 if (prev == *peer_ptr) {
7204 MUTEX_EXIT(&rx_peerHashTable_lock);
7205 MUTEX_EXIT(&rx_rpc_stats);
7209 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7214 * IN clearFlag - flag indicating which stats to clear
7221 void rx_clearProcessRPCStats(afs_uint32 clearFlag)
7223 rx_interface_stat_p rpc_stat, nrpc_stat;
7225 MUTEX_ENTER(&rx_rpc_stats);
7227 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7228 unsigned int num_funcs = 0, i;
7229 num_funcs = rpc_stat->stats[0].func_total;
7230 for(i=0;i<num_funcs;i++) {
7231 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7232 hzero(rpc_stat->stats[i].invocations);
7234 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7235 hzero(rpc_stat->stats[i].bytes_sent);
7237 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7238 hzero(rpc_stat->stats[i].bytes_rcvd);
7240 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7241 rpc_stat->stats[i].queue_time_sum.sec = 0;
7242 rpc_stat->stats[i].queue_time_sum.usec = 0;
7244 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7245 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7246 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7248 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7249 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7250 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7252 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7253 rpc_stat->stats[i].queue_time_max.sec = 0;
7254 rpc_stat->stats[i].queue_time_max.usec = 0;
7256 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7257 rpc_stat->stats[i].execution_time_sum.sec = 0;
7258 rpc_stat->stats[i].execution_time_sum.usec = 0;
7260 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7261 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7262 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7264 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7265 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7266 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7268 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7269 rpc_stat->stats[i].execution_time_max.sec = 0;
7270 rpc_stat->stats[i].execution_time_max.usec = 0;
7275 MUTEX_EXIT(&rx_rpc_stats);
7279 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7284 * IN clearFlag - flag indicating which stats to clear
7291 void rx_clearPeerRPCStats(afs_uint32 clearFlag)
7293 rx_interface_stat_p rpc_stat, nrpc_stat;
7295 MUTEX_ENTER(&rx_rpc_stats);
7297 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7298 unsigned int num_funcs = 0, i;
7301 * We have to fix the offset of rpc_stat since we are
7302 * keeping this structure on two rx_queues. The rx_queue
7303 * package assumes that the rx_queue member is the first
7304 * member of the structure. That is, rx_queue assumes that
7305 * any one item is only on one queue at a time. We are
7306 * breaking that assumption and so we have to do a little
7307 * math to fix our pointers.
7310 fix_offset = (char *) rpc_stat;
7311 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7312 rpc_stat = (rx_interface_stat_p) fix_offset;
7314 num_funcs = rpc_stat->stats[0].func_total;
7315 for(i=0;i<num_funcs;i++) {
7316 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7317 hzero(rpc_stat->stats[i].invocations);
7319 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7320 hzero(rpc_stat->stats[i].bytes_sent);
7322 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7323 hzero(rpc_stat->stats[i].bytes_rcvd);
7325 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7326 rpc_stat->stats[i].queue_time_sum.sec = 0;
7327 rpc_stat->stats[i].queue_time_sum.usec = 0;
7329 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7330 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7331 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7333 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7334 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7335 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7337 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7338 rpc_stat->stats[i].queue_time_max.sec = 0;
7339 rpc_stat->stats[i].queue_time_max.usec = 0;
7341 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7342 rpc_stat->stats[i].execution_time_sum.sec = 0;
7343 rpc_stat->stats[i].execution_time_sum.usec = 0;
7345 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7346 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7347 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7349 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7350 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7351 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7353 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7354 rpc_stat->stats[i].execution_time_max.sec = 0;
7355 rpc_stat->stats[i].execution_time_max.usec = 0;
7360 MUTEX_EXIT(&rx_rpc_stats);
7364 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7365 * is authorized to enable/disable/clear RX statistics.
7367 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7369 void rx_SetRxStatUserOk(int (*proc)(struct rx_call *call))
7371 rxi_rxstat_userok = proc;
7374 int rx_RxStatUserOk(struct rx_call *call)
7376 if (!rxi_rxstat_userok)
7378 return rxi_rxstat_userok(call);