2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "afs/param.h"
16 #include <afs/param.h>
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
64 #include "rx_globals.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
116 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
118 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119 afs_int32 rxi_start_in_error;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
124 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125 * currently allocated within rx. This number is used to allocate the
126 * memory required to return the statistics when queried.
129 static unsigned int rxi_rpc_peer_stat_cnt;
132 * rxi_rpc_process_stat_cnt counts the total number of local process stat
133 * structures currently allocated within rx. The number is used to allocate
134 * the memory required to return the statistics when queried.
137 static unsigned int rxi_rpc_process_stat_cnt;
139 #if !defined(offsetof)
140 #include <stddef.h> /* for definition of offsetof() */
143 #ifdef AFS_PTHREAD_ENV
147 * Use procedural initialization of mutexes/condition variables
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
172 static void rxi_InitPthread(void) {
173 assert(pthread_mutex_init(&rx_clock_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxi_connCacheMutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_init_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&epoch_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_event_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&des_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&des_random_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&osi_malloc_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&event_handler_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&listener_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_if_init_mutex,
194 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_mutex_init(&rx_if_mutex,
196 (const pthread_mutexattr_t*)0)==0);
197 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198 (const pthread_mutexattr_t*)0)==0);
199 assert(pthread_mutex_init(&rxkad_random_mutex,
200 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_mutex_init(&rxkad_stats_mutex,
202 (const pthread_mutexattr_t*)0)==0);
203 assert(pthread_mutex_init(&rx_debug_mutex,
204 (const pthread_mutexattr_t*)0)==0);
206 assert(pthread_cond_init(&rx_event_handler_cond,
207 (const pthread_condattr_t*)0)==0);
208 assert(pthread_cond_init(&rx_listener_cond,
209 (const pthread_condattr_t*)0)==0);
210 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
217 * The rx_stats_mutex mutex protects the following global variables:
222 * rxi_lowConnRefCount
223 * rxi_lowPeerRefCount
232 #define INIT_PTHREAD_LOCKS
236 /* Variables for handling the minProcs implementation. availProcs gives the
237 * number of threads available in the pool at this moment (not counting dudes
238 * executing right now). totalMin gives the total number of procs required
239 * for handling all minProcs requests. minDeficit is a dynamic variable
240 * tracking the # of procs required to satisfy all of the remaining minProcs
242 * For fine grain locking to work, the quota check and the reservation of
243 * a server thread has to come while rxi_availProcs and rxi_minDeficit
244 * are locked. To this end, the code has been modified under #ifdef
245 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246 * same time. A new function, ReturnToServerPool() returns the allocation.
248 * A call can be on several queue's (but only one at a time). When
249 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250 * that no one else is touching the queue. To this end, we store the address
251 * of the queue lock in the call structure (under the call lock) when we
252 * put the call on a queue, and we clear the call_queue_lock when the
253 * call is removed from a queue (once the call lock has been obtained).
254 * This allows rxi_ResetCall to safely synchronize with others wishing
255 * to manipulate the queue.
258 #ifdef RX_ENABLE_LOCKS
259 static int rxi_ServerThreadSelectingCall;
260 static afs_kmutex_t rx_rpc_stats;
261 void rxi_StartUnlocked();
264 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
265 ** pretty good that the next packet coming in is from the same connection
266 ** as the last packet, since we're send multiple packets in a transmit window.
268 struct rx_connection *rxLastConn = 0;
270 #ifdef RX_ENABLE_LOCKS
271 /* The locking hierarchy for rx fine grain locking is composed of these
274 * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
275 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
276 * call->lock - locks call data fields.
277 * These are independent of each other:
278 * rx_freeCallQueue_lock
283 * serverQueueEntry->lock
285 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
286 * peer->lock - locks peer data fields.
287 * conn_data_lock - that more than one thread is not updating a conn data
288 * field at the same time.
296 * Do we need a lock to protect the peer field in the conn structure?
297 * conn->peer was previously a constant for all intents and so has no
298 * lock protecting this field. The multihomed client delta introduced
299 * a RX code change : change the peer field in the connection structure
300 * to that remote inetrface from which the last packet for this
301 * connection was sent out. This may become an issue if further changes
304 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
305 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
307 /* rxdb_fileID is used to identify the lock location, along with line#. */
308 static int rxdb_fileID = RXDB_FILE_RX;
309 #endif /* RX_LOCKS_DB */
310 #else /* RX_ENABLE_LOCKS */
311 #define SET_CALL_QUEUE_LOCK(C, L)
312 #define CLEAR_CALL_QUEUE_LOCK(C)
313 #endif /* RX_ENABLE_LOCKS */
314 struct rx_serverQueueEntry *rx_waitForPacket = 0;
316 /* ------------Exported Interfaces------------- */
318 /* This function allows rxkad to set the epoch to a suitably random number
319 * which rx_NewConnection will use in the future. The principle purpose is to
320 * get rxnull connections to use the same epoch as the rxkad connections do, at
321 * least once the first rxkad connection is established. This is important now
322 * that the host/port addresses aren't used in FindConnection: the uniqueness
323 * of epoch/cid matters and the start time won't do. */
325 #ifdef AFS_PTHREAD_ENV
327 * This mutex protects the following global variables:
331 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
332 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
336 #endif /* AFS_PTHREAD_ENV */
338 void rx_SetEpoch (afs_uint32 epoch)
345 /* Initialize rx. A port number may be mentioned, in which case this
346 * becomes the default port number for any service installed later.
347 * If 0 is provided for the port number, a random port will be chosen
348 * by the kernel. Whether this will ever overlap anything in
349 * /etc/services is anybody's guess... Returns 0 on success, -1 on
351 static int rxinit_status = 1;
352 #ifdef AFS_PTHREAD_ENV
354 * This mutex protects the following global variables:
358 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
359 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
362 #define UNLOCK_RX_INIT
365 int rx_Init(u_int port)
372 char *htable, *ptable;
375 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
376 __djgpp_set_quiet_socket(1);
383 if (rxinit_status == 0) {
384 tmp_status = rxinit_status;
386 return tmp_status; /* Already started; return previous error code. */
390 if (afs_winsockInit()<0)
396 * Initialize anything necessary to provide a non-premptive threading
399 rxi_InitializeThreadSupport();
402 /* Allocate and initialize a socket for client and perhaps server
405 rx_socket = rxi_GetUDPSocket((u_short)port);
406 if (rx_socket == OSI_NULLSOCKET) {
412 #ifdef RX_ENABLE_LOCKS
415 #endif /* RX_LOCKS_DB */
416 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
417 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
418 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
419 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
420 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
422 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
423 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
424 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
425 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
427 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
429 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
430 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
432 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
433 #endif /* KERNEL && AFS_HPUX110_ENV */
434 #else /* RX_ENABLE_LOCKS */
435 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
436 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
437 #endif /* AFS_GLOBAL_SUNLOCK */
438 #endif /* RX_ENABLE_LOCKS */
441 rx_connDeadTime = 12;
442 rx_tranquil = 0; /* reset flag */
443 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
445 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
446 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
447 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
448 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
449 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
450 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
452 /* Malloc up a bunch of packets & buffers */
454 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
455 queue_Init(&rx_freePacketQueue);
456 rxi_NeedMorePackets = FALSE;
457 rxi_MorePackets(rx_nPackets);
465 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
466 tv.tv_sec = clock_now.sec;
467 tv.tv_usec = clock_now.usec;
468 srand((unsigned int) tv.tv_usec);
475 #if defined(KERNEL) && !defined(UKERNEL)
476 /* Really, this should never happen in a real kernel */
479 struct sockaddr_in addr;
480 int addrlen = sizeof(addr);
481 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
485 rx_port = addr.sin_port;
488 rx_stats.minRtt.sec = 9999999;
490 rx_SetEpoch (tv.tv_sec | 0x80000000);
492 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
493 * will provide a randomer value. */
495 MUTEX_ENTER(&rx_stats_mutex);
496 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
497 MUTEX_EXIT(&rx_stats_mutex);
498 /* *Slightly* random start time for the cid. This is just to help
499 * out with the hashing function at the peer */
500 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
501 rx_connHashTable = (struct rx_connection **) htable;
502 rx_peerHashTable = (struct rx_peer **) ptable;
504 rx_lastAckDelay.sec = 0;
505 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
506 rx_hardAckDelay.sec = 0;
507 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
508 rx_softAckDelay.sec = 0;
509 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
511 rxevent_Init(20, rxi_ReScheduleEvents);
513 /* Initialize various global queues */
514 queue_Init(&rx_idleServerQueue);
515 queue_Init(&rx_incomingCallQueue);
516 queue_Init(&rx_freeCallQueue);
518 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
519 /* Initialize our list of usable IP addresses. */
523 /* Start listener process (exact function is dependent on the
524 * implementation environment--kernel or user space) */
529 tmp_status = rxinit_status = 0;
534 /* called with unincremented nRequestsRunning to see if it is OK to start
535 * a new thread in this service. Could be "no" for two reasons: over the
536 * max quota, or would prevent others from reaching their min quota.
538 #ifdef RX_ENABLE_LOCKS
539 /* This verion of QuotaOK reserves quota if it's ok while the
540 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
542 static int QuotaOK(register struct rx_service *aservice)
544 /* check if over max quota */
545 if (aservice->nRequestsRunning >= aservice->maxProcs) {
549 /* under min quota, we're OK */
550 /* otherwise, can use only if there are enough to allow everyone
551 * to go to their min quota after this guy starts.
553 MUTEX_ENTER(&rx_stats_mutex);
554 if ((aservice->nRequestsRunning < aservice->minProcs) ||
555 (rxi_availProcs > rxi_minDeficit)) {
556 aservice->nRequestsRunning++;
557 /* just started call in minProcs pool, need fewer to maintain
559 if (aservice->nRequestsRunning <= aservice->minProcs)
562 MUTEX_EXIT(&rx_stats_mutex);
565 MUTEX_EXIT(&rx_stats_mutex);
570 static void ReturnToServerPool(register struct rx_service *aservice)
572 aservice->nRequestsRunning--;
573 MUTEX_ENTER(&rx_stats_mutex);
574 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
576 MUTEX_EXIT(&rx_stats_mutex);
579 #else /* RX_ENABLE_LOCKS */
580 static int QuotaOK(register struct rx_service *aservice)
583 /* under min quota, we're OK */
584 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
586 /* check if over max quota */
587 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
589 /* otherwise, can use only if there are enough to allow everyone
590 * to go to their min quota after this guy starts.
592 if (rxi_availProcs > rxi_minDeficit) rc = 1;
595 #endif /* RX_ENABLE_LOCKS */
598 /* Called by rx_StartServer to start up lwp's to service calls.
599 NExistingProcs gives the number of procs already existing, and which
600 therefore needn't be created. */
601 void rxi_StartServerProcs(int nExistingProcs)
603 register struct rx_service *service;
608 /* For each service, reserve N processes, where N is the "minimum"
609 number of processes that MUST be able to execute a request in parallel,
610 at any time, for that process. Also compute the maximum difference
611 between any service's maximum number of processes that can run
612 (i.e. the maximum number that ever will be run, and a guarantee
613 that this number will run if other services aren't running), and its
614 minimum number. The result is the extra number of processes that
615 we need in order to provide the latter guarantee */
616 for (i=0; i<RX_MAX_SERVICES; i++) {
618 service = rx_services[i];
619 if (service == (struct rx_service *) 0) break;
620 nProcs += service->minProcs;
621 diff = service->maxProcs - service->minProcs;
622 if (diff > maxdiff) maxdiff = diff;
624 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
625 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
626 for (i = 0; i<nProcs; i++) {
627 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
632 /* This routine must be called if any services are exported. If the
633 * donateMe flag is set, the calling process is donated to the server
635 void rx_StartServer(int donateMe)
637 register struct rx_service *service;
638 register int i, nProcs=0;
644 /* Start server processes, if necessary (exact function is dependent
645 * on the implementation environment--kernel or user space). DonateMe
646 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
647 * case, one less new proc will be created rx_StartServerProcs.
649 rxi_StartServerProcs(donateMe);
651 /* count up the # of threads in minProcs, and add set the min deficit to
652 * be that value, too.
654 for (i=0; i<RX_MAX_SERVICES; i++) {
655 service = rx_services[i];
656 if (service == (struct rx_service *) 0) break;
657 MUTEX_ENTER(&rx_stats_mutex);
658 rxi_totalMin += service->minProcs;
659 /* below works even if a thread is running, since minDeficit would
660 * still have been decremented and later re-incremented.
662 rxi_minDeficit += service->minProcs;
663 MUTEX_EXIT(&rx_stats_mutex);
666 /* Turn on reaping of idle server connections */
667 rxi_ReapConnections();
676 #ifdef AFS_PTHREAD_ENV
678 pid = (pid_t) pthread_self();
679 #else /* AFS_PTHREAD_ENV */
681 LWP_CurrentProcess(&pid);
682 #endif /* AFS_PTHREAD_ENV */
684 sprintf(name,"srv_%d", ++nProcs);
686 (*registerProgram)(pid, name);
688 #endif /* AFS_NT40_ENV */
689 rx_ServerProc(); /* Never returns */
694 /* Create a new client connection to the specified service, using the
695 * specified security object to implement the security model for this
697 struct rx_connection *rx_NewConnection(register afs_uint32 shost,
698 u_short sport, u_short sservice,
699 register struct rx_securityClass *securityObject, int serviceSecurityIndex)
703 register struct rx_connection *conn;
708 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
709 shost, sport, sservice, securityObject, serviceSecurityIndex));
711 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
712 * the case of kmem_alloc? */
713 conn = rxi_AllocConnection();
714 #ifdef RX_ENABLE_LOCKS
715 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
716 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
717 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
721 MUTEX_ENTER(&rx_connHashTable_lock);
722 cid = (rx_nextCid += RX_MAXCALLS);
723 conn->type = RX_CLIENT_CONNECTION;
725 conn->epoch = rx_epoch;
726 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
727 conn->serviceId = sservice;
728 conn->securityObject = securityObject;
729 /* This doesn't work in all compilers with void (they're buggy), so fake it
731 conn->securityData = (VOID *) 0;
732 conn->securityIndex = serviceSecurityIndex;
733 rx_SetConnDeadTime(conn, rx_connDeadTime);
734 conn->ackRate = RX_FAST_ACK_RATE;
736 conn->specific = NULL;
737 conn->challengeEvent = NULL;
738 conn->delayedAbortEvent = NULL;
739 conn->abortCount = 0;
742 RXS_NewConnection(securityObject, conn);
743 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
745 conn->refCount++; /* no lock required since only this thread knows... */
746 conn->next = rx_connHashTable[hashindex];
747 rx_connHashTable[hashindex] = conn;
748 MUTEX_ENTER(&rx_stats_mutex);
749 rx_stats.nClientConns++;
750 MUTEX_EXIT(&rx_stats_mutex);
752 MUTEX_EXIT(&rx_connHashTable_lock);
758 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
760 /* The idea is to set the dead time to a value that allows several
761 * keepalives to be dropped without timing out the connection. */
762 conn->secondsUntilDead = MAX(seconds, 6);
763 conn->secondsUntilPing = conn->secondsUntilDead/6;
766 int rxi_lowPeerRefCount = 0;
767 int rxi_lowConnRefCount = 0;
770 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
771 * NOTE: must not be called with rx_connHashTable_lock held.
773 void rxi_CleanupConnection(struct rx_connection *conn)
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(register struct rx_connection *conn)
831 MUTEX_ENTER(&rx_connHashTable_lock);
832 rxi_DestroyConnectionNoLock(conn);
833 /* conn should be at the head of the cleanup list */
834 if (conn == rx_connCleanup_list) {
835 rx_connCleanup_list = rx_connCleanup_list->next;
836 MUTEX_EXIT(&rx_connHashTable_lock);
837 rxi_CleanupConnection(conn);
839 #ifdef RX_ENABLE_LOCKS
841 MUTEX_EXIT(&rx_connHashTable_lock);
843 #endif /* RX_ENABLE_LOCKS */
846 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
848 register struct rx_connection **conn_ptr;
849 register int havecalls = 0;
850 struct rx_packet *packet;
857 MUTEX_ENTER(&conn->conn_data_lock);
858 if (conn->refCount > 0)
861 MUTEX_ENTER(&rx_stats_mutex);
862 rxi_lowConnRefCount++;
863 MUTEX_EXIT(&rx_stats_mutex);
866 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
867 /* Busy; wait till the last guy before proceeding */
868 MUTEX_EXIT(&conn->conn_data_lock);
873 /* If the client previously called rx_NewCall, but it is still
874 * waiting, treat this as a running call, and wait to destroy the
875 * connection later when the call completes. */
876 if ((conn->type == RX_CLIENT_CONNECTION) &&
877 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
878 conn->flags |= RX_CONN_DESTROY_ME;
879 MUTEX_EXIT(&conn->conn_data_lock);
883 MUTEX_EXIT(&conn->conn_data_lock);
885 /* Check for extant references to this connection */
886 for (i = 0; i<RX_MAXCALLS; i++) {
887 register struct rx_call *call = conn->call[i];
890 if (conn->type == RX_CLIENT_CONNECTION) {
891 MUTEX_ENTER(&call->lock);
892 if (call->delayedAckEvent) {
893 /* Push the final acknowledgment out now--there
894 * won't be a subsequent call to acknowledge the
895 * last reply packets */
896 rxevent_Cancel(call->delayedAckEvent, call,
897 RX_CALL_REFCOUNT_DELAY);
898 if (call->state == RX_STATE_PRECALL ||
899 call->state == RX_STATE_ACTIVE) {
900 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
902 rxi_AckAll(NULL, call, 0);
905 MUTEX_EXIT(&call->lock);
909 #ifdef RX_ENABLE_LOCKS
911 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
912 MUTEX_EXIT(&conn->conn_data_lock);
915 /* Someone is accessing a packet right now. */
919 #endif /* RX_ENABLE_LOCKS */
922 /* Don't destroy the connection if there are any call
923 * structures still in use */
924 MUTEX_ENTER(&conn->conn_data_lock);
925 conn->flags |= RX_CONN_DESTROY_ME;
926 MUTEX_EXIT(&conn->conn_data_lock);
931 if (conn->delayedAbortEvent) {
932 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
933 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
935 MUTEX_ENTER(&conn->conn_data_lock);
936 rxi_SendConnectionAbort(conn, packet, 0, 1);
937 MUTEX_EXIT(&conn->conn_data_lock);
938 rxi_FreePacket(packet);
942 /* Remove from connection hash table before proceeding */
943 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
944 conn->epoch, conn->type) ];
945 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
946 if (*conn_ptr == conn) {
947 *conn_ptr = conn->next;
951 /* if the conn that we are destroying was the last connection, then we
952 * clear rxLastConn as well */
953 if ( rxLastConn == conn )
956 /* Make sure the connection is completely reset before deleting it. */
957 /* get rid of pending events that could zap us later */
958 if (conn->challengeEvent)
959 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
960 if (conn->checkReachEvent)
961 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
963 /* Add the connection to the list of destroyed connections that
964 * need to be cleaned up. This is necessary to avoid deadlocks
965 * in the routines we call to inform others that this connection is
966 * being destroyed. */
967 conn->next = rx_connCleanup_list;
968 rx_connCleanup_list = conn;
971 /* Externally available version */
972 void rx_DestroyConnection(register struct rx_connection *conn)
978 rxi_DestroyConnection (conn);
983 /* Start a new rx remote procedure call, on the specified connection.
984 * If wait is set to 1, wait for a free call channel; otherwise return
985 * 0. Maxtime gives the maximum number of seconds this call may take,
986 * after rx_MakeCall returns. After this time interval, a call to any
987 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
988 * For fine grain locking, we hold the conn_call_lock in order to
989 * to ensure that we don't get signalle after we found a call in an active
990 * state and before we go to sleep.
992 struct rx_call *rx_NewCall(register struct rx_connection *conn)
995 register struct rx_call *call;
996 struct clock queueTime;
1000 dpf (("rx_MakeCall(conn %x)\n", conn));
1003 clock_GetTime(&queueTime);
1005 MUTEX_ENTER(&conn->conn_call_lock);
1008 * Check if there are others waiting for a new call.
1009 * If so, let them go first to avoid starving them.
1010 * This is a fairly simple scheme, and might not be
1011 * a complete solution for large numbers of waiters.
1013 if (conn->makeCallWaiters) {
1014 #ifdef RX_ENABLE_LOCKS
1015 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1022 for (i=0; i<RX_MAXCALLS; i++) {
1023 call = conn->call[i];
1025 MUTEX_ENTER(&call->lock);
1026 if (call->state == RX_STATE_DALLY) {
1027 rxi_ResetCall(call, 0);
1028 (*call->callNumber)++;
1031 MUTEX_EXIT(&call->lock);
1034 call = rxi_NewCall(conn, i);
1038 if (i < RX_MAXCALLS) {
1041 MUTEX_ENTER(&conn->conn_data_lock);
1042 conn->flags |= RX_CONN_MAKECALL_WAITING;
1043 MUTEX_EXIT(&conn->conn_data_lock);
1045 conn->makeCallWaiters++;
1046 #ifdef RX_ENABLE_LOCKS
1047 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1051 conn->makeCallWaiters--;
1054 * Wake up anyone else who might be giving us a chance to
1055 * run (see code above that avoids resource starvation).
1057 #ifdef RX_ENABLE_LOCKS
1058 CV_BROADCAST(&conn->conn_call_cv);
1063 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1065 /* Client is initially in send mode */
1066 call->state = RX_STATE_ACTIVE;
1067 call->mode = RX_MODE_SENDING;
1069 /* remember start time for call in case we have hard dead time limit */
1070 call->queueTime = queueTime;
1071 clock_GetTime(&call->startTime);
1072 hzero(call->bytesSent);
1073 hzero(call->bytesRcvd);
1075 /* Turn on busy protocol. */
1076 rxi_KeepAliveOn(call);
1078 MUTEX_EXIT(&call->lock);
1079 MUTEX_EXIT(&conn->conn_call_lock);
1083 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1084 /* Now, if TQ wasn't cleared earlier, do it now. */
1086 MUTEX_ENTER(&call->lock);
1087 while (call->flags & RX_CALL_TQ_BUSY) {
1088 call->flags |= RX_CALL_TQ_WAIT;
1089 #ifdef RX_ENABLE_LOCKS
1090 CV_WAIT(&call->cv_tq, &call->lock);
1091 #else /* RX_ENABLE_LOCKS */
1092 osi_rxSleep(&call->tq);
1093 #endif /* RX_ENABLE_LOCKS */
1095 if (call->flags & RX_CALL_TQ_CLEARME) {
1096 rxi_ClearTransmitQueue(call, 0);
1097 queue_Init(&call->tq);
1099 MUTEX_EXIT(&call->lock);
1101 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1106 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1109 register struct rx_call *tcall;
1113 for(i=0; i<RX_MAXCALLS; i++) {
1114 if ((tcall = aconn->call[i])) {
1115 if ((tcall->state == RX_STATE_ACTIVE)
1116 || (tcall->state == RX_STATE_PRECALL)) {
1126 int rxi_GetCallNumberVector(register struct rx_connection *aconn,
1127 register afs_int32 *aint32s)
1130 register struct rx_call *tcall;
1134 for(i=0; i<RX_MAXCALLS; i++) {
1135 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1136 aint32s[i] = aconn->callNumber[i]+1;
1138 aint32s[i] = aconn->callNumber[i];
1144 int rxi_SetCallNumberVector(register struct rx_connection *aconn,
1145 register afs_int32 *aint32s)
1148 register struct rx_call *tcall;
1152 for(i=0; i<RX_MAXCALLS; i++) {
1153 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1154 aconn->callNumber[i] = aint32s[i] - 1;
1156 aconn->callNumber[i] = aint32s[i];
1162 /* Advertise a new service. A service is named locally by a UDP port
1163 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1166 char *serviceName; Name for identification purposes (e.g. the
1167 service name might be used for probing for
1169 struct rx_service *rx_NewService(u_short port, u_short serviceId,
1171 struct rx_securityClass **securityObjects,
1172 int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1174 osi_socket socket = OSI_NULLSOCKET;
1175 register struct rx_service *tservice;
1181 if (serviceId == 0) {
1182 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1188 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1195 tservice = rxi_AllocService();
1198 for (i = 0; i<RX_MAX_SERVICES; i++) {
1199 register struct rx_service *service = rx_services[i];
1201 if (port == service->servicePort) {
1202 if (service->serviceId == serviceId) {
1203 /* The identical service has already been
1204 * installed; if the caller was intending to
1205 * change the security classes used by this
1206 * service, he/she loses. */
1207 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1210 rxi_FreeService(tservice);
1213 /* Different service, same port: re-use the socket
1214 * which is bound to the same port */
1215 socket = service->socket;
1218 if (socket == OSI_NULLSOCKET) {
1219 /* If we don't already have a socket (from another
1220 * service on same port) get a new one */
1221 socket = rxi_GetUDPSocket(port);
1222 if (socket == OSI_NULLSOCKET) {
1225 rxi_FreeService(tservice);
1230 service->socket = socket;
1231 service->servicePort = port;
1232 service->serviceId = serviceId;
1233 service->serviceName = serviceName;
1234 service->nSecurityObjects = nSecurityObjects;
1235 service->securityObjects = securityObjects;
1236 service->minProcs = 0;
1237 service->maxProcs = 1;
1238 service->idleDeadTime = 60;
1239 service->connDeadTime = rx_connDeadTime;
1240 service->executeRequestProc = serviceProc;
1241 service->checkReach = 0;
1242 rx_services[i] = service; /* not visible until now */
1250 rxi_FreeService(tservice);
1251 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1255 /* Generic request processing loop. This routine should be called
1256 * by the implementation dependent rx_ServerProc. If socketp is
1257 * non-null, it will be set to the file descriptor that this thread
1258 * is now listening on. If socketp is null, this routine will never
1260 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1262 register struct rx_call *call;
1263 register afs_int32 code;
1264 register struct rx_service *tservice = NULL;
1271 call = rx_GetCall(threadID, tservice, socketp);
1272 if (socketp && *socketp != OSI_NULLSOCKET) {
1273 /* We are now a listener thread */
1278 /* if server is restarting( typically smooth shutdown) then do not
1279 * allow any new calls.
1282 if ( rx_tranquil && (call != NULL) ) {
1287 MUTEX_ENTER(&call->lock);
1289 rxi_CallError(call, RX_RESTARTING);
1290 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1292 MUTEX_EXIT(&call->lock);
1298 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1299 #ifdef RX_ENABLE_LOCKS
1301 #endif /* RX_ENABLE_LOCKS */
1302 afs_termState = AFSOP_STOP_AFS;
1303 afs_osi_Wakeup(&afs_termState);
1304 #ifdef RX_ENABLE_LOCKS
1306 #endif /* RX_ENABLE_LOCKS */
1311 tservice = call->conn->service;
1313 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1315 code = call->conn->service->executeRequestProc(call);
1317 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1319 rx_EndCall(call, code);
1320 MUTEX_ENTER(&rx_stats_mutex);
1322 MUTEX_EXIT(&rx_stats_mutex);
1327 void rx_WakeupServerProcs(void)
1329 struct rx_serverQueueEntry *np, *tqp;
1334 MUTEX_ENTER(&rx_serverPool_lock);
1336 #ifdef RX_ENABLE_LOCKS
1337 if (rx_waitForPacket)
1338 CV_BROADCAST(&rx_waitForPacket->cv);
1339 #else /* RX_ENABLE_LOCKS */
1340 if (rx_waitForPacket)
1341 osi_rxWakeup(rx_waitForPacket);
1342 #endif /* RX_ENABLE_LOCKS */
1343 MUTEX_ENTER(&freeSQEList_lock);
1344 for (np = rx_FreeSQEList; np; np = tqp) {
1345 tqp = *(struct rx_serverQueueEntry **)np;
1346 #ifdef RX_ENABLE_LOCKS
1347 CV_BROADCAST(&np->cv);
1348 #else /* RX_ENABLE_LOCKS */
1350 #endif /* RX_ENABLE_LOCKS */
1352 MUTEX_EXIT(&freeSQEList_lock);
1353 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1354 #ifdef RX_ENABLE_LOCKS
1355 CV_BROADCAST(&np->cv);
1356 #else /* RX_ENABLE_LOCKS */
1358 #endif /* RX_ENABLE_LOCKS */
1360 MUTEX_EXIT(&rx_serverPool_lock);
1366 * One thing that seems to happen is that all the server threads get
1367 * tied up on some empty or slow call, and then a whole bunch of calls
1368 * arrive at once, using up the packet pool, so now there are more
1369 * empty calls. The most critical resources here are server threads
1370 * and the free packet pool. The "doreclaim" code seems to help in
1371 * general. I think that eventually we arrive in this state: there
1372 * are lots of pending calls which do have all their packets present,
1373 * so they won't be reclaimed, are multi-packet calls, so they won't
1374 * be scheduled until later, and thus are tying up most of the free
1375 * packet pool for a very long time.
1377 * 1. schedule multi-packet calls if all the packets are present.
1378 * Probably CPU-bound operation, useful to return packets to pool.
1379 * Do what if there is a full window, but the last packet isn't here?
1380 * 3. preserve one thread which *only* runs "best" calls, otherwise
1381 * it sleeps and waits for that type of call.
1382 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1383 * the current dataquota business is badly broken. The quota isn't adjusted
1384 * to reflect how many packets are presently queued for a running call.
1385 * So, when we schedule a queued call with a full window of packets queued
1386 * up for it, that *should* free up a window full of packets for other 2d-class
1387 * calls to be able to use from the packet pool. But it doesn't.
1389 * NB. Most of the time, this code doesn't run -- since idle server threads
1390 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1391 * as a new call arrives.
1393 /* Sleep until a call arrives. Returns a pointer to the call, ready
1394 * for an rx_Read. */
1395 #ifdef RX_ENABLE_LOCKS
1396 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1398 struct rx_serverQueueEntry *sq;
1399 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1400 struct rx_service *service = NULL;
1403 MUTEX_ENTER(&freeSQEList_lock);
1405 if ((sq = rx_FreeSQEList)) {
1406 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1407 MUTEX_EXIT(&freeSQEList_lock);
1408 } else { /* otherwise allocate a new one and return that */
1409 MUTEX_EXIT(&freeSQEList_lock);
1410 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1411 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1412 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1415 MUTEX_ENTER(&rx_serverPool_lock);
1416 if (cur_service != NULL) {
1417 ReturnToServerPool(cur_service);
1420 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1421 register struct rx_call *tcall, *ncall;
1422 choice2 = (struct rx_call *) 0;
1423 /* Scan for eligible incoming calls. A call is not eligible
1424 * if the maximum number of calls for its service type are
1425 * already executing */
1426 /* One thread will process calls FCFS (to prevent starvation),
1427 * while the other threads may run ahead looking for calls which
1428 * have all their input data available immediately. This helps
1429 * keep threads from blocking, waiting for data from the client. */
1430 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1431 service = tcall->conn->service;
1432 if (!QuotaOK(service)) {
1435 if (!tno || !tcall->queue_item_header.next ) {
1436 /* If we're thread 0, then we'll just use
1437 * this call. If we haven't been able to find an optimal
1438 * choice, and we're at the end of the list, then use a
1439 * 2d choice if one has been identified. Otherwise... */
1440 call = (choice2 ? choice2 : tcall);
1441 service = call->conn->service;
1442 } else if (!queue_IsEmpty(&tcall->rq)) {
1443 struct rx_packet *rp;
1444 rp = queue_First(&tcall->rq, rx_packet);
1445 if (rp->header.seq == 1) {
1446 if (!meltdown_1pkt ||
1447 (rp->header.flags & RX_LAST_PACKET)) {
1449 } else if (rxi_2dchoice && !choice2 &&
1450 !(tcall->flags & RX_CALL_CLEARED) &&
1451 (tcall->rprev > rxi_HardAckRate)) {
1453 } else rxi_md2cnt++;
1459 ReturnToServerPool(service);
1466 rxi_ServerThreadSelectingCall = 1;
1467 MUTEX_EXIT(&rx_serverPool_lock);
1468 MUTEX_ENTER(&call->lock);
1469 MUTEX_ENTER(&rx_serverPool_lock);
1471 if (queue_IsEmpty(&call->rq) ||
1472 queue_First(&call->rq, rx_packet)->header.seq != 1)
1473 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1475 CLEAR_CALL_QUEUE_LOCK(call);
1477 MUTEX_EXIT(&call->lock);
1478 ReturnToServerPool(service);
1479 rxi_ServerThreadSelectingCall = 0;
1480 CV_SIGNAL(&rx_serverPool_cv);
1481 call = (struct rx_call*)0;
1484 call->flags &= (~RX_CALL_WAIT_PROC);
1485 MUTEX_ENTER(&rx_stats_mutex);
1487 MUTEX_EXIT(&rx_stats_mutex);
1488 rxi_ServerThreadSelectingCall = 0;
1489 CV_SIGNAL(&rx_serverPool_cv);
1490 MUTEX_EXIT(&rx_serverPool_lock);
1494 /* If there are no eligible incoming calls, add this process
1495 * to the idle server queue, to wait for one */
1499 *socketp = OSI_NULLSOCKET;
1501 sq->socketp = socketp;
1502 queue_Append(&rx_idleServerQueue, sq);
1503 #ifndef AFS_AIX41_ENV
1504 rx_waitForPacket = sq;
1505 #endif /* AFS_AIX41_ENV */
1507 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1509 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1510 MUTEX_EXIT(&rx_serverPool_lock);
1511 return (struct rx_call *)0;
1514 } while (!(call = sq->newcall) &&
1515 !(socketp && *socketp != OSI_NULLSOCKET));
1516 MUTEX_EXIT(&rx_serverPool_lock);
1518 MUTEX_ENTER(&call->lock);
1524 MUTEX_ENTER(&freeSQEList_lock);
1525 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1526 rx_FreeSQEList = sq;
1527 MUTEX_EXIT(&freeSQEList_lock);
1530 clock_GetTime(&call->startTime);
1531 call->state = RX_STATE_ACTIVE;
1532 call->mode = RX_MODE_RECEIVING;
1534 rxi_calltrace(RX_CALL_START, call);
1535 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1536 call->conn->service->servicePort,
1537 call->conn->service->serviceId, call));
1539 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1540 MUTEX_EXIT(&call->lock);
1542 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1547 #else /* RX_ENABLE_LOCKS */
1548 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1550 struct rx_serverQueueEntry *sq;
1551 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1552 struct rx_service *service = NULL;
1557 MUTEX_ENTER(&freeSQEList_lock);
1559 if ((sq = rx_FreeSQEList)) {
1560 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1561 MUTEX_EXIT(&freeSQEList_lock);
1562 } else { /* otherwise allocate a new one and return that */
1563 MUTEX_EXIT(&freeSQEList_lock);
1564 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1565 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1566 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1568 MUTEX_ENTER(&sq->lock);
1570 if (cur_service != NULL) {
1571 cur_service->nRequestsRunning--;
1572 if (cur_service->nRequestsRunning < cur_service->minProcs)
1576 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1577 register struct rx_call *tcall, *ncall;
1578 /* Scan for eligible incoming calls. A call is not eligible
1579 * if the maximum number of calls for its service type are
1580 * already executing */
1581 /* One thread will process calls FCFS (to prevent starvation),
1582 * while the other threads may run ahead looking for calls which
1583 * have all their input data available immediately. This helps
1584 * keep threads from blocking, waiting for data from the client. */
1585 choice2 = (struct rx_call *) 0;
1586 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1587 service = tcall->conn->service;
1588 if (QuotaOK(service)) {
1589 if (!tno || !tcall->queue_item_header.next ) {
1590 /* If we're thread 0, then we'll just use
1591 * this call. If we haven't been able to find an optimal
1592 * choice, and we're at the end of the list, then use a
1593 * 2d choice if one has been identified. Otherwise... */
1594 call = (choice2 ? choice2 : tcall);
1595 service = call->conn->service;
1596 } else if (!queue_IsEmpty(&tcall->rq)) {
1597 struct rx_packet *rp;
1598 rp = queue_First(&tcall->rq, rx_packet);
1599 if (rp->header.seq == 1
1600 && (!meltdown_1pkt ||
1601 (rp->header.flags & RX_LAST_PACKET))) {
1603 } else if (rxi_2dchoice && !choice2 &&
1604 !(tcall->flags & RX_CALL_CLEARED) &&
1605 (tcall->rprev > rxi_HardAckRate)) {
1607 } else rxi_md2cnt++;
1617 /* we can't schedule a call if there's no data!!! */
1618 /* send an ack if there's no data, if we're missing the
1619 * first packet, or we're missing something between first
1620 * and last -- there's a "hole" in the incoming data. */
1621 if (queue_IsEmpty(&call->rq) ||
1622 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1623 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1624 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1626 call->flags &= (~RX_CALL_WAIT_PROC);
1627 service->nRequestsRunning++;
1628 /* just started call in minProcs pool, need fewer to maintain
1630 if (service->nRequestsRunning <= service->minProcs)
1634 /* MUTEX_EXIT(&call->lock); */
1637 /* If there are no eligible incoming calls, add this process
1638 * to the idle server queue, to wait for one */
1641 *socketp = OSI_NULLSOCKET;
1643 sq->socketp = socketp;
1644 queue_Append(&rx_idleServerQueue, sq);
1648 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1651 return (struct rx_call *)0;
1654 } while (!(call = sq->newcall) &&
1655 !(socketp && *socketp != OSI_NULLSOCKET));
1657 MUTEX_EXIT(&sq->lock);
1659 MUTEX_ENTER(&freeSQEList_lock);
1660 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1661 rx_FreeSQEList = sq;
1662 MUTEX_EXIT(&freeSQEList_lock);
1665 clock_GetTime(&call->startTime);
1666 call->state = RX_STATE_ACTIVE;
1667 call->mode = RX_MODE_RECEIVING;
1669 rxi_calltrace(RX_CALL_START, call);
1670 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1671 call->conn->service->servicePort,
1672 call->conn->service->serviceId, call));
1674 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1682 #endif /* RX_ENABLE_LOCKS */
1686 /* Establish a procedure to be called when a packet arrives for a
1687 * call. This routine will be called at most once after each call,
1688 * and will also be called if there is an error condition on the or
1689 * the call is complete. Used by multi rx to build a selection
1690 * function which determines which of several calls is likely to be a
1691 * good one to read from.
1692 * NOTE: the way this is currently implemented it is probably only a
1693 * good idea to (1) use it immediately after a newcall (clients only)
1694 * and (2) only use it once. Other uses currently void your warranty
1696 void rx_SetArrivalProc(register struct rx_call *call,
1697 register VOID (*proc)(register struct rx_call *call,
1698 register struct multi_handle *mh, register int index),
1699 register VOID *handle, register VOID *arg)
1701 call->arrivalProc = proc;
1702 call->arrivalProcHandle = handle;
1703 call->arrivalProcArg = arg;
1706 /* Call is finished (possibly prematurely). Return rc to the peer, if
1707 * appropriate, and return the final error code from the conversation
1710 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1712 register struct rx_connection *conn = call->conn;
1713 register struct rx_service *service;
1714 register struct rx_packet *tp; /* Temporary packet pointer */
1715 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1719 dpf(("rx_EndCall(call %x)\n", call));
1723 MUTEX_ENTER(&call->lock);
1725 if (rc == 0 && call->error == 0) {
1726 call->abortCode = 0;
1727 call->abortCount = 0;
1730 call->arrivalProc = (VOID (*)()) 0;
1731 if (rc && call->error == 0) {
1732 rxi_CallError(call, rc);
1733 /* Send an abort message to the peer if this error code has
1734 * only just been set. If it was set previously, assume the
1735 * peer has already been sent the error code or will request it
1737 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1739 if (conn->type == RX_SERVER_CONNECTION) {
1740 /* Make sure reply or at least dummy reply is sent */
1741 if (call->mode == RX_MODE_RECEIVING) {
1742 rxi_WriteProc(call, 0, 0);
1744 if (call->mode == RX_MODE_SENDING) {
1745 rxi_FlushWrite(call);
1747 service = conn->service;
1748 rxi_calltrace(RX_CALL_END, call);
1749 /* Call goes to hold state until reply packets are acknowledged */
1750 if (call->tfirst + call->nSoftAcked < call->tnext) {
1751 call->state = RX_STATE_HOLD;
1753 call->state = RX_STATE_DALLY;
1754 rxi_ClearTransmitQueue(call, 0);
1755 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1756 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1759 else { /* Client connection */
1761 /* Make sure server receives input packets, in the case where
1762 * no reply arguments are expected */
1763 if ((call->mode == RX_MODE_SENDING)
1764 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1765 (void) rxi_ReadProc(call, &dummy, 1);
1768 /* If we had an outstanding delayed ack, be nice to the server
1769 * and force-send it now.
1771 if (call->delayedAckEvent) {
1772 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1773 call->delayedAckEvent = NULL;
1774 rxi_SendDelayedAck(NULL, call, NULL);
1777 /* We need to release the call lock since it's lower than the
1778 * conn_call_lock and we don't want to hold the conn_call_lock
1779 * over the rx_ReadProc call. The conn_call_lock needs to be held
1780 * here for the case where rx_NewCall is perusing the calls on
1781 * the connection structure. We don't want to signal until
1782 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1783 * have checked this call, found it active and by the time it
1784 * goes to sleep, will have missed the signal.
1786 MUTEX_EXIT(&call->lock);
1787 MUTEX_ENTER(&conn->conn_call_lock);
1788 MUTEX_ENTER(&call->lock);
1789 MUTEX_ENTER(&conn->conn_data_lock);
1790 conn->flags |= RX_CONN_BUSY;
1791 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1792 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1793 MUTEX_EXIT(&conn->conn_data_lock);
1794 #ifdef RX_ENABLE_LOCKS
1795 CV_BROADCAST(&conn->conn_call_cv);
1800 #ifdef RX_ENABLE_LOCKS
1802 MUTEX_EXIT(&conn->conn_data_lock);
1804 #endif /* RX_ENABLE_LOCKS */
1805 call->state = RX_STATE_DALLY;
1807 error = call->error;
1809 /* currentPacket, nLeft, and NFree must be zeroed here, because
1810 * ResetCall cannot: ResetCall may be called at splnet(), in the
1811 * kernel version, and may interrupt the macros rx_Read or
1812 * rx_Write, which run at normal priority for efficiency. */
1813 if (call->currentPacket) {
1814 rxi_FreePacket(call->currentPacket);
1815 call->currentPacket = (struct rx_packet *) 0;
1816 call->nLeft = call->nFree = call->curlen = 0;
1819 call->nLeft = call->nFree = call->curlen = 0;
1821 /* Free any packets from the last call to ReadvProc/WritevProc */
1822 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1827 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1828 MUTEX_EXIT(&call->lock);
1829 if (conn->type == RX_CLIENT_CONNECTION) {
1830 MUTEX_EXIT(&conn->conn_call_lock);
1831 conn->flags &= ~RX_CONN_BUSY;
1836 * Map errors to the local host's errno.h format.
1838 error = ntoh_syserr_conv(error);
1842 #if !defined(KERNEL)
1844 /* Call this routine when shutting down a server or client (especially
1845 * clients). This will allow Rx to gracefully garbage collect server
1846 * connections, and reduce the number of retries that a server might
1847 * make to a dead client.
1848 * This is not quite right, since some calls may still be ongoing and
1849 * we can't lock them to destroy them. */
1850 void rx_Finalize(void)
1852 register struct rx_connection **conn_ptr, **conn_end;
1856 if (rxinit_status == 1) {
1858 return; /* Already shutdown. */
1860 rxi_DeleteCachedConnections();
1861 if (rx_connHashTable) {
1862 MUTEX_ENTER(&rx_connHashTable_lock);
1863 for (conn_ptr = &rx_connHashTable[0],
1864 conn_end = &rx_connHashTable[rx_hashTableSize];
1865 conn_ptr < conn_end; conn_ptr++) {
1866 struct rx_connection *conn, *next;
1867 for (conn = *conn_ptr; conn; conn = next) {
1869 if (conn->type == RX_CLIENT_CONNECTION) {
1870 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1872 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1873 #ifdef RX_ENABLE_LOCKS
1874 rxi_DestroyConnectionNoLock(conn);
1875 #else /* RX_ENABLE_LOCKS */
1876 rxi_DestroyConnection(conn);
1877 #endif /* RX_ENABLE_LOCKS */
1881 #ifdef RX_ENABLE_LOCKS
1882 while (rx_connCleanup_list) {
1883 struct rx_connection *conn;
1884 conn = rx_connCleanup_list;
1885 rx_connCleanup_list = rx_connCleanup_list->next;
1886 MUTEX_EXIT(&rx_connHashTable_lock);
1887 rxi_CleanupConnection(conn);
1888 MUTEX_ENTER(&rx_connHashTable_lock);
1890 MUTEX_EXIT(&rx_connHashTable_lock);
1891 #endif /* RX_ENABLE_LOCKS */
1900 /* if we wakeup packet waiter too often, can get in loop with two
1901 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1902 void rxi_PacketsUnWait(void)
1904 if (!rx_waitingForPackets) {
1908 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1909 return; /* still over quota */
1912 rx_waitingForPackets = 0;
1913 #ifdef RX_ENABLE_LOCKS
1914 CV_BROADCAST(&rx_waitingForPackets_cv);
1916 osi_rxWakeup(&rx_waitingForPackets);
1922 /* ------------------Internal interfaces------------------------- */
1924 /* Return this process's service structure for the
1925 * specified socket and service */
1926 struct rx_service *rxi_FindService(register osi_socket socket,
1927 register u_short serviceId)
1929 register struct rx_service **sp;
1930 for (sp = &rx_services[0]; *sp; sp++) {
1931 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1937 /* Allocate a call structure, for the indicated channel of the
1938 * supplied connection. The mode and state of the call must be set by
1939 * the caller. Returns the call with mutex locked. */
1940 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1941 register int channel)
1943 register struct rx_call *call;
1944 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1945 register struct rx_call *cp; /* Call pointer temp */
1946 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1947 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1949 /* Grab an existing call structure, or allocate a new one.
1950 * Existing call structures are assumed to have been left reset by
1952 MUTEX_ENTER(&rx_freeCallQueue_lock);
1954 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1956 * EXCEPT that the TQ might not yet be cleared out.
1957 * Skip over those with in-use TQs.
1960 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1961 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1967 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1968 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1969 call = queue_First(&rx_freeCallQueue, rx_call);
1970 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1972 MUTEX_ENTER(&rx_stats_mutex);
1973 rx_stats.nFreeCallStructs--;
1974 MUTEX_EXIT(&rx_stats_mutex);
1975 MUTEX_EXIT(&rx_freeCallQueue_lock);
1976 MUTEX_ENTER(&call->lock);
1977 CLEAR_CALL_QUEUE_LOCK(call);
1978 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1979 /* Now, if TQ wasn't cleared earlier, do it now. */
1980 if (call->flags & RX_CALL_TQ_CLEARME) {
1981 rxi_ClearTransmitQueue(call, 0);
1982 queue_Init(&call->tq);
1984 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1985 /* Bind the call to its connection structure */
1987 rxi_ResetCall(call, 1);
1990 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1992 MUTEX_EXIT(&rx_freeCallQueue_lock);
1993 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1994 MUTEX_ENTER(&call->lock);
1995 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1996 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1997 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1999 MUTEX_ENTER(&rx_stats_mutex);
2000 rx_stats.nCallStructs++;
2001 MUTEX_EXIT(&rx_stats_mutex);
2002 /* Initialize once-only items */
2003 queue_Init(&call->tq);
2004 queue_Init(&call->rq);
2005 queue_Init(&call->iovq);
2006 /* Bind the call to its connection structure (prereq for reset) */
2008 rxi_ResetCall(call, 1);
2010 call->channel = channel;
2011 call->callNumber = &conn->callNumber[channel];
2012 /* Note that the next expected call number is retained (in
2013 * conn->callNumber[i]), even if we reallocate the call structure
2015 conn->call[channel] = call;
2016 /* if the channel's never been used (== 0), we should start at 1, otherwise
2017 the call number is valid from the last time this channel was used */
2018 if (*call->callNumber == 0) *call->callNumber = 1;
2023 /* A call has been inactive long enough that so we can throw away
2024 * state, including the call structure, which is placed on the call
2026 * Call is locked upon entry.
2027 * haveCTLock set if called from rxi_ReapConnections
2029 #ifdef RX_ENABLE_LOCKS
2030 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2031 #else /* RX_ENABLE_LOCKS */
2032 void rxi_FreeCall(register struct rx_call *call)
2033 #endif /* RX_ENABLE_LOCKS */
2035 register int channel = call->channel;
2036 register struct rx_connection *conn = call->conn;
2039 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2040 (*call->callNumber)++;
2041 rxi_ResetCall(call, 0);
2042 call->conn->call[channel] = (struct rx_call *) 0;
2044 MUTEX_ENTER(&rx_freeCallQueue_lock);
2045 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2046 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2047 /* A call may be free even though its transmit queue is still in use.
2048 * Since we search the call list from head to tail, put busy calls at
2049 * the head of the list, and idle calls at the tail.
2051 if (call->flags & RX_CALL_TQ_BUSY)
2052 queue_Prepend(&rx_freeCallQueue, call);
2054 queue_Append(&rx_freeCallQueue, call);
2055 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2056 queue_Append(&rx_freeCallQueue, call);
2057 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2058 MUTEX_ENTER(&rx_stats_mutex);
2059 rx_stats.nFreeCallStructs++;
2060 MUTEX_EXIT(&rx_stats_mutex);
2062 MUTEX_EXIT(&rx_freeCallQueue_lock);
2064 /* Destroy the connection if it was previously slated for
2065 * destruction, i.e. the Rx client code previously called
2066 * rx_DestroyConnection (client connections), or
2067 * rxi_ReapConnections called the same routine (server
2068 * connections). Only do this, however, if there are no
2069 * outstanding calls. Note that for fine grain locking, there appears
2070 * to be a deadlock in that rxi_FreeCall has a call locked and
2071 * DestroyConnectionNoLock locks each call in the conn. But note a
2072 * few lines up where we have removed this call from the conn.
2073 * If someone else destroys a connection, they either have no
2074 * call lock held or are going through this section of code.
2076 if (conn->flags & RX_CONN_DESTROY_ME) {
2077 MUTEX_ENTER(&conn->conn_data_lock);
2079 MUTEX_EXIT(&conn->conn_data_lock);
2080 #ifdef RX_ENABLE_LOCKS
2082 rxi_DestroyConnectionNoLock(conn);
2084 rxi_DestroyConnection(conn);
2085 #else /* RX_ENABLE_LOCKS */
2086 rxi_DestroyConnection(conn);
2087 #endif /* RX_ENABLE_LOCKS */
2091 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2092 char *rxi_Alloc(register size_t size)
2096 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2097 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2100 int glockOwner = ISAFS_GLOCK();
2104 MUTEX_ENTER(&rx_stats_mutex);
2105 rxi_Alloccnt++; rxi_Allocsize += size;
2106 MUTEX_EXIT(&rx_stats_mutex);
2107 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2108 if (size > AFS_SMALLOCSIZ) {
2109 p = (char *) osi_AllocMediumSpace(size);
2111 p = (char *) osi_AllocSmall(size, 1);
2112 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2117 p = (char *) osi_Alloc(size);
2119 if (!p) osi_Panic("rxi_Alloc error");
2124 void rxi_Free(void *addr, register size_t size)
2126 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2127 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2130 int glockOwner = ISAFS_GLOCK();
2134 MUTEX_ENTER(&rx_stats_mutex);
2135 rxi_Alloccnt--; rxi_Allocsize -= size;
2136 MUTEX_EXIT(&rx_stats_mutex);
2137 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2138 if (size > AFS_SMALLOCSIZ)
2139 osi_FreeMediumSpace(addr);
2141 osi_FreeSmall(addr);
2142 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2147 osi_Free(addr, size);
2151 /* Find the peer process represented by the supplied (host,port)
2152 * combination. If there is no appropriate active peer structure, a
2153 * new one will be allocated and initialized
2154 * The origPeer, if set, is a pointer to a peer structure on which the
2155 * refcount will be be decremented. This is used to replace the peer
2156 * structure hanging off a connection structure */
2157 struct rx_peer *rxi_FindPeer(register afs_uint32 host,
2158 register u_short port, struct rx_peer *origPeer, int create)
2160 register struct rx_peer *pp;
2162 hashIndex = PEER_HASH(host, port);
2163 MUTEX_ENTER(&rx_peerHashTable_lock);
2164 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2165 if ((pp->host == host) && (pp->port == port)) break;
2169 pp = rxi_AllocPeer(); /* This bzero's *pp */
2170 pp->host = host; /* set here or in InitPeerParams is zero */
2172 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2173 queue_Init(&pp->congestionQueue);
2174 queue_Init(&pp->rpcStats);
2175 pp->next = rx_peerHashTable[hashIndex];
2176 rx_peerHashTable[hashIndex] = pp;
2177 rxi_InitPeerParams(pp);
2178 MUTEX_ENTER(&rx_stats_mutex);
2179 rx_stats.nPeerStructs++;
2180 MUTEX_EXIT(&rx_stats_mutex);
2187 origPeer->refCount--;
2188 MUTEX_EXIT(&rx_peerHashTable_lock);
2193 /* Find the connection at (host, port) started at epoch, and with the
2194 * given connection id. Creates the server connection if necessary.
2195 * The type specifies whether a client connection or a server
2196 * connection is desired. In both cases, (host, port) specify the
2197 * peer's (host, pair) pair. Client connections are not made
2198 * automatically by this routine. The parameter socket gives the
2199 * socket descriptor on which the packet was received. This is used,
2200 * in the case of server connections, to check that *new* connections
2201 * come via a valid (port, serviceId). Finally, the securityIndex
2202 * parameter must match the existing index for the connection. If a
2203 * server connection is created, it will be created using the supplied
2204 * index, if the index is valid for this service */
2205 struct rx_connection *rxi_FindConnection(osi_socket socket,
2206 register afs_int32 host, register u_short port, u_short serviceId,
2207 afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2209 int hashindex, flag;
2210 register struct rx_connection *conn;
2211 struct rx_peer *peer;
2212 hashindex = CONN_HASH(host, port, cid, epoch, type);
2213 MUTEX_ENTER(&rx_connHashTable_lock);
2214 rxLastConn ? (conn = rxLastConn, flag = 0) :
2215 (conn = rx_connHashTable[hashindex], flag = 1);
2217 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2218 && (epoch == conn->epoch)) {
2219 register struct rx_peer *pp = conn->peer;
2220 if (securityIndex != conn->securityIndex) {
2221 /* this isn't supposed to happen, but someone could forge a packet
2222 like this, and there seems to be some CM bug that makes this
2223 happen from time to time -- in which case, the fileserver
2225 MUTEX_EXIT(&rx_connHashTable_lock);
2226 return (struct rx_connection *) 0;
2228 /* epoch's high order bits mean route for security reasons only on
2229 * the cid, not the host and port fields.
2231 if (conn->epoch & 0x80000000) break;
2232 if (((type == RX_CLIENT_CONNECTION)
2233 || (pp->host == host)) && (pp->port == port))
2238 /* the connection rxLastConn that was used the last time is not the
2239 ** one we are looking for now. Hence, start searching in the hash */
2241 conn = rx_connHashTable[hashindex];
2247 struct rx_service *service;
2248 if (type == RX_CLIENT_CONNECTION) {
2249 MUTEX_EXIT(&rx_connHashTable_lock);
2250 return (struct rx_connection *) 0;
2252 service = rxi_FindService(socket, serviceId);
2253 if (!service || (securityIndex >= service->nSecurityObjects)
2254 || (service->securityObjects[securityIndex] == 0)) {
2255 MUTEX_EXIT(&rx_connHashTable_lock);
2256 return (struct rx_connection *) 0;
2258 conn = rxi_AllocConnection(); /* This bzero's the connection */
2259 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2261 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2263 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2264 conn->next = rx_connHashTable[hashindex];
2265 rx_connHashTable[hashindex] = conn;
2266 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2267 conn->type = RX_SERVER_CONNECTION;
2268 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2269 conn->epoch = epoch;
2270 conn->cid = cid & RX_CIDMASK;
2271 /* conn->serial = conn->lastSerial = 0; */
2272 /* conn->timeout = 0; */
2273 conn->ackRate = RX_FAST_ACK_RATE;
2274 conn->service = service;
2275 conn->serviceId = serviceId;
2276 conn->securityIndex = securityIndex;
2277 conn->securityObject = service->securityObjects[securityIndex];
2278 conn->nSpecific = 0;
2279 conn->specific = NULL;
2280 rx_SetConnDeadTime(conn, service->connDeadTime);
2281 /* Notify security object of the new connection */
2282 RXS_NewConnection(conn->securityObject, conn);
2283 /* XXXX Connection timeout? */
2284 if (service->newConnProc) (*service->newConnProc)(conn);
2285 MUTEX_ENTER(&rx_stats_mutex);
2286 rx_stats.nServerConns++;
2287 MUTEX_EXIT(&rx_stats_mutex);
2291 /* Ensure that the peer structure is set up in such a way that
2292 ** replies in this connection go back to that remote interface
2293 ** from which the last packet was sent out. In case, this packet's
2294 ** source IP address does not match the peer struct for this conn,
2295 ** then drop the refCount on conn->peer and get a new peer structure.
2296 ** We can check the host,port field in the peer structure without the
2297 ** rx_peerHashTable_lock because the peer structure has its refCount
2298 ** incremented and the only time the host,port in the peer struct gets
2299 ** updated is when the peer structure is created.
2301 if (conn->peer->host == host )
2302 peer = conn->peer; /* no change to the peer structure */
2304 peer = rxi_FindPeer(host, port, conn->peer, 1);
2307 MUTEX_ENTER(&conn->conn_data_lock);
2310 MUTEX_EXIT(&conn->conn_data_lock);
2312 rxLastConn = conn; /* store this connection as the last conn used */
2313 MUTEX_EXIT(&rx_connHashTable_lock);
2317 /* There are two packet tracing routines available for testing and monitoring
2318 * Rx. One is called just after every packet is received and the other is
2319 * called just before every packet is sent. Received packets, have had their
2320 * headers decoded, and packets to be sent have not yet had their headers
2321 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2322 * containing the network address. Both can be modified. The return value, if
2323 * non-zero, indicates that the packet should be dropped. */
2325 int (*rx_justReceived)() = 0;
2326 int (*rx_almostSent)() = 0;
2328 /* A packet has been received off the interface. Np is the packet, socket is
2329 * the socket number it was received from (useful in determining which service
2330 * this packet corresponds to), and (host, port) reflect the host,port of the
2331 * sender. This call returns the packet to the caller if it is finished with
2332 * it, rather than de-allocating it, just as a small performance hack */
2334 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np,
2335 osi_socket socket, afs_uint32 host, u_short port,
2336 int *tnop, struct rx_call **newcallp)
2338 register struct rx_call *call;
2339 register struct rx_connection *conn;
2341 afs_uint32 currentCallNumber;
2347 struct rx_packet *tnp;
2350 /* We don't print out the packet until now because (1) the time may not be
2351 * accurate enough until now in the lwp implementation (rx_Listener only gets
2352 * the time after the packet is read) and (2) from a protocol point of view,
2353 * this is the first time the packet has been seen */
2354 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2355 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2356 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2357 np->header.serial, packetType, host, port, np->header.serviceId,
2358 np->header.epoch, np->header.cid, np->header.callNumber,
2359 np->header.seq, np->header.flags, np));
2362 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2363 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2366 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2367 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2370 /* If an input tracer function is defined, call it with the packet and
2371 * network address. Note this function may modify its arguments. */
2372 if (rx_justReceived) {
2373 struct sockaddr_in addr;
2375 addr.sin_family = AF_INET;
2376 addr.sin_port = port;
2377 addr.sin_addr.s_addr = host;
2378 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2379 addr.sin_len = sizeof(addr);
2380 #endif /* AFS_OSF_ENV */
2381 drop = (*rx_justReceived) (np, &addr);
2382 /* drop packet if return value is non-zero */
2383 if (drop) return np;
2384 port = addr.sin_port; /* in case fcn changed addr */
2385 host = addr.sin_addr.s_addr;
2389 /* If packet was not sent by the client, then *we* must be the client */
2390 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2391 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2393 /* Find the connection (or fabricate one, if we're the server & if
2394 * necessary) associated with this packet */
2395 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2396 np->header.cid, np->header.epoch, type,
2397 np->header.securityIndex);
2400 /* If no connection found or fabricated, just ignore the packet.
2401 * (An argument could be made for sending an abort packet for
2406 MUTEX_ENTER(&conn->conn_data_lock);
2407 if (conn->maxSerial < np->header.serial)
2408 conn->maxSerial = np->header.serial;
2409 MUTEX_EXIT(&conn->conn_data_lock);
2411 /* If the connection is in an error state, send an abort packet and ignore
2412 * the incoming packet */
2414 /* Don't respond to an abort packet--we don't want loops! */
2415 MUTEX_ENTER(&conn->conn_data_lock);
2416 if (np->header.type != RX_PACKET_TYPE_ABORT)
2417 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2419 MUTEX_EXIT(&conn->conn_data_lock);
2423 /* Check for connection-only requests (i.e. not call specific). */
2424 if (np->header.callNumber == 0) {
2425 switch (np->header.type) {
2426 case RX_PACKET_TYPE_ABORT:
2427 /* What if the supplied error is zero? */
2428 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2429 MUTEX_ENTER(&conn->conn_data_lock);
2431 MUTEX_EXIT(&conn->conn_data_lock);
2433 case RX_PACKET_TYPE_CHALLENGE:
2434 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2435 MUTEX_ENTER(&conn->conn_data_lock);
2437 MUTEX_EXIT(&conn->conn_data_lock);
2439 case RX_PACKET_TYPE_RESPONSE:
2440 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2441 MUTEX_ENTER(&conn->conn_data_lock);
2443 MUTEX_EXIT(&conn->conn_data_lock);
2445 case RX_PACKET_TYPE_PARAMS:
2446 case RX_PACKET_TYPE_PARAMS+1:
2447 case RX_PACKET_TYPE_PARAMS+2:
2448 /* ignore these packet types for now */
2449 MUTEX_ENTER(&conn->conn_data_lock);
2451 MUTEX_EXIT(&conn->conn_data_lock);
2456 /* Should not reach here, unless the peer is broken: send an
2458 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2459 MUTEX_ENTER(&conn->conn_data_lock);
2460 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2462 MUTEX_EXIT(&conn->conn_data_lock);
2467 channel = np->header.cid & RX_CHANNELMASK;
2468 call = conn->call[channel];
2469 #ifdef RX_ENABLE_LOCKS
2471 MUTEX_ENTER(&call->lock);
2472 /* Test to see if call struct is still attached to conn. */
2473 if (call != conn->call[channel]) {
2475 MUTEX_EXIT(&call->lock);
2476 if (type == RX_SERVER_CONNECTION) {
2477 call = conn->call[channel];
2478 /* If we started with no call attached and there is one now,
2479 * another thread is also running this routine and has gotten
2480 * the connection channel. We should drop this packet in the tests
2481 * below. If there was a call on this connection and it's now
2482 * gone, then we'll be making a new call below.
2483 * If there was previously a call and it's now different then
2484 * the old call was freed and another thread running this routine
2485 * has created a call on this channel. One of these two threads
2486 * has a packet for the old call and the code below handles those
2490 MUTEX_ENTER(&call->lock);
2493 /* This packet can't be for this call. If the new call address is
2494 * 0 then no call is running on this channel. If there is a call
2495 * then, since this is a client connection we're getting data for
2496 * it must be for the previous call.
2498 MUTEX_ENTER(&rx_stats_mutex);
2499 rx_stats.spuriousPacketsRead++;
2500 MUTEX_EXIT(&rx_stats_mutex);
2501 MUTEX_ENTER(&conn->conn_data_lock);
2503 MUTEX_EXIT(&conn->conn_data_lock);
2508 currentCallNumber = conn->callNumber[channel];
2510 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2511 if (np->header.callNumber < currentCallNumber) {
2512 MUTEX_ENTER(&rx_stats_mutex);
2513 rx_stats.spuriousPacketsRead++;
2514 MUTEX_EXIT(&rx_stats_mutex);
2515 #ifdef RX_ENABLE_LOCKS
2517 MUTEX_EXIT(&call->lock);
2519 MUTEX_ENTER(&conn->conn_data_lock);
2521 MUTEX_EXIT(&conn->conn_data_lock);
2525 MUTEX_ENTER(&conn->conn_call_lock);
2526 call = rxi_NewCall(conn, channel);
2527 MUTEX_EXIT(&conn->conn_call_lock);
2528 *call->callNumber = np->header.callNumber;
2529 call->state = RX_STATE_PRECALL;
2530 clock_GetTime(&call->queueTime);
2531 hzero(call->bytesSent);
2532 hzero(call->bytesRcvd);
2533 rxi_KeepAliveOn(call);
2535 else if (np->header.callNumber != currentCallNumber) {
2536 /* Wait until the transmit queue is idle before deciding
2537 * whether to reset the current call. Chances are that the
2538 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2541 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2542 while ((call->state == RX_STATE_ACTIVE) &&
2543 (call->flags & RX_CALL_TQ_BUSY)) {
2544 call->flags |= RX_CALL_TQ_WAIT;
2545 #ifdef RX_ENABLE_LOCKS
2546 CV_WAIT(&call->cv_tq, &call->lock);
2547 #else /* RX_ENABLE_LOCKS */
2548 osi_rxSleep(&call->tq);
2549 #endif /* RX_ENABLE_LOCKS */
2551 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2552 /* If the new call cannot be taken right now send a busy and set
2553 * the error condition in this call, so that it terminates as
2554 * quickly as possible */
2555 if (call->state == RX_STATE_ACTIVE) {
2556 struct rx_packet *tp;
2558 rxi_CallError(call, RX_CALL_DEAD);
2559 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2560 MUTEX_EXIT(&call->lock);
2561 MUTEX_ENTER(&conn->conn_data_lock);
2563 MUTEX_EXIT(&conn->conn_data_lock);
2566 rxi_ResetCall(call, 0);
2567 *call->callNumber = np->header.callNumber;
2568 call->state = RX_STATE_PRECALL;
2569 clock_GetTime(&call->queueTime);
2570 hzero(call->bytesSent);
2571 hzero(call->bytesRcvd);
2573 * If the number of queued calls exceeds the overload
2574 * threshold then abort this call.
2576 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2577 struct rx_packet *tp;
2579 rxi_CallError(call, rx_BusyError);
2580 tp = rxi_SendCallAbort(call, np, 1, 0);
2581 MUTEX_EXIT(&call->lock);
2582 MUTEX_ENTER(&conn->conn_data_lock);
2584 MUTEX_EXIT(&conn->conn_data_lock);
2587 rxi_KeepAliveOn(call);
2590 /* Continuing call; do nothing here. */
2592 } else { /* we're the client */
2593 /* Ignore all incoming acknowledgements for calls in DALLY state */
2594 if ( call && (call->state == RX_STATE_DALLY)
2595 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2596 MUTEX_ENTER(&rx_stats_mutex);
2597 rx_stats.ignorePacketDally++;
2598 MUTEX_EXIT(&rx_stats_mutex);
2599 #ifdef RX_ENABLE_LOCKS
2601 MUTEX_EXIT(&call->lock);
2604 MUTEX_ENTER(&conn->conn_data_lock);
2606 MUTEX_EXIT(&conn->conn_data_lock);
2610 /* Ignore anything that's not relevant to the current call. If there
2611 * isn't a current call, then no packet is relevant. */
2612 if (!call || (np->header.callNumber != currentCallNumber)) {
2613 MUTEX_ENTER(&rx_stats_mutex);
2614 rx_stats.spuriousPacketsRead++;
2615 MUTEX_EXIT(&rx_stats_mutex);
2616 #ifdef RX_ENABLE_LOCKS
2618 MUTEX_EXIT(&call->lock);
2621 MUTEX_ENTER(&conn->conn_data_lock);
2623 MUTEX_EXIT(&conn->conn_data_lock);
2626 /* If the service security object index stamped in the packet does not
2627 * match the connection's security index, ignore the packet */
2628 if (np->header.securityIndex != conn->securityIndex) {
2629 #ifdef RX_ENABLE_LOCKS
2630 MUTEX_EXIT(&call->lock);
2632 MUTEX_ENTER(&conn->conn_data_lock);
2634 MUTEX_EXIT(&conn->conn_data_lock);
2638 /* If we're receiving the response, then all transmit packets are
2639 * implicitly acknowledged. Get rid of them. */
2640 if (np->header.type == RX_PACKET_TYPE_DATA) {
2641 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2642 /* XXX Hack. Because we must release the global rx lock when
2643 * sending packets (osi_NetSend) we drop all acks while we're
2644 * traversing the tq in rxi_Start sending packets out because
2645 * packets may move to the freePacketQueue as result of being here!
2646 * So we drop these packets until we're safely out of the
2647 * traversing. Really ugly!
2648 * For fine grain RX locking, we set the acked field in the
2649 * packets and let rxi_Start remove them from the transmit queue.
2651 if (call->flags & RX_CALL_TQ_BUSY) {
2652 #ifdef RX_ENABLE_LOCKS
2653 rxi_SetAcksInTransmitQueue(call);
2656 return np; /* xmitting; drop packet */
2660 rxi_ClearTransmitQueue(call, 0);
2662 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2663 rxi_ClearTransmitQueue(call, 0);
2664 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2666 if (np->header.type == RX_PACKET_TYPE_ACK) {
2667 /* now check to see if this is an ack packet acknowledging that the
2668 * server actually *lost* some hard-acked data. If this happens we
2669 * ignore this packet, as it may indicate that the server restarted in
2670 * the middle of a call. It is also possible that this is an old ack
2671 * packet. We don't abort the connection in this case, because this
2672 * *might* just be an old ack packet. The right way to detect a server
2673 * restart in the midst of a call is to notice that the server epoch
2675 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2676 * XXX unacknowledged. I think that this is off-by-one, but
2677 * XXX I don't dare change it just yet, since it will
2678 * XXX interact badly with the server-restart detection
2679 * XXX code in receiveackpacket. */
2680 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2681 MUTEX_ENTER(&rx_stats_mutex);
2682 rx_stats.spuriousPacketsRead++;
2683 MUTEX_EXIT(&rx_stats_mutex);
2684 MUTEX_EXIT(&call->lock);
2685 MUTEX_ENTER(&conn->conn_data_lock);
2687 MUTEX_EXIT(&conn->conn_data_lock);
2691 } /* else not a data packet */
2694 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2695 /* Set remote user defined status from packet */
2696 call->remoteStatus = np->header.userStatus;
2698 /* Note the gap between the expected next packet and the actual
2699 * packet that arrived, when the new packet has a smaller serial number
2700 * than expected. Rioses frequently reorder packets all by themselves,
2701 * so this will be quite important with very large window sizes.
2702 * Skew is checked against 0 here to avoid any dependence on the type of
2703 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2705 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2706 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2707 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2709 MUTEX_ENTER(&conn->conn_data_lock);
2710 skew = conn->lastSerial - np->header.serial;
2711 conn->lastSerial = np->header.serial;
2712 MUTEX_EXIT(&conn->conn_data_lock);
2714 register struct rx_peer *peer;
2716 if (skew > peer->inPacketSkew) {
2717 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2718 peer->inPacketSkew = skew;
2722 /* Now do packet type-specific processing */
2723 switch (np->header.type) {
2724 case RX_PACKET_TYPE_DATA:
2725 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2728 case RX_PACKET_TYPE_ACK:
2729 /* Respond immediately to ack packets requesting acknowledgement
2731 if (np->header.flags & RX_REQUEST_ACK) {
2732 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2733 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2735 np = rxi_ReceiveAckPacket(call, np, 1);
2737 case RX_PACKET_TYPE_ABORT:
2738 /* An abort packet: reset the connection, passing the error up to
2740 /* What if error is zero? */
2741 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2743 case RX_PACKET_TYPE_BUSY:
2746 case RX_PACKET_TYPE_ACKALL:
2747 /* All packets acknowledged, so we can drop all packets previously
2748 * readied for sending */
2749 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2750 /* XXX Hack. We because we can't release the global rx lock when
2751 * sending packets (osi_NetSend) we drop all ack pkts while we're
2752 * traversing the tq in rxi_Start sending packets out because
2753 * packets may move to the freePacketQueue as result of being
2754 * here! So we drop these packets until we're safely out of the
2755 * traversing. Really ugly!
2756 * For fine grain RX locking, we set the acked field in the packets
2757 * and let rxi_Start remove the packets from the transmit queue.
2759 if (call->flags & RX_CALL_TQ_BUSY) {
2760 #ifdef RX_ENABLE_LOCKS
2761 rxi_SetAcksInTransmitQueue(call);
2763 #else /* RX_ENABLE_LOCKS */
2765 return np; /* xmitting; drop packet */
2766 #endif /* RX_ENABLE_LOCKS */
2768 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2769 rxi_ClearTransmitQueue(call, 0);
2772 /* Should not reach here, unless the peer is broken: send an abort
2774 rxi_CallError(call, RX_PROTOCOL_ERROR);
2775 np = rxi_SendCallAbort(call, np, 1, 0);
2778 /* Note when this last legitimate packet was received, for keep-alive
2779 * processing. Note, we delay getting the time until now in the hope that
2780 * the packet will be delivered to the user before any get time is required
2781 * (if not, then the time won't actually be re-evaluated here). */
2782 call->lastReceiveTime = clock_Sec();
2783 MUTEX_EXIT(&call->lock);
2784 MUTEX_ENTER(&conn->conn_data_lock);
2786 MUTEX_EXIT(&conn->conn_data_lock);
2790 /* return true if this is an "interesting" connection from the point of view
2791 of someone trying to debug the system */
2792 int rxi_IsConnInteresting(struct rx_connection *aconn)
2795 register struct rx_call *tcall;
2797 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2799 for(i=0;i<RX_MAXCALLS;i++) {
2800 tcall = aconn->call[i];
2802 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2804 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2812 /* if this is one of the last few packets AND it wouldn't be used by the
2813 receiving call to immediately satisfy a read request, then drop it on
2814 the floor, since accepting it might prevent a lock-holding thread from
2815 making progress in its reading. If a call has been cleared while in
2816 the precall state then ignore all subsequent packets until the call
2817 is assigned to a thread. */
2819 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2822 MUTEX_ENTER(&rx_stats_mutex);
2823 if (((ap->header.seq != 1) &&
2824 (acall->flags & RX_CALL_CLEARED) &&
2825 (acall->state == RX_STATE_PRECALL)) ||
2826 ((rx_nFreePackets < rxi_dataQuota+2) &&
2827 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2828 && (acall->flags & RX_CALL_READER_WAIT)))) {
2831 MUTEX_EXIT(&rx_stats_mutex);
2836 static void rxi_CheckReachEvent(struct rxevent *event,
2837 struct rx_connection *conn, struct rx_call *acall)
2839 struct rx_call *call = acall;
2843 MUTEX_ENTER(&conn->conn_data_lock);
2844 conn->checkReachEvent = NULL;
2845 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2846 if (event) conn->refCount--;
2847 MUTEX_EXIT(&conn->conn_data_lock);
2851 MUTEX_ENTER(&conn->conn_call_lock);
2852 MUTEX_ENTER(&conn->conn_data_lock);
2853 for (i=0; i<RX_MAXCALLS; i++) {
2854 struct rx_call *tc = conn->call[i];
2855 if (tc && tc->state == RX_STATE_PRECALL) {
2861 /* Indicate that rxi_CheckReachEvent is no longer running by
2862 * clearing the flag. Must be atomic under conn_data_lock to
2863 * avoid a new call slipping by: rxi_CheckConnReach holds
2864 * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2866 conn->flags &= ~RX_CONN_ATTACHWAIT;
2867 MUTEX_EXIT(&conn->conn_data_lock);
2868 MUTEX_EXIT(&conn->conn_call_lock);
2872 if (call != acall) MUTEX_ENTER(&call->lock);
2873 rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
2874 if (call != acall) MUTEX_EXIT(&call->lock);
2876 clock_GetTime(&when);
2877 when.sec += RX_CHECKREACH_TIMEOUT;
2878 MUTEX_ENTER(&conn->conn_data_lock);
2879 if (!conn->checkReachEvent) {
2881 conn->checkReachEvent =
2882 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2884 MUTEX_EXIT(&conn->conn_data_lock);
2889 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2891 struct rx_service *service = conn->service;
2892 struct rx_peer *peer = conn->peer;
2893 afs_uint32 now, lastReach;
2895 if (service->checkReach == 0)
2899 MUTEX_ENTER(&peer->peer_lock);
2900 lastReach = peer->lastReachTime;
2901 MUTEX_EXIT(&peer->peer_lock);
2902 if (now - lastReach < RX_CHECKREACH_TTL)
2905 MUTEX_ENTER(&conn->conn_data_lock);
2906 if (conn->flags & RX_CONN_ATTACHWAIT) {
2907 MUTEX_EXIT(&conn->conn_data_lock);
2910 conn->flags |= RX_CONN_ATTACHWAIT;
2911 MUTEX_EXIT(&conn->conn_data_lock);
2912 if (!conn->checkReachEvent)
2913 rxi_CheckReachEvent(NULL, conn, call);
2918 /* try to attach call, if authentication is complete */
2919 static void TryAttach(register struct rx_call *acall,
2920 register osi_socket socket, register int *tnop,
2921 register struct rx_call **newcallp, int reachOverride)
2923 struct rx_connection *conn = acall->conn;
2925 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2926 /* Don't attach until we have any req'd. authentication. */
2927 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2928 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2929 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2930 /* Note: this does not necessarily succeed; there
2931 * may not any proc available
2935 rxi_ChallengeOn(acall->conn);
2940 /* A data packet has been received off the interface. This packet is
2941 * appropriate to the call (the call is in the right state, etc.). This
2942 * routine can return a packet to the caller, for re-use */
2944 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call,
2945 register struct rx_packet *np, int istack, osi_socket socket,
2946 afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2952 afs_uint32 seq, serial, flags;
2954 struct rx_packet *tnp;
2956 MUTEX_ENTER(&rx_stats_mutex);
2957 rx_stats.dataPacketsRead++;
2958 MUTEX_EXIT(&rx_stats_mutex);
2961 /* If there are no packet buffers, drop this new packet, unless we can find
2962 * packet buffers from inactive calls */
2964 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2965 MUTEX_ENTER(&rx_freePktQ_lock);
2966 rxi_NeedMorePackets = TRUE;
2967 MUTEX_EXIT(&rx_freePktQ_lock);
2968 MUTEX_ENTER(&rx_stats_mutex);
2969 rx_stats.noPacketBuffersOnRead++;
2970 MUTEX_EXIT(&rx_stats_mutex);
2971 call->rprev = np->header.serial;
2972 rxi_calltrace(RX_TRACE_DROP, call);
2973 dpf (("packet %x dropped on receipt - quota problems", np));
2975 rxi_ClearReceiveQueue(call);
2976 clock_GetTime(&when);
2977 clock_Add(&when, &rx_softAckDelay);
2978 if (!call->delayedAckEvent ||
2979 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2980 rxevent_Cancel(call->delayedAckEvent, call,
2981 RX_CALL_REFCOUNT_DELAY);
2982 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2983 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2986 /* we've damaged this call already, might as well do it in. */
2992 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2993 * packet is one of several packets transmitted as a single
2994 * datagram. Do not send any soft or hard acks until all packets
2995 * in a jumbogram have been processed. Send negative acks right away.
2997 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2998 /* tnp is non-null when there are more packets in the
2999 * current jumbo gram */
3006 seq = np->header.seq;
3007 serial = np->header.serial;
3008 flags = np->header.flags;
3010 /* If the call is in an error state, send an abort message */
3012 return rxi_SendCallAbort(call, np, istack, 0);
3014 /* The RX_JUMBO_PACKET is set in all but the last packet in each
3015 * AFS 3.5 jumbogram. */
3016 if (flags & RX_JUMBO_PACKET) {
3017 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3022 if (np->header.spare != 0) {
3023 MUTEX_ENTER(&call->conn->conn_data_lock);
3024 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3025 MUTEX_EXIT(&call->conn->conn_data_lock);
3028 /* The usual case is that this is the expected next packet */
3029 if (seq == call->rnext) {
3031 /* Check to make sure it is not a duplicate of one already queued */
3032 if (queue_IsNotEmpty(&call->rq)
3033 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3034 MUTEX_ENTER(&rx_stats_mutex);
3035 rx_stats.dupPacketsRead++;
3036 MUTEX_EXIT(&rx_stats_mutex);
3037 dpf (("packet %x dropped on receipt - duplicate", np));
3038 rxevent_Cancel(call->delayedAckEvent, call,
3039 RX_CALL_REFCOUNT_DELAY);
3040 np = rxi_SendAck(call, np, seq, serial,
3041 flags, RX_ACK_DUPLICATE, istack);
3047 /* It's the next packet. Stick it on the receive queue
3048 * for this call. Set newPackets to make sure we wake
3049 * the reader once all packets have been processed */
3050 queue_Prepend(&call->rq, np);
3052 np = NULL; /* We can't use this anymore */
3055 /* If an ack is requested then set a flag to make sure we
3056 * send an acknowledgement for this packet */
3057 if (flags & RX_REQUEST_ACK) {
3061 /* Keep track of whether we have received the last packet */
3062 if (flags & RX_LAST_PACKET) {
3063 call->flags |= RX_CALL_HAVE_LAST;
3067 /* Check whether we have all of the packets for this call */
3068 if (call->flags & RX_CALL_HAVE_LAST) {
3069 afs_uint32 tseq; /* temporary sequence number */
3070 struct rx_packet *tp; /* Temporary packet pointer */
3071 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3073 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3074 if (tseq != tp->header.seq)
3076 if (tp->header.flags & RX_LAST_PACKET) {
3077 call->flags |= RX_CALL_RECEIVE_DONE;
3084 /* Provide asynchronous notification for those who want it
3085 * (e.g. multi rx) */
3086 if (call->arrivalProc) {
3087 (*call->arrivalProc)(call, call->arrivalProcHandle,
3088 (int) call->arrivalProcArg);
3089 call->arrivalProc = (VOID (*)()) 0;
3092 /* Update last packet received */
3095 /* If there is no server process serving this call, grab
3096 * one, if available. We only need to do this once. If a
3097 * server thread is available, this thread becomes a server
3098 * thread and the server thread becomes a listener thread. */
3100 TryAttach(call, socket, tnop, newcallp, 0);
3103 /* This is not the expected next packet. */
3105 /* Determine whether this is a new or old packet, and if it's
3106 * a new one, whether it fits into the current receive window.
3107 * Also figure out whether the packet was delivered in sequence.
3108 * We use the prev variable to determine whether the new packet
3109 * is the successor of its immediate predecessor in the
3110 * receive queue, and the missing flag to determine whether
3111 * any of this packets predecessors are missing. */
3113 afs_uint32 prev; /* "Previous packet" sequence number */
3114 struct rx_packet *tp; /* Temporary packet pointer */
3115 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3116 int missing; /* Are any predecessors missing? */
3118 /* If the new packet's sequence number has been sent to the
3119 * application already, then this is a duplicate */
3120 if (seq < call->rnext) {
3121 MUTEX_ENTER(&rx_stats_mutex);
3122 rx_stats.dupPacketsRead++;
3123 MUTEX_EXIT(&rx_stats_mutex);
3124 rxevent_Cancel(call->delayedAckEvent, call,
3125 RX_CALL_REFCOUNT_DELAY);
3126 np = rxi_SendAck(call, np, seq, serial,
3127 flags, RX_ACK_DUPLICATE, istack);
3133 /* If the sequence number is greater than what can be
3134 * accomodated by the current window, then send a negative
3135 * acknowledge and drop the packet */
3136 if ((call->rnext + call->rwind) <= seq) {
3137 rxevent_Cancel(call->delayedAckEvent, call,
3138 RX_CALL_REFCOUNT_DELAY);
3139 np = rxi_SendAck(call, np, seq, serial,
3140 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3146 /* Look for the packet in the queue of old received packets */
3147 for (prev = call->rnext - 1, missing = 0,
3148 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3149 /*Check for duplicate packet */
3150 if (seq == tp->header.seq) {
3151 MUTEX_ENTER(&rx_stats_mutex);
3152 rx_stats.dupPacketsRead++;
3153 MUTEX_EXIT(&rx_stats_mutex);
3154 rxevent_Cancel(call->delayedAckEvent, call,
3155 RX_CALL_REFCOUNT_DELAY);
3156 np = rxi_SendAck(call, np, seq, serial,
3157 flags, RX_ACK_DUPLICATE, istack);
3162 /* If we find a higher sequence packet, break out and
3163 * insert the new packet here. */
3164 if (seq < tp->header.seq) break;
3165 /* Check for missing packet */
3166 if (tp->header.seq != prev+1) {
3170 prev = tp->header.seq;
3173 /* Keep track of whether we have received the last packet. */
3174 if (flags & RX_LAST_PACKET) {
3175 call->flags |= RX_CALL_HAVE_LAST;
3178 /* It's within the window: add it to the the receive queue.
3179 * tp is left by the previous loop either pointing at the
3180 * packet before which to insert the new packet, or at the
3181 * queue head if the queue is empty or the packet should be
3183 queue_InsertBefore(tp, np);
3187 /* Check whether we have all of the packets for this call */
3188 if ((call->flags & RX_CALL_HAVE_LAST)
3189 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3190 afs_uint32 tseq; /* temporary sequence number */
3192 for (tseq = call->rnext,
3193 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3194 if (tseq != tp->header.seq)
3196 if (tp->header.flags & RX_LAST_PACKET) {
3197 call->flags |= RX_CALL_RECEIVE_DONE;
3204 /* We need to send an ack of the packet is out of sequence,
3205 * or if an ack was requested by the peer. */
3206 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3210 /* Acknowledge the last packet for each call */
3211 if (flags & RX_LAST_PACKET) {
3222 * If the receiver is waiting for an iovec, fill the iovec
3223 * using the data from the receive queue */
3224 if (call->flags & RX_CALL_IOVEC_WAIT) {
3225 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3226 /* the call may have been aborted */
3235 /* Wakeup the reader if any */
3236 if ((call->flags & RX_CALL_READER_WAIT) &&
3237 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3238 (call->iovNext >= call->iovMax) ||
3239 (call->flags & RX_CALL_RECEIVE_DONE))) {
3240 call->flags &= ~RX_CALL_READER_WAIT;
3241 #ifdef RX_ENABLE_LOCKS
3242 CV_BROADCAST(&call->cv_rq);
3244 osi_rxWakeup(&call->rq);
3250 * Send an ack when requested by the peer, or once every
3251 * rxi_SoftAckRate packets until the last packet has been
3252 * received. Always send a soft ack for the last packet in
3253 * the server's reply. */
3255 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3256 np = rxi_SendAck(call, np, seq, serial, flags,
3257 RX_ACK_REQUESTED, istack);
3258 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3259 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3260 np = rxi_SendAck(call, np, seq, serial, flags,
3261 RX_ACK_IDLE, istack);
3262 } else if (call->nSoftAcks) {
3263 clock_GetTime(&when);
3264 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3265 clock_Add(&when, &rx_lastAckDelay);
3267 clock_Add(&when, &rx_softAckDelay);
3269 if (!call->delayedAckEvent ||
3270 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3271 rxevent_Cancel(call->delayedAckEvent, call,
3272 RX_CALL_REFCOUNT_DELAY);
3273 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3274 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3277 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3278 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3285 static void rxi_ComputeRate();
3288 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3290 struct rx_peer *peer = conn->peer;
3292 MUTEX_ENTER(&peer->peer_lock);
3293 peer->lastReachTime = clock_Sec();
3294 MUTEX_EXIT(&peer->peer_lock);
3296 MUTEX_ENTER(&conn->conn_data_lock);
3297 if (conn->flags & RX_CONN_ATTACHWAIT) {
3300 conn->flags &= ~RX_CONN_ATTACHWAIT;
3301 MUTEX_EXIT(&conn->conn_data_lock);
3303 for (i=0; i<RX_MAXCALLS; i++) {
3304 struct rx_call *call = conn->call[i];
3306 if (call != acall) MUTEX_ENTER(&call->lock);
3307 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3308 if (call != acall) MUTEX_EXIT(&call->lock);
3312 MUTEX_EXIT(&conn->conn_data_lock);
3315 /* The real smarts of the whole thing. */
3316 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call,
3317 struct rx_packet *np, int istack)
3319 struct rx_ackPacket *ap;
3321 register struct rx_packet *tp;
3322 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3323 register struct rx_connection *conn = call->conn;
3324 struct rx_peer *peer = conn->peer;
3327 /* because there are CM's that are bogus, sending weird values for this. */
3328 afs_uint32 skew = 0;
3333 int newAckCount = 0;
3334 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3335 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3337 MUTEX_ENTER(&rx_stats_mutex);
3338 rx_stats.ackPacketsRead++;
3339 MUTEX_EXIT(&rx_stats_mutex);
3340 ap = (struct rx_ackPacket *) rx_DataOf(np);
3341 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3343 return np; /* truncated ack packet */
3345 /* depends on ack packet struct */
3346 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3347 first = ntohl(ap->firstPacket);
3348 serial = ntohl(ap->serial);
3349 /* temporarily disabled -- needs to degrade over time
3350 skew = ntohs(ap->maxSkew); */
3352 /* Ignore ack packets received out of order */
3353 if (first < call->tfirst) {
3357 if (np->header.flags & RX_SLOW_START_OK) {
3358 call->flags |= RX_CALL_SLOW_START_OK;
3361 if (ap->reason == RX_ACK_PING_RESPONSE)
3362 rxi_UpdatePeerReach(conn, call);
3367 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3368 ap->reason, ntohl(ap->previousPacket),
3369 (unsigned int) np->header.seq, (unsigned int) serial,
3370 (unsigned int) skew, ntohl(ap->firstPacket));
3373 for (offset = 0; offset < nAcks; offset++)
3374 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3380 /* if a server connection has been re-created, it doesn't remember what
3381 serial # it was up to. An ack will tell us, since the serial field
3382 contains the largest serial received by the other side */
3383 MUTEX_ENTER(&conn->conn_data_lock);
3384 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3385 conn->serial = serial+1;
3387 MUTEX_EXIT(&conn->conn_data_lock);
3389 /* Update the outgoing packet skew value to the latest value of
3390 * the peer's incoming packet skew value. The ack packet, of
3391 * course, could arrive out of order, but that won't affect things
3393 MUTEX_ENTER(&peer->peer_lock);
3394 peer->outPacketSkew = skew;
3396 /* Check for packets that no longer need to be transmitted, and
3397 * discard them. This only applies to packets positively
3398 * acknowledged as having been sent to the peer's upper level.
3399 * All other packets must be retained. So only packets with
3400 * sequence numbers < ap->firstPacket are candidates. */
3401 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3402 if (tp->header.seq >= first) break;
3403 call->tfirst = tp->header.seq + 1;
3404 if (tp->header.serial == serial) {
3405 /* Use RTT if not delayed by client. */
3406 if (ap->reason != RX_ACK_DELAY)
3407 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3409 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3412 else if (tp->firstSerial == serial) {
3413 /* Use RTT if not delayed by client. */
3414 if (ap->reason != RX_ACK_DELAY)
3415 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3417 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3420 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3421 /* XXX Hack. Because we have to release the global rx lock when sending
3422 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3423 * in rxi_Start sending packets out because packets may move to the
3424 * freePacketQueue as result of being here! So we drop these packets until
3425 * we're safely out of the traversing. Really ugly!
3426 * To make it even uglier, if we're using fine grain locking, we can
3427 * set the ack bits in the packets and have rxi_Start remove the packets
3428 * when it's done transmitting.
3430 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3433 if (call->flags & RX_CALL_TQ_BUSY) {
3434 #ifdef RX_ENABLE_LOCKS
3435 tp->flags |= RX_PKTFLAG_ACKED;
3436 call->flags |= RX_CALL_TQ_SOME_ACKED;
3437 #else /* RX_ENABLE_LOCKS */
3439 #endif /* RX_ENABLE_LOCKS */
3441 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3444 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3449 /* Give rate detector a chance to respond to ping requests */
3450 if (ap->reason == RX_ACK_PING_RESPONSE) {
3451 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3455 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3457 /* Now go through explicit acks/nacks and record the results in
3458 * the waiting packets. These are packets that can't be released
3459 * yet, even with a positive acknowledge. This positive
3460 * acknowledge only means the packet has been received by the
3461 * peer, not that it will be retained long enough to be sent to
3462 * the peer's upper level. In addition, reset the transmit timers
3463 * of any missing packets (those packets that must be missing
3464 * because this packet was out of sequence) */
3466 call->nSoftAcked = 0;
3467 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3468 /* Update round trip time if the ack was stimulated on receipt
3470 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3471 #ifdef RX_ENABLE_LOCKS
3472 if (tp->header.seq >= first) {
3473 #endif /* RX_ENABLE_LOCKS */
3474 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3475 if (tp->header.serial == serial) {
3476 /* Use RTT if not delayed by client. */
3477 if (ap->reason != RX_ACK_DELAY)
3478 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3480 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3483 else if ((tp->firstSerial == serial)) {
3484 /* Use RTT if not delayed by client. */
3485 if (ap->reason != RX_ACK_DELAY)
3486 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3488 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3491 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3492 #ifdef RX_ENABLE_LOCKS
3494 #endif /* RX_ENABLE_LOCKS */
3495 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3497 /* Set the acknowledge flag per packet based on the
3498 * information in the ack packet. An acknowlegded packet can
3499 * be downgraded when the server has discarded a packet it
3500 * soacked previously, or when an ack packet is received
3501 * out of sequence. */
3502 if (tp->header.seq < first) {
3503 /* Implicit ack information */
3504 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3507 tp->flags |= RX_PKTFLAG_ACKED;
3509 else if (tp->header.seq < first + nAcks) {
3510 /* Explicit ack information: set it in the packet appropriately */
3511 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3512 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3514 tp->flags |= RX_PKTFLAG_ACKED;
3522 tp->flags &= ~RX_PKTFLAG_ACKED;
3527 tp->flags &= ~RX_PKTFLAG_ACKED;
3531 /* If packet isn't yet acked, and it has been transmitted at least
3532 * once, reset retransmit time using latest timeout
3533 * ie, this should readjust the retransmit timer for all outstanding
3534 * packets... So we don't just retransmit when we should know better*/
3536 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3537 tp->retryTime = tp->timeSent;
3538 clock_Add(&tp->retryTime, &peer->timeout);
3539 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3540 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3544 /* If the window has been extended by this acknowledge packet,
3545 * then wakeup a sender waiting in alloc for window space, or try
3546 * sending packets now, if he's been sitting on packets due to
3547 * lack of window space */
3548 if (call->tnext < (call->tfirst + call->twind)) {
3549 #ifdef RX_ENABLE_LOCKS
3550 CV_SIGNAL(&call->cv_twind);
3552 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3553 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3554 osi_rxWakeup(&call->twind);
3557 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3558 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3562 /* if the ack packet has a receivelen field hanging off it,
3563 * update our state */
3564 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3567 /* If the ack packet has a "recommended" size that is less than
3568 * what I am using now, reduce my size to match */
3569 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3570 sizeof(afs_int32), &tSize);
3571 tSize = (afs_uint32) ntohl(tSize);
3572 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3574 /* Get the maximum packet size to send to this peer */
3575 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3577 tSize = (afs_uint32)ntohl(tSize);
3578 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3579 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3581 /* sanity check - peer might have restarted with different params.
3582 * If peer says "send less", dammit, send less... Peer should never
3583 * be unable to accept packets of the size that prior AFS versions would
3584 * send without asking. */
3585 if (peer->maxMTU != tSize) {
3586 peer->maxMTU = tSize;
3587 peer->MTU = MIN(tSize, peer->MTU);
3588 call->MTU = MIN(call->MTU, tSize);
3592 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3594 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3595 sizeof(afs_int32), &tSize);
3596 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3597 if (tSize < call->twind) { /* smaller than our send */
3598 call->twind = tSize; /* window, we must send less... */
3599 call->ssthresh = MIN(call->twind, call->ssthresh);
3602 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3603 * network MTU confused with the loopback MTU. Calculate the
3604 * maximum MTU here for use in the slow start code below.
3606 maxMTU = peer->maxMTU;
3607 /* Did peer restart with older RX version? */
3608 if (peer->maxDgramPackets > 1) {
3609 peer->maxDgramPackets = 1;
3611 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3613 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3614 sizeof(afs_int32), &tSize);
3615 tSize = (afs_uint32) ntohl(tSize);
3617 * As of AFS 3.5 we set the send window to match the receive window.
3619 if (tSize < call->twind) {
3620 call->twind = tSize;
3621 call->ssthresh = MIN(call->twind, call->ssthresh);
3622 } else if (tSize > call->twind) {
3623 call->twind = tSize;
3627 * As of AFS 3.5, a jumbogram is more than one fixed size
3628 * packet transmitted in a single UDP datagram. If the remote
3629 * MTU is smaller than our local MTU then never send a datagram
3630 * larger than the natural MTU.
3632 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3633 sizeof(afs_int32), &tSize);
3634 maxDgramPackets = (afs_uint32) ntohl(tSize);
3635 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3636 maxDgramPackets = MIN(maxDgramPackets,
3637 (int)(peer->ifDgramPackets));
3638 maxDgramPackets = MIN(maxDgramPackets, tSize);
3639 if (maxDgramPackets > 1) {
3640 peer->maxDgramPackets = maxDgramPackets;
3641 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3643 peer->maxDgramPackets = 1;
3644 call->MTU = peer->natMTU;
3646 } else if (peer->maxDgramPackets > 1) {
3647 /* Restarted with lower version of RX */
3648 peer->maxDgramPackets = 1;
3650 } else if (peer->maxDgramPackets > 1 ||
3651 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3652 /* Restarted with lower version of RX */
3653 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3654 peer->natMTU = OLD_MAX_PACKET_SIZE;
3655 peer->MTU = OLD_MAX_PACKET_SIZE;
3656 peer->maxDgramPackets = 1;
3657 peer->nDgramPackets = 1;
3659 call->MTU = OLD_MAX_PACKET_SIZE;
3664 * Calculate how many datagrams were successfully received after
3665 * the first missing packet and adjust the negative ack counter
3670 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3671 if (call->nNacks < nNacked) {
3672 call->nNacks = nNacked;
3681 if (call->flags & RX_CALL_FAST_RECOVER) {
3683 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3685 call->flags &= ~RX_CALL_FAST_RECOVER;
3686 call->cwind = call->nextCwind;
3687 call->nextCwind = 0;
3690 call->nCwindAcks = 0;
3692 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3693 /* Three negative acks in a row trigger congestion recovery */
3694 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3695 MUTEX_EXIT(&peer->peer_lock);
3696 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3697 /* someone else is waiting to start recovery */
3700 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3701 while (call->flags & RX_CALL_TQ_BUSY) {
3702 call->flags |= RX_CALL_TQ_WAIT;
3703 #ifdef RX_ENABLE_LOCKS
3704 CV_WAIT(&call->cv_tq, &call->lock);
3705 #else /* RX_ENABLE_LOCKS */
3706 osi_rxSleep(&call->tq);
3707 #endif /* RX_ENABLE_LOCKS */
3709 MUTEX_ENTER(&peer->peer_lock);
3710 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3711 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3712 call->flags |= RX_CALL_FAST_RECOVER;
3713 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3714 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3716 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3717 call->nextCwind = call->ssthresh;
3720 peer->MTU = call->MTU;
3721 peer->cwind = call->nextCwind;
3722 peer->nDgramPackets = call->nDgramPackets;
3724 call->congestSeq = peer->congestSeq;
3725 /* Reset the resend times on the packets that were nacked
3726 * so we will retransmit as soon as the window permits*/
3727 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3729 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3730 clock_Zero(&tp->retryTime);
3732 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3737 /* If cwind is smaller than ssthresh, then increase
3738 * the window one packet for each ack we receive (exponential
3740 * If cwind is greater than or equal to ssthresh then increase
3741 * the congestion window by one packet for each cwind acks we
3742 * receive (linear growth). */
3743 if (call->cwind < call->ssthresh) {
3744 call->cwind = MIN((int)call->ssthresh,
3745 (int)(call->cwind + newAckCount));
3746 call->nCwindAcks = 0;
3748 call->nCwindAcks += newAckCount;
3749 if (call->nCwindAcks >= call->cwind) {
3750 call->nCwindAcks = 0;
3751 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3755 * If we have received several acknowledgements in a row then
3756 * it is time to increase the size of our datagrams
3758 if ((int)call->nAcks > rx_nDgramThreshold) {
3759 if (peer->maxDgramPackets > 1) {
3760 if (call->nDgramPackets < peer->maxDgramPackets) {
3761 call->nDgramPackets++;
3763 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3764 } else if (call->MTU < peer->maxMTU) {
3765 call->MTU += peer->natMTU;
3766 call->MTU = MIN(call->MTU, peer->maxMTU);
3772 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3774 /* Servers need to hold the call until all response packets have
3775 * been acknowledged. Soft acks are good enough since clients
3776 * are not allowed to clear their receive queues. */
3777 if (call->state == RX_STATE_HOLD &&
3778 call->tfirst + call->nSoftAcked >= call->tnext) {
3779 call->state = RX_STATE_DALLY;
3780 rxi_ClearTransmitQueue(call, 0);
3781 } else if (!queue_IsEmpty(&call->tq)) {
3782 rxi_Start(0, call, istack);
3787 /* Received a response to a challenge packet */
3788 struct rx_packet *rxi_ReceiveResponsePacket(register struct rx_connection *conn,
3789 register struct rx_packet *np, int istack)
3793 /* Ignore the packet if we're the client */
3794 if (conn->type == RX_CLIENT_CONNECTION) return np;
3796 /* If already authenticated, ignore the packet (it's probably a retry) */
3797 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3800 /* Otherwise, have the security object evaluate the response packet */
3801 error = RXS_CheckResponse(conn->securityObject, conn, np);
3803 /* If the response is invalid, reset the connection, sending
3804 * an abort to the peer */
3808 rxi_ConnectionError(conn, error);
3809 MUTEX_ENTER(&conn->conn_data_lock);
3810 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3811 MUTEX_EXIT(&conn->conn_data_lock);
3815 /* If the response is valid, any calls waiting to attach
3816 * servers can now do so */
3819 for (i=0; i<RX_MAXCALLS; i++) {
3820 struct rx_call *call = conn->call[i];
3822 MUTEX_ENTER(&call->lock);
3823 if (call->state == RX_STATE_PRECALL)
3824 rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
3825 MUTEX_EXIT(&call->lock);
3829 /* Update the peer reachability information, just in case
3830 * some calls went into attach-wait while we were waiting
3831 * for authentication..
3833 rxi_UpdatePeerReach(conn, NULL);
3838 /* A client has received an authentication challenge: the security
3839 * object is asked to cough up a respectable response packet to send
3840 * back to the server. The server is responsible for retrying the
3841 * challenge if it fails to get a response. */
3843 struct rx_packet *rxi_ReceiveChallengePacket(register struct rx_connection *conn,
3844 register struct rx_packet *np, int istack)
3848 /* Ignore the challenge if we're the server */
3849 if (conn->type == RX_SERVER_CONNECTION) return np;
3851 /* Ignore the challenge if the connection is otherwise idle; someone's
3852 * trying to use us as an oracle. */
3853 if (!rxi_HasActiveCalls(conn)) return np;
3855 /* Send the security object the challenge packet. It is expected to fill
3856 * in the response. */
3857 error = RXS_GetResponse(conn->securityObject, conn, np);
3859 /* If the security object is unable to return a valid response, reset the
3860 * connection and send an abort to the peer. Otherwise send the response
3861 * packet to the peer connection. */
3863 rxi_ConnectionError(conn, error);
3864 MUTEX_ENTER(&conn->conn_data_lock);
3865 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3866 MUTEX_EXIT(&conn->conn_data_lock);
3869 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3870 RX_PACKET_TYPE_RESPONSE, NULL, -1, istack);
3876 /* Find an available server process to service the current request in
3877 * the given call structure. If one isn't available, queue up this
3878 * call so it eventually gets one */
3879 void rxi_AttachServerProc(register struct rx_call *call,
3880 register osi_socket socket, register int *tnop, register struct rx_call **newcallp)
3882 register struct rx_serverQueueEntry *sq;
3883 register struct rx_service *service = call->conn->service;
3884 #ifdef RX_ENABLE_LOCKS
3885 register int haveQuota = 0;
3886 #endif /* RX_ENABLE_LOCKS */
3887 /* May already be attached */
3888 if (call->state == RX_STATE_ACTIVE) return;
3890 MUTEX_ENTER(&rx_serverPool_lock);
3891 #ifdef RX_ENABLE_LOCKS
3892 while(rxi_ServerThreadSelectingCall) {
3893 MUTEX_EXIT(&call->lock);
3894 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3895 MUTEX_EXIT(&rx_serverPool_lock);
3896 MUTEX_ENTER(&call->lock);
3897 MUTEX_ENTER(&rx_serverPool_lock);
3898 /* Call may have been attached */
3899 if (call->state == RX_STATE_ACTIVE) return;
3902 haveQuota = QuotaOK(service);
3903 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3904 /* If there are no processes available to service this call,
3905 * put the call on the incoming call queue (unless it's
3906 * already on the queue).
3909 ReturnToServerPool(service);
3910 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3911 call->flags |= RX_CALL_WAIT_PROC;
3912 MUTEX_ENTER(&rx_stats_mutex);
3914 MUTEX_EXIT(&rx_stats_mutex);
3915 rxi_calltrace(RX_CALL_ARRIVAL, call);
3916 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3917 queue_Append(&rx_incomingCallQueue, call);
3920 #else /* RX_ENABLE_LOCKS */
3921 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3922 /* If there are no processes available to service this call,
3923 * put the call on the incoming call queue (unless it's
3924 * already on the queue).
3926 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3927 call->flags |= RX_CALL_WAIT_PROC;
3929 rxi_calltrace(RX_CALL_ARRIVAL, call);
3930 queue_Append(&rx_incomingCallQueue, call);
3933 #endif /* RX_ENABLE_LOCKS */
3935 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3937 /* If hot threads are enabled, and both newcallp and sq->socketp
3938 * are non-null, then this thread will process the call, and the
3939 * idle server thread will start listening on this threads socket.
3942 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3945 *sq->socketp = socket;
3946 clock_GetTime(&call->startTime);
3947 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3951 if (call->flags & RX_CALL_WAIT_PROC) {
3952 /* Conservative: I don't think this should happen */
3953 call->flags &= ~RX_CALL_WAIT_PROC;
3954 MUTEX_ENTER(&rx_stats_mutex);
3956 MUTEX_EXIT(&rx_stats_mutex);
3959 call->state = RX_STATE_ACTIVE;
3960 call->mode = RX_MODE_RECEIVING;
3961 if (call->flags & RX_CALL_CLEARED) {
3962 /* send an ack now to start the packet flow up again */
3963 call->flags &= ~RX_CALL_CLEARED;
3964 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3966 #ifdef RX_ENABLE_LOCKS
3969 service->nRequestsRunning++;
3970 if (service->nRequestsRunning <= service->minProcs)
3976 MUTEX_EXIT(&rx_serverPool_lock);
3979 /* Delay the sending of an acknowledge event for a short while, while
3980 * a new call is being prepared (in the case of a client) or a reply
3981 * is being prepared (in the case of a server). Rather than sending
3982 * an ack packet, an ACKALL packet is sent. */
3983 void rxi_AckAll(struct rxevent *event, register struct rx_call *call, char *dummy)
3985 #ifdef RX_ENABLE_LOCKS
3987 MUTEX_ENTER(&call->lock);
3988 call->delayedAckEvent = NULL;
3989 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3991 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3992 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3994 MUTEX_EXIT(&call->lock);
3995 #else /* RX_ENABLE_LOCKS */
3996 if (event) call->delayedAckEvent = NULL;
3997 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3998 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3999 #endif /* RX_ENABLE_LOCKS */
4002 void rxi_SendDelayedAck(struct rxevent *event, register struct rx_call *call, char *dummy)
4004 #ifdef RX_ENABLE_LOCKS
4006 MUTEX_ENTER(&call->lock);
4007 if (event == call->delayedAckEvent)
4008 call->delayedAckEvent = NULL;
4009 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
4011 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4013 MUTEX_EXIT(&call->lock);
4014 #else /* RX_ENABLE_LOCKS */
4015 if (event) call->delayedAckEvent = NULL;
4016 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4017 #endif /* RX_ENABLE_LOCKS */
4021 #ifdef RX_ENABLE_LOCKS
4022 /* Set ack in all packets in transmit queue. rxi_Start will deal with
4023 * clearing them out.
4025 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call)
4027 register struct rx_packet *p, *tp;
4030 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4033 p->flags |= RX_PKTFLAG_ACKED;
4037 call->flags |= RX_CALL_TQ_CLEARME;
4038 call->flags |= RX_CALL_TQ_SOME_ACKED;
4041 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4042 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4043 call->tfirst = call->tnext;
4044 call->nSoftAcked = 0;
4046 if (call->flags & RX_CALL_FAST_RECOVER) {
4047 call->flags &= ~RX_CALL_FAST_RECOVER;
4048 call->cwind = call->nextCwind;
4049 call->nextCwind = 0;
4052 CV_SIGNAL(&call->cv_twind);
4054 #endif /* RX_ENABLE_LOCKS */
4056 /* Clear out the transmit queue for the current call (all packets have
4057 * been received by peer) */
4058 void rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
4060 register struct rx_packet *p, *tp;
4062 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4063 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
4065 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4068 p->flags |= RX_PKTFLAG_ACKED;
4072 call->flags |= RX_CALL_TQ_CLEARME;
4073 call->flags |= RX_CALL_TQ_SOME_ACKED;
4076 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4077 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4083 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4084 call->flags &= ~RX_CALL_TQ_CLEARME;
4086 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4088 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4089 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4090 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4091 call->nSoftAcked = 0;
4093 if (call->flags & RX_CALL_FAST_RECOVER) {
4094 call->flags &= ~RX_CALL_FAST_RECOVER;
4095 call->cwind = call->nextCwind;
4098 #ifdef RX_ENABLE_LOCKS
4099 CV_SIGNAL(&call->cv_twind);
4101 osi_rxWakeup(&call->twind);
4105 void rxi_ClearReceiveQueue(register struct rx_call *call)
4107 register struct rx_packet *p, *tp;
4108 if (queue_IsNotEmpty(&call->rq)) {
4109 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4114 rx_packetReclaims++;
4116 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4118 if (call->state == RX_STATE_PRECALL) {
4119 call->flags |= RX_CALL_CLEARED;
4123 /* Send an abort packet for the specified call */
4124 struct rx_packet *rxi_SendCallAbort(register struct rx_call *call,
4125 struct rx_packet *packet, int istack, int force)
4133 /* Clients should never delay abort messages */
4134 if (rx_IsClientConn(call->conn))
4137 if (call->abortCode != call->error) {
4138 call->abortCode = call->error;
4139 call->abortCount = 0;
4142 if (force || rxi_callAbortThreshhold == 0 ||
4143 call->abortCount < rxi_callAbortThreshhold) {
4144 if (call->delayedAbortEvent) {
4145 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4147 error = htonl(call->error);
4149 packet = rxi_SendSpecial(call, call->conn, packet,
4150 RX_PACKET_TYPE_ABORT, (char *)&error,
4151 sizeof(error), istack);
4152 } else if (!call->delayedAbortEvent) {
4153 clock_GetTime(&when);
4154 clock_Addmsec(&when, rxi_callAbortDelay);
4155 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4156 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4162 /* Send an abort packet for the specified connection. Packet is an
4163 * optional pointer to a packet that can be used to send the abort.
4164 * Once the number of abort messages reaches the threshhold, an
4165 * event is scheduled to send the abort. Setting the force flag
4166 * overrides sending delayed abort messages.
4168 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4169 * to send the abort packet.
4171 struct rx_packet *rxi_SendConnectionAbort(register struct rx_connection *conn,
4172 struct rx_packet *packet, int istack, int force)
4180 /* Clients should never delay abort messages */
4181 if (rx_IsClientConn(conn))
4184 if (force || rxi_connAbortThreshhold == 0 ||
4185 conn->abortCount < rxi_connAbortThreshhold) {
4186 if (conn->delayedAbortEvent) {
4187 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4189 error = htonl(conn->error);
4191 MUTEX_EXIT(&conn->conn_data_lock);
4192 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4193 RX_PACKET_TYPE_ABORT, (char *)&error,
4194 sizeof(error), istack);
4195 MUTEX_ENTER(&conn->conn_data_lock);
4196 } else if (!conn->delayedAbortEvent) {
4197 clock_GetTime(&when);
4198 clock_Addmsec(&when, rxi_connAbortDelay);
4199 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4205 /* Associate an error all of the calls owned by a connection. Called
4206 * with error non-zero. This is only for really fatal things, like
4207 * bad authentication responses. The connection itself is set in
4208 * error at this point, so that future packets received will be
4210 void rxi_ConnectionError(register struct rx_connection *conn,
4211 register afs_int32 error)
4215 MUTEX_ENTER(&conn->conn_data_lock);
4216 if (conn->challengeEvent)
4217 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4218 if (conn->checkReachEvent) {
4219 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4220 conn->checkReachEvent = 0;
4221 conn->flags &= ~RX_CONN_ATTACHWAIT;
4224 MUTEX_EXIT(&conn->conn_data_lock);
4225 for (i=0; i<RX_MAXCALLS; i++) {
4226 struct rx_call *call = conn->call[i];
4228 MUTEX_ENTER(&call->lock);
4229 rxi_CallError(call, error);
4230 MUTEX_EXIT(&call->lock);
4233 conn->error = error;
4234 MUTEX_ENTER(&rx_stats_mutex);
4235 rx_stats.fatalErrors++;
4236 MUTEX_EXIT(&rx_stats_mutex);
4240 void rxi_CallError(register struct rx_call *call, afs_int32 error)
4242 if (call->error) error = call->error;
4243 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4244 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4245 rxi_ResetCall(call, 0);
4248 rxi_ResetCall(call, 0);
4250 call->error = error;
4251 call->mode = RX_MODE_ERROR;
4254 /* Reset various fields in a call structure, and wakeup waiting
4255 * processes. Some fields aren't changed: state & mode are not
4256 * touched (these must be set by the caller), and bufptr, nLeft, and
4257 * nFree are not reset, since these fields are manipulated by
4258 * unprotected macros, and may only be reset by non-interrupting code.
4261 /* this code requires that call->conn be set properly as a pre-condition. */
4262 #endif /* ADAPT_WINDOW */
4264 void rxi_ResetCall(register struct rx_call *call, register int newcall)
4267 register struct rx_peer *peer;
4268 struct rx_packet *packet;
4270 /* Notify anyone who is waiting for asynchronous packet arrival */
4271 if (call->arrivalProc) {
4272 (*call->arrivalProc)(call, call->arrivalProcHandle, (int) call->arrivalProcArg);
4273 call->arrivalProc = (VOID (*)()) 0;
4276 if (call->delayedAbortEvent) {
4277 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4278 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4280 rxi_SendCallAbort(call, packet, 0, 1);
4281 rxi_FreePacket(packet);
4286 * Update the peer with the congestion information in this call
4287 * so other calls on this connection can pick up where this call
4288 * left off. If the congestion sequence numbers don't match then
4289 * another call experienced a retransmission.
4291 peer = call->conn->peer;
4292 MUTEX_ENTER(&peer->peer_lock);
4294 if (call->congestSeq == peer->congestSeq) {
4295 peer->cwind = MAX(peer->cwind, call->cwind);
4296 peer->MTU = MAX(peer->MTU, call->MTU);
4297 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4300 call->abortCode = 0;
4301 call->abortCount = 0;
4303 if (peer->maxDgramPackets > 1) {
4304 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4306 call->MTU = peer->MTU;
4308 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4309 call->ssthresh = rx_maxSendWindow;
4310 call->nDgramPackets = peer->nDgramPackets;
4311 call->congestSeq = peer->congestSeq;
4312 MUTEX_EXIT(&peer->peer_lock);
4314 flags = call->flags;
4315 rxi_ClearReceiveQueue(call);
4316 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4317 if (call->flags & RX_CALL_TQ_BUSY) {
4318 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4319 call->flags |= (flags & RX_CALL_TQ_WAIT);
4321 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4323 rxi_ClearTransmitQueue(call, 0);
4324 queue_Init(&call->tq);
4327 queue_Init(&call->rq);
4329 call->rwind = rx_initReceiveWindow;
4330 call->twind = rx_initSendWindow;
4331 call->nSoftAcked = 0;
4332 call->nextCwind = 0;
4335 call->nCwindAcks = 0;
4336 call->nSoftAcks = 0;
4337 call->nHardAcks = 0;
4339 call->tfirst = call->rnext = call->tnext = 1;
4341 call->lastAcked = 0;
4342 call->localStatus = call->remoteStatus = 0;
4344 if (flags & RX_CALL_READER_WAIT) {
4345 #ifdef RX_ENABLE_LOCKS
4346 CV_BROADCAST(&call->cv_rq);
4348 osi_rxWakeup(&call->rq);
4351 if (flags & RX_CALL_WAIT_PACKETS) {
4352 MUTEX_ENTER(&rx_freePktQ_lock);
4353 rxi_PacketsUnWait(); /* XXX */
4354 MUTEX_EXIT(&rx_freePktQ_lock);
4357 #ifdef RX_ENABLE_LOCKS
4358 CV_SIGNAL(&call->cv_twind);
4360 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4361 osi_rxWakeup(&call->twind);
4364 #ifdef RX_ENABLE_LOCKS
4365 /* The following ensures that we don't mess with any queue while some
4366 * other thread might also be doing so. The call_queue_lock field is
4367 * is only modified under the call lock. If the call is in the process
4368 * of being removed from a queue, the call is not locked until the
4369 * the queue lock is dropped and only then is the call_queue_lock field
4370 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4371 * Note that any other routine which removes a call from a queue has to
4372 * obtain the queue lock before examing the queue and removing the call.
4374 if (call->call_queue_lock) {
4375 MUTEX_ENTER(call->call_queue_lock);
4376 if (queue_IsOnQueue(call)) {
4378 if (flags & RX_CALL_WAIT_PROC) {
4379 MUTEX_ENTER(&rx_stats_mutex);
4381 MUTEX_EXIT(&rx_stats_mutex);
4384 MUTEX_EXIT(call->call_queue_lock);
4385 CLEAR_CALL_QUEUE_LOCK(call);
4387 #else /* RX_ENABLE_LOCKS */
4388 if (queue_IsOnQueue(call)) {
4390 if (flags & RX_CALL_WAIT_PROC)
4393 #endif /* RX_ENABLE_LOCKS */
4395 rxi_KeepAliveOff(call);
4396 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4399 /* Send an acknowledge for the indicated packet (seq,serial) of the
4400 * indicated call, for the indicated reason (reason). This
4401 * acknowledge will specifically acknowledge receiving the packet, and
4402 * will also specify which other packets for this call have been
4403 * received. This routine returns the packet that was used to the
4404 * caller. The caller is responsible for freeing it or re-using it.
4405 * This acknowledgement also returns the highest sequence number
4406 * actually read out by the higher level to the sender; the sender
4407 * promises to keep around packets that have not been read by the
4408 * higher level yet (unless, of course, the sender decides to abort
4409 * the call altogether). Any of p, seq, serial, pflags, or reason may
4410 * be set to zero without ill effect. That is, if they are zero, they
4411 * will not convey any information.
4412 * NOW there is a trailer field, after the ack where it will safely be
4413 * ignored by mundanes, which indicates the maximum size packet this
4414 * host can swallow. */
4416 register struct rx_packet *optionalPacket; use to send ack (or null)
4417 int seq; Sequence number of the packet we are acking
4418 int serial; Serial number of the packet
4419 int pflags; Flags field from packet header
4420 int reason; Reason an acknowledge was prompted
4423 struct rx_packet *rxi_SendAck(register struct rx_call *call,
4424 register struct rx_packet *optionalPacket, int seq, int serial,
4425 int pflags, int reason, int istack)
4427 struct rx_ackPacket *ap;
4428 register struct rx_packet *rqp;
4429 register struct rx_packet *nxp; /* For queue_Scan */
4430 register struct rx_packet *p;
4435 * Open the receive window once a thread starts reading packets
4437 if (call->rnext > 1) {
4438 call->rwind = rx_maxReceiveWindow;
4441 call->nHardAcks = 0;
4442 call->nSoftAcks = 0;
4443 if (call->rnext > call->lastAcked)
4444 call->lastAcked = call->rnext;
4448 rx_computelen(p, p->length); /* reset length, you never know */
4449 } /* where that's been... */
4451 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4452 /* We won't send the ack, but don't panic. */
4453 return optionalPacket;
4456 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4458 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4459 if (!optionalPacket) rxi_FreePacket(p);
4460 return optionalPacket;
4462 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4463 if (rx_Contiguous(p)<templ) {
4464 if (!optionalPacket) rxi_FreePacket(p);
4465 return optionalPacket;
4467 } /* MTUXXX failing to send an ack is very serious. We should */
4468 /* try as hard as possible to send even a partial ack; it's */
4469 /* better than nothing. */
4471 ap = (struct rx_ackPacket *) rx_DataOf(p);
4472 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4473 ap->reason = reason;
4475 /* The skew computation used to be bogus, I think it's better now. */
4476 /* We should start paying attention to skew. XXX */
4477 ap->serial = htonl(call->conn->maxSerial);
4478 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4480 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4481 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4483 /* No fear of running out of ack packet here because there can only be at most
4484 * one window full of unacknowledged packets. The window size must be constrained
4485 * to be less than the maximum ack size, of course. Also, an ack should always
4486 * fit into a single packet -- it should not ever be fragmented. */
4487 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4488 if (!rqp || !call->rq.next
4489 || (rqp->header.seq > (call->rnext + call->rwind))) {
4490 if (!optionalPacket) rxi_FreePacket(p);
4491 rxi_CallError(call, RX_CALL_DEAD);
4492 return optionalPacket;
4495 while (rqp->header.seq > call->rnext + offset)
4496 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4497 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4499 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4500 if (!optionalPacket) rxi_FreePacket(p);
4501 rxi_CallError(call, RX_CALL_DEAD);
4502 return optionalPacket;
4507 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4509 /* these are new for AFS 3.3 */
4510 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4511 templ = htonl(templ);
4512 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4513 templ = htonl(call->conn->peer->ifMTU);
4514 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4516 /* new for AFS 3.4 */
4517 templ = htonl(call->rwind);
4518 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4520 /* new for AFS 3.5 */
4521 templ = htonl(call->conn->peer->ifDgramPackets);
4522 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4524 p->header.serviceId = call->conn->serviceId;
4525 p->header.cid = (call->conn->cid | call->channel);
4526 p->header.callNumber = *call->callNumber;
4527 p->header.seq = seq;
4528 p->header.securityIndex = call->conn->securityIndex;
4529 p->header.epoch = call->conn->epoch;
4530 p->header.type = RX_PACKET_TYPE_ACK;
4531 p->header.flags = RX_SLOW_START_OK;
4532 if (reason == RX_ACK_PING) {
4533 p->header.flags |= RX_REQUEST_ACK;
4535 clock_GetTime(&call->pingRequestTime);
4538 if (call->conn->type == RX_CLIENT_CONNECTION)
4539 p->header.flags |= RX_CLIENT_INITIATED;
4543 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4544 ap->reason, ntohl(ap->previousPacket),
4545 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4547 for (offset = 0; offset < ap->nAcks; offset++)
4548 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4555 register int i, nbytes = p->length;
4557 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4558 if (nbytes <= p->wirevec[i].iov_len) {
4559 register int savelen, saven;
4561 savelen = p->wirevec[i].iov_len;
4563 p->wirevec[i].iov_len = nbytes;
4565 rxi_Send(call, p, istack);
4566 p->wirevec[i].iov_len = savelen;
4570 else nbytes -= p->wirevec[i].iov_len;
4573 MUTEX_ENTER(&rx_stats_mutex);
4574 rx_stats.ackPacketsSent++;
4575 MUTEX_EXIT(&rx_stats_mutex);
4576 if (!optionalPacket) rxi_FreePacket(p);
4577 return optionalPacket; /* Return packet for re-use by caller */
4580 /* Send all of the packets in the list in single datagram */
4581 static void rxi_SendList(struct rx_call *call, struct rx_packet **list,
4582 int len, int istack, int moreFlag, struct clock *now,
4583 struct clock *retryTime, int resending)
4588 struct rx_connection *conn = call->conn;
4589 struct rx_peer *peer = conn->peer;
4591 MUTEX_ENTER(&peer->peer_lock);
4593 if (resending) peer->reSends += len;
4594 MUTEX_ENTER(&rx_stats_mutex);
4595 rx_stats.dataPacketsSent += len;
4596 MUTEX_EXIT(&rx_stats_mutex);
4597 MUTEX_EXIT(&peer->peer_lock);
4599 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4603 /* Set the packet flags and schedule the resend events */
4604 /* Only request an ack for the last packet in the list */
4605 for (i = 0 ; i < len ; i++) {
4606 list[i]->retryTime = *retryTime;
4607 if (list[i]->header.serial) {
4608 /* Exponentially backoff retry times */
4609 if (list[i]->backoff < MAXBACKOFF) {
4610 /* so it can't stay == 0 */
4611 list[i]->backoff = (list[i]->backoff << 1) +1;
4613 else list[i]->backoff++;
4614 clock_Addmsec(&(list[i]->retryTime),
4615 ((afs_uint32) list[i]->backoff) << 8);
4618 /* Wait a little extra for the ack on the last packet */
4619 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4620 clock_Addmsec(&(list[i]->retryTime), 400);
4623 /* Record the time sent */
4624 list[i]->timeSent = *now;
4626 /* Ask for an ack on retransmitted packets, on every other packet
4627 * if the peer doesn't support slow start. Ask for an ack on every
4628 * packet until the congestion window reaches the ack rate. */
4629 if (list[i]->header.serial) {
4631 MUTEX_ENTER(&rx_stats_mutex);
4632 rx_stats.dataPacketsReSent++;
4633 MUTEX_EXIT(&rx_stats_mutex);
4635 /* improved RTO calculation- not Karn */
4636 list[i]->firstSent = *now;
4638 && (call->cwind <= (u_short)(conn->ackRate+1)
4639 || (!(call->flags & RX_CALL_SLOW_START_OK)
4640 && (list[i]->header.seq & 1)))) {
4645 MUTEX_ENTER(&peer->peer_lock);
4647 if (resending) peer->reSends++;
4648 MUTEX_ENTER(&rx_stats_mutex);
4649 rx_stats.dataPacketsSent++;
4650 MUTEX_EXIT(&rx_stats_mutex);
4651 MUTEX_EXIT(&peer->peer_lock);
4653 /* Tag this packet as not being the last in this group,
4654 * for the receiver's benefit */
4655 if (i < len-1 || moreFlag) {
4656 list[i]->header.flags |= RX_MORE_PACKETS;
4659 /* Install the new retransmit time for the packet, and
4660 * record the time sent */
4661 list[i]->timeSent = *now;
4665 list[len-1]->header.flags |= RX_REQUEST_ACK;
4668 /* Since we're about to send a data packet to the peer, it's
4669 * safe to nuke any scheduled end-of-packets ack */
4670 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4672 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4673 MUTEX_EXIT(&call->lock);
4675 rxi_SendPacketList(conn, list, len, istack);
4677 rxi_SendPacket(conn, list[0], istack);
4679 MUTEX_ENTER(&call->lock);
4680 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4682 /* Update last send time for this call (for keep-alive
4683 * processing), and for the connection (so that we can discover
4684 * idle connections) */
4685 conn->lastSendTime = call->lastSendTime = clock_Sec();
4688 /* When sending packets we need to follow these rules:
4689 * 1. Never send more than maxDgramPackets in a jumbogram.
4690 * 2. Never send a packet with more than two iovecs in a jumbogram.
4691 * 3. Never send a retransmitted packet in a jumbogram.
4692 * 4. Never send more than cwind/4 packets in a jumbogram
4693 * We always keep the last list we should have sent so we
4694 * can set the RX_MORE_PACKETS flags correctly.
4696 static void rxi_SendXmitList(struct rx_call *call, struct rx_packet **list,
4697 int len, int istack, struct clock *now, struct clock *retryTime,
4700 int i, cnt, lastCnt = 0;
4701 struct rx_packet **listP, **lastP = 0;
4702 struct rx_peer *peer = call->conn->peer;
4703 int morePackets = 0;
4705 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4706 /* Does the current packet force us to flush the current list? */
4708 && (list[i]->header.serial
4709 || (list[i]->flags & RX_PKTFLAG_ACKED)
4710 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4712 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4713 /* If the call enters an error state stop sending, or if
4714 * we entered congestion recovery mode, stop sending */
4715 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4723 /* Add the current packet to the list if it hasn't been acked.
4724 * Otherwise adjust the list pointer to skip the current packet. */
4725 if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
4727 /* Do we need to flush the list? */
4728 if (cnt >= (int)peer->maxDgramPackets
4729 || cnt >= (int)call->nDgramPackets
4730 || cnt >= (int)call->cwind
4731 || list[i]->header.serial
4732 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4734 rxi_SendList(call, lastP, lastCnt, istack, 1,
4735 now, retryTime, resending);
4736 /* If the call enters an error state stop sending, or if
4737 * we entered congestion recovery mode, stop sending */
4738 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4748 osi_Panic("rxi_SendList error");
4754 /* Send the whole list when the call is in receive mode, when
4755 * the call is in eof mode, when we are in fast recovery mode,
4756 * and when we have the last packet */
4757 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4758 || call->mode == RX_MODE_RECEIVING
4759 || call->mode == RX_MODE_EOF
4760 || (call->flags & RX_CALL_FAST_RECOVER)) {
4761 /* Check for the case where the current list contains
4762 * an acked packet. Since we always send retransmissions
4763 * in a separate packet, we only need to check the first
4764 * packet in the list */
4765 if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
4769 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4770 now, retryTime, resending);
4771 /* If the call enters an error state stop sending, or if
4772 * we entered congestion recovery mode, stop sending */
4773 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4777 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4779 } else if (lastCnt > 0) {
4780 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4784 #ifdef RX_ENABLE_LOCKS
4785 /* Call rxi_Start, below, but with the call lock held. */
4786 void rxi_StartUnlocked(struct rxevent *event, register struct rx_call *call,
4789 MUTEX_ENTER(&call->lock);
4790 rxi_Start(event, call, istack);
4791 MUTEX_EXIT(&call->lock);
4793 #endif /* RX_ENABLE_LOCKS */
4795 /* This routine is called when new packets are readied for
4796 * transmission and when retransmission may be necessary, or when the
4797 * transmission window or burst count are favourable. This should be
4798 * better optimized for new packets, the usual case, now that we've
4799 * got rid of queues of send packets. XXXXXXXXXXX */
4800 void rxi_Start(struct rxevent *event, register struct rx_call *call,
4803 struct rx_packet *p;
4804 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4805 struct rx_peer *peer = call->conn->peer;
4806 struct clock now, retryTime;
4810 struct rx_packet **xmitList;
4813 /* If rxi_Start is being called as a result of a resend event,
4814 * then make sure that the event pointer is removed from the call
4815 * structure, since there is no longer a per-call retransmission
4817 if (event && event == call->resendEvent) {
4818 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4819 call->resendEvent = NULL;
4821 if (queue_IsEmpty(&call->tq)) {
4825 /* Timeouts trigger congestion recovery */
4826 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4827 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4828 /* someone else is waiting to start recovery */
4831 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4832 while (call->flags & RX_CALL_TQ_BUSY) {
4833 call->flags |= RX_CALL_TQ_WAIT;
4834 #ifdef RX_ENABLE_LOCKS
4835 CV_WAIT(&call->cv_tq, &call->lock);
4836 #else /* RX_ENABLE_LOCKS */
4837 osi_rxSleep(&call->tq);
4838 #endif /* RX_ENABLE_LOCKS */
4840 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4841 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4842 call->flags |= RX_CALL_FAST_RECOVER;
4843 if (peer->maxDgramPackets > 1) {
4844 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4846 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4848 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4849 call->nDgramPackets = 1;
4851 call->nextCwind = 1;
4854 MUTEX_ENTER(&peer->peer_lock);
4855 peer->MTU = call->MTU;
4856 peer->cwind = call->cwind;
4857 peer->nDgramPackets = 1;
4859 call->congestSeq = peer->congestSeq;
4860 MUTEX_EXIT(&peer->peer_lock);
4861 /* Clear retry times on packets. Otherwise, it's possible for
4862 * some packets in the queue to force resends at rates faster
4863 * than recovery rates.
4865 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4866 if (!(p->flags & RX_PKTFLAG_ACKED)) {
4867 clock_Zero(&p->retryTime);
4872 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4873 MUTEX_ENTER(&rx_stats_mutex);
4874 rx_tq_debug.rxi_start_in_error ++;
4875 MUTEX_EXIT(&rx_stats_mutex);
4880 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4881 /* Get clock to compute the re-transmit time for any packets
4882 * in this burst. Note, if we back off, it's reasonable to
4883 * back off all of the packets in the same manner, even if
4884 * some of them have been retransmitted more times than more
4885 * recent additions */
4886 clock_GetTime(&now);
4887 retryTime = now; /* initialize before use */
4888 MUTEX_ENTER(&peer->peer_lock);
4889 clock_Add(&retryTime, &peer->timeout);
4890 MUTEX_EXIT(&peer->peer_lock);
4892 /* Send (or resend) any packets that need it, subject to
4893 * window restrictions and congestion burst control
4894 * restrictions. Ask for an ack on the last packet sent in
4895 * this burst. For now, we're relying upon the window being
4896 * considerably bigger than the largest number of packets that
4897 * are typically sent at once by one initial call to
4898 * rxi_Start. This is probably bogus (perhaps we should ask
4899 * for an ack when we're half way through the current
4900 * window?). Also, for non file transfer applications, this
4901 * may end up asking for an ack for every packet. Bogus. XXXX
4904 * But check whether we're here recursively, and let the other guy
4907 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4908 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4909 call->flags |= RX_CALL_TQ_BUSY;
4911 call->flags &= ~RX_CALL_NEED_START;
4912 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4914 maxXmitPackets = MIN(call->twind, call->cwind);
4915 xmitList = (struct rx_packet **)
4916 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4917 if (xmitList == NULL)
4918 osi_Panic("rxi_Start, failed to allocate xmit list");
4919 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4920 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4921 /* We shouldn't be sending packets if a thread is waiting
4922 * to initiate congestion recovery */
4925 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4926 /* Only send one packet during fast recovery */
4929 if ((p->flags & RX_PKTFLAG_FREE) ||
4930 (!queue_IsEnd(&call->tq, nxp)
4931 && (nxp->flags & RX_PKTFLAG_FREE)) ||
4932 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4933 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4934 osi_Panic("rxi_Start: xmit queue clobbered");
4936 if (p->flags & RX_PKTFLAG_ACKED) {
4937 MUTEX_ENTER(&rx_stats_mutex);
4938 rx_stats.ignoreAckedPacket++;
4939 MUTEX_EXIT(&rx_stats_mutex);
4940 continue; /* Ignore this packet if it has been acknowledged */
4943 /* Turn off all flags except these ones, which are the same
4944 * on each transmission */
4945 p->header.flags &= RX_PRESET_FLAGS;
4947 if (p->header.seq >= call->tfirst +
4948 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4949 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4950 /* Note: if we're waiting for more window space, we can
4951 * still send retransmits; hence we don't return here, but
4952 * break out to schedule a retransmit event */
4953 dpf(("call %d waiting for window", *(call->callNumber)));
4957 /* Transmit the packet if it needs to be sent. */
4958 if (!clock_Lt(&now, &p->retryTime)) {
4959 if (nXmitPackets == maxXmitPackets) {
4960 osi_Panic("rxi_Start: xmit list overflowed");
4962 xmitList[nXmitPackets++] = p;
4966 /* xmitList now hold pointers to all of the packets that are
4967 * ready to send. Now we loop to send the packets */
4968 if (nXmitPackets > 0) {
4969 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4970 &now, &retryTime, resending);
4972 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4974 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4976 * TQ references no longer protected by this flag; they must remain
4977 * protected by the global lock.
4979 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4980 call->flags &= ~RX_CALL_TQ_BUSY;
4981 if (call->flags & RX_CALL_TQ_WAIT) {
4982 call->flags &= ~RX_CALL_TQ_WAIT;
4983 #ifdef RX_ENABLE_LOCKS
4984 CV_BROADCAST(&call->cv_tq);
4985 #else /* RX_ENABLE_LOCKS */
4986 osi_rxWakeup(&call->tq);
4987 #endif /* RX_ENABLE_LOCKS */
4992 /* We went into the error state while sending packets. Now is
4993 * the time to reset the call. This will also inform the using
4994 * process that the call is in an error state.
4996 MUTEX_ENTER(&rx_stats_mutex);
4997 rx_tq_debug.rxi_start_aborted ++;
4998 MUTEX_EXIT(&rx_stats_mutex);
4999 call->flags &= ~RX_CALL_TQ_BUSY;
5000 if (call->flags & RX_CALL_TQ_WAIT) {
5001 call->flags &= ~RX_CALL_TQ_WAIT;
5002 #ifdef RX_ENABLE_LOCKS
5003 CV_BROADCAST(&call->cv_tq);
5004 #else /* RX_ENABLE_LOCKS */
5005 osi_rxWakeup(&call->tq);
5006 #endif /* RX_ENABLE_LOCKS */
5008 rxi_CallError(call, call->error);
5011 #ifdef RX_ENABLE_LOCKS
5012 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
5013 register int missing;
5014 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
5015 /* Some packets have received acks. If they all have, we can clear
5016 * the transmit queue.
5018 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5019 if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
5027 call->flags |= RX_CALL_TQ_CLEARME;
5029 #endif /* RX_ENABLE_LOCKS */
5030 /* Don't bother doing retransmits if the TQ is cleared. */
5031 if (call->flags & RX_CALL_TQ_CLEARME) {
5032 rxi_ClearTransmitQueue(call, 1);
5034 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5037 /* Always post a resend event, if there is anything in the
5038 * queue, and resend is possible. There should be at least
5039 * one unacknowledged packet in the queue ... otherwise none
5040 * of these packets should be on the queue in the first place.
5042 if (call->resendEvent) {
5043 /* Cancel the existing event and post a new one */
5044 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5047 /* The retry time is the retry time on the first unacknowledged
5048 * packet inside the current window */
5049 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5050 /* Don't set timers for packets outside the window */
5051 if (p->header.seq >= call->tfirst + call->twind) {
5055 if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
5057 retryTime = p->retryTime;
5062 /* Post a new event to re-run rxi_Start when retries may be needed */
5063 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
5064 #ifdef RX_ENABLE_LOCKS
5065 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
5066 call->resendEvent = rxevent_Post(&retryTime,
5068 (void *)call, (void *)istack);
5069 #else /* RX_ENABLE_LOCKS */
5070 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
5071 (void *)call, (void *)istack);
5072 #endif /* RX_ENABLE_LOCKS */
5075 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5076 } while (call->flags & RX_CALL_NEED_START);
5078 * TQ references no longer protected by this flag; they must remain
5079 * protected by the global lock.
5081 call->flags &= ~RX_CALL_TQ_BUSY;
5082 if (call->flags & RX_CALL_TQ_WAIT) {
5083 call->flags &= ~RX_CALL_TQ_WAIT;
5084 #ifdef RX_ENABLE_LOCKS
5085 CV_BROADCAST(&call->cv_tq);
5086 #else /* RX_ENABLE_LOCKS */
5087 osi_rxWakeup(&call->tq);
5088 #endif /* RX_ENABLE_LOCKS */
5091 call->flags |= RX_CALL_NEED_START;
5093 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5095 if (call->resendEvent) {
5096 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5101 /* Also adjusts the keep alive parameters for the call, to reflect
5102 * that we have just sent a packet (so keep alives aren't sent
5104 void rxi_Send(register struct rx_call *call, register struct rx_packet *p,
5107 register struct rx_connection *conn = call->conn;
5109 /* Stamp each packet with the user supplied status */
5110 p->header.userStatus = call->localStatus;
5112 /* Allow the security object controlling this call's security to
5113 * make any last-minute changes to the packet */
5114 RXS_SendPacket(conn->securityObject, call, p);
5116 /* Since we're about to send SOME sort of packet to the peer, it's
5117 * safe to nuke any scheduled end-of-packets ack */
5118 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5120 /* Actually send the packet, filling in more connection-specific fields */
5121 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5122 MUTEX_EXIT(&call->lock);
5123 rxi_SendPacket(conn, p, istack);
5124 MUTEX_ENTER(&call->lock);
5125 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5127 /* Update last send time for this call (for keep-alive
5128 * processing), and for the connection (so that we can discover
5129 * idle connections) */
5130 conn->lastSendTime = call->lastSendTime = clock_Sec();
5134 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5135 * that things are fine. Also called periodically to guarantee that nothing
5136 * falls through the cracks (e.g. (error + dally) connections have keepalive
5137 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5139 * haveCTLock Set if calling from rxi_ReapConnections
5141 #ifdef RX_ENABLE_LOCKS
5142 int rxi_CheckCall(register struct rx_call *call, int haveCTLock)
5143 #else /* RX_ENABLE_LOCKS */
5144 int rxi_CheckCall(register struct rx_call *call)
5145 #endif /* RX_ENABLE_LOCKS */
5147 register struct rx_connection *conn = call->conn;
5148 register struct rx_service *tservice;
5150 afs_uint32 deadTime;
5152 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5153 if (call->flags & RX_CALL_TQ_BUSY) {
5154 /* Call is active and will be reset by rxi_Start if it's
5155 * in an error state.
5160 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5161 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5162 ((afs_uint32)conn->peer->rtt >> 3) +
5163 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5165 /* These are computed to the second (+- 1 second). But that's
5166 * good enough for these values, which should be a significant
5167 * number of seconds. */
5168 if (now > (call->lastReceiveTime + deadTime)) {
5169 if (call->state == RX_STATE_ACTIVE) {
5170 rxi_CallError(call, RX_CALL_DEAD);
5174 #ifdef RX_ENABLE_LOCKS
5175 /* Cancel pending events */
5176 rxevent_Cancel(call->delayedAckEvent, call,
5177 RX_CALL_REFCOUNT_DELAY);
5178 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5179 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5180 if (call->refCount == 0) {
5181 rxi_FreeCall(call, haveCTLock);
5185 #else /* RX_ENABLE_LOCKS */
5188 #endif /* RX_ENABLE_LOCKS */
5190 /* Non-active calls are destroyed if they are not responding
5191 * to pings; active calls are simply flagged in error, so the
5192 * attached process can die reasonably gracefully. */
5194 /* see if we have a non-activity timeout */
5195 tservice = conn->service;
5196 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5197 && tservice->idleDeadTime
5198 && ((call->startWait + tservice->idleDeadTime) < now)) {
5199 if (call->state == RX_STATE_ACTIVE) {
5200 rxi_CallError(call, RX_CALL_TIMEOUT);
5204 /* see if we have a hard timeout */
5205 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5206 if (call->state == RX_STATE_ACTIVE)
5207 rxi_CallError(call, RX_CALL_TIMEOUT);
5214 /* When a call is in progress, this routine is called occasionally to
5215 * make sure that some traffic has arrived (or been sent to) the peer.
5216 * If nothing has arrived in a reasonable amount of time, the call is
5217 * declared dead; if nothing has been sent for a while, we send a
5218 * keep-alive packet (if we're actually trying to keep the call alive)
5220 void rxi_KeepAliveEvent(struct rxevent *event, register struct rx_call *call,
5223 struct rx_connection *conn;
5226 MUTEX_ENTER(&call->lock);
5227 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5228 if (event == call->keepAliveEvent)
5229 call->keepAliveEvent = NULL;
5232 #ifdef RX_ENABLE_LOCKS
5233 if(rxi_CheckCall(call, 0)) {
5234 MUTEX_EXIT(&call->lock);
5237 #else /* RX_ENABLE_LOCKS */
5238 if (rxi_CheckCall(call)) return;
5239 #endif /* RX_ENABLE_LOCKS */
5241 /* Don't try to keep alive dallying calls */
5242 if (call->state == RX_STATE_DALLY) {
5243 MUTEX_EXIT(&call->lock);
5248 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5249 /* Don't try to send keepalives if there is unacknowledged data */
5250 /* the rexmit code should be good enough, this little hack
5251 * doesn't quite work XXX */
5252 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5254 rxi_ScheduleKeepAliveEvent(call);
5255 MUTEX_EXIT(&call->lock);
5259 void rxi_ScheduleKeepAliveEvent(register struct rx_call *call)
5261 if (!call->keepAliveEvent) {
5263 clock_GetTime(&when);
5264 when.sec += call->conn->secondsUntilPing;
5265 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5266 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5270 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5271 void rxi_KeepAliveOn(register struct rx_call *call)
5273 /* Pretend last packet received was received now--i.e. if another
5274 * packet isn't received within the keep alive time, then the call
5275 * will die; Initialize last send time to the current time--even
5276 * if a packet hasn't been sent yet. This will guarantee that a
5277 * keep-alive is sent within the ping time */
5278 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5279 rxi_ScheduleKeepAliveEvent(call);
5282 /* This routine is called to send connection abort messages
5283 * that have been delayed to throttle looping clients. */
5284 void rxi_SendDelayedConnAbort(struct rxevent *event, register struct rx_connection *conn,
5288 struct rx_packet *packet;
5290 MUTEX_ENTER(&conn->conn_data_lock);
5291 conn->delayedAbortEvent = NULL;
5292 error = htonl(conn->error);
5294 MUTEX_EXIT(&conn->conn_data_lock);
5295 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5297 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5298 RX_PACKET_TYPE_ABORT, (char *)&error,
5300 rxi_FreePacket(packet);
5304 /* This routine is called to send call abort messages
5305 * that have been delayed to throttle looping clients. */
5306 void rxi_SendDelayedCallAbort(struct rxevent *event, register struct rx_call *call,
5310 struct rx_packet *packet;
5312 MUTEX_ENTER(&call->lock);
5313 call->delayedAbortEvent = NULL;
5314 error = htonl(call->error);
5316 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5318 packet = rxi_SendSpecial(call, call->conn, packet,
5319 RX_PACKET_TYPE_ABORT, (char *)&error,
5321 rxi_FreePacket(packet);
5323 MUTEX_EXIT(&call->lock);
5326 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5327 * seconds) to ask the client to authenticate itself. The routine
5328 * issues a challenge to the client, which is obtained from the
5329 * security object associated with the connection */
5330 void rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn,
5333 int tries = (int) atries;
5334 conn->challengeEvent = NULL;
5335 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5336 register struct rx_packet *packet;
5340 /* We've failed to authenticate for too long.
5341 * Reset any calls waiting for authentication;
5342 * they are all in RX_STATE_PRECALL.
5346 MUTEX_ENTER(&conn->conn_call_lock);
5347 for (i=0; i<RX_MAXCALLS; i++) {
5348 struct rx_call *call = conn->call[i];
5350 MUTEX_ENTER(&call->lock);
5351 if (call->state == RX_STATE_PRECALL) {
5352 rxi_CallError(call, RX_CALL_DEAD);
5353 rxi_SendCallAbort(call, NULL, 0, 0);
5355 MUTEX_EXIT(&call->lock);
5358 MUTEX_EXIT(&conn->conn_call_lock);
5362 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5364 /* If there's no packet available, do this later. */
5365 RXS_GetChallenge(conn->securityObject, conn, packet);
5366 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5367 RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
5368 rxi_FreePacket(packet);
5370 clock_GetTime(&when);
5371 when.sec += RX_CHALLENGE_TIMEOUT;
5372 conn->challengeEvent =
5373 rxevent_Post(&when, rxi_ChallengeEvent, conn, (void *) (tries-1));
5377 /* Call this routine to start requesting the client to authenticate
5378 * itself. This will continue until authentication is established,
5379 * the call times out, or an invalid response is returned. The
5380 * security object associated with the connection is asked to create
5381 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5382 * defined earlier. */
5383 void rxi_ChallengeOn(register struct rx_connection *conn)
5385 if (!conn->challengeEvent) {
5386 RXS_CreateChallenge(conn->securityObject, conn);
5387 rxi_ChallengeEvent(NULL, conn, (void *) RX_CHALLENGE_MAXTRIES);
5392 /* Compute round trip time of the packet provided, in *rttp.
5395 /* rxi_ComputeRoundTripTime is called with peer locked. */
5396 /* sentp and/or peer may be null */
5397 void rxi_ComputeRoundTripTime(register struct rx_packet *p,
5398 register struct clock *sentp, register struct rx_peer *peer)
5400 struct clock thisRtt, *rttp = &thisRtt;
5402 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5403 /* making year 2038 bugs to get this running now - stroucki */
5404 struct timeval temptime;
5406 register int rtt_timeout;
5408 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5409 /* yet again. This was the worst Heisenbug of the port - stroucki */
5410 clock_GetTime(&temptime);
5411 rttp->sec=(afs_int32)temptime.tv_sec;
5412 rttp->usec=(afs_int32)temptime.tv_usec;
5414 clock_GetTime(rttp);
5416 if (clock_Lt(rttp, sentp)) {
5418 return; /* somebody set the clock back, don't count this time. */
5420 clock_Sub(rttp, sentp);
5421 MUTEX_ENTER(&rx_stats_mutex);
5422 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5423 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5424 if (rttp->sec > 60) {
5425 MUTEX_EXIT(&rx_stats_mutex);
5426 return; /* somebody set the clock ahead */
5428 rx_stats.maxRtt = *rttp;
5430 clock_Add(&rx_stats.totalRtt, rttp);
5431 rx_stats.nRttSamples++;
5432 MUTEX_EXIT(&rx_stats_mutex);
5434 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5436 /* Apply VanJacobson round-trip estimations */
5441 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5442 * srtt is stored as fixed point with 3 bits after the binary
5443 * point (i.e., scaled by 8). The following magic is
5444 * equivalent to the smoothing algorithm in rfc793 with an
5445 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5446 * srtt*8 = srtt*8 + rtt - srtt
5447 * srtt = srtt + rtt/8 - srtt/8
5450 delta = MSEC(rttp) - (peer->rtt >> 3);
5454 * We accumulate a smoothed rtt variance (actually, a smoothed
5455 * mean difference), then set the retransmit timer to smoothed
5456 * rtt + 4 times the smoothed variance (was 2x in van's original
5457 * paper, but 4x works better for me, and apparently for him as
5459 * rttvar is stored as
5460 * fixed point with 2 bits after the binary point (scaled by
5461 * 4). The following is equivalent to rfc793 smoothing with
5462 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5463 * replaces rfc793's wired-in beta.
5464 * dev*4 = dev*4 + (|actual - expected| - dev)
5470 delta -= (peer->rtt_dev >> 2);
5471 peer->rtt_dev += delta;
5474 /* I don't have a stored RTT so I start with this value. Since I'm
5475 * probably just starting a call, and will be pushing more data down
5476 * this, I expect congestion to increase rapidly. So I fudge a
5477 * little, and I set deviance to half the rtt. In practice,
5478 * deviance tends to approach something a little less than
5479 * half the smoothed rtt. */
5480 peer->rtt = (MSEC(rttp) << 3) + 8;
5481 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5483 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5484 * the other of these connections is usually in a user process, and can
5485 * be switched and/or swapped out. So on fast, reliable networks, the
5486 * timeout would otherwise be too short.
5488 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5489 clock_Zero(&(peer->timeout));
5490 clock_Addmsec(&(peer->timeout), rtt_timeout);
5492 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5493 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5494 (peer->timeout.sec),(peer->timeout.usec)) );
5498 /* Find all server connections that have not been active for a long time, and
5500 void rxi_ReapConnections(void)
5503 clock_GetTime(&now);
5505 /* Find server connection structures that haven't been used for
5506 * greater than rx_idleConnectionTime */
5507 { struct rx_connection **conn_ptr, **conn_end;
5508 int i, havecalls = 0;
5509 MUTEX_ENTER(&rx_connHashTable_lock);
5510 for (conn_ptr = &rx_connHashTable[0],
5511 conn_end = &rx_connHashTable[rx_hashTableSize];
5512 conn_ptr < conn_end; conn_ptr++) {
5513 struct rx_connection *conn, *next;
5514 struct rx_call *call;
5518 for (conn = *conn_ptr; conn; conn = next) {
5519 /* XXX -- Shouldn't the connection be locked? */
5522 for(i=0;i<RX_MAXCALLS;i++) {
5523 call = conn->call[i];
5526 MUTEX_ENTER(&call->lock);
5527 #ifdef RX_ENABLE_LOCKS
5528 result = rxi_CheckCall(call, 1);
5529 #else /* RX_ENABLE_LOCKS */
5530 result = rxi_CheckCall(call);
5531 #endif /* RX_ENABLE_LOCKS */
5532 MUTEX_EXIT(&call->lock);
5534 /* If CheckCall freed the call, it might
5535 * have destroyed the connection as well,
5536 * which screws up the linked lists.
5542 if (conn->type == RX_SERVER_CONNECTION) {
5543 /* This only actually destroys the connection if
5544 * there are no outstanding calls */
5545 MUTEX_ENTER(&conn->conn_data_lock);
5546 if (!havecalls && !conn->refCount &&
5547 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5548 conn->refCount++; /* it will be decr in rx_DestroyConn */
5549 MUTEX_EXIT(&conn->conn_data_lock);
5550 #ifdef RX_ENABLE_LOCKS
5551 rxi_DestroyConnectionNoLock(conn);
5552 #else /* RX_ENABLE_LOCKS */
5553 rxi_DestroyConnection(conn);
5554 #endif /* RX_ENABLE_LOCKS */
5556 #ifdef RX_ENABLE_LOCKS
5558 MUTEX_EXIT(&conn->conn_data_lock);
5560 #endif /* RX_ENABLE_LOCKS */
5564 #ifdef RX_ENABLE_LOCKS
5565 while (rx_connCleanup_list) {
5566 struct rx_connection *conn;
5567 conn = rx_connCleanup_list;
5568 rx_connCleanup_list = rx_connCleanup_list->next;
5569 MUTEX_EXIT(&rx_connHashTable_lock);
5570 rxi_CleanupConnection(conn);
5571 MUTEX_ENTER(&rx_connHashTable_lock);
5573 MUTEX_EXIT(&rx_connHashTable_lock);
5574 #endif /* RX_ENABLE_LOCKS */
5577 /* Find any peer structures that haven't been used (haven't had an
5578 * associated connection) for greater than rx_idlePeerTime */
5579 { struct rx_peer **peer_ptr, **peer_end;
5581 MUTEX_ENTER(&rx_rpc_stats);
5582 MUTEX_ENTER(&rx_peerHashTable_lock);
5583 for (peer_ptr = &rx_peerHashTable[0],
5584 peer_end = &rx_peerHashTable[rx_hashTableSize];
5585 peer_ptr < peer_end; peer_ptr++) {
5586 struct rx_peer *peer, *next, *prev;
5587 for (prev = peer = *peer_ptr; peer; peer = next) {
5589 code = MUTEX_TRYENTER(&peer->peer_lock);
5590 if ((code) && (peer->refCount == 0)
5591 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5592 rx_interface_stat_p rpc_stat, nrpc_stat;
5594 MUTEX_EXIT(&peer->peer_lock);
5595 MUTEX_DESTROY(&peer->peer_lock);
5596 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5597 rx_interface_stat)) {
5598 unsigned int num_funcs;
5599 if (!rpc_stat) break;
5600 queue_Remove(&rpc_stat->queue_header);
5601 queue_Remove(&rpc_stat->all_peers);
5602 num_funcs = rpc_stat->stats[0].func_total;
5603 space = sizeof(rx_interface_stat_t) +
5604 rpc_stat->stats[0].func_total *
5605 sizeof(rx_function_entry_v1_t);
5607 rxi_Free(rpc_stat, space);
5608 rxi_rpc_peer_stat_cnt -= num_funcs;
5611 MUTEX_ENTER(&rx_stats_mutex);
5612 rx_stats.nPeerStructs--;
5613 MUTEX_EXIT(&rx_stats_mutex);
5614 if (prev == *peer_ptr) {
5623 MUTEX_EXIT(&peer->peer_lock);
5629 MUTEX_EXIT(&rx_peerHashTable_lock);
5630 MUTEX_EXIT(&rx_rpc_stats);
5633 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5634 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5635 GC, just below. Really, we shouldn't have to keep moving packets from
5636 one place to another, but instead ought to always know if we can
5637 afford to hold onto a packet in its particular use. */
5638 MUTEX_ENTER(&rx_freePktQ_lock);
5639 if (rx_waitingForPackets) {
5640 rx_waitingForPackets = 0;
5641 #ifdef RX_ENABLE_LOCKS
5642 CV_BROADCAST(&rx_waitingForPackets_cv);
5644 osi_rxWakeup(&rx_waitingForPackets);
5647 MUTEX_EXIT(&rx_freePktQ_lock);
5649 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5650 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5654 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5655 * rx.h is sort of strange this is better. This is called with a security
5656 * object before it is discarded. Each connection using a security object has
5657 * its own refcount to the object so it won't actually be freed until the last
5658 * connection is destroyed.
5660 * This is the only rxs module call. A hold could also be written but no one
5663 int rxs_Release (struct rx_securityClass *aobj)
5665 return RXS_Close (aobj);
5669 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5670 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5671 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5672 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5674 /* Adjust our estimate of the transmission rate to this peer, given
5675 * that the packet p was just acked. We can adjust peer->timeout and
5676 * call->twind. Pragmatically, this is called
5677 * only with packets of maximal length.
5678 * Called with peer and call locked.
5681 static void rxi_ComputeRate(register struct rx_peer *peer,
5682 register struct rx_call *call, struct rx_packet *p,
5683 struct rx_packet *ackp, u_char ackReason)
5685 afs_int32 xferSize, xferMs;
5686 register afs_int32 minTime;
5689 /* Count down packets */
5690 if (peer->rateFlag > 0) peer->rateFlag--;
5691 /* Do nothing until we're enabled */
5692 if (peer->rateFlag != 0) return;
5693 if (!call->conn) return;
5695 /* Count only when the ack seems legitimate */
5696 switch (ackReason) {
5697 case RX_ACK_REQUESTED:
5698 xferSize = p->length + RX_HEADER_SIZE +
5699 call->conn->securityMaxTrailerSize;
5703 case RX_ACK_PING_RESPONSE:
5704 if (p) /* want the response to ping-request, not data send */
5706 clock_GetTime(&newTO);
5707 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5708 clock_Sub(&newTO, &call->pingRequestTime);
5709 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5713 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5720 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5721 ntohl(peer->host), ntohs(peer->port),
5722 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5723 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5726 /* Track only packets that are big enough. */
5727 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5731 /* absorb RTT data (in milliseconds) for these big packets */
5732 if (peer->smRtt == 0) {
5733 peer->smRtt = xferMs;
5735 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5736 if (!peer->smRtt) peer->smRtt = 1;
5739 if (peer->countDown) {
5743 peer->countDown = 10; /* recalculate only every so often */
5745 /* In practice, we can measure only the RTT for full packets,
5746 * because of the way Rx acks the data that it receives. (If it's
5747 * smaller than a full packet, it often gets implicitly acked
5748 * either by the call response (from a server) or by the next call
5749 * (from a client), and either case confuses transmission times
5750 * with processing times.) Therefore, replace the above
5751 * more-sophisticated processing with a simpler version, where the
5752 * smoothed RTT is kept for full-size packets, and the time to
5753 * transmit a windowful of full-size packets is simply RTT *
5754 * windowSize. Again, we take two steps:
5755 - ensure the timeout is large enough for a single packet's RTT;
5756 - ensure that the window is small enough to fit in the desired timeout.*/
5758 /* First, the timeout check. */
5759 minTime = peer->smRtt;
5760 /* Get a reasonable estimate for a timeout period */
5762 newTO.sec = minTime / 1000;
5763 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5765 /* Increase the timeout period so that we can always do at least
5766 * one packet exchange */
5767 if (clock_Gt(&newTO, &peer->timeout)) {
5769 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5770 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5771 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5774 peer->timeout = newTO;
5777 /* Now, get an estimate for the transmit window size. */
5778 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5779 /* Now, convert to the number of full packets that could fit in a
5780 * reasonable fraction of that interval */
5781 minTime /= (peer->smRtt << 1);
5782 xferSize = minTime; /* (make a copy) */
5784 /* Now clamp the size to reasonable bounds. */
5785 if (minTime <= 1) minTime = 1;
5786 else if (minTime > rx_Window) minTime = rx_Window;
5787 /* if (minTime != peer->maxWindow) {
5788 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5789 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5790 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5792 peer->maxWindow = minTime;
5793 elide... call->twind = minTime;
5797 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5798 * Discern this by calculating the timeout necessary for rx_Window
5800 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5801 /* calculate estimate for transmission interval in milliseconds */
5802 minTime = rx_Window * peer->smRtt;
5803 if (minTime < 1000) {
5804 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5805 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5806 peer->timeout.usec, peer->smRtt,
5809 newTO.sec = 0; /* cut back on timeout by half a second */
5810 newTO.usec = 500000;
5811 clock_Sub(&peer->timeout, &newTO);
5816 } /* end of rxi_ComputeRate */
5817 #endif /* ADAPT_WINDOW */
5825 /* Don't call this debugging routine directly; use dpf */
5827 rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8, int a9, int a10,
5828 int a11, int a12, int a13, int a14, int a15)
5831 clock_GetTime(&now);
5832 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5833 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5840 * This function is used to process the rx_stats structure that is local
5841 * to a process as well as an rx_stats structure received from a remote
5842 * process (via rxdebug). Therefore, it needs to do minimal version
5845 void rx_PrintTheseStats (FILE *file, struct rx_stats *s, int size,
5846 afs_int32 freePackets, char version)
5850 if (size != sizeof(struct rx_stats)) {
5852 "Unexpected size of stats structure: was %d, expected %d\n",
5853 size, sizeof(struct rx_stats));
5857 "rx stats: free packets %d, allocs %d, ",
5861 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5863 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5864 s->receivePktAllocFailures,
5865 s->receiveCbufPktAllocFailures,
5866 s->sendPktAllocFailures,
5867 s->sendCbufPktAllocFailures,
5868 s->specialPktAllocFailures);
5871 "alloc-failures(rcv %d,send %d,ack %d)\n",
5872 s->receivePktAllocFailures,
5873 s->sendPktAllocFailures,
5874 s->specialPktAllocFailures);
5879 "bogusReads %d (last from host %x), "
5885 s->bogusPacketOnRead,
5888 s->noPacketBuffersOnRead,
5892 fprintf(file, " packets read: ");
5893 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5899 fprintf(file, "\n");
5902 " other read counters: data %d, "
5910 s->spuriousPacketsRead,
5911 s->ignorePacketDally);
5913 fprintf(file, " packets sent: ");
5914 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5920 fprintf(file, "\n");
5923 " other send counters: ack %d, "
5924 "data %d (not resends), "
5927 "acked&ignored %d\n",
5930 s->dataPacketsReSent,
5931 s->dataPacketsPushed,
5932 s->ignoreAckedPacket);
5935 " \t(these should be small) sendFailed %d, "
5938 (int) s->fatalErrors);
5940 if (s->nRttSamples) {
5942 " Average rtt is %0.3f, with %d samples\n",
5943 clock_Float(&s->totalRtt)/s->nRttSamples,
5947 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5948 clock_Float(&s->minRtt),
5949 clock_Float(&s->maxRtt));
5953 " %d server connections, "
5954 "%d client connections, "
5957 "%d free call structs\n",
5962 s->nFreeCallStructs);
5964 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5966 " %d clock updates\n",
5972 /* for backward compatibility */
5973 void rx_PrintStats(FILE *file)
5975 MUTEX_ENTER(&rx_stats_mutex);
5976 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5977 MUTEX_EXIT(&rx_stats_mutex);
5980 void rx_PrintPeerStats(FILE *file, struct rx_peer *peer)
5985 "burst wait %u.%d.\n",
5988 (int) peer->burstSize,
5989 (int) peer->burstWait.sec,
5990 (int) peer->burstWait.usec);
5994 "retry time %u.%06d, "
5998 (int) peer->timeout.sec,
5999 (int) peer->timeout.usec,
6005 "max in packet skew %d, "
6006 "max out packet skew %d\n",
6008 (int) peer->inPacketSkew,
6009 (int) peer->outPacketSkew);
6012 #ifdef AFS_PTHREAD_ENV
6014 * This mutex protects the following static variables:
6018 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
6019 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
6021 #define LOCK_RX_DEBUG
6022 #define UNLOCK_RX_DEBUG
6023 #endif /* AFS_PTHREAD_ENV */
6025 static int MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr,
6026 afs_uint16 remotePort, u_char type, void *inputData, size_t inputLength,
6027 void *outputData, size_t outputLength)
6029 static afs_int32 counter = 100;
6031 struct rx_header theader;
6033 register afs_int32 code;
6035 struct sockaddr_in taddr, faddr;
6040 endTime = time(0) + 20; /* try for 20 seconds */
6044 tp = &tbuffer[sizeof(struct rx_header)];
6045 taddr.sin_family = AF_INET;
6046 taddr.sin_port = remotePort;
6047 taddr.sin_addr.s_addr = remoteAddr;
6048 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
6049 taddr.sin_len = sizeof(struct sockaddr_in);
6052 memset(&theader, 0, sizeof(theader));
6053 theader.epoch = htonl(999);
6055 theader.callNumber = htonl(counter);
6058 theader.type = type;
6059 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
6060 theader.serviceId = 0;
6062 memcpy(tbuffer, &theader, sizeof(theader));
6063 memcpy(tp, inputData, inputLength);
6064 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
6065 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
6067 /* see if there's a packet available */
6069 FD_SET(socket, &imask);
6072 code = select(socket+1, &imask, 0, 0, &tv);
6074 /* now receive a packet */
6075 faddrLen = sizeof(struct sockaddr_in);
6076 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6077 (struct sockaddr *) &faddr, &faddrLen);
6079 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6080 if (counter == ntohl(theader.callNumber)) break;
6083 /* see if we've timed out */
6084 if (endTime < time(0)) return -1;
6086 code -= sizeof(struct rx_header);
6087 if (code > outputLength) code = outputLength;
6088 memcpy(outputData, tp, code);
6092 afs_int32 rx_GetServerDebug(osi_socket socket, afs_uint32 remoteAddr,
6093 afs_uint16 remotePort, struct rx_debugStats *stat, afs_uint32 *supportedValues)
6095 struct rx_debugIn in;
6098 *supportedValues = 0;
6099 in.type = htonl(RX_DEBUGI_GETSTATS);
6102 rc = MakeDebugCall(socket,
6105 RX_PACKET_TYPE_DEBUG,
6112 * If the call was successful, fixup the version and indicate
6113 * what contents of the stat structure are valid.
6114 * Also do net to host conversion of fields here.
6118 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6119 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6121 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6122 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6124 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6125 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6127 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6128 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6130 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6131 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6133 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6134 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6136 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6137 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6140 stat->nFreePackets = ntohl(stat->nFreePackets);
6141 stat->packetReclaims = ntohl(stat->packetReclaims);
6142 stat->callsExecuted = ntohl(stat->callsExecuted);
6143 stat->nWaiting = ntohl(stat->nWaiting);
6144 stat->idleThreads = ntohl(stat->idleThreads);
6150 afs_int32 rx_GetServerStats(osi_socket socket, afs_uint32 remoteAddr,
6151 afs_uint16 remotePort, struct rx_stats *stat, afs_uint32 *supportedValues)
6153 struct rx_debugIn in;
6154 afs_int32 *lp = (afs_int32 *) stat;
6159 * supportedValues is currently unused, but added to allow future
6160 * versioning of this function.
6163 *supportedValues = 0;
6164 in.type = htonl(RX_DEBUGI_RXSTATS);
6166 memset(stat, 0, sizeof(*stat));
6168 rc = MakeDebugCall(socket,
6171 RX_PACKET_TYPE_DEBUG,
6180 * Do net to host conversion here
6183 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6191 afs_int32 rx_GetServerVersion(osi_socket socket, afs_uint32 remoteAddr,
6192 afs_uint16 remotePort, size_t version_length, char *version)
6195 return MakeDebugCall(socket,
6198 RX_PACKET_TYPE_VERSION,
6205 afs_int32 rx_GetServerConnections(osi_socket socket, afs_uint32 remoteAddr,
6206 afs_uint16 remotePort, afs_int32 *nextConnection, int allConnections,
6207 afs_uint32 debugSupportedValues, struct rx_debugConn *conn, afs_uint32 *supportedValues)
6209 struct rx_debugIn in;
6214 * supportedValues is currently unused, but added to allow future
6215 * versioning of this function.
6218 *supportedValues = 0;
6219 if (allConnections) {
6220 in.type = htonl(RX_DEBUGI_GETALLCONN);
6222 in.type = htonl(RX_DEBUGI_GETCONN);
6224 in.index = htonl(*nextConnection);
6225 memset(conn, 0, sizeof(*conn));
6227 rc = MakeDebugCall(socket,
6230 RX_PACKET_TYPE_DEBUG,
6237 *nextConnection += 1;
6240 * Convert old connection format to new structure.
6243 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6244 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6245 #define MOVEvL(a) (conn->a = vL->a)
6247 /* any old or unrecognized version... */
6248 for (i=0;i<RX_MAXCALLS;i++) {
6249 MOVEvL(callState[i]);
6250 MOVEvL(callMode[i]);
6251 MOVEvL(callFlags[i]);
6252 MOVEvL(callOther[i]);
6254 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6255 MOVEvL(secStats.type);
6256 MOVEvL(secStats.level);
6257 MOVEvL(secStats.flags);
6258 MOVEvL(secStats.expires);
6259 MOVEvL(secStats.packetsReceived);
6260 MOVEvL(secStats.packetsSent);
6261 MOVEvL(secStats.bytesReceived);
6262 MOVEvL(secStats.bytesSent);
6267 * Do net to host conversion here
6269 * I don't convert host or port since we are most likely
6270 * going to want these in NBO.
6272 conn->cid = ntohl(conn->cid);
6273 conn->serial = ntohl(conn->serial);
6274 for(i=0;i<RX_MAXCALLS;i++) {
6275 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6277 conn->error = ntohl(conn->error);
6278 conn->secStats.flags = ntohl(conn->secStats.flags);
6279 conn->secStats.expires = ntohl(conn->secStats.expires);
6280 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6281 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6282 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6283 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6284 conn->epoch = ntohl(conn->epoch);
6285 conn->natMTU = ntohl(conn->natMTU);
6291 afs_int32 rx_GetServerPeers(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
6292 afs_int32 *nextPeer, afs_uint32 debugSupportedValues, struct rx_debugPeer *peer,
6293 afs_uint32 *supportedValues)
6295 struct rx_debugIn in;
6299 * supportedValues is currently unused, but added to allow future
6300 * versioning of this function.
6303 *supportedValues = 0;
6304 in.type = htonl(RX_DEBUGI_GETPEER);
6305 in.index = htonl(*nextPeer);
6306 memset(peer, 0, sizeof(*peer));
6308 rc = MakeDebugCall(socket,
6311 RX_PACKET_TYPE_DEBUG,
6321 * Do net to host conversion here
6323 * I don't convert host or port since we are most likely
6324 * going to want these in NBO.
6326 peer->ifMTU = ntohs(peer->ifMTU);
6327 peer->idleWhen = ntohl(peer->idleWhen);
6328 peer->refCount = ntohs(peer->refCount);
6329 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6330 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6331 peer->rtt = ntohl(peer->rtt);
6332 peer->rtt_dev = ntohl(peer->rtt_dev);
6333 peer->timeout.sec = ntohl(peer->timeout.sec);
6334 peer->timeout.usec = ntohl(peer->timeout.usec);
6335 peer->nSent = ntohl(peer->nSent);
6336 peer->reSends = ntohl(peer->reSends);
6337 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6338 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6339 peer->rateFlag = ntohl(peer->rateFlag);
6340 peer->natMTU = ntohs(peer->natMTU);
6341 peer->maxMTU = ntohs(peer->maxMTU);
6342 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6343 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6344 peer->MTU = ntohs(peer->MTU);
6345 peer->cwind = ntohs(peer->cwind);
6346 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6347 peer->congestSeq = ntohs(peer->congestSeq);
6348 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6349 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6350 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6351 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6356 #endif /* RXDEBUG */
6358 void shutdown_rx(void)
6360 struct rx_serverQueueEntry *np;
6362 register struct rx_call *call;
6363 register struct rx_serverQueueEntry *sq;
6366 if (rxinit_status == 1) {
6368 return; /* Already shutdown. */
6373 #ifndef AFS_PTHREAD_ENV
6374 FD_ZERO(&rx_selectMask);
6375 #endif /* AFS_PTHREAD_ENV */
6376 rxi_dataQuota = RX_MAX_QUOTA;
6377 #ifndef AFS_PTHREAD_ENV
6379 #endif /* AFS_PTHREAD_ENV */
6382 #ifndef AFS_PTHREAD_ENV
6383 #ifndef AFS_USE_GETTIMEOFDAY
6385 #endif /* AFS_USE_GETTIMEOFDAY */
6386 #endif /* AFS_PTHREAD_ENV */
6388 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6389 call = queue_First(&rx_freeCallQueue, rx_call);
6391 rxi_Free(call, sizeof(struct rx_call));
6394 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6395 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6401 struct rx_peer **peer_ptr, **peer_end;
6402 for (peer_ptr = &rx_peerHashTable[0],
6403 peer_end = &rx_peerHashTable[rx_hashTableSize];
6404 peer_ptr < peer_end; peer_ptr++) {
6405 struct rx_peer *peer, *next;
6406 for (peer = *peer_ptr; peer; peer = next) {
6407 rx_interface_stat_p rpc_stat, nrpc_stat;
6409 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6410 rx_interface_stat)) {
6411 unsigned int num_funcs;
6412 if (!rpc_stat) break;
6413 queue_Remove(&rpc_stat->queue_header);
6414 queue_Remove(&rpc_stat->all_peers);
6415 num_funcs = rpc_stat->stats[0].func_total;
6416 space = sizeof(rx_interface_stat_t) +
6417 rpc_stat->stats[0].func_total *
6418 sizeof(rx_function_entry_v1_t);
6420 rxi_Free(rpc_stat, space);
6421 MUTEX_ENTER(&rx_rpc_stats);
6422 rxi_rpc_peer_stat_cnt -= num_funcs;
6423 MUTEX_EXIT(&rx_rpc_stats);
6427 MUTEX_ENTER(&rx_stats_mutex);
6428 rx_stats.nPeerStructs--;
6429 MUTEX_EXIT(&rx_stats_mutex);
6433 for (i = 0; i<RX_MAX_SERVICES; i++) {
6435 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6437 for (i = 0; i < rx_hashTableSize; i++) {
6438 register struct rx_connection *tc, *ntc;
6439 MUTEX_ENTER(&rx_connHashTable_lock);
6440 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6442 for (j = 0; j < RX_MAXCALLS; j++) {
6444 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6447 rxi_Free(tc, sizeof(*tc));
6449 MUTEX_EXIT(&rx_connHashTable_lock);
6452 MUTEX_ENTER(&freeSQEList_lock);
6454 while ((np = rx_FreeSQEList)) {
6455 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6456 MUTEX_DESTROY(&np->lock);
6457 rxi_Free(np, sizeof(*np));
6460 MUTEX_EXIT(&freeSQEList_lock);
6461 MUTEX_DESTROY(&freeSQEList_lock);
6462 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6463 MUTEX_DESTROY(&rx_connHashTable_lock);
6464 MUTEX_DESTROY(&rx_peerHashTable_lock);
6465 MUTEX_DESTROY(&rx_serverPool_lock);
6467 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6468 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6470 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6471 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6473 rxi_FreeAllPackets();
6475 MUTEX_ENTER(&rx_stats_mutex);
6476 rxi_dataQuota = RX_MAX_QUOTA;
6477 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6478 MUTEX_EXIT(&rx_stats_mutex);
6484 #ifdef RX_ENABLE_LOCKS
6485 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6487 if (!MUTEX_ISMINE(lockaddr))
6488 osi_Panic("Lock not held: %s", msg);
6490 #endif /* RX_ENABLE_LOCKS */
6495 * Routines to implement connection specific data.
6498 int rx_KeyCreate(rx_destructor_t rtn)
6501 MUTEX_ENTER(&rxi_keyCreate_lock);
6502 key = rxi_keyCreate_counter++;
6503 rxi_keyCreate_destructor = (rx_destructor_t *)
6504 realloc((void *)rxi_keyCreate_destructor,
6505 (key+1) * sizeof(rx_destructor_t));
6506 rxi_keyCreate_destructor[key] = rtn;
6507 MUTEX_EXIT(&rxi_keyCreate_lock);
6511 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6514 MUTEX_ENTER(&conn->conn_data_lock);
6515 if (!conn->specific) {
6516 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6517 for (i = 0 ; i < key ; i++)
6518 conn->specific[i] = NULL;
6519 conn->nSpecific = key+1;
6520 conn->specific[key] = ptr;
6521 } else if (key >= conn->nSpecific) {
6522 conn->specific = (void **)
6523 realloc(conn->specific,(key+1)*sizeof(void *));
6524 for (i = conn->nSpecific ; i < key ; i++)
6525 conn->specific[i] = NULL;
6526 conn->nSpecific = key+1;
6527 conn->specific[key] = ptr;
6529 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6530 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6531 conn->specific[key] = ptr;
6533 MUTEX_EXIT(&conn->conn_data_lock);
6536 void *rx_GetSpecific(struct rx_connection *conn, int key)
6539 MUTEX_ENTER(&conn->conn_data_lock);
6540 if (key >= conn->nSpecific)
6543 ptr = conn->specific[key];
6544 MUTEX_EXIT(&conn->conn_data_lock);
6548 #endif /* !KERNEL */
6551 * processStats is a queue used to store the statistics for the local
6552 * process. Its contents are similar to the contents of the rpcStats
6553 * queue on a rx_peer structure, but the actual data stored within
6554 * this queue contains totals across the lifetime of the process (assuming
6555 * the stats have not been reset) - unlike the per peer structures
6556 * which can come and go based upon the peer lifetime.
6559 static struct rx_queue processStats = {&processStats,&processStats};
6562 * peerStats is a queue used to store the statistics for all peer structs.
6563 * Its contents are the union of all the peer rpcStats queues.
6566 static struct rx_queue peerStats = {&peerStats,&peerStats};
6569 * rxi_monitor_processStats is used to turn process wide stat collection
6573 static int rxi_monitor_processStats = 0;
6576 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6579 static int rxi_monitor_peerStats = 0;
6582 * rxi_AddRpcStat - given all of the information for a particular rpc
6583 * call, create (if needed) and update the stat totals for the rpc.
6587 * IN stats - the queue of stats that will be updated with the new value
6589 * IN rxInterface - a unique number that identifies the rpc interface
6591 * IN currentFunc - the index of the function being invoked
6593 * IN totalFunc - the total number of functions in this interface
6595 * IN queueTime - the amount of time this function waited for a thread
6597 * IN execTime - the amount of time this function invocation took to execute
6599 * IN bytesSent - the number bytes sent by this invocation
6601 * IN bytesRcvd - the number bytes received by this invocation
6603 * IN isServer - if true, this invocation was made to a server
6605 * IN remoteHost - the ip address of the remote host
6607 * IN remotePort - the port of the remote host
6609 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6611 * INOUT counter - if a new stats structure is allocated, the counter will
6612 * be updated with the new number of allocated stat structures
6619 static int rxi_AddRpcStat(
6620 struct rx_queue *stats,
6621 afs_uint32 rxInterface,
6622 afs_uint32 currentFunc,
6623 afs_uint32 totalFunc,
6624 struct clock *queueTime,
6625 struct clock *execTime,
6626 afs_hyper_t *bytesSent,
6627 afs_hyper_t *bytesRcvd,
6629 afs_uint32 remoteHost,
6630 afs_uint32 remotePort,
6632 unsigned int *counter)
6635 rx_interface_stat_p rpc_stat, nrpc_stat;
6638 * See if there's already a structure for this interface
6641 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6642 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6643 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6647 * Didn't find a match so allocate a new structure and add it to the
6651 if (queue_IsEnd(stats, rpc_stat) ||
6652 (rpc_stat == NULL) ||
6653 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6654 (rpc_stat->stats[0].remote_is_server != isServer)) {
6658 space = sizeof(rx_interface_stat_t) + totalFunc *
6659 sizeof(rx_function_entry_v1_t);
6661 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6662 if (rpc_stat == NULL) {
6666 *counter += totalFunc;
6667 for(i=0;i<totalFunc;i++) {
6668 rpc_stat->stats[i].remote_peer = remoteHost;
6669 rpc_stat->stats[i].remote_port = remotePort;
6670 rpc_stat->stats[i].remote_is_server = isServer;
6671 rpc_stat->stats[i].interfaceId = rxInterface;
6672 rpc_stat->stats[i].func_total = totalFunc;
6673 rpc_stat->stats[i].func_index = i;
6674 hzero(rpc_stat->stats[i].invocations);
6675 hzero(rpc_stat->stats[i].bytes_sent);
6676 hzero(rpc_stat->stats[i].bytes_rcvd);
6677 rpc_stat->stats[i].queue_time_sum.sec = 0;
6678 rpc_stat->stats[i].queue_time_sum.usec = 0;
6679 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6680 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6681 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6682 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6683 rpc_stat->stats[i].queue_time_max.sec = 0;
6684 rpc_stat->stats[i].queue_time_max.usec = 0;
6685 rpc_stat->stats[i].execution_time_sum.sec = 0;
6686 rpc_stat->stats[i].execution_time_sum.usec = 0;
6687 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6688 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6689 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6690 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6691 rpc_stat->stats[i].execution_time_max.sec = 0;
6692 rpc_stat->stats[i].execution_time_max.usec = 0;
6694 queue_Prepend(stats, rpc_stat);
6695 if (addToPeerList) {
6696 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6701 * Increment the stats for this function
6704 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6705 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6706 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6707 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6708 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6709 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6710 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6712 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6713 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6715 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6716 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6717 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6718 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6720 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6721 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6729 * rx_IncrementTimeAndCount - increment the times and count for a particular
6734 * IN peer - the peer who invoked the rpc
6736 * IN rxInterface - a unique number that identifies the rpc interface
6738 * IN currentFunc - the index of the function being invoked
6740 * IN totalFunc - the total number of functions in this interface
6742 * IN queueTime - the amount of time this function waited for a thread
6744 * IN execTime - the amount of time this function invocation took to execute
6746 * IN bytesSent - the number bytes sent by this invocation
6748 * IN bytesRcvd - the number bytes received by this invocation
6750 * IN isServer - if true, this invocation was made to a server
6757 void rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
6758 afs_uint32 currentFunc, afs_uint32 totalFunc, struct clock *queueTime,
6759 struct clock *execTime, afs_hyper_t *bytesSent, afs_hyper_t *bytesRcvd, int isServer)
6762 MUTEX_ENTER(&rx_rpc_stats);
6763 MUTEX_ENTER(&peer->peer_lock);
6765 if (rxi_monitor_peerStats) {
6766 rxi_AddRpcStat(&peer->rpcStats,
6778 &rxi_rpc_peer_stat_cnt);
6781 if (rxi_monitor_processStats) {
6782 rxi_AddRpcStat(&processStats,
6794 &rxi_rpc_process_stat_cnt);
6797 MUTEX_EXIT(&peer->peer_lock);
6798 MUTEX_EXIT(&rx_rpc_stats);
6803 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6807 * IN callerVersion - the rpc stat version of the caller.
6809 * IN count - the number of entries to marshall.
6811 * IN stats - pointer to stats to be marshalled.
6813 * OUT ptr - Where to store the marshalled data.
6819 void rx_MarshallProcessRPCStats(afs_uint32 callerVersion,
6820 int count, rx_function_entry_v1_t *stats, afs_uint32 **ptrP)
6826 * We only support the first version
6828 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6829 *(ptr++) = stats->remote_peer;
6830 *(ptr++) = stats->remote_port;
6831 *(ptr++) = stats->remote_is_server;
6832 *(ptr++) = stats->interfaceId;
6833 *(ptr++) = stats->func_total;
6834 *(ptr++) = stats->func_index;
6835 *(ptr++) = hgethi(stats->invocations);
6836 *(ptr++) = hgetlo(stats->invocations);
6837 *(ptr++) = hgethi(stats->bytes_sent);
6838 *(ptr++) = hgetlo(stats->bytes_sent);
6839 *(ptr++) = hgethi(stats->bytes_rcvd);
6840 *(ptr++) = hgetlo(stats->bytes_rcvd);
6841 *(ptr++) = stats->queue_time_sum.sec;
6842 *(ptr++) = stats->queue_time_sum.usec;
6843 *(ptr++) = stats->queue_time_sum_sqr.sec;
6844 *(ptr++) = stats->queue_time_sum_sqr.usec;
6845 *(ptr++) = stats->queue_time_min.sec;
6846 *(ptr++) = stats->queue_time_min.usec;
6847 *(ptr++) = stats->queue_time_max.sec;
6848 *(ptr++) = stats->queue_time_max.usec;
6849 *(ptr++) = stats->execution_time_sum.sec;
6850 *(ptr++) = stats->execution_time_sum.usec;
6851 *(ptr++) = stats->execution_time_sum_sqr.sec;
6852 *(ptr++) = stats->execution_time_sum_sqr.usec;
6853 *(ptr++) = stats->execution_time_min.sec;
6854 *(ptr++) = stats->execution_time_min.usec;
6855 *(ptr++) = stats->execution_time_max.sec;
6856 *(ptr++) = stats->execution_time_max.usec;
6862 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6867 * IN callerVersion - the rpc stat version of the caller
6869 * OUT myVersion - the rpc stat version of this function
6871 * OUT clock_sec - local time seconds
6873 * OUT clock_usec - local time microseconds
6875 * OUT allocSize - the number of bytes allocated to contain stats
6877 * OUT statCount - the number stats retrieved from this process.
6879 * OUT stats - the actual stats retrieved from this process.
6883 * Returns void. If successful, stats will != NULL.
6886 int rx_RetrieveProcessRPCStats(afs_uint32 callerVersion,
6887 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6888 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6898 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6901 * Check to see if stats are enabled
6904 MUTEX_ENTER(&rx_rpc_stats);
6905 if (!rxi_monitor_processStats) {
6906 MUTEX_EXIT(&rx_rpc_stats);
6910 clock_GetTime(&now);
6911 *clock_sec = now.sec;
6912 *clock_usec = now.usec;
6915 * Allocate the space based upon the caller version
6917 * If the client is at an older version than we are,
6918 * we return the statistic data in the older data format, but
6919 * we still return our version number so the client knows we
6920 * are maintaining more data than it can retrieve.
6923 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6924 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6925 *statCount = rxi_rpc_process_stat_cnt;
6928 * This can't happen yet, but in the future version changes
6929 * can be handled by adding additional code here
6933 if (space > (size_t) 0) {
6935 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6938 rx_interface_stat_p rpc_stat, nrpc_stat;
6941 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6942 rx_interface_stat)) {
6944 * Copy the data based upon the caller version
6946 rx_MarshallProcessRPCStats(callerVersion,
6947 rpc_stat->stats[0].func_total,
6948 rpc_stat->stats, &ptr);
6954 MUTEX_EXIT(&rx_rpc_stats);
6959 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6963 * IN callerVersion - the rpc stat version of the caller
6965 * OUT myVersion - the rpc stat version of this function
6967 * OUT clock_sec - local time seconds
6969 * OUT clock_usec - local time microseconds
6971 * OUT allocSize - the number of bytes allocated to contain stats
6973 * OUT statCount - the number of stats retrieved from the individual
6976 * OUT stats - the actual stats retrieved from the individual peer structures.
6980 * Returns void. If successful, stats will != NULL.
6983 int rx_RetrievePeerRPCStats(afs_uint32 callerVersion,
6984 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6985 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6995 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6998 * Check to see if stats are enabled
7001 MUTEX_ENTER(&rx_rpc_stats);
7002 if (!rxi_monitor_peerStats) {
7003 MUTEX_EXIT(&rx_rpc_stats);
7007 clock_GetTime(&now);
7008 *clock_sec = now.sec;
7009 *clock_usec = now.usec;
7012 * Allocate the space based upon the caller version
7014 * If the client is at an older version than we are,
7015 * we return the statistic data in the older data format, but
7016 * we still return our version number so the client knows we
7017 * are maintaining more data than it can retrieve.
7020 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7021 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
7022 *statCount = rxi_rpc_peer_stat_cnt;
7025 * This can't happen yet, but in the future version changes
7026 * can be handled by adding additional code here
7030 if (space > (size_t) 0) {
7032 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7035 rx_interface_stat_p rpc_stat, nrpc_stat;
7038 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7039 rx_interface_stat)) {
7041 * We have to fix the offset of rpc_stat since we are
7042 * keeping this structure on two rx_queues. The rx_queue
7043 * package assumes that the rx_queue member is the first
7044 * member of the structure. That is, rx_queue assumes that
7045 * any one item is only on one queue at a time. We are
7046 * breaking that assumption and so we have to do a little
7047 * math to fix our pointers.
7050 fix_offset = (char *) rpc_stat;
7051 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7052 rpc_stat = (rx_interface_stat_p) fix_offset;
7055 * Copy the data based upon the caller version
7057 rx_MarshallProcessRPCStats(callerVersion,
7058 rpc_stat->stats[0].func_total,
7059 rpc_stat->stats, &ptr);
7065 MUTEX_EXIT(&rx_rpc_stats);
7070 * rx_FreeRPCStats - free memory allocated by
7071 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7075 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7076 * rx_RetrievePeerRPCStats
7078 * IN allocSize - the number of bytes in stats.
7085 void rx_FreeRPCStats(afs_uint32 *stats, size_t allocSize)
7087 rxi_Free(stats, allocSize);
7091 * rx_queryProcessRPCStats - see if process rpc stat collection is
7092 * currently enabled.
7098 * Returns 0 if stats are not enabled != 0 otherwise
7101 int rx_queryProcessRPCStats(void)
7104 MUTEX_ENTER(&rx_rpc_stats);
7105 rc = rxi_monitor_processStats;
7106 MUTEX_EXIT(&rx_rpc_stats);
7111 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7117 * Returns 0 if stats are not enabled != 0 otherwise
7120 int rx_queryPeerRPCStats(void)
7123 MUTEX_ENTER(&rx_rpc_stats);
7124 rc = rxi_monitor_peerStats;
7125 MUTEX_EXIT(&rx_rpc_stats);
7130 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7139 void rx_enableProcessRPCStats(void)
7141 MUTEX_ENTER(&rx_rpc_stats);
7142 rx_enable_stats = 1;
7143 rxi_monitor_processStats = 1;
7144 MUTEX_EXIT(&rx_rpc_stats);
7148 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7157 void rx_enablePeerRPCStats(void)
7159 MUTEX_ENTER(&rx_rpc_stats);
7160 rx_enable_stats = 1;
7161 rxi_monitor_peerStats = 1;
7162 MUTEX_EXIT(&rx_rpc_stats);
7166 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7175 void rx_disableProcessRPCStats(void)
7177 rx_interface_stat_p rpc_stat, nrpc_stat;
7180 MUTEX_ENTER(&rx_rpc_stats);
7183 * Turn off process statistics and if peer stats is also off, turn
7187 rxi_monitor_processStats = 0;
7188 if (rxi_monitor_peerStats == 0) {
7189 rx_enable_stats = 0;
7192 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7193 unsigned int num_funcs = 0;
7194 if (!rpc_stat) break;
7195 queue_Remove(rpc_stat);
7196 num_funcs = rpc_stat->stats[0].func_total;
7197 space = sizeof(rx_interface_stat_t) +
7198 rpc_stat->stats[0].func_total *
7199 sizeof(rx_function_entry_v1_t);
7201 rxi_Free(rpc_stat, space);
7202 rxi_rpc_process_stat_cnt -= num_funcs;
7204 MUTEX_EXIT(&rx_rpc_stats);
7208 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7217 void rx_disablePeerRPCStats(void)
7219 struct rx_peer **peer_ptr, **peer_end;
7222 MUTEX_ENTER(&rx_rpc_stats);
7225 * Turn off peer statistics and if process stats is also off, turn
7229 rxi_monitor_peerStats = 0;
7230 if (rxi_monitor_processStats == 0) {
7231 rx_enable_stats = 0;
7234 MUTEX_ENTER(&rx_peerHashTable_lock);
7235 for (peer_ptr = &rx_peerHashTable[0],
7236 peer_end = &rx_peerHashTable[rx_hashTableSize];
7237 peer_ptr < peer_end; peer_ptr++) {
7238 struct rx_peer *peer, *next, *prev;
7239 for (prev = peer = *peer_ptr; peer; peer = next) {
7241 code = MUTEX_TRYENTER(&peer->peer_lock);
7243 rx_interface_stat_p rpc_stat, nrpc_stat;
7245 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7246 rx_interface_stat)) {
7247 unsigned int num_funcs = 0;
7248 if (!rpc_stat) break;
7249 queue_Remove(&rpc_stat->queue_header);
7250 queue_Remove(&rpc_stat->all_peers);
7251 num_funcs = rpc_stat->stats[0].func_total;
7252 space = sizeof(rx_interface_stat_t) +
7253 rpc_stat->stats[0].func_total *
7254 sizeof(rx_function_entry_v1_t);
7256 rxi_Free(rpc_stat, space);
7257 rxi_rpc_peer_stat_cnt -= num_funcs;
7259 MUTEX_EXIT(&peer->peer_lock);
7260 if (prev == *peer_ptr) {
7272 MUTEX_EXIT(&rx_peerHashTable_lock);
7273 MUTEX_EXIT(&rx_rpc_stats);
7277 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7282 * IN clearFlag - flag indicating which stats to clear
7289 void rx_clearProcessRPCStats(afs_uint32 clearFlag)
7291 rx_interface_stat_p rpc_stat, nrpc_stat;
7293 MUTEX_ENTER(&rx_rpc_stats);
7295 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7296 unsigned int num_funcs = 0, i;
7297 num_funcs = rpc_stat->stats[0].func_total;
7298 for(i=0;i<num_funcs;i++) {
7299 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7300 hzero(rpc_stat->stats[i].invocations);
7302 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7303 hzero(rpc_stat->stats[i].bytes_sent);
7305 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7306 hzero(rpc_stat->stats[i].bytes_rcvd);
7308 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7309 rpc_stat->stats[i].queue_time_sum.sec = 0;
7310 rpc_stat->stats[i].queue_time_sum.usec = 0;
7312 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7313 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7314 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7316 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7317 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7318 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7320 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7321 rpc_stat->stats[i].queue_time_max.sec = 0;
7322 rpc_stat->stats[i].queue_time_max.usec = 0;
7324 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7325 rpc_stat->stats[i].execution_time_sum.sec = 0;
7326 rpc_stat->stats[i].execution_time_sum.usec = 0;
7328 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7329 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7330 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7332 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7333 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7334 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7336 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7337 rpc_stat->stats[i].execution_time_max.sec = 0;
7338 rpc_stat->stats[i].execution_time_max.usec = 0;
7343 MUTEX_EXIT(&rx_rpc_stats);
7347 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7352 * IN clearFlag - flag indicating which stats to clear
7359 void rx_clearPeerRPCStats(afs_uint32 clearFlag)
7361 rx_interface_stat_p rpc_stat, nrpc_stat;
7363 MUTEX_ENTER(&rx_rpc_stats);
7365 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7366 unsigned int num_funcs = 0, i;
7369 * We have to fix the offset of rpc_stat since we are
7370 * keeping this structure on two rx_queues. The rx_queue
7371 * package assumes that the rx_queue member is the first
7372 * member of the structure. That is, rx_queue assumes that
7373 * any one item is only on one queue at a time. We are
7374 * breaking that assumption and so we have to do a little
7375 * math to fix our pointers.
7378 fix_offset = (char *) rpc_stat;
7379 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7380 rpc_stat = (rx_interface_stat_p) fix_offset;
7382 num_funcs = rpc_stat->stats[0].func_total;
7383 for(i=0;i<num_funcs;i++) {
7384 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7385 hzero(rpc_stat->stats[i].invocations);
7387 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7388 hzero(rpc_stat->stats[i].bytes_sent);
7390 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7391 hzero(rpc_stat->stats[i].bytes_rcvd);
7393 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7394 rpc_stat->stats[i].queue_time_sum.sec = 0;
7395 rpc_stat->stats[i].queue_time_sum.usec = 0;
7397 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7398 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7399 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7401 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7402 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7403 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7405 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7406 rpc_stat->stats[i].queue_time_max.sec = 0;
7407 rpc_stat->stats[i].queue_time_max.usec = 0;
7409 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7410 rpc_stat->stats[i].execution_time_sum.sec = 0;
7411 rpc_stat->stats[i].execution_time_sum.usec = 0;
7413 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7414 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7415 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7417 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7418 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7419 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7421 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7422 rpc_stat->stats[i].execution_time_max.sec = 0;
7423 rpc_stat->stats[i].execution_time_max.usec = 0;
7428 MUTEX_EXIT(&rx_rpc_stats);
7432 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7433 * is authorized to enable/disable/clear RX statistics.
7435 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7437 void rx_SetRxStatUserOk(int (*proc)(struct rx_call *call))
7439 rxi_rxstat_userok = proc;
7442 int rx_RxStatUserOk(struct rx_call *call)
7444 if (!rxi_rxstat_userok)
7446 return rxi_rxstat_userok(call);