2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "afs/param.h"
16 #include <afs/param.h>
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
64 #include "rx_globals.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
116 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
118 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119 afs_int32 rxi_start_in_error;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
124 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125 * currently allocated within rx. This number is used to allocate the
126 * memory required to return the statistics when queried.
129 static unsigned int rxi_rpc_peer_stat_cnt;
132 * rxi_rpc_process_stat_cnt counts the total number of local process stat
133 * structures currently allocated within rx. The number is used to allocate
134 * the memory required to return the statistics when queried.
137 static unsigned int rxi_rpc_process_stat_cnt;
139 #if !defined(offsetof)
140 #include <stddef.h> /* for definition of offsetof() */
143 #ifdef AFS_PTHREAD_ENV
147 * Use procedural initialization of mutexes/condition variables
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
172 static void rxi_InitPthread(void) {
173 assert(pthread_mutex_init(&rx_clock_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxi_connCacheMutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_init_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&epoch_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_event_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&des_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&des_random_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&osi_malloc_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&event_handler_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&listener_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_if_init_mutex,
194 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_mutex_init(&rx_if_mutex,
196 (const pthread_mutexattr_t*)0)==0);
197 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198 (const pthread_mutexattr_t*)0)==0);
199 assert(pthread_mutex_init(&rxkad_random_mutex,
200 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_mutex_init(&rxkad_stats_mutex,
202 (const pthread_mutexattr_t*)0)==0);
203 assert(pthread_mutex_init(&rx_debug_mutex,
204 (const pthread_mutexattr_t*)0)==0);
206 assert(pthread_cond_init(&rx_event_handler_cond,
207 (const pthread_condattr_t*)0)==0);
208 assert(pthread_cond_init(&rx_listener_cond,
209 (const pthread_condattr_t*)0)==0);
210 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
217 * The rx_stats_mutex mutex protects the following global variables:
222 * rxi_lowConnRefCount
223 * rxi_lowPeerRefCount
232 #define INIT_PTHREAD_LOCKS
236 /* Variables for handling the minProcs implementation. availProcs gives the
237 * number of threads available in the pool at this moment (not counting dudes
238 * executing right now). totalMin gives the total number of procs required
239 * for handling all minProcs requests. minDeficit is a dynamic variable
240 * tracking the # of procs required to satisfy all of the remaining minProcs
242 * For fine grain locking to work, the quota check and the reservation of
243 * a server thread has to come while rxi_availProcs and rxi_minDeficit
244 * are locked. To this end, the code has been modified under #ifdef
245 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246 * same time. A new function, ReturnToServerPool() returns the allocation.
248 * A call can be on several queue's (but only one at a time). When
249 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250 * that no one else is touching the queue. To this end, we store the address
251 * of the queue lock in the call structure (under the call lock) when we
252 * put the call on a queue, and we clear the call_queue_lock when the
253 * call is removed from a queue (once the call lock has been obtained).
254 * This allows rxi_ResetCall to safely synchronize with others wishing
255 * to manipulate the queue.
258 #ifdef RX_ENABLE_LOCKS
259 static int rxi_ServerThreadSelectingCall;
260 static afs_kmutex_t rx_rpc_stats;
261 void rxi_StartUnlocked();
264 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
265 ** pretty good that the next packet coming in is from the same connection
266 ** as the last packet, since we're send multiple packets in a transmit window.
268 struct rx_connection *rxLastConn = 0;
270 #ifdef RX_ENABLE_LOCKS
271 /* The locking hierarchy for rx fine grain locking is composed of these
274 * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
275 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
276 * call->lock - locks call data fields.
277 * These are independent of each other:
278 * rx_freeCallQueue_lock
283 * serverQueueEntry->lock
285 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
286 * peer->lock - locks peer data fields.
287 * conn_data_lock - that more than one thread is not updating a conn data
288 * field at the same time.
296 * Do we need a lock to protect the peer field in the conn structure?
297 * conn->peer was previously a constant for all intents and so has no
298 * lock protecting this field. The multihomed client delta introduced
299 * a RX code change : change the peer field in the connection structure
300 * to that remote inetrface from which the last packet for this
301 * connection was sent out. This may become an issue if further changes
304 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
305 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
307 /* rxdb_fileID is used to identify the lock location, along with line#. */
308 static int rxdb_fileID = RXDB_FILE_RX;
309 #endif /* RX_LOCKS_DB */
310 #else /* RX_ENABLE_LOCKS */
311 #define SET_CALL_QUEUE_LOCK(C, L)
312 #define CLEAR_CALL_QUEUE_LOCK(C)
313 #endif /* RX_ENABLE_LOCKS */
314 struct rx_serverQueueEntry *rx_waitForPacket = 0;
316 /* ------------Exported Interfaces------------- */
318 /* This function allows rxkad to set the epoch to a suitably random number
319 * which rx_NewConnection will use in the future. The principle purpose is to
320 * get rxnull connections to use the same epoch as the rxkad connections do, at
321 * least once the first rxkad connection is established. This is important now
322 * that the host/port addresses aren't used in FindConnection: the uniqueness
323 * of epoch/cid matters and the start time won't do. */
325 #ifdef AFS_PTHREAD_ENV
327 * This mutex protects the following global variables:
331 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
332 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
336 #endif /* AFS_PTHREAD_ENV */
338 void rx_SetEpoch (afs_uint32 epoch)
345 /* Initialize rx. A port number may be mentioned, in which case this
346 * becomes the default port number for any service installed later.
347 * If 0 is provided for the port number, a random port will be chosen
348 * by the kernel. Whether this will ever overlap anything in
349 * /etc/services is anybody's guess... Returns 0 on success, -1 on
351 static int rxinit_status = 1;
352 #ifdef AFS_PTHREAD_ENV
354 * This mutex protects the following global variables:
358 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
359 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
362 #define UNLOCK_RX_INIT
365 int rx_Init(u_int port)
372 char *htable, *ptable;
375 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
376 __djgpp_set_quiet_socket(1);
383 if (rxinit_status == 0) {
384 tmp_status = rxinit_status;
386 return tmp_status; /* Already started; return previous error code. */
390 if (afs_winsockInit()<0)
396 * Initialize anything necessary to provide a non-premptive threading
399 rxi_InitializeThreadSupport();
402 /* Allocate and initialize a socket for client and perhaps server
405 rx_socket = rxi_GetUDPSocket((u_short)port);
406 if (rx_socket == OSI_NULLSOCKET) {
412 #ifdef RX_ENABLE_LOCKS
415 #endif /* RX_LOCKS_DB */
416 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
417 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
418 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
419 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
420 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
422 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
423 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
424 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
425 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
427 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
429 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
430 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
432 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
433 #endif /* KERNEL && AFS_HPUX110_ENV */
434 #else /* RX_ENABLE_LOCKS */
435 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
436 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
437 #endif /* AFS_GLOBAL_SUNLOCK */
438 #endif /* RX_ENABLE_LOCKS */
441 rx_connDeadTime = 12;
442 rx_tranquil = 0; /* reset flag */
443 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
445 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
446 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
447 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
448 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
449 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
450 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
452 /* Malloc up a bunch of packets & buffers */
454 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
455 queue_Init(&rx_freePacketQueue);
456 rxi_NeedMorePackets = FALSE;
457 rxi_MorePackets(rx_nPackets);
465 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
466 tv.tv_sec = clock_now.sec;
467 tv.tv_usec = clock_now.usec;
468 srand((unsigned int) tv.tv_usec);
475 #if defined(KERNEL) && !defined(UKERNEL)
476 /* Really, this should never happen in a real kernel */
479 struct sockaddr_in addr;
480 int addrlen = sizeof(addr);
481 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
485 rx_port = addr.sin_port;
488 rx_stats.minRtt.sec = 9999999;
490 rx_SetEpoch (tv.tv_sec | 0x80000000);
492 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
493 * will provide a randomer value. */
495 MUTEX_ENTER(&rx_stats_mutex);
496 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
497 MUTEX_EXIT(&rx_stats_mutex);
498 /* *Slightly* random start time for the cid. This is just to help
499 * out with the hashing function at the peer */
500 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
501 rx_connHashTable = (struct rx_connection **) htable;
502 rx_peerHashTable = (struct rx_peer **) ptable;
504 rx_lastAckDelay.sec = 0;
505 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
506 rx_hardAckDelay.sec = 0;
507 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
508 rx_softAckDelay.sec = 0;
509 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
511 rxevent_Init(20, rxi_ReScheduleEvents);
513 /* Initialize various global queues */
514 queue_Init(&rx_idleServerQueue);
515 queue_Init(&rx_incomingCallQueue);
516 queue_Init(&rx_freeCallQueue);
518 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
519 /* Initialize our list of usable IP addresses. */
523 /* Start listener process (exact function is dependent on the
524 * implementation environment--kernel or user space) */
529 tmp_status = rxinit_status = 0;
534 /* called with unincremented nRequestsRunning to see if it is OK to start
535 * a new thread in this service. Could be "no" for two reasons: over the
536 * max quota, or would prevent others from reaching their min quota.
538 #ifdef RX_ENABLE_LOCKS
539 /* This verion of QuotaOK reserves quota if it's ok while the
540 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
542 static int QuotaOK(register struct rx_service *aservice)
544 /* check if over max quota */
545 if (aservice->nRequestsRunning >= aservice->maxProcs) {
549 /* under min quota, we're OK */
550 /* otherwise, can use only if there are enough to allow everyone
551 * to go to their min quota after this guy starts.
553 MUTEX_ENTER(&rx_stats_mutex);
554 if ((aservice->nRequestsRunning < aservice->minProcs) ||
555 (rxi_availProcs > rxi_minDeficit)) {
556 aservice->nRequestsRunning++;
557 /* just started call in minProcs pool, need fewer to maintain
559 if (aservice->nRequestsRunning <= aservice->minProcs)
562 MUTEX_EXIT(&rx_stats_mutex);
565 MUTEX_EXIT(&rx_stats_mutex);
570 static void ReturnToServerPool(register struct rx_service *aservice)
572 aservice->nRequestsRunning--;
573 MUTEX_ENTER(&rx_stats_mutex);
574 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
576 MUTEX_EXIT(&rx_stats_mutex);
579 #else /* RX_ENABLE_LOCKS */
580 static int QuotaOK(register struct rx_service *aservice)
583 /* under min quota, we're OK */
584 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
586 /* check if over max quota */
587 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
589 /* otherwise, can use only if there are enough to allow everyone
590 * to go to their min quota after this guy starts.
592 if (rxi_availProcs > rxi_minDeficit) rc = 1;
595 #endif /* RX_ENABLE_LOCKS */
598 /* Called by rx_StartServer to start up lwp's to service calls.
599 NExistingProcs gives the number of procs already existing, and which
600 therefore needn't be created. */
601 void rxi_StartServerProcs(int nExistingProcs)
603 register struct rx_service *service;
608 /* For each service, reserve N processes, where N is the "minimum"
609 number of processes that MUST be able to execute a request in parallel,
610 at any time, for that process. Also compute the maximum difference
611 between any service's maximum number of processes that can run
612 (i.e. the maximum number that ever will be run, and a guarantee
613 that this number will run if other services aren't running), and its
614 minimum number. The result is the extra number of processes that
615 we need in order to provide the latter guarantee */
616 for (i=0; i<RX_MAX_SERVICES; i++) {
618 service = rx_services[i];
619 if (service == (struct rx_service *) 0) break;
620 nProcs += service->minProcs;
621 diff = service->maxProcs - service->minProcs;
622 if (diff > maxdiff) maxdiff = diff;
624 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
625 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
626 for (i = 0; i<nProcs; i++) {
627 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
632 /* This routine must be called if any services are exported. If the
633 * donateMe flag is set, the calling process is donated to the server
635 void rx_StartServer(int donateMe)
637 register struct rx_service *service;
644 /* Start server processes, if necessary (exact function is dependent
645 * on the implementation environment--kernel or user space). DonateMe
646 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
647 * case, one less new proc will be created rx_StartServerProcs.
649 rxi_StartServerProcs(donateMe);
651 /* count up the # of threads in minProcs, and add set the min deficit to
652 * be that value, too.
654 for (i=0; i<RX_MAX_SERVICES; i++) {
655 service = rx_services[i];
656 if (service == (struct rx_service *) 0) break;
657 MUTEX_ENTER(&rx_stats_mutex);
658 rxi_totalMin += service->minProcs;
659 /* below works even if a thread is running, since minDeficit would
660 * still have been decremented and later re-incremented.
662 rxi_minDeficit += service->minProcs;
663 MUTEX_EXIT(&rx_stats_mutex);
666 /* Turn on reaping of idle server connections */
667 rxi_ReapConnections();
676 #ifdef AFS_PTHREAD_ENV
678 pid = (pid_t) pthread_self();
679 #else /* AFS_PTHREAD_ENV */
681 LWP_CurrentProcess(&pid);
682 #endif /* AFS_PTHREAD_ENV */
684 sprintf(name,"srv_%d", ++nProcs);
686 (*registerProgram)(pid, name);
688 #endif /* AFS_NT40_ENV */
689 rx_ServerProc(); /* Never returns */
694 /* Create a new client connection to the specified service, using the
695 * specified security object to implement the security model for this
697 struct rx_connection *rx_NewConnection(register afs_uint32 shost,
698 u_short sport, u_short sservice,
699 register struct rx_securityClass *securityObject, int serviceSecurityIndex)
703 register struct rx_connection *conn;
708 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
709 shost, sport, sservice, securityObject, serviceSecurityIndex));
711 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
712 * the case of kmem_alloc? */
713 conn = rxi_AllocConnection();
714 #ifdef RX_ENABLE_LOCKS
715 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
716 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
717 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
721 MUTEX_ENTER(&rx_connHashTable_lock);
722 cid = (rx_nextCid += RX_MAXCALLS);
723 conn->type = RX_CLIENT_CONNECTION;
725 conn->epoch = rx_epoch;
726 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
727 conn->serviceId = sservice;
728 conn->securityObject = securityObject;
729 /* This doesn't work in all compilers with void (they're buggy), so fake it
731 conn->securityData = (VOID *) 0;
732 conn->securityIndex = serviceSecurityIndex;
733 rx_SetConnDeadTime(conn, rx_connDeadTime);
734 conn->ackRate = RX_FAST_ACK_RATE;
736 conn->specific = NULL;
737 conn->challengeEvent = NULL;
738 conn->delayedAbortEvent = NULL;
739 conn->abortCount = 0;
742 RXS_NewConnection(securityObject, conn);
743 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
745 conn->refCount++; /* no lock required since only this thread knows... */
746 conn->next = rx_connHashTable[hashindex];
747 rx_connHashTable[hashindex] = conn;
748 MUTEX_ENTER(&rx_stats_mutex);
749 rx_stats.nClientConns++;
750 MUTEX_EXIT(&rx_stats_mutex);
752 MUTEX_EXIT(&rx_connHashTable_lock);
758 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
760 /* The idea is to set the dead time to a value that allows several
761 * keepalives to be dropped without timing out the connection. */
762 conn->secondsUntilDead = MAX(seconds, 6);
763 conn->secondsUntilPing = conn->secondsUntilDead/6;
766 int rxi_lowPeerRefCount = 0;
767 int rxi_lowConnRefCount = 0;
770 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
771 * NOTE: must not be called with rx_connHashTable_lock held.
773 void rxi_CleanupConnection(struct rx_connection *conn)
775 /* Notify the service exporter, if requested, that this connection
776 * is being destroyed */
777 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
778 (*conn->service->destroyConnProc)(conn);
780 /* Notify the security module that this connection is being destroyed */
781 RXS_DestroyConnection(conn->securityObject, conn);
783 /* If this is the last connection using the rx_peer struct, set its
784 * idle time to now. rxi_ReapConnections will reap it if it's still
785 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
787 MUTEX_ENTER(&rx_peerHashTable_lock);
788 if (--conn->peer->refCount <= 0) {
789 conn->peer->idleWhen = clock_Sec();
790 if (conn->peer->refCount < 0) {
791 conn->peer->refCount = 0;
792 MUTEX_ENTER(&rx_stats_mutex);
793 rxi_lowPeerRefCount ++;
794 MUTEX_EXIT(&rx_stats_mutex);
797 MUTEX_EXIT(&rx_peerHashTable_lock);
799 MUTEX_ENTER(&rx_stats_mutex);
800 if (conn->type == RX_SERVER_CONNECTION)
801 rx_stats.nServerConns--;
803 rx_stats.nClientConns--;
804 MUTEX_EXIT(&rx_stats_mutex);
807 if (conn->specific) {
809 for (i = 0 ; i < conn->nSpecific ; i++) {
810 if (conn->specific[i] && rxi_keyCreate_destructor[i])
811 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
812 conn->specific[i] = NULL;
814 free(conn->specific);
816 conn->specific = NULL;
820 MUTEX_DESTROY(&conn->conn_call_lock);
821 MUTEX_DESTROY(&conn->conn_data_lock);
822 CV_DESTROY(&conn->conn_call_cv);
824 rxi_FreeConnection(conn);
827 /* Destroy the specified connection */
828 void rxi_DestroyConnection(register struct rx_connection *conn)
830 MUTEX_ENTER(&rx_connHashTable_lock);
831 rxi_DestroyConnectionNoLock(conn);
832 /* conn should be at the head of the cleanup list */
833 if (conn == rx_connCleanup_list) {
834 rx_connCleanup_list = rx_connCleanup_list->next;
835 MUTEX_EXIT(&rx_connHashTable_lock);
836 rxi_CleanupConnection(conn);
838 #ifdef RX_ENABLE_LOCKS
840 MUTEX_EXIT(&rx_connHashTable_lock);
842 #endif /* RX_ENABLE_LOCKS */
845 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
847 register struct rx_connection **conn_ptr;
848 register int havecalls = 0;
849 struct rx_packet *packet;
856 MUTEX_ENTER(&conn->conn_data_lock);
857 if (conn->refCount > 0)
860 MUTEX_ENTER(&rx_stats_mutex);
861 rxi_lowConnRefCount++;
862 MUTEX_EXIT(&rx_stats_mutex);
865 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
866 /* Busy; wait till the last guy before proceeding */
867 MUTEX_EXIT(&conn->conn_data_lock);
872 /* If the client previously called rx_NewCall, but it is still
873 * waiting, treat this as a running call, and wait to destroy the
874 * connection later when the call completes. */
875 if ((conn->type == RX_CLIENT_CONNECTION) &&
876 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
877 conn->flags |= RX_CONN_DESTROY_ME;
878 MUTEX_EXIT(&conn->conn_data_lock);
882 MUTEX_EXIT(&conn->conn_data_lock);
884 /* Check for extant references to this connection */
885 for (i = 0; i<RX_MAXCALLS; i++) {
886 register struct rx_call *call = conn->call[i];
889 if (conn->type == RX_CLIENT_CONNECTION) {
890 MUTEX_ENTER(&call->lock);
891 if (call->delayedAckEvent) {
892 /* Push the final acknowledgment out now--there
893 * won't be a subsequent call to acknowledge the
894 * last reply packets */
895 rxevent_Cancel(call->delayedAckEvent, call,
896 RX_CALL_REFCOUNT_DELAY);
897 if (call->state == RX_STATE_PRECALL ||
898 call->state == RX_STATE_ACTIVE) {
899 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
901 rxi_AckAll(NULL, call, 0);
904 MUTEX_EXIT(&call->lock);
908 #ifdef RX_ENABLE_LOCKS
910 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
911 MUTEX_EXIT(&conn->conn_data_lock);
914 /* Someone is accessing a packet right now. */
918 #endif /* RX_ENABLE_LOCKS */
921 /* Don't destroy the connection if there are any call
922 * structures still in use */
923 MUTEX_ENTER(&conn->conn_data_lock);
924 conn->flags |= RX_CONN_DESTROY_ME;
925 MUTEX_EXIT(&conn->conn_data_lock);
930 if (conn->delayedAbortEvent) {
931 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
932 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
934 MUTEX_ENTER(&conn->conn_data_lock);
935 rxi_SendConnectionAbort(conn, packet, 0, 1);
936 MUTEX_EXIT(&conn->conn_data_lock);
937 rxi_FreePacket(packet);
941 /* Remove from connection hash table before proceeding */
942 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
943 conn->epoch, conn->type) ];
944 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
945 if (*conn_ptr == conn) {
946 *conn_ptr = conn->next;
950 /* if the conn that we are destroying was the last connection, then we
951 * clear rxLastConn as well */
952 if ( rxLastConn == conn )
955 /* Make sure the connection is completely reset before deleting it. */
956 /* get rid of pending events that could zap us later */
957 if (conn->challengeEvent)
958 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
959 if (conn->checkReachEvent)
960 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
962 /* Add the connection to the list of destroyed connections that
963 * need to be cleaned up. This is necessary to avoid deadlocks
964 * in the routines we call to inform others that this connection is
965 * being destroyed. */
966 conn->next = rx_connCleanup_list;
967 rx_connCleanup_list = conn;
970 /* Externally available version */
971 void rx_DestroyConnection(register struct rx_connection *conn)
977 rxi_DestroyConnection (conn);
982 /* Start a new rx remote procedure call, on the specified connection.
983 * If wait is set to 1, wait for a free call channel; otherwise return
984 * 0. Maxtime gives the maximum number of seconds this call may take,
985 * after rx_MakeCall returns. After this time interval, a call to any
986 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
987 * For fine grain locking, we hold the conn_call_lock in order to
988 * to ensure that we don't get signalle after we found a call in an active
989 * state and before we go to sleep.
991 struct rx_call *rx_NewCall(register struct rx_connection *conn)
994 register struct rx_call *call;
995 struct clock queueTime;
999 dpf (("rx_MakeCall(conn %x)\n", conn));
1002 clock_GetTime(&queueTime);
1004 MUTEX_ENTER(&conn->conn_call_lock);
1007 * Check if there are others waiting for a new call.
1008 * If so, let them go first to avoid starving them.
1009 * This is a fairly simple scheme, and might not be
1010 * a complete solution for large numbers of waiters.
1012 if (conn->makeCallWaiters) {
1013 #ifdef RX_ENABLE_LOCKS
1014 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1021 for (i=0; i<RX_MAXCALLS; i++) {
1022 call = conn->call[i];
1024 MUTEX_ENTER(&call->lock);
1025 if (call->state == RX_STATE_DALLY) {
1026 rxi_ResetCall(call, 0);
1027 (*call->callNumber)++;
1030 MUTEX_EXIT(&call->lock);
1033 call = rxi_NewCall(conn, i);
1037 if (i < RX_MAXCALLS) {
1040 MUTEX_ENTER(&conn->conn_data_lock);
1041 conn->flags |= RX_CONN_MAKECALL_WAITING;
1042 MUTEX_EXIT(&conn->conn_data_lock);
1044 conn->makeCallWaiters++;
1045 #ifdef RX_ENABLE_LOCKS
1046 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1050 conn->makeCallWaiters--;
1053 * Wake up anyone else who might be giving us a chance to
1054 * run (see code above that avoids resource starvation).
1056 #ifdef RX_ENABLE_LOCKS
1057 CV_BROADCAST(&conn->conn_call_cv);
1062 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1064 /* Client is initially in send mode */
1065 call->state = RX_STATE_ACTIVE;
1066 call->mode = RX_MODE_SENDING;
1068 /* remember start time for call in case we have hard dead time limit */
1069 call->queueTime = queueTime;
1070 clock_GetTime(&call->startTime);
1071 hzero(call->bytesSent);
1072 hzero(call->bytesRcvd);
1074 /* Turn on busy protocol. */
1075 rxi_KeepAliveOn(call);
1077 MUTEX_EXIT(&call->lock);
1078 MUTEX_EXIT(&conn->conn_call_lock);
1082 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1083 /* Now, if TQ wasn't cleared earlier, do it now. */
1085 MUTEX_ENTER(&call->lock);
1086 while (call->flags & RX_CALL_TQ_BUSY) {
1087 call->flags |= RX_CALL_TQ_WAIT;
1088 #ifdef RX_ENABLE_LOCKS
1089 CV_WAIT(&call->cv_tq, &call->lock);
1090 #else /* RX_ENABLE_LOCKS */
1091 osi_rxSleep(&call->tq);
1092 #endif /* RX_ENABLE_LOCKS */
1094 if (call->flags & RX_CALL_TQ_CLEARME) {
1095 rxi_ClearTransmitQueue(call, 0);
1096 queue_Init(&call->tq);
1098 MUTEX_EXIT(&call->lock);
1100 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1105 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1108 register struct rx_call *tcall;
1112 for(i=0; i<RX_MAXCALLS; i++) {
1113 if ((tcall = aconn->call[i])) {
1114 if ((tcall->state == RX_STATE_ACTIVE)
1115 || (tcall->state == RX_STATE_PRECALL)) {
1125 int rxi_GetCallNumberVector(register struct rx_connection *aconn,
1126 register afs_int32 *aint32s)
1129 register struct rx_call *tcall;
1133 for(i=0; i<RX_MAXCALLS; i++) {
1134 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1135 aint32s[i] = aconn->callNumber[i]+1;
1137 aint32s[i] = aconn->callNumber[i];
1143 int rxi_SetCallNumberVector(register struct rx_connection *aconn,
1144 register afs_int32 *aint32s)
1147 register struct rx_call *tcall;
1151 for(i=0; i<RX_MAXCALLS; i++) {
1152 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1153 aconn->callNumber[i] = aint32s[i] - 1;
1155 aconn->callNumber[i] = aint32s[i];
1161 /* Advertise a new service. A service is named locally by a UDP port
1162 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1165 char *serviceName; Name for identification purposes (e.g. the
1166 service name might be used for probing for
1168 struct rx_service *rx_NewService(u_short port, u_short serviceId,
1170 struct rx_securityClass **securityObjects,
1171 int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1173 osi_socket socket = OSI_NULLSOCKET;
1174 register struct rx_service *tservice;
1180 if (serviceId == 0) {
1181 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1187 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1194 tservice = rxi_AllocService();
1197 for (i = 0; i<RX_MAX_SERVICES; i++) {
1198 register struct rx_service *service = rx_services[i];
1200 if (port == service->servicePort) {
1201 if (service->serviceId == serviceId) {
1202 /* The identical service has already been
1203 * installed; if the caller was intending to
1204 * change the security classes used by this
1205 * service, he/she loses. */
1206 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1209 rxi_FreeService(tservice);
1212 /* Different service, same port: re-use the socket
1213 * which is bound to the same port */
1214 socket = service->socket;
1217 if (socket == OSI_NULLSOCKET) {
1218 /* If we don't already have a socket (from another
1219 * service on same port) get a new one */
1220 socket = rxi_GetUDPSocket(port);
1221 if (socket == OSI_NULLSOCKET) {
1224 rxi_FreeService(tservice);
1229 service->socket = socket;
1230 service->servicePort = port;
1231 service->serviceId = serviceId;
1232 service->serviceName = serviceName;
1233 service->nSecurityObjects = nSecurityObjects;
1234 service->securityObjects = securityObjects;
1235 service->minProcs = 0;
1236 service->maxProcs = 1;
1237 service->idleDeadTime = 60;
1238 service->connDeadTime = rx_connDeadTime;
1239 service->executeRequestProc = serviceProc;
1240 service->checkReach = 0;
1241 rx_services[i] = service; /* not visible until now */
1249 rxi_FreeService(tservice);
1250 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1254 /* Generic request processing loop. This routine should be called
1255 * by the implementation dependent rx_ServerProc. If socketp is
1256 * non-null, it will be set to the file descriptor that this thread
1257 * is now listening on. If socketp is null, this routine will never
1259 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1261 register struct rx_call *call;
1262 register afs_int32 code;
1263 register struct rx_service *tservice = NULL;
1270 call = rx_GetCall(threadID, tservice, socketp);
1271 if (socketp && *socketp != OSI_NULLSOCKET) {
1272 /* We are now a listener thread */
1277 /* if server is restarting( typically smooth shutdown) then do not
1278 * allow any new calls.
1281 if ( rx_tranquil && (call != NULL) ) {
1286 MUTEX_ENTER(&call->lock);
1288 rxi_CallError(call, RX_RESTARTING);
1289 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1291 MUTEX_EXIT(&call->lock);
1297 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1298 #ifdef RX_ENABLE_LOCKS
1300 #endif /* RX_ENABLE_LOCKS */
1301 afs_termState = AFSOP_STOP_AFS;
1302 afs_osi_Wakeup(&afs_termState);
1303 #ifdef RX_ENABLE_LOCKS
1305 #endif /* RX_ENABLE_LOCKS */
1310 tservice = call->conn->service;
1312 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1314 code = call->conn->service->executeRequestProc(call);
1316 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1318 rx_EndCall(call, code);
1319 MUTEX_ENTER(&rx_stats_mutex);
1321 MUTEX_EXIT(&rx_stats_mutex);
1326 void rx_WakeupServerProcs(void)
1328 struct rx_serverQueueEntry *np, *tqp;
1333 MUTEX_ENTER(&rx_serverPool_lock);
1335 #ifdef RX_ENABLE_LOCKS
1336 if (rx_waitForPacket)
1337 CV_BROADCAST(&rx_waitForPacket->cv);
1338 #else /* RX_ENABLE_LOCKS */
1339 if (rx_waitForPacket)
1340 osi_rxWakeup(rx_waitForPacket);
1341 #endif /* RX_ENABLE_LOCKS */
1342 MUTEX_ENTER(&freeSQEList_lock);
1343 for (np = rx_FreeSQEList; np; np = tqp) {
1344 tqp = *(struct rx_serverQueueEntry **)np;
1345 #ifdef RX_ENABLE_LOCKS
1346 CV_BROADCAST(&np->cv);
1347 #else /* RX_ENABLE_LOCKS */
1349 #endif /* RX_ENABLE_LOCKS */
1351 MUTEX_EXIT(&freeSQEList_lock);
1352 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1353 #ifdef RX_ENABLE_LOCKS
1354 CV_BROADCAST(&np->cv);
1355 #else /* RX_ENABLE_LOCKS */
1357 #endif /* RX_ENABLE_LOCKS */
1359 MUTEX_EXIT(&rx_serverPool_lock);
1365 * One thing that seems to happen is that all the server threads get
1366 * tied up on some empty or slow call, and then a whole bunch of calls
1367 * arrive at once, using up the packet pool, so now there are more
1368 * empty calls. The most critical resources here are server threads
1369 * and the free packet pool. The "doreclaim" code seems to help in
1370 * general. I think that eventually we arrive in this state: there
1371 * are lots of pending calls which do have all their packets present,
1372 * so they won't be reclaimed, are multi-packet calls, so they won't
1373 * be scheduled until later, and thus are tying up most of the free
1374 * packet pool for a very long time.
1376 * 1. schedule multi-packet calls if all the packets are present.
1377 * Probably CPU-bound operation, useful to return packets to pool.
1378 * Do what if there is a full window, but the last packet isn't here?
1379 * 3. preserve one thread which *only* runs "best" calls, otherwise
1380 * it sleeps and waits for that type of call.
1381 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1382 * the current dataquota business is badly broken. The quota isn't adjusted
1383 * to reflect how many packets are presently queued for a running call.
1384 * So, when we schedule a queued call with a full window of packets queued
1385 * up for it, that *should* free up a window full of packets for other 2d-class
1386 * calls to be able to use from the packet pool. But it doesn't.
1388 * NB. Most of the time, this code doesn't run -- since idle server threads
1389 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1390 * as a new call arrives.
1392 /* Sleep until a call arrives. Returns a pointer to the call, ready
1393 * for an rx_Read. */
1394 #ifdef RX_ENABLE_LOCKS
1395 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1397 struct rx_serverQueueEntry *sq;
1398 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1399 struct rx_service *service = NULL;
1402 MUTEX_ENTER(&freeSQEList_lock);
1404 if ((sq = rx_FreeSQEList)) {
1405 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1406 MUTEX_EXIT(&freeSQEList_lock);
1407 } else { /* otherwise allocate a new one and return that */
1408 MUTEX_EXIT(&freeSQEList_lock);
1409 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1410 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1411 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1414 MUTEX_ENTER(&rx_serverPool_lock);
1415 if (cur_service != NULL) {
1416 ReturnToServerPool(cur_service);
1419 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1420 register struct rx_call *tcall, *ncall;
1421 choice2 = (struct rx_call *) 0;
1422 /* Scan for eligible incoming calls. A call is not eligible
1423 * if the maximum number of calls for its service type are
1424 * already executing */
1425 /* One thread will process calls FCFS (to prevent starvation),
1426 * while the other threads may run ahead looking for calls which
1427 * have all their input data available immediately. This helps
1428 * keep threads from blocking, waiting for data from the client. */
1429 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1430 service = tcall->conn->service;
1431 if (!QuotaOK(service)) {
1434 if (!tno || !tcall->queue_item_header.next ) {
1435 /* If we're thread 0, then we'll just use
1436 * this call. If we haven't been able to find an optimal
1437 * choice, and we're at the end of the list, then use a
1438 * 2d choice if one has been identified. Otherwise... */
1439 call = (choice2 ? choice2 : tcall);
1440 service = call->conn->service;
1441 } else if (!queue_IsEmpty(&tcall->rq)) {
1442 struct rx_packet *rp;
1443 rp = queue_First(&tcall->rq, rx_packet);
1444 if (rp->header.seq == 1) {
1445 if (!meltdown_1pkt ||
1446 (rp->header.flags & RX_LAST_PACKET)) {
1448 } else if (rxi_2dchoice && !choice2 &&
1449 !(tcall->flags & RX_CALL_CLEARED) &&
1450 (tcall->rprev > rxi_HardAckRate)) {
1452 } else rxi_md2cnt++;
1458 ReturnToServerPool(service);
1465 rxi_ServerThreadSelectingCall = 1;
1466 MUTEX_EXIT(&rx_serverPool_lock);
1467 MUTEX_ENTER(&call->lock);
1468 MUTEX_ENTER(&rx_serverPool_lock);
1470 if (queue_IsEmpty(&call->rq) ||
1471 queue_First(&call->rq, rx_packet)->header.seq != 1)
1472 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1474 CLEAR_CALL_QUEUE_LOCK(call);
1476 MUTEX_EXIT(&call->lock);
1477 ReturnToServerPool(service);
1478 rxi_ServerThreadSelectingCall = 0;
1479 CV_SIGNAL(&rx_serverPool_cv);
1480 call = (struct rx_call*)0;
1483 call->flags &= (~RX_CALL_WAIT_PROC);
1484 MUTEX_ENTER(&rx_stats_mutex);
1486 MUTEX_EXIT(&rx_stats_mutex);
1487 rxi_ServerThreadSelectingCall = 0;
1488 CV_SIGNAL(&rx_serverPool_cv);
1489 MUTEX_EXIT(&rx_serverPool_lock);
1493 /* If there are no eligible incoming calls, add this process
1494 * to the idle server queue, to wait for one */
1498 *socketp = OSI_NULLSOCKET;
1500 sq->socketp = socketp;
1501 queue_Append(&rx_idleServerQueue, sq);
1502 #ifndef AFS_AIX41_ENV
1503 rx_waitForPacket = sq;
1504 #endif /* AFS_AIX41_ENV */
1506 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1508 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1509 MUTEX_EXIT(&rx_serverPool_lock);
1510 return (struct rx_call *)0;
1513 } while (!(call = sq->newcall) &&
1514 !(socketp && *socketp != OSI_NULLSOCKET));
1515 MUTEX_EXIT(&rx_serverPool_lock);
1517 MUTEX_ENTER(&call->lock);
1523 MUTEX_ENTER(&freeSQEList_lock);
1524 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1525 rx_FreeSQEList = sq;
1526 MUTEX_EXIT(&freeSQEList_lock);
1529 clock_GetTime(&call->startTime);
1530 call->state = RX_STATE_ACTIVE;
1531 call->mode = RX_MODE_RECEIVING;
1533 rxi_calltrace(RX_CALL_START, call);
1534 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1535 call->conn->service->servicePort,
1536 call->conn->service->serviceId, call));
1538 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1539 MUTEX_EXIT(&call->lock);
1541 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1546 #else /* RX_ENABLE_LOCKS */
1547 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1549 struct rx_serverQueueEntry *sq;
1550 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1551 struct rx_service *service = NULL;
1556 MUTEX_ENTER(&freeSQEList_lock);
1558 if ((sq = rx_FreeSQEList)) {
1559 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1560 MUTEX_EXIT(&freeSQEList_lock);
1561 } else { /* otherwise allocate a new one and return that */
1562 MUTEX_EXIT(&freeSQEList_lock);
1563 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1564 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1565 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1567 MUTEX_ENTER(&sq->lock);
1569 if (cur_service != NULL) {
1570 cur_service->nRequestsRunning--;
1571 if (cur_service->nRequestsRunning < cur_service->minProcs)
1575 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1576 register struct rx_call *tcall, *ncall;
1577 /* Scan for eligible incoming calls. A call is not eligible
1578 * if the maximum number of calls for its service type are
1579 * already executing */
1580 /* One thread will process calls FCFS (to prevent starvation),
1581 * while the other threads may run ahead looking for calls which
1582 * have all their input data available immediately. This helps
1583 * keep threads from blocking, waiting for data from the client. */
1584 choice2 = (struct rx_call *) 0;
1585 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1586 service = tcall->conn->service;
1587 if (QuotaOK(service)) {
1588 if (!tno || !tcall->queue_item_header.next ) {
1589 /* If we're thread 0, then we'll just use
1590 * this call. If we haven't been able to find an optimal
1591 * choice, and we're at the end of the list, then use a
1592 * 2d choice if one has been identified. Otherwise... */
1593 call = (choice2 ? choice2 : tcall);
1594 service = call->conn->service;
1595 } else if (!queue_IsEmpty(&tcall->rq)) {
1596 struct rx_packet *rp;
1597 rp = queue_First(&tcall->rq, rx_packet);
1598 if (rp->header.seq == 1
1599 && (!meltdown_1pkt ||
1600 (rp->header.flags & RX_LAST_PACKET))) {
1602 } else if (rxi_2dchoice && !choice2 &&
1603 !(tcall->flags & RX_CALL_CLEARED) &&
1604 (tcall->rprev > rxi_HardAckRate)) {
1606 } else rxi_md2cnt++;
1616 /* we can't schedule a call if there's no data!!! */
1617 /* send an ack if there's no data, if we're missing the
1618 * first packet, or we're missing something between first
1619 * and last -- there's a "hole" in the incoming data. */
1620 if (queue_IsEmpty(&call->rq) ||
1621 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1622 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1623 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1625 call->flags &= (~RX_CALL_WAIT_PROC);
1626 service->nRequestsRunning++;
1627 /* just started call in minProcs pool, need fewer to maintain
1629 if (service->nRequestsRunning <= service->minProcs)
1633 /* MUTEX_EXIT(&call->lock); */
1636 /* If there are no eligible incoming calls, add this process
1637 * to the idle server queue, to wait for one */
1640 *socketp = OSI_NULLSOCKET;
1642 sq->socketp = socketp;
1643 queue_Append(&rx_idleServerQueue, sq);
1647 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1650 return (struct rx_call *)0;
1653 } while (!(call = sq->newcall) &&
1654 !(socketp && *socketp != OSI_NULLSOCKET));
1656 MUTEX_EXIT(&sq->lock);
1658 MUTEX_ENTER(&freeSQEList_lock);
1659 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1660 rx_FreeSQEList = sq;
1661 MUTEX_EXIT(&freeSQEList_lock);
1664 clock_GetTime(&call->startTime);
1665 call->state = RX_STATE_ACTIVE;
1666 call->mode = RX_MODE_RECEIVING;
1668 rxi_calltrace(RX_CALL_START, call);
1669 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1670 call->conn->service->servicePort,
1671 call->conn->service->serviceId, call));
1673 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1681 #endif /* RX_ENABLE_LOCKS */
1685 /* Establish a procedure to be called when a packet arrives for a
1686 * call. This routine will be called at most once after each call,
1687 * and will also be called if there is an error condition on the or
1688 * the call is complete. Used by multi rx to build a selection
1689 * function which determines which of several calls is likely to be a
1690 * good one to read from.
1691 * NOTE: the way this is currently implemented it is probably only a
1692 * good idea to (1) use it immediately after a newcall (clients only)
1693 * and (2) only use it once. Other uses currently void your warranty
1695 void rx_SetArrivalProc(register struct rx_call *call,
1696 register VOID (*proc)(register struct rx_call *call,
1697 register struct multi_handle *mh, register int index),
1698 register VOID *handle, register VOID *arg)
1700 call->arrivalProc = proc;
1701 call->arrivalProcHandle = handle;
1702 call->arrivalProcArg = arg;
1705 /* Call is finished (possibly prematurely). Return rc to the peer, if
1706 * appropriate, and return the final error code from the conversation
1709 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1711 register struct rx_connection *conn = call->conn;
1712 register struct rx_service *service;
1713 register struct rx_packet *tp; /* Temporary packet pointer */
1714 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1718 dpf(("rx_EndCall(call %x)\n", call));
1722 MUTEX_ENTER(&call->lock);
1724 if (rc == 0 && call->error == 0) {
1725 call->abortCode = 0;
1726 call->abortCount = 0;
1729 call->arrivalProc = (VOID (*)()) 0;
1730 if (rc && call->error == 0) {
1731 rxi_CallError(call, rc);
1732 /* Send an abort message to the peer if this error code has
1733 * only just been set. If it was set previously, assume the
1734 * peer has already been sent the error code or will request it
1736 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1738 if (conn->type == RX_SERVER_CONNECTION) {
1739 /* Make sure reply or at least dummy reply is sent */
1740 if (call->mode == RX_MODE_RECEIVING) {
1741 rxi_WriteProc(call, 0, 0);
1743 if (call->mode == RX_MODE_SENDING) {
1744 rxi_FlushWrite(call);
1746 service = conn->service;
1747 rxi_calltrace(RX_CALL_END, call);
1748 /* Call goes to hold state until reply packets are acknowledged */
1749 if (call->tfirst + call->nSoftAcked < call->tnext) {
1750 call->state = RX_STATE_HOLD;
1752 call->state = RX_STATE_DALLY;
1753 rxi_ClearTransmitQueue(call, 0);
1754 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1755 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1758 else { /* Client connection */
1760 /* Make sure server receives input packets, in the case where
1761 * no reply arguments are expected */
1762 if ((call->mode == RX_MODE_SENDING)
1763 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1764 (void) rxi_ReadProc(call, &dummy, 1);
1767 /* If we had an outstanding delayed ack, be nice to the server
1768 * and force-send it now.
1770 if (call->delayedAckEvent) {
1771 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1772 call->delayedAckEvent = NULL;
1773 rxi_SendDelayedAck(NULL, call, NULL);
1776 /* We need to release the call lock since it's lower than the
1777 * conn_call_lock and we don't want to hold the conn_call_lock
1778 * over the rx_ReadProc call. The conn_call_lock needs to be held
1779 * here for the case where rx_NewCall is perusing the calls on
1780 * the connection structure. We don't want to signal until
1781 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1782 * have checked this call, found it active and by the time it
1783 * goes to sleep, will have missed the signal.
1785 MUTEX_EXIT(&call->lock);
1786 MUTEX_ENTER(&conn->conn_call_lock);
1787 MUTEX_ENTER(&call->lock);
1788 MUTEX_ENTER(&conn->conn_data_lock);
1789 conn->flags |= RX_CONN_BUSY;
1790 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1791 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1792 MUTEX_EXIT(&conn->conn_data_lock);
1793 #ifdef RX_ENABLE_LOCKS
1794 CV_BROADCAST(&conn->conn_call_cv);
1799 #ifdef RX_ENABLE_LOCKS
1801 MUTEX_EXIT(&conn->conn_data_lock);
1803 #endif /* RX_ENABLE_LOCKS */
1804 call->state = RX_STATE_DALLY;
1806 error = call->error;
1808 /* currentPacket, nLeft, and NFree must be zeroed here, because
1809 * ResetCall cannot: ResetCall may be called at splnet(), in the
1810 * kernel version, and may interrupt the macros rx_Read or
1811 * rx_Write, which run at normal priority for efficiency. */
1812 if (call->currentPacket) {
1813 rxi_FreePacket(call->currentPacket);
1814 call->currentPacket = (struct rx_packet *) 0;
1815 call->nLeft = call->nFree = call->curlen = 0;
1818 call->nLeft = call->nFree = call->curlen = 0;
1820 /* Free any packets from the last call to ReadvProc/WritevProc */
1821 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1826 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1827 MUTEX_EXIT(&call->lock);
1828 if (conn->type == RX_CLIENT_CONNECTION) {
1829 MUTEX_EXIT(&conn->conn_call_lock);
1830 conn->flags &= ~RX_CONN_BUSY;
1835 * Map errors to the local host's errno.h format.
1837 error = ntoh_syserr_conv(error);
1841 #if !defined(KERNEL)
1843 /* Call this routine when shutting down a server or client (especially
1844 * clients). This will allow Rx to gracefully garbage collect server
1845 * connections, and reduce the number of retries that a server might
1846 * make to a dead client.
1847 * This is not quite right, since some calls may still be ongoing and
1848 * we can't lock them to destroy them. */
1849 void rx_Finalize(void)
1851 register struct rx_connection **conn_ptr, **conn_end;
1855 if (rxinit_status == 1) {
1857 return; /* Already shutdown. */
1859 rxi_DeleteCachedConnections();
1860 if (rx_connHashTable) {
1861 MUTEX_ENTER(&rx_connHashTable_lock);
1862 for (conn_ptr = &rx_connHashTable[0],
1863 conn_end = &rx_connHashTable[rx_hashTableSize];
1864 conn_ptr < conn_end; conn_ptr++) {
1865 struct rx_connection *conn, *next;
1866 for (conn = *conn_ptr; conn; conn = next) {
1868 if (conn->type == RX_CLIENT_CONNECTION) {
1869 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1871 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1872 #ifdef RX_ENABLE_LOCKS
1873 rxi_DestroyConnectionNoLock(conn);
1874 #else /* RX_ENABLE_LOCKS */
1875 rxi_DestroyConnection(conn);
1876 #endif /* RX_ENABLE_LOCKS */
1880 #ifdef RX_ENABLE_LOCKS
1881 while (rx_connCleanup_list) {
1882 struct rx_connection *conn;
1883 conn = rx_connCleanup_list;
1884 rx_connCleanup_list = rx_connCleanup_list->next;
1885 MUTEX_EXIT(&rx_connHashTable_lock);
1886 rxi_CleanupConnection(conn);
1887 MUTEX_ENTER(&rx_connHashTable_lock);
1889 MUTEX_EXIT(&rx_connHashTable_lock);
1890 #endif /* RX_ENABLE_LOCKS */
1899 /* if we wakeup packet waiter too often, can get in loop with two
1900 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1901 void rxi_PacketsUnWait(void)
1903 if (!rx_waitingForPackets) {
1907 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1908 return; /* still over quota */
1911 rx_waitingForPackets = 0;
1912 #ifdef RX_ENABLE_LOCKS
1913 CV_BROADCAST(&rx_waitingForPackets_cv);
1915 osi_rxWakeup(&rx_waitingForPackets);
1921 /* ------------------Internal interfaces------------------------- */
1923 /* Return this process's service structure for the
1924 * specified socket and service */
1925 struct rx_service *rxi_FindService(register osi_socket socket,
1926 register u_short serviceId)
1928 register struct rx_service **sp;
1929 for (sp = &rx_services[0]; *sp; sp++) {
1930 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1936 /* Allocate a call structure, for the indicated channel of the
1937 * supplied connection. The mode and state of the call must be set by
1938 * the caller. Returns the call with mutex locked. */
1939 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1940 register int channel)
1942 register struct rx_call *call;
1943 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1944 register struct rx_call *cp; /* Call pointer temp */
1945 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1946 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1948 /* Grab an existing call structure, or allocate a new one.
1949 * Existing call structures are assumed to have been left reset by
1951 MUTEX_ENTER(&rx_freeCallQueue_lock);
1953 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1955 * EXCEPT that the TQ might not yet be cleared out.
1956 * Skip over those with in-use TQs.
1959 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1960 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1966 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1967 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1968 call = queue_First(&rx_freeCallQueue, rx_call);
1969 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1971 MUTEX_ENTER(&rx_stats_mutex);
1972 rx_stats.nFreeCallStructs--;
1973 MUTEX_EXIT(&rx_stats_mutex);
1974 MUTEX_EXIT(&rx_freeCallQueue_lock);
1975 MUTEX_ENTER(&call->lock);
1976 CLEAR_CALL_QUEUE_LOCK(call);
1977 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1978 /* Now, if TQ wasn't cleared earlier, do it now. */
1979 if (call->flags & RX_CALL_TQ_CLEARME) {
1980 rxi_ClearTransmitQueue(call, 0);
1981 queue_Init(&call->tq);
1983 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1984 /* Bind the call to its connection structure */
1986 rxi_ResetCall(call, 1);
1989 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1991 MUTEX_EXIT(&rx_freeCallQueue_lock);
1992 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1993 MUTEX_ENTER(&call->lock);
1994 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1995 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1996 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1998 MUTEX_ENTER(&rx_stats_mutex);
1999 rx_stats.nCallStructs++;
2000 MUTEX_EXIT(&rx_stats_mutex);
2001 /* Initialize once-only items */
2002 queue_Init(&call->tq);
2003 queue_Init(&call->rq);
2004 queue_Init(&call->iovq);
2005 /* Bind the call to its connection structure (prereq for reset) */
2007 rxi_ResetCall(call, 1);
2009 call->channel = channel;
2010 call->callNumber = &conn->callNumber[channel];
2011 /* Note that the next expected call number is retained (in
2012 * conn->callNumber[i]), even if we reallocate the call structure
2014 conn->call[channel] = call;
2015 /* if the channel's never been used (== 0), we should start at 1, otherwise
2016 the call number is valid from the last time this channel was used */
2017 if (*call->callNumber == 0) *call->callNumber = 1;
2022 /* A call has been inactive long enough that so we can throw away
2023 * state, including the call structure, which is placed on the call
2025 * Call is locked upon entry.
2026 * haveCTLock set if called from rxi_ReapConnections
2028 #ifdef RX_ENABLE_LOCKS
2029 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2030 #else /* RX_ENABLE_LOCKS */
2031 void rxi_FreeCall(register struct rx_call *call)
2032 #endif /* RX_ENABLE_LOCKS */
2034 register int channel = call->channel;
2035 register struct rx_connection *conn = call->conn;
2038 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2039 (*call->callNumber)++;
2040 rxi_ResetCall(call, 0);
2041 call->conn->call[channel] = (struct rx_call *) 0;
2043 MUTEX_ENTER(&rx_freeCallQueue_lock);
2044 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2045 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2046 /* A call may be free even though its transmit queue is still in use.
2047 * Since we search the call list from head to tail, put busy calls at
2048 * the head of the list, and idle calls at the tail.
2050 if (call->flags & RX_CALL_TQ_BUSY)
2051 queue_Prepend(&rx_freeCallQueue, call);
2053 queue_Append(&rx_freeCallQueue, call);
2054 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2055 queue_Append(&rx_freeCallQueue, call);
2056 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2057 MUTEX_ENTER(&rx_stats_mutex);
2058 rx_stats.nFreeCallStructs++;
2059 MUTEX_EXIT(&rx_stats_mutex);
2061 MUTEX_EXIT(&rx_freeCallQueue_lock);
2063 /* Destroy the connection if it was previously slated for
2064 * destruction, i.e. the Rx client code previously called
2065 * rx_DestroyConnection (client connections), or
2066 * rxi_ReapConnections called the same routine (server
2067 * connections). Only do this, however, if there are no
2068 * outstanding calls. Note that for fine grain locking, there appears
2069 * to be a deadlock in that rxi_FreeCall has a call locked and
2070 * DestroyConnectionNoLock locks each call in the conn. But note a
2071 * few lines up where we have removed this call from the conn.
2072 * If someone else destroys a connection, they either have no
2073 * call lock held or are going through this section of code.
2075 if (conn->flags & RX_CONN_DESTROY_ME) {
2076 MUTEX_ENTER(&conn->conn_data_lock);
2078 MUTEX_EXIT(&conn->conn_data_lock);
2079 #ifdef RX_ENABLE_LOCKS
2081 rxi_DestroyConnectionNoLock(conn);
2083 rxi_DestroyConnection(conn);
2084 #else /* RX_ENABLE_LOCKS */
2085 rxi_DestroyConnection(conn);
2086 #endif /* RX_ENABLE_LOCKS */
2090 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2091 char *rxi_Alloc(register size_t size)
2095 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2096 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2099 int glockOwner = ISAFS_GLOCK();
2103 MUTEX_ENTER(&rx_stats_mutex);
2104 rxi_Alloccnt++; rxi_Allocsize += size;
2105 MUTEX_EXIT(&rx_stats_mutex);
2106 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2107 if (size > AFS_SMALLOCSIZ) {
2108 p = (char *) osi_AllocMediumSpace(size);
2110 p = (char *) osi_AllocSmall(size, 1);
2111 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2116 p = (char *) osi_Alloc(size);
2118 if (!p) osi_Panic("rxi_Alloc error");
2123 void rxi_Free(void *addr, register size_t size)
2125 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2126 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2129 int glockOwner = ISAFS_GLOCK();
2133 MUTEX_ENTER(&rx_stats_mutex);
2134 rxi_Alloccnt--; rxi_Allocsize -= size;
2135 MUTEX_EXIT(&rx_stats_mutex);
2136 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2137 if (size > AFS_SMALLOCSIZ)
2138 osi_FreeMediumSpace(addr);
2140 osi_FreeSmall(addr);
2141 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2146 osi_Free(addr, size);
2150 /* Find the peer process represented by the supplied (host,port)
2151 * combination. If there is no appropriate active peer structure, a
2152 * new one will be allocated and initialized
2153 * The origPeer, if set, is a pointer to a peer structure on which the
2154 * refcount will be be decremented. This is used to replace the peer
2155 * structure hanging off a connection structure */
2156 struct rx_peer *rxi_FindPeer(register afs_uint32 host,
2157 register u_short port, struct rx_peer *origPeer, int create)
2159 register struct rx_peer *pp;
2161 hashIndex = PEER_HASH(host, port);
2162 MUTEX_ENTER(&rx_peerHashTable_lock);
2163 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2164 if ((pp->host == host) && (pp->port == port)) break;
2168 pp = rxi_AllocPeer(); /* This bzero's *pp */
2169 pp->host = host; /* set here or in InitPeerParams is zero */
2171 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2172 queue_Init(&pp->congestionQueue);
2173 queue_Init(&pp->rpcStats);
2174 pp->next = rx_peerHashTable[hashIndex];
2175 rx_peerHashTable[hashIndex] = pp;
2176 rxi_InitPeerParams(pp);
2177 MUTEX_ENTER(&rx_stats_mutex);
2178 rx_stats.nPeerStructs++;
2179 MUTEX_EXIT(&rx_stats_mutex);
2186 origPeer->refCount--;
2187 MUTEX_EXIT(&rx_peerHashTable_lock);
2192 /* Find the connection at (host, port) started at epoch, and with the
2193 * given connection id. Creates the server connection if necessary.
2194 * The type specifies whether a client connection or a server
2195 * connection is desired. In both cases, (host, port) specify the
2196 * peer's (host, pair) pair. Client connections are not made
2197 * automatically by this routine. The parameter socket gives the
2198 * socket descriptor on which the packet was received. This is used,
2199 * in the case of server connections, to check that *new* connections
2200 * come via a valid (port, serviceId). Finally, the securityIndex
2201 * parameter must match the existing index for the connection. If a
2202 * server connection is created, it will be created using the supplied
2203 * index, if the index is valid for this service */
2204 struct rx_connection *rxi_FindConnection(osi_socket socket,
2205 register afs_int32 host, register u_short port, u_short serviceId,
2206 afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2208 int hashindex, flag;
2209 register struct rx_connection *conn;
2210 struct rx_peer *peer;
2211 hashindex = CONN_HASH(host, port, cid, epoch, type);
2212 MUTEX_ENTER(&rx_connHashTable_lock);
2213 rxLastConn ? (conn = rxLastConn, flag = 0) :
2214 (conn = rx_connHashTable[hashindex], flag = 1);
2216 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2217 && (epoch == conn->epoch)) {
2218 register struct rx_peer *pp = conn->peer;
2219 if (securityIndex != conn->securityIndex) {
2220 /* this isn't supposed to happen, but someone could forge a packet
2221 like this, and there seems to be some CM bug that makes this
2222 happen from time to time -- in which case, the fileserver
2224 MUTEX_EXIT(&rx_connHashTable_lock);
2225 return (struct rx_connection *) 0;
2227 /* epoch's high order bits mean route for security reasons only on
2228 * the cid, not the host and port fields.
2230 if (conn->epoch & 0x80000000) break;
2231 if (((type == RX_CLIENT_CONNECTION)
2232 || (pp->host == host)) && (pp->port == port))
2237 /* the connection rxLastConn that was used the last time is not the
2238 ** one we are looking for now. Hence, start searching in the hash */
2240 conn = rx_connHashTable[hashindex];
2246 struct rx_service *service;
2247 if (type == RX_CLIENT_CONNECTION) {
2248 MUTEX_EXIT(&rx_connHashTable_lock);
2249 return (struct rx_connection *) 0;
2251 service = rxi_FindService(socket, serviceId);
2252 if (!service || (securityIndex >= service->nSecurityObjects)
2253 || (service->securityObjects[securityIndex] == 0)) {
2254 MUTEX_EXIT(&rx_connHashTable_lock);
2255 return (struct rx_connection *) 0;
2257 conn = rxi_AllocConnection(); /* This bzero's the connection */
2258 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2260 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2262 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2263 conn->next = rx_connHashTable[hashindex];
2264 rx_connHashTable[hashindex] = conn;
2265 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2266 conn->type = RX_SERVER_CONNECTION;
2267 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2268 conn->epoch = epoch;
2269 conn->cid = cid & RX_CIDMASK;
2270 /* conn->serial = conn->lastSerial = 0; */
2271 /* conn->timeout = 0; */
2272 conn->ackRate = RX_FAST_ACK_RATE;
2273 conn->service = service;
2274 conn->serviceId = serviceId;
2275 conn->securityIndex = securityIndex;
2276 conn->securityObject = service->securityObjects[securityIndex];
2277 conn->nSpecific = 0;
2278 conn->specific = NULL;
2279 rx_SetConnDeadTime(conn, service->connDeadTime);
2280 /* Notify security object of the new connection */
2281 RXS_NewConnection(conn->securityObject, conn);
2282 /* XXXX Connection timeout? */
2283 if (service->newConnProc) (*service->newConnProc)(conn);
2284 MUTEX_ENTER(&rx_stats_mutex);
2285 rx_stats.nServerConns++;
2286 MUTEX_EXIT(&rx_stats_mutex);
2290 /* Ensure that the peer structure is set up in such a way that
2291 ** replies in this connection go back to that remote interface
2292 ** from which the last packet was sent out. In case, this packet's
2293 ** source IP address does not match the peer struct for this conn,
2294 ** then drop the refCount on conn->peer and get a new peer structure.
2295 ** We can check the host,port field in the peer structure without the
2296 ** rx_peerHashTable_lock because the peer structure has its refCount
2297 ** incremented and the only time the host,port in the peer struct gets
2298 ** updated is when the peer structure is created.
2300 if (conn->peer->host == host )
2301 peer = conn->peer; /* no change to the peer structure */
2303 peer = rxi_FindPeer(host, port, conn->peer, 1);
2306 MUTEX_ENTER(&conn->conn_data_lock);
2309 MUTEX_EXIT(&conn->conn_data_lock);
2311 rxLastConn = conn; /* store this connection as the last conn used */
2312 MUTEX_EXIT(&rx_connHashTable_lock);
2316 /* There are two packet tracing routines available for testing and monitoring
2317 * Rx. One is called just after every packet is received and the other is
2318 * called just before every packet is sent. Received packets, have had their
2319 * headers decoded, and packets to be sent have not yet had their headers
2320 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2321 * containing the network address. Both can be modified. The return value, if
2322 * non-zero, indicates that the packet should be dropped. */
2324 int (*rx_justReceived)() = 0;
2325 int (*rx_almostSent)() = 0;
2327 /* A packet has been received off the interface. Np is the packet, socket is
2328 * the socket number it was received from (useful in determining which service
2329 * this packet corresponds to), and (host, port) reflect the host,port of the
2330 * sender. This call returns the packet to the caller if it is finished with
2331 * it, rather than de-allocating it, just as a small performance hack */
2333 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np,
2334 osi_socket socket, afs_uint32 host, u_short port,
2335 int *tnop, struct rx_call **newcallp)
2337 register struct rx_call *call;
2338 register struct rx_connection *conn;
2340 afs_uint32 currentCallNumber;
2346 struct rx_packet *tnp;
2349 /* We don't print out the packet until now because (1) the time may not be
2350 * accurate enough until now in the lwp implementation (rx_Listener only gets
2351 * the time after the packet is read) and (2) from a protocol point of view,
2352 * this is the first time the packet has been seen */
2353 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2354 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2355 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2356 np->header.serial, packetType, host, port, np->header.serviceId,
2357 np->header.epoch, np->header.cid, np->header.callNumber,
2358 np->header.seq, np->header.flags, np));
2361 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2362 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2365 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2366 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2369 /* If an input tracer function is defined, call it with the packet and
2370 * network address. Note this function may modify its arguments. */
2371 if (rx_justReceived) {
2372 struct sockaddr_in addr;
2374 addr.sin_family = AF_INET;
2375 addr.sin_port = port;
2376 addr.sin_addr.s_addr = host;
2377 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2378 addr.sin_len = sizeof(addr);
2379 #endif /* AFS_OSF_ENV */
2380 drop = (*rx_justReceived) (np, &addr);
2381 /* drop packet if return value is non-zero */
2382 if (drop) return np;
2383 port = addr.sin_port; /* in case fcn changed addr */
2384 host = addr.sin_addr.s_addr;
2388 /* If packet was not sent by the client, then *we* must be the client */
2389 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2390 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2392 /* Find the connection (or fabricate one, if we're the server & if
2393 * necessary) associated with this packet */
2394 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2395 np->header.cid, np->header.epoch, type,
2396 np->header.securityIndex);
2399 /* If no connection found or fabricated, just ignore the packet.
2400 * (An argument could be made for sending an abort packet for
2405 MUTEX_ENTER(&conn->conn_data_lock);
2406 if (conn->maxSerial < np->header.serial)
2407 conn->maxSerial = np->header.serial;
2408 MUTEX_EXIT(&conn->conn_data_lock);
2410 /* If the connection is in an error state, send an abort packet and ignore
2411 * the incoming packet */
2413 /* Don't respond to an abort packet--we don't want loops! */
2414 MUTEX_ENTER(&conn->conn_data_lock);
2415 if (np->header.type != RX_PACKET_TYPE_ABORT)
2416 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2418 MUTEX_EXIT(&conn->conn_data_lock);
2422 /* Check for connection-only requests (i.e. not call specific). */
2423 if (np->header.callNumber == 0) {
2424 switch (np->header.type) {
2425 case RX_PACKET_TYPE_ABORT:
2426 /* What if the supplied error is zero? */
2427 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2428 MUTEX_ENTER(&conn->conn_data_lock);
2430 MUTEX_EXIT(&conn->conn_data_lock);
2432 case RX_PACKET_TYPE_CHALLENGE:
2433 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2434 MUTEX_ENTER(&conn->conn_data_lock);
2436 MUTEX_EXIT(&conn->conn_data_lock);
2438 case RX_PACKET_TYPE_RESPONSE:
2439 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2440 MUTEX_ENTER(&conn->conn_data_lock);
2442 MUTEX_EXIT(&conn->conn_data_lock);
2444 case RX_PACKET_TYPE_PARAMS:
2445 case RX_PACKET_TYPE_PARAMS+1:
2446 case RX_PACKET_TYPE_PARAMS+2:
2447 /* ignore these packet types for now */
2448 MUTEX_ENTER(&conn->conn_data_lock);
2450 MUTEX_EXIT(&conn->conn_data_lock);
2455 /* Should not reach here, unless the peer is broken: send an
2457 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2458 MUTEX_ENTER(&conn->conn_data_lock);
2459 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2461 MUTEX_EXIT(&conn->conn_data_lock);
2466 channel = np->header.cid & RX_CHANNELMASK;
2467 call = conn->call[channel];
2468 #ifdef RX_ENABLE_LOCKS
2470 MUTEX_ENTER(&call->lock);
2471 /* Test to see if call struct is still attached to conn. */
2472 if (call != conn->call[channel]) {
2474 MUTEX_EXIT(&call->lock);
2475 if (type == RX_SERVER_CONNECTION) {
2476 call = conn->call[channel];
2477 /* If we started with no call attached and there is one now,
2478 * another thread is also running this routine and has gotten
2479 * the connection channel. We should drop this packet in the tests
2480 * below. If there was a call on this connection and it's now
2481 * gone, then we'll be making a new call below.
2482 * If there was previously a call and it's now different then
2483 * the old call was freed and another thread running this routine
2484 * has created a call on this channel. One of these two threads
2485 * has a packet for the old call and the code below handles those
2489 MUTEX_ENTER(&call->lock);
2492 /* This packet can't be for this call. If the new call address is
2493 * 0 then no call is running on this channel. If there is a call
2494 * then, since this is a client connection we're getting data for
2495 * it must be for the previous call.
2497 MUTEX_ENTER(&rx_stats_mutex);
2498 rx_stats.spuriousPacketsRead++;
2499 MUTEX_EXIT(&rx_stats_mutex);
2500 MUTEX_ENTER(&conn->conn_data_lock);
2502 MUTEX_EXIT(&conn->conn_data_lock);
2507 currentCallNumber = conn->callNumber[channel];
2509 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2510 if (np->header.callNumber < currentCallNumber) {
2511 MUTEX_ENTER(&rx_stats_mutex);
2512 rx_stats.spuriousPacketsRead++;
2513 MUTEX_EXIT(&rx_stats_mutex);
2514 #ifdef RX_ENABLE_LOCKS
2516 MUTEX_EXIT(&call->lock);
2518 MUTEX_ENTER(&conn->conn_data_lock);
2520 MUTEX_EXIT(&conn->conn_data_lock);
2524 MUTEX_ENTER(&conn->conn_call_lock);
2525 call = rxi_NewCall(conn, channel);
2526 MUTEX_EXIT(&conn->conn_call_lock);
2527 *call->callNumber = np->header.callNumber;
2528 call->state = RX_STATE_PRECALL;
2529 clock_GetTime(&call->queueTime);
2530 hzero(call->bytesSent);
2531 hzero(call->bytesRcvd);
2532 rxi_KeepAliveOn(call);
2534 else if (np->header.callNumber != currentCallNumber) {
2535 /* Wait until the transmit queue is idle before deciding
2536 * whether to reset the current call. Chances are that the
2537 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2540 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2541 while ((call->state == RX_STATE_ACTIVE) &&
2542 (call->flags & RX_CALL_TQ_BUSY)) {
2543 call->flags |= RX_CALL_TQ_WAIT;
2544 #ifdef RX_ENABLE_LOCKS
2545 CV_WAIT(&call->cv_tq, &call->lock);
2546 #else /* RX_ENABLE_LOCKS */
2547 osi_rxSleep(&call->tq);
2548 #endif /* RX_ENABLE_LOCKS */
2550 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2551 /* If the new call cannot be taken right now send a busy and set
2552 * the error condition in this call, so that it terminates as
2553 * quickly as possible */
2554 if (call->state == RX_STATE_ACTIVE) {
2555 struct rx_packet *tp;
2557 rxi_CallError(call, RX_CALL_DEAD);
2558 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2559 MUTEX_EXIT(&call->lock);
2560 MUTEX_ENTER(&conn->conn_data_lock);
2562 MUTEX_EXIT(&conn->conn_data_lock);
2565 rxi_ResetCall(call, 0);
2566 *call->callNumber = np->header.callNumber;
2567 call->state = RX_STATE_PRECALL;
2568 clock_GetTime(&call->queueTime);
2569 hzero(call->bytesSent);
2570 hzero(call->bytesRcvd);
2572 * If the number of queued calls exceeds the overload
2573 * threshold then abort this call.
2575 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2576 struct rx_packet *tp;
2578 rxi_CallError(call, rx_BusyError);
2579 tp = rxi_SendCallAbort(call, np, 1, 0);
2580 MUTEX_EXIT(&call->lock);
2581 MUTEX_ENTER(&conn->conn_data_lock);
2583 MUTEX_EXIT(&conn->conn_data_lock);
2586 rxi_KeepAliveOn(call);
2589 /* Continuing call; do nothing here. */
2591 } else { /* we're the client */
2592 /* Ignore all incoming acknowledgements for calls in DALLY state */
2593 if ( call && (call->state == RX_STATE_DALLY)
2594 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2595 MUTEX_ENTER(&rx_stats_mutex);
2596 rx_stats.ignorePacketDally++;
2597 MUTEX_EXIT(&rx_stats_mutex);
2598 #ifdef RX_ENABLE_LOCKS
2600 MUTEX_EXIT(&call->lock);
2603 MUTEX_ENTER(&conn->conn_data_lock);
2605 MUTEX_EXIT(&conn->conn_data_lock);
2609 /* Ignore anything that's not relevant to the current call. If there
2610 * isn't a current call, then no packet is relevant. */
2611 if (!call || (np->header.callNumber != currentCallNumber)) {
2612 MUTEX_ENTER(&rx_stats_mutex);
2613 rx_stats.spuriousPacketsRead++;
2614 MUTEX_EXIT(&rx_stats_mutex);
2615 #ifdef RX_ENABLE_LOCKS
2617 MUTEX_EXIT(&call->lock);
2620 MUTEX_ENTER(&conn->conn_data_lock);
2622 MUTEX_EXIT(&conn->conn_data_lock);
2625 /* If the service security object index stamped in the packet does not
2626 * match the connection's security index, ignore the packet */
2627 if (np->header.securityIndex != conn->securityIndex) {
2628 #ifdef RX_ENABLE_LOCKS
2629 MUTEX_EXIT(&call->lock);
2631 MUTEX_ENTER(&conn->conn_data_lock);
2633 MUTEX_EXIT(&conn->conn_data_lock);
2637 /* If we're receiving the response, then all transmit packets are
2638 * implicitly acknowledged. Get rid of them. */
2639 if (np->header.type == RX_PACKET_TYPE_DATA) {
2640 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2641 /* XXX Hack. Because we must release the global rx lock when
2642 * sending packets (osi_NetSend) we drop all acks while we're
2643 * traversing the tq in rxi_Start sending packets out because
2644 * packets may move to the freePacketQueue as result of being here!
2645 * So we drop these packets until we're safely out of the
2646 * traversing. Really ugly!
2647 * For fine grain RX locking, we set the acked field in the
2648 * packets and let rxi_Start remove them from the transmit queue.
2650 if (call->flags & RX_CALL_TQ_BUSY) {
2651 #ifdef RX_ENABLE_LOCKS
2652 rxi_SetAcksInTransmitQueue(call);
2655 return np; /* xmitting; drop packet */
2659 rxi_ClearTransmitQueue(call, 0);
2661 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2662 rxi_ClearTransmitQueue(call, 0);
2663 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2665 if (np->header.type == RX_PACKET_TYPE_ACK) {
2666 /* now check to see if this is an ack packet acknowledging that the
2667 * server actually *lost* some hard-acked data. If this happens we
2668 * ignore this packet, as it may indicate that the server restarted in
2669 * the middle of a call. It is also possible that this is an old ack
2670 * packet. We don't abort the connection in this case, because this
2671 * *might* just be an old ack packet. The right way to detect a server
2672 * restart in the midst of a call is to notice that the server epoch
2674 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2675 * XXX unacknowledged. I think that this is off-by-one, but
2676 * XXX I don't dare change it just yet, since it will
2677 * XXX interact badly with the server-restart detection
2678 * XXX code in receiveackpacket. */
2679 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2680 MUTEX_ENTER(&rx_stats_mutex);
2681 rx_stats.spuriousPacketsRead++;
2682 MUTEX_EXIT(&rx_stats_mutex);
2683 MUTEX_EXIT(&call->lock);
2684 MUTEX_ENTER(&conn->conn_data_lock);
2686 MUTEX_EXIT(&conn->conn_data_lock);
2690 } /* else not a data packet */
2693 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2694 /* Set remote user defined status from packet */
2695 call->remoteStatus = np->header.userStatus;
2697 /* Note the gap between the expected next packet and the actual
2698 * packet that arrived, when the new packet has a smaller serial number
2699 * than expected. Rioses frequently reorder packets all by themselves,
2700 * so this will be quite important with very large window sizes.
2701 * Skew is checked against 0 here to avoid any dependence on the type of
2702 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2704 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2705 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2706 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2708 MUTEX_ENTER(&conn->conn_data_lock);
2709 skew = conn->lastSerial - np->header.serial;
2710 conn->lastSerial = np->header.serial;
2711 MUTEX_EXIT(&conn->conn_data_lock);
2713 register struct rx_peer *peer;
2715 if (skew > peer->inPacketSkew) {
2716 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2717 peer->inPacketSkew = skew;
2721 /* Now do packet type-specific processing */
2722 switch (np->header.type) {
2723 case RX_PACKET_TYPE_DATA:
2724 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2727 case RX_PACKET_TYPE_ACK:
2728 /* Respond immediately to ack packets requesting acknowledgement
2730 if (np->header.flags & RX_REQUEST_ACK) {
2731 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2732 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2734 np = rxi_ReceiveAckPacket(call, np, 1);
2736 case RX_PACKET_TYPE_ABORT:
2737 /* An abort packet: reset the connection, passing the error up to
2739 /* What if error is zero? */
2740 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2742 case RX_PACKET_TYPE_BUSY:
2745 case RX_PACKET_TYPE_ACKALL:
2746 /* All packets acknowledged, so we can drop all packets previously
2747 * readied for sending */
2748 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2749 /* XXX Hack. We because we can't release the global rx lock when
2750 * sending packets (osi_NetSend) we drop all ack pkts while we're
2751 * traversing the tq in rxi_Start sending packets out because
2752 * packets may move to the freePacketQueue as result of being
2753 * here! So we drop these packets until we're safely out of the
2754 * traversing. Really ugly!
2755 * For fine grain RX locking, we set the acked field in the packets
2756 * and let rxi_Start remove the packets from the transmit queue.
2758 if (call->flags & RX_CALL_TQ_BUSY) {
2759 #ifdef RX_ENABLE_LOCKS
2760 rxi_SetAcksInTransmitQueue(call);
2762 #else /* RX_ENABLE_LOCKS */
2764 return np; /* xmitting; drop packet */
2765 #endif /* RX_ENABLE_LOCKS */
2767 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2768 rxi_ClearTransmitQueue(call, 0);
2771 /* Should not reach here, unless the peer is broken: send an abort
2773 rxi_CallError(call, RX_PROTOCOL_ERROR);
2774 np = rxi_SendCallAbort(call, np, 1, 0);
2777 /* Note when this last legitimate packet was received, for keep-alive
2778 * processing. Note, we delay getting the time until now in the hope that
2779 * the packet will be delivered to the user before any get time is required
2780 * (if not, then the time won't actually be re-evaluated here). */
2781 call->lastReceiveTime = clock_Sec();
2782 MUTEX_EXIT(&call->lock);
2783 MUTEX_ENTER(&conn->conn_data_lock);
2785 MUTEX_EXIT(&conn->conn_data_lock);
2789 /* return true if this is an "interesting" connection from the point of view
2790 of someone trying to debug the system */
2791 int rxi_IsConnInteresting(struct rx_connection *aconn)
2794 register struct rx_call *tcall;
2796 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2798 for(i=0;i<RX_MAXCALLS;i++) {
2799 tcall = aconn->call[i];
2801 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2803 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2811 /* if this is one of the last few packets AND it wouldn't be used by the
2812 receiving call to immediately satisfy a read request, then drop it on
2813 the floor, since accepting it might prevent a lock-holding thread from
2814 making progress in its reading. If a call has been cleared while in
2815 the precall state then ignore all subsequent packets until the call
2816 is assigned to a thread. */
2818 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2821 MUTEX_ENTER(&rx_stats_mutex);
2822 if (((ap->header.seq != 1) &&
2823 (acall->flags & RX_CALL_CLEARED) &&
2824 (acall->state == RX_STATE_PRECALL)) ||
2825 ((rx_nFreePackets < rxi_dataQuota+2) &&
2826 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2827 && (acall->flags & RX_CALL_READER_WAIT)))) {
2830 MUTEX_EXIT(&rx_stats_mutex);
2835 static void rxi_CheckReachEvent(struct rxevent *event,
2836 struct rx_connection *conn, struct rx_call *acall)
2838 struct rx_call *call = acall;
2842 MUTEX_ENTER(&conn->conn_data_lock);
2843 conn->checkReachEvent = NULL;
2844 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2845 if (event) conn->refCount--;
2846 MUTEX_EXIT(&conn->conn_data_lock);
2850 MUTEX_ENTER(&conn->conn_call_lock);
2851 MUTEX_ENTER(&conn->conn_data_lock);
2852 for (i=0; i<RX_MAXCALLS; i++) {
2853 struct rx_call *tc = conn->call[i];
2854 if (tc && tc->state == RX_STATE_PRECALL) {
2860 /* Indicate that rxi_CheckReachEvent is no longer running by
2861 * clearing the flag. Must be atomic under conn_data_lock to
2862 * avoid a new call slipping by: rxi_CheckConnReach holds
2863 * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2865 conn->flags &= ~RX_CONN_ATTACHWAIT;
2866 MUTEX_EXIT(&conn->conn_data_lock);
2867 MUTEX_EXIT(&conn->conn_call_lock);
2871 if (call != acall) MUTEX_ENTER(&call->lock);
2872 rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
2873 if (call != acall) MUTEX_EXIT(&call->lock);
2875 clock_GetTime(&when);
2876 when.sec += RX_CHECKREACH_TIMEOUT;
2877 MUTEX_ENTER(&conn->conn_data_lock);
2878 if (!conn->checkReachEvent) {
2880 conn->checkReachEvent =
2881 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2883 MUTEX_EXIT(&conn->conn_data_lock);
2888 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2890 struct rx_service *service = conn->service;
2891 struct rx_peer *peer = conn->peer;
2892 afs_uint32 now, lastReach;
2894 if (service->checkReach == 0)
2898 MUTEX_ENTER(&peer->peer_lock);
2899 lastReach = peer->lastReachTime;
2900 MUTEX_EXIT(&peer->peer_lock);
2901 if (now - lastReach < RX_CHECKREACH_TTL)
2904 MUTEX_ENTER(&conn->conn_data_lock);
2905 if (conn->flags & RX_CONN_ATTACHWAIT) {
2906 MUTEX_EXIT(&conn->conn_data_lock);
2909 conn->flags |= RX_CONN_ATTACHWAIT;
2910 MUTEX_EXIT(&conn->conn_data_lock);
2911 if (!conn->checkReachEvent)
2912 rxi_CheckReachEvent(NULL, conn, call);
2917 /* try to attach call, if authentication is complete */
2918 static void TryAttach(register struct rx_call *acall,
2919 register osi_socket socket, register int *tnop,
2920 register struct rx_call **newcallp, int reachOverride)
2922 struct rx_connection *conn = acall->conn;
2924 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2925 /* Don't attach until we have any req'd. authentication. */
2926 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2927 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2928 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2929 /* Note: this does not necessarily succeed; there
2930 * may not any proc available
2934 rxi_ChallengeOn(acall->conn);
2939 /* A data packet has been received off the interface. This packet is
2940 * appropriate to the call (the call is in the right state, etc.). This
2941 * routine can return a packet to the caller, for re-use */
2943 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call,
2944 register struct rx_packet *np, int istack, osi_socket socket,
2945 afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2951 afs_uint32 seq, serial, flags;
2953 struct rx_packet *tnp;
2955 MUTEX_ENTER(&rx_stats_mutex);
2956 rx_stats.dataPacketsRead++;
2957 MUTEX_EXIT(&rx_stats_mutex);
2960 /* If there are no packet buffers, drop this new packet, unless we can find
2961 * packet buffers from inactive calls */
2963 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2964 MUTEX_ENTER(&rx_freePktQ_lock);
2965 rxi_NeedMorePackets = TRUE;
2966 MUTEX_EXIT(&rx_freePktQ_lock);
2967 MUTEX_ENTER(&rx_stats_mutex);
2968 rx_stats.noPacketBuffersOnRead++;
2969 MUTEX_EXIT(&rx_stats_mutex);
2970 call->rprev = np->header.serial;
2971 rxi_calltrace(RX_TRACE_DROP, call);
2972 dpf (("packet %x dropped on receipt - quota problems", np));
2974 rxi_ClearReceiveQueue(call);
2975 clock_GetTime(&when);
2976 clock_Add(&when, &rx_softAckDelay);
2977 if (!call->delayedAckEvent ||
2978 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2979 rxevent_Cancel(call->delayedAckEvent, call,
2980 RX_CALL_REFCOUNT_DELAY);
2981 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2982 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2985 /* we've damaged this call already, might as well do it in. */
2991 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2992 * packet is one of several packets transmitted as a single
2993 * datagram. Do not send any soft or hard acks until all packets
2994 * in a jumbogram have been processed. Send negative acks right away.
2996 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2997 /* tnp is non-null when there are more packets in the
2998 * current jumbo gram */
3005 seq = np->header.seq;
3006 serial = np->header.serial;
3007 flags = np->header.flags;
3009 /* If the call is in an error state, send an abort message */
3011 return rxi_SendCallAbort(call, np, istack, 0);
3013 /* The RX_JUMBO_PACKET is set in all but the last packet in each
3014 * AFS 3.5 jumbogram. */
3015 if (flags & RX_JUMBO_PACKET) {
3016 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3021 if (np->header.spare != 0) {
3022 MUTEX_ENTER(&call->conn->conn_data_lock);
3023 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3024 MUTEX_EXIT(&call->conn->conn_data_lock);
3027 /* The usual case is that this is the expected next packet */
3028 if (seq == call->rnext) {
3030 /* Check to make sure it is not a duplicate of one already queued */
3031 if (queue_IsNotEmpty(&call->rq)
3032 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3033 MUTEX_ENTER(&rx_stats_mutex);
3034 rx_stats.dupPacketsRead++;
3035 MUTEX_EXIT(&rx_stats_mutex);
3036 dpf (("packet %x dropped on receipt - duplicate", np));
3037 rxevent_Cancel(call->delayedAckEvent, call,
3038 RX_CALL_REFCOUNT_DELAY);
3039 np = rxi_SendAck(call, np, seq, serial,
3040 flags, RX_ACK_DUPLICATE, istack);
3046 /* It's the next packet. Stick it on the receive queue
3047 * for this call. Set newPackets to make sure we wake
3048 * the reader once all packets have been processed */
3049 queue_Prepend(&call->rq, np);
3051 np = NULL; /* We can't use this anymore */
3054 /* If an ack is requested then set a flag to make sure we
3055 * send an acknowledgement for this packet */
3056 if (flags & RX_REQUEST_ACK) {
3060 /* Keep track of whether we have received the last packet */
3061 if (flags & RX_LAST_PACKET) {
3062 call->flags |= RX_CALL_HAVE_LAST;
3066 /* Check whether we have all of the packets for this call */
3067 if (call->flags & RX_CALL_HAVE_LAST) {
3068 afs_uint32 tseq; /* temporary sequence number */
3069 struct rx_packet *tp; /* Temporary packet pointer */
3070 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3072 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3073 if (tseq != tp->header.seq)
3075 if (tp->header.flags & RX_LAST_PACKET) {
3076 call->flags |= RX_CALL_RECEIVE_DONE;
3083 /* Provide asynchronous notification for those who want it
3084 * (e.g. multi rx) */
3085 if (call->arrivalProc) {
3086 (*call->arrivalProc)(call, call->arrivalProcHandle,
3087 (int) call->arrivalProcArg);
3088 call->arrivalProc = (VOID (*)()) 0;
3091 /* Update last packet received */
3094 /* If there is no server process serving this call, grab
3095 * one, if available. We only need to do this once. If a
3096 * server thread is available, this thread becomes a server
3097 * thread and the server thread becomes a listener thread. */
3099 TryAttach(call, socket, tnop, newcallp, 0);
3102 /* This is not the expected next packet. */
3104 /* Determine whether this is a new or old packet, and if it's
3105 * a new one, whether it fits into the current receive window.
3106 * Also figure out whether the packet was delivered in sequence.
3107 * We use the prev variable to determine whether the new packet
3108 * is the successor of its immediate predecessor in the
3109 * receive queue, and the missing flag to determine whether
3110 * any of this packets predecessors are missing. */
3112 afs_uint32 prev; /* "Previous packet" sequence number */
3113 struct rx_packet *tp; /* Temporary packet pointer */
3114 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3115 int missing; /* Are any predecessors missing? */
3117 /* If the new packet's sequence number has been sent to the
3118 * application already, then this is a duplicate */
3119 if (seq < call->rnext) {
3120 MUTEX_ENTER(&rx_stats_mutex);
3121 rx_stats.dupPacketsRead++;
3122 MUTEX_EXIT(&rx_stats_mutex);
3123 rxevent_Cancel(call->delayedAckEvent, call,
3124 RX_CALL_REFCOUNT_DELAY);
3125 np = rxi_SendAck(call, np, seq, serial,
3126 flags, RX_ACK_DUPLICATE, istack);
3132 /* If the sequence number is greater than what can be
3133 * accomodated by the current window, then send a negative
3134 * acknowledge and drop the packet */
3135 if ((call->rnext + call->rwind) <= seq) {
3136 rxevent_Cancel(call->delayedAckEvent, call,
3137 RX_CALL_REFCOUNT_DELAY);
3138 np = rxi_SendAck(call, np, seq, serial,
3139 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3145 /* Look for the packet in the queue of old received packets */
3146 for (prev = call->rnext - 1, missing = 0,
3147 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3148 /*Check for duplicate packet */
3149 if (seq == tp->header.seq) {
3150 MUTEX_ENTER(&rx_stats_mutex);
3151 rx_stats.dupPacketsRead++;
3152 MUTEX_EXIT(&rx_stats_mutex);
3153 rxevent_Cancel(call->delayedAckEvent, call,
3154 RX_CALL_REFCOUNT_DELAY);
3155 np = rxi_SendAck(call, np, seq, serial,
3156 flags, RX_ACK_DUPLICATE, istack);
3161 /* If we find a higher sequence packet, break out and
3162 * insert the new packet here. */
3163 if (seq < tp->header.seq) break;
3164 /* Check for missing packet */
3165 if (tp->header.seq != prev+1) {
3169 prev = tp->header.seq;
3172 /* Keep track of whether we have received the last packet. */
3173 if (flags & RX_LAST_PACKET) {
3174 call->flags |= RX_CALL_HAVE_LAST;
3177 /* It's within the window: add it to the the receive queue.
3178 * tp is left by the previous loop either pointing at the
3179 * packet before which to insert the new packet, or at the
3180 * queue head if the queue is empty or the packet should be
3182 queue_InsertBefore(tp, np);
3186 /* Check whether we have all of the packets for this call */
3187 if ((call->flags & RX_CALL_HAVE_LAST)
3188 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3189 afs_uint32 tseq; /* temporary sequence number */
3191 for (tseq = call->rnext,
3192 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3193 if (tseq != tp->header.seq)
3195 if (tp->header.flags & RX_LAST_PACKET) {
3196 call->flags |= RX_CALL_RECEIVE_DONE;
3203 /* We need to send an ack of the packet is out of sequence,
3204 * or if an ack was requested by the peer. */
3205 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3209 /* Acknowledge the last packet for each call */
3210 if (flags & RX_LAST_PACKET) {
3221 * If the receiver is waiting for an iovec, fill the iovec
3222 * using the data from the receive queue */
3223 if (call->flags & RX_CALL_IOVEC_WAIT) {
3224 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3225 /* the call may have been aborted */
3234 /* Wakeup the reader if any */
3235 if ((call->flags & RX_CALL_READER_WAIT) &&
3236 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3237 (call->iovNext >= call->iovMax) ||
3238 (call->flags & RX_CALL_RECEIVE_DONE))) {
3239 call->flags &= ~RX_CALL_READER_WAIT;
3240 #ifdef RX_ENABLE_LOCKS
3241 CV_BROADCAST(&call->cv_rq);
3243 osi_rxWakeup(&call->rq);
3249 * Send an ack when requested by the peer, or once every
3250 * rxi_SoftAckRate packets until the last packet has been
3251 * received. Always send a soft ack for the last packet in
3252 * the server's reply. */
3254 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3255 np = rxi_SendAck(call, np, seq, serial, flags,
3256 RX_ACK_REQUESTED, istack);
3257 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3258 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3259 np = rxi_SendAck(call, np, seq, serial, flags,
3260 RX_ACK_IDLE, istack);
3261 } else if (call->nSoftAcks) {
3262 clock_GetTime(&when);
3263 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3264 clock_Add(&when, &rx_lastAckDelay);
3266 clock_Add(&when, &rx_softAckDelay);
3268 if (!call->delayedAckEvent ||
3269 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3270 rxevent_Cancel(call->delayedAckEvent, call,
3271 RX_CALL_REFCOUNT_DELAY);
3272 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3273 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3276 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3277 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3284 static void rxi_ComputeRate();
3287 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3289 struct rx_peer *peer = conn->peer;
3291 MUTEX_ENTER(&peer->peer_lock);
3292 peer->lastReachTime = clock_Sec();
3293 MUTEX_EXIT(&peer->peer_lock);
3295 MUTEX_ENTER(&conn->conn_data_lock);
3296 if (conn->flags & RX_CONN_ATTACHWAIT) {
3299 conn->flags &= ~RX_CONN_ATTACHWAIT;
3300 MUTEX_EXIT(&conn->conn_data_lock);
3302 for (i=0; i<RX_MAXCALLS; i++) {
3303 struct rx_call *call = conn->call[i];
3305 if (call != acall) MUTEX_ENTER(&call->lock);
3306 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3307 if (call != acall) MUTEX_EXIT(&call->lock);
3311 MUTEX_EXIT(&conn->conn_data_lock);
3314 /* The real smarts of the whole thing. */
3315 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call,
3316 struct rx_packet *np, int istack)
3318 struct rx_ackPacket *ap;
3320 register struct rx_packet *tp;
3321 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3322 register struct rx_connection *conn = call->conn;
3323 struct rx_peer *peer = conn->peer;
3326 /* because there are CM's that are bogus, sending weird values for this. */
3327 afs_uint32 skew = 0;
3332 int newAckCount = 0;
3333 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3334 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3336 MUTEX_ENTER(&rx_stats_mutex);
3337 rx_stats.ackPacketsRead++;
3338 MUTEX_EXIT(&rx_stats_mutex);
3339 ap = (struct rx_ackPacket *) rx_DataOf(np);
3340 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3342 return np; /* truncated ack packet */
3344 /* depends on ack packet struct */
3345 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3346 first = ntohl(ap->firstPacket);
3347 serial = ntohl(ap->serial);
3348 /* temporarily disabled -- needs to degrade over time
3349 skew = ntohs(ap->maxSkew); */
3351 /* Ignore ack packets received out of order */
3352 if (first < call->tfirst) {
3356 if (np->header.flags & RX_SLOW_START_OK) {
3357 call->flags |= RX_CALL_SLOW_START_OK;
3360 if (ap->reason == RX_ACK_PING_RESPONSE)
3361 rxi_UpdatePeerReach(conn, call);
3366 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3367 ap->reason, ntohl(ap->previousPacket),
3368 (unsigned int) np->header.seq, (unsigned int) serial,
3369 (unsigned int) skew, ntohl(ap->firstPacket));
3372 for (offset = 0; offset < nAcks; offset++)
3373 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3379 /* if a server connection has been re-created, it doesn't remember what
3380 serial # it was up to. An ack will tell us, since the serial field
3381 contains the largest serial received by the other side */
3382 MUTEX_ENTER(&conn->conn_data_lock);
3383 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3384 conn->serial = serial+1;
3386 MUTEX_EXIT(&conn->conn_data_lock);
3388 /* Update the outgoing packet skew value to the latest value of
3389 * the peer's incoming packet skew value. The ack packet, of
3390 * course, could arrive out of order, but that won't affect things
3392 MUTEX_ENTER(&peer->peer_lock);
3393 peer->outPacketSkew = skew;
3395 /* Check for packets that no longer need to be transmitted, and
3396 * discard them. This only applies to packets positively
3397 * acknowledged as having been sent to the peer's upper level.
3398 * All other packets must be retained. So only packets with
3399 * sequence numbers < ap->firstPacket are candidates. */
3400 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3401 if (tp->header.seq >= first) break;
3402 call->tfirst = tp->header.seq + 1;
3403 if (tp->header.serial == serial) {
3404 /* Use RTT if not delayed by client. */
3405 if (ap->reason != RX_ACK_DELAY)
3406 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3408 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3411 else if (tp->firstSerial == serial) {
3412 /* Use RTT if not delayed by client. */
3413 if (ap->reason != RX_ACK_DELAY)
3414 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3416 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3419 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3420 /* XXX Hack. Because we have to release the global rx lock when sending
3421 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3422 * in rxi_Start sending packets out because packets may move to the
3423 * freePacketQueue as result of being here! So we drop these packets until
3424 * we're safely out of the traversing. Really ugly!
3425 * To make it even uglier, if we're using fine grain locking, we can
3426 * set the ack bits in the packets and have rxi_Start remove the packets
3427 * when it's done transmitting.
3429 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3432 if (call->flags & RX_CALL_TQ_BUSY) {
3433 #ifdef RX_ENABLE_LOCKS
3434 tp->flags |= RX_PKTFLAG_ACKED;
3435 call->flags |= RX_CALL_TQ_SOME_ACKED;
3436 #else /* RX_ENABLE_LOCKS */
3438 #endif /* RX_ENABLE_LOCKS */
3440 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3443 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3448 /* Give rate detector a chance to respond to ping requests */
3449 if (ap->reason == RX_ACK_PING_RESPONSE) {
3450 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3454 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3456 /* Now go through explicit acks/nacks and record the results in
3457 * the waiting packets. These are packets that can't be released
3458 * yet, even with a positive acknowledge. This positive
3459 * acknowledge only means the packet has been received by the
3460 * peer, not that it will be retained long enough to be sent to
3461 * the peer's upper level. In addition, reset the transmit timers
3462 * of any missing packets (those packets that must be missing
3463 * because this packet was out of sequence) */
3465 call->nSoftAcked = 0;
3466 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3467 /* Update round trip time if the ack was stimulated on receipt
3469 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3470 #ifdef RX_ENABLE_LOCKS
3471 if (tp->header.seq >= first) {
3472 #endif /* RX_ENABLE_LOCKS */
3473 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3474 if (tp->header.serial == serial) {
3475 /* Use RTT if not delayed by client. */
3476 if (ap->reason != RX_ACK_DELAY)
3477 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3479 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3482 else if ((tp->firstSerial == serial)) {
3483 /* Use RTT if not delayed by client. */
3484 if (ap->reason != RX_ACK_DELAY)
3485 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3487 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3490 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3491 #ifdef RX_ENABLE_LOCKS
3493 #endif /* RX_ENABLE_LOCKS */
3494 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3496 /* Set the acknowledge flag per packet based on the
3497 * information in the ack packet. An acknowlegded packet can
3498 * be downgraded when the server has discarded a packet it
3499 * soacked previously, or when an ack packet is received
3500 * out of sequence. */
3501 if (tp->header.seq < first) {
3502 /* Implicit ack information */
3503 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3506 tp->flags |= RX_PKTFLAG_ACKED;
3508 else if (tp->header.seq < first + nAcks) {
3509 /* Explicit ack information: set it in the packet appropriately */
3510 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3511 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3513 tp->flags |= RX_PKTFLAG_ACKED;
3521 tp->flags &= ~RX_PKTFLAG_ACKED;
3526 tp->flags &= ~RX_PKTFLAG_ACKED;
3530 /* If packet isn't yet acked, and it has been transmitted at least
3531 * once, reset retransmit time using latest timeout
3532 * ie, this should readjust the retransmit timer for all outstanding
3533 * packets... So we don't just retransmit when we should know better*/
3535 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3536 tp->retryTime = tp->timeSent;
3537 clock_Add(&tp->retryTime, &peer->timeout);
3538 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3539 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3543 /* If the window has been extended by this acknowledge packet,
3544 * then wakeup a sender waiting in alloc for window space, or try
3545 * sending packets now, if he's been sitting on packets due to
3546 * lack of window space */
3547 if (call->tnext < (call->tfirst + call->twind)) {
3548 #ifdef RX_ENABLE_LOCKS
3549 CV_SIGNAL(&call->cv_twind);
3551 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3552 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3553 osi_rxWakeup(&call->twind);
3556 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3557 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3561 /* if the ack packet has a receivelen field hanging off it,
3562 * update our state */
3563 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3566 /* If the ack packet has a "recommended" size that is less than
3567 * what I am using now, reduce my size to match */
3568 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3569 sizeof(afs_int32), &tSize);
3570 tSize = (afs_uint32) ntohl(tSize);
3571 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3573 /* Get the maximum packet size to send to this peer */
3574 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3576 tSize = (afs_uint32)ntohl(tSize);
3577 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3578 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3580 /* sanity check - peer might have restarted with different params.
3581 * If peer says "send less", dammit, send less... Peer should never
3582 * be unable to accept packets of the size that prior AFS versions would
3583 * send without asking. */
3584 if (peer->maxMTU != tSize) {
3585 peer->maxMTU = tSize;
3586 peer->MTU = MIN(tSize, peer->MTU);
3587 call->MTU = MIN(call->MTU, tSize);
3591 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3593 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3594 sizeof(afs_int32), &tSize);
3595 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3596 if (tSize < call->twind) { /* smaller than our send */
3597 call->twind = tSize; /* window, we must send less... */
3598 call->ssthresh = MIN(call->twind, call->ssthresh);
3601 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3602 * network MTU confused with the loopback MTU. Calculate the
3603 * maximum MTU here for use in the slow start code below.
3605 maxMTU = peer->maxMTU;
3606 /* Did peer restart with older RX version? */
3607 if (peer->maxDgramPackets > 1) {
3608 peer->maxDgramPackets = 1;
3610 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3612 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3613 sizeof(afs_int32), &tSize);
3614 tSize = (afs_uint32) ntohl(tSize);
3616 * As of AFS 3.5 we set the send window to match the receive window.
3618 if (tSize < call->twind) {
3619 call->twind = tSize;
3620 call->ssthresh = MIN(call->twind, call->ssthresh);
3621 } else if (tSize > call->twind) {
3622 call->twind = tSize;
3626 * As of AFS 3.5, a jumbogram is more than one fixed size
3627 * packet transmitted in a single UDP datagram. If the remote
3628 * MTU is smaller than our local MTU then never send a datagram
3629 * larger than the natural MTU.
3631 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3632 sizeof(afs_int32), &tSize);
3633 maxDgramPackets = (afs_uint32) ntohl(tSize);
3634 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3635 maxDgramPackets = MIN(maxDgramPackets,
3636 (int)(peer->ifDgramPackets));
3637 maxDgramPackets = MIN(maxDgramPackets, tSize);
3638 if (maxDgramPackets > 1) {
3639 peer->maxDgramPackets = maxDgramPackets;
3640 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3642 peer->maxDgramPackets = 1;
3643 call->MTU = peer->natMTU;
3645 } else if (peer->maxDgramPackets > 1) {
3646 /* Restarted with lower version of RX */
3647 peer->maxDgramPackets = 1;
3649 } else if (peer->maxDgramPackets > 1 ||
3650 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3651 /* Restarted with lower version of RX */
3652 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3653 peer->natMTU = OLD_MAX_PACKET_SIZE;
3654 peer->MTU = OLD_MAX_PACKET_SIZE;
3655 peer->maxDgramPackets = 1;
3656 peer->nDgramPackets = 1;
3658 call->MTU = OLD_MAX_PACKET_SIZE;
3663 * Calculate how many datagrams were successfully received after
3664 * the first missing packet and adjust the negative ack counter
3669 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3670 if (call->nNacks < nNacked) {
3671 call->nNacks = nNacked;
3680 if (call->flags & RX_CALL_FAST_RECOVER) {
3682 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3684 call->flags &= ~RX_CALL_FAST_RECOVER;
3685 call->cwind = call->nextCwind;
3686 call->nextCwind = 0;
3689 call->nCwindAcks = 0;
3691 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3692 /* Three negative acks in a row trigger congestion recovery */
3693 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3694 MUTEX_EXIT(&peer->peer_lock);
3695 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3696 /* someone else is waiting to start recovery */
3699 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3700 while (call->flags & RX_CALL_TQ_BUSY) {
3701 call->flags |= RX_CALL_TQ_WAIT;
3702 #ifdef RX_ENABLE_LOCKS
3703 CV_WAIT(&call->cv_tq, &call->lock);
3704 #else /* RX_ENABLE_LOCKS */
3705 osi_rxSleep(&call->tq);
3706 #endif /* RX_ENABLE_LOCKS */
3708 MUTEX_ENTER(&peer->peer_lock);
3709 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3710 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3711 call->flags |= RX_CALL_FAST_RECOVER;
3712 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3713 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3715 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3716 call->nextCwind = call->ssthresh;
3719 peer->MTU = call->MTU;
3720 peer->cwind = call->nextCwind;
3721 peer->nDgramPackets = call->nDgramPackets;
3723 call->congestSeq = peer->congestSeq;
3724 /* Reset the resend times on the packets that were nacked
3725 * so we will retransmit as soon as the window permits*/
3726 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3728 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3729 clock_Zero(&tp->retryTime);
3731 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3736 /* If cwind is smaller than ssthresh, then increase
3737 * the window one packet for each ack we receive (exponential
3739 * If cwind is greater than or equal to ssthresh then increase
3740 * the congestion window by one packet for each cwind acks we
3741 * receive (linear growth). */
3742 if (call->cwind < call->ssthresh) {
3743 call->cwind = MIN((int)call->ssthresh,
3744 (int)(call->cwind + newAckCount));
3745 call->nCwindAcks = 0;
3747 call->nCwindAcks += newAckCount;
3748 if (call->nCwindAcks >= call->cwind) {
3749 call->nCwindAcks = 0;
3750 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3754 * If we have received several acknowledgements in a row then
3755 * it is time to increase the size of our datagrams
3757 if ((int)call->nAcks > rx_nDgramThreshold) {
3758 if (peer->maxDgramPackets > 1) {
3759 if (call->nDgramPackets < peer->maxDgramPackets) {
3760 call->nDgramPackets++;
3762 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3763 } else if (call->MTU < peer->maxMTU) {
3764 call->MTU += peer->natMTU;
3765 call->MTU = MIN(call->MTU, peer->maxMTU);
3771 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3773 /* Servers need to hold the call until all response packets have
3774 * been acknowledged. Soft acks are good enough since clients
3775 * are not allowed to clear their receive queues. */
3776 if (call->state == RX_STATE_HOLD &&
3777 call->tfirst + call->nSoftAcked >= call->tnext) {
3778 call->state = RX_STATE_DALLY;
3779 rxi_ClearTransmitQueue(call, 0);
3780 } else if (!queue_IsEmpty(&call->tq)) {
3781 rxi_Start(0, call, istack);
3786 /* Received a response to a challenge packet */
3787 struct rx_packet *rxi_ReceiveResponsePacket(register struct rx_connection *conn,
3788 register struct rx_packet *np, int istack)
3792 /* Ignore the packet if we're the client */
3793 if (conn->type == RX_CLIENT_CONNECTION) return np;
3795 /* If already authenticated, ignore the packet (it's probably a retry) */
3796 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3799 /* Otherwise, have the security object evaluate the response packet */
3800 error = RXS_CheckResponse(conn->securityObject, conn, np);
3802 /* If the response is invalid, reset the connection, sending
3803 * an abort to the peer */
3807 rxi_ConnectionError(conn, error);
3808 MUTEX_ENTER(&conn->conn_data_lock);
3809 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3810 MUTEX_EXIT(&conn->conn_data_lock);
3814 /* If the response is valid, any calls waiting to attach
3815 * servers can now do so */
3818 for (i=0; i<RX_MAXCALLS; i++) {
3819 struct rx_call *call = conn->call[i];
3821 MUTEX_ENTER(&call->lock);
3822 if (call->state == RX_STATE_PRECALL)
3823 rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
3824 MUTEX_EXIT(&call->lock);
3828 /* Update the peer reachability information, just in case
3829 * some calls went into attach-wait while we were waiting
3830 * for authentication..
3832 rxi_UpdatePeerReach(conn, NULL);
3837 /* A client has received an authentication challenge: the security
3838 * object is asked to cough up a respectable response packet to send
3839 * back to the server. The server is responsible for retrying the
3840 * challenge if it fails to get a response. */
3842 struct rx_packet *rxi_ReceiveChallengePacket(register struct rx_connection *conn,
3843 register struct rx_packet *np, int istack)
3847 /* Ignore the challenge if we're the server */
3848 if (conn->type == RX_SERVER_CONNECTION) return np;
3850 /* Ignore the challenge if the connection is otherwise idle; someone's
3851 * trying to use us as an oracle. */
3852 if (!rxi_HasActiveCalls(conn)) return np;
3854 /* Send the security object the challenge packet. It is expected to fill
3855 * in the response. */
3856 error = RXS_GetResponse(conn->securityObject, conn, np);
3858 /* If the security object is unable to return a valid response, reset the
3859 * connection and send an abort to the peer. Otherwise send the response
3860 * packet to the peer connection. */
3862 rxi_ConnectionError(conn, error);
3863 MUTEX_ENTER(&conn->conn_data_lock);
3864 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3865 MUTEX_EXIT(&conn->conn_data_lock);
3868 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3869 RX_PACKET_TYPE_RESPONSE, NULL, -1, istack);
3875 /* Find an available server process to service the current request in
3876 * the given call structure. If one isn't available, queue up this
3877 * call so it eventually gets one */
3878 void rxi_AttachServerProc(register struct rx_call *call,
3879 register osi_socket socket, register int *tnop, register struct rx_call **newcallp)
3881 register struct rx_serverQueueEntry *sq;
3882 register struct rx_service *service = call->conn->service;
3883 #ifdef RX_ENABLE_LOCKS
3884 register int haveQuota = 0;
3885 #endif /* RX_ENABLE_LOCKS */
3886 /* May already be attached */
3887 if (call->state == RX_STATE_ACTIVE) return;
3889 MUTEX_ENTER(&rx_serverPool_lock);
3890 #ifdef RX_ENABLE_LOCKS
3891 while(rxi_ServerThreadSelectingCall) {
3892 MUTEX_EXIT(&call->lock);
3893 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3894 MUTEX_EXIT(&rx_serverPool_lock);
3895 MUTEX_ENTER(&call->lock);
3896 MUTEX_ENTER(&rx_serverPool_lock);
3897 /* Call may have been attached */
3898 if (call->state == RX_STATE_ACTIVE) return;
3901 haveQuota = QuotaOK(service);
3902 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3903 /* If there are no processes available to service this call,
3904 * put the call on the incoming call queue (unless it's
3905 * already on the queue).
3908 ReturnToServerPool(service);
3909 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3910 call->flags |= RX_CALL_WAIT_PROC;
3911 MUTEX_ENTER(&rx_stats_mutex);
3913 MUTEX_EXIT(&rx_stats_mutex);
3914 rxi_calltrace(RX_CALL_ARRIVAL, call);
3915 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3916 queue_Append(&rx_incomingCallQueue, call);
3919 #else /* RX_ENABLE_LOCKS */
3920 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3921 /* If there are no processes available to service this call,
3922 * put the call on the incoming call queue (unless it's
3923 * already on the queue).
3925 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3926 call->flags |= RX_CALL_WAIT_PROC;
3928 rxi_calltrace(RX_CALL_ARRIVAL, call);
3929 queue_Append(&rx_incomingCallQueue, call);
3932 #endif /* RX_ENABLE_LOCKS */
3934 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3936 /* If hot threads are enabled, and both newcallp and sq->socketp
3937 * are non-null, then this thread will process the call, and the
3938 * idle server thread will start listening on this threads socket.
3941 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3944 *sq->socketp = socket;
3945 clock_GetTime(&call->startTime);
3946 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3950 if (call->flags & RX_CALL_WAIT_PROC) {
3951 /* Conservative: I don't think this should happen */
3952 call->flags &= ~RX_CALL_WAIT_PROC;
3953 MUTEX_ENTER(&rx_stats_mutex);
3955 MUTEX_EXIT(&rx_stats_mutex);
3958 call->state = RX_STATE_ACTIVE;
3959 call->mode = RX_MODE_RECEIVING;
3960 if (call->flags & RX_CALL_CLEARED) {
3961 /* send an ack now to start the packet flow up again */
3962 call->flags &= ~RX_CALL_CLEARED;
3963 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3965 #ifdef RX_ENABLE_LOCKS
3968 service->nRequestsRunning++;
3969 if (service->nRequestsRunning <= service->minProcs)
3975 MUTEX_EXIT(&rx_serverPool_lock);
3978 /* Delay the sending of an acknowledge event for a short while, while
3979 * a new call is being prepared (in the case of a client) or a reply
3980 * is being prepared (in the case of a server). Rather than sending
3981 * an ack packet, an ACKALL packet is sent. */
3982 void rxi_AckAll(struct rxevent *event, register struct rx_call *call, char *dummy)
3984 #ifdef RX_ENABLE_LOCKS
3986 MUTEX_ENTER(&call->lock);
3987 call->delayedAckEvent = NULL;
3988 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3990 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3991 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3993 MUTEX_EXIT(&call->lock);
3994 #else /* RX_ENABLE_LOCKS */
3995 if (event) call->delayedAckEvent = NULL;
3996 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3997 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3998 #endif /* RX_ENABLE_LOCKS */
4001 void rxi_SendDelayedAck(struct rxevent *event, register struct rx_call *call, char *dummy)
4003 #ifdef RX_ENABLE_LOCKS
4005 MUTEX_ENTER(&call->lock);
4006 if (event == call->delayedAckEvent)
4007 call->delayedAckEvent = NULL;
4008 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
4010 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4012 MUTEX_EXIT(&call->lock);
4013 #else /* RX_ENABLE_LOCKS */
4014 if (event) call->delayedAckEvent = NULL;
4015 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4016 #endif /* RX_ENABLE_LOCKS */
4020 #ifdef RX_ENABLE_LOCKS
4021 /* Set ack in all packets in transmit queue. rxi_Start will deal with
4022 * clearing them out.
4024 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call)
4026 register struct rx_packet *p, *tp;
4029 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4032 p->flags |= RX_PKTFLAG_ACKED;
4036 call->flags |= RX_CALL_TQ_CLEARME;
4037 call->flags |= RX_CALL_TQ_SOME_ACKED;
4040 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4041 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4042 call->tfirst = call->tnext;
4043 call->nSoftAcked = 0;
4045 if (call->flags & RX_CALL_FAST_RECOVER) {
4046 call->flags &= ~RX_CALL_FAST_RECOVER;
4047 call->cwind = call->nextCwind;
4048 call->nextCwind = 0;
4051 CV_SIGNAL(&call->cv_twind);
4053 #endif /* RX_ENABLE_LOCKS */
4055 /* Clear out the transmit queue for the current call (all packets have
4056 * been received by peer) */
4057 void rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
4059 register struct rx_packet *p, *tp;
4061 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4062 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
4064 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4067 p->flags |= RX_PKTFLAG_ACKED;
4071 call->flags |= RX_CALL_TQ_CLEARME;
4072 call->flags |= RX_CALL_TQ_SOME_ACKED;
4075 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4076 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4082 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4083 call->flags &= ~RX_CALL_TQ_CLEARME;
4085 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4087 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4088 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4089 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4090 call->nSoftAcked = 0;
4092 if (call->flags & RX_CALL_FAST_RECOVER) {
4093 call->flags &= ~RX_CALL_FAST_RECOVER;
4094 call->cwind = call->nextCwind;
4097 #ifdef RX_ENABLE_LOCKS
4098 CV_SIGNAL(&call->cv_twind);
4100 osi_rxWakeup(&call->twind);
4104 void rxi_ClearReceiveQueue(register struct rx_call *call)
4106 register struct rx_packet *p, *tp;
4107 if (queue_IsNotEmpty(&call->rq)) {
4108 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4113 rx_packetReclaims++;
4115 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4117 if (call->state == RX_STATE_PRECALL) {
4118 call->flags |= RX_CALL_CLEARED;
4122 /* Send an abort packet for the specified call */
4123 struct rx_packet *rxi_SendCallAbort(register struct rx_call *call,
4124 struct rx_packet *packet, int istack, int force)
4132 /* Clients should never delay abort messages */
4133 if (rx_IsClientConn(call->conn))
4136 if (call->abortCode != call->error) {
4137 call->abortCode = call->error;
4138 call->abortCount = 0;
4141 if (force || rxi_callAbortThreshhold == 0 ||
4142 call->abortCount < rxi_callAbortThreshhold) {
4143 if (call->delayedAbortEvent) {
4144 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4146 error = htonl(call->error);
4148 packet = rxi_SendSpecial(call, call->conn, packet,
4149 RX_PACKET_TYPE_ABORT, (char *)&error,
4150 sizeof(error), istack);
4151 } else if (!call->delayedAbortEvent) {
4152 clock_GetTime(&when);
4153 clock_Addmsec(&when, rxi_callAbortDelay);
4154 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4155 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4161 /* Send an abort packet for the specified connection. Packet is an
4162 * optional pointer to a packet that can be used to send the abort.
4163 * Once the number of abort messages reaches the threshhold, an
4164 * event is scheduled to send the abort. Setting the force flag
4165 * overrides sending delayed abort messages.
4167 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4168 * to send the abort packet.
4170 struct rx_packet *rxi_SendConnectionAbort(register struct rx_connection *conn,
4171 struct rx_packet *packet, int istack, int force)
4179 /* Clients should never delay abort messages */
4180 if (rx_IsClientConn(conn))
4183 if (force || rxi_connAbortThreshhold == 0 ||
4184 conn->abortCount < rxi_connAbortThreshhold) {
4185 if (conn->delayedAbortEvent) {
4186 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4188 error = htonl(conn->error);
4190 MUTEX_EXIT(&conn->conn_data_lock);
4191 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4192 RX_PACKET_TYPE_ABORT, (char *)&error,
4193 sizeof(error), istack);
4194 MUTEX_ENTER(&conn->conn_data_lock);
4195 } else if (!conn->delayedAbortEvent) {
4196 clock_GetTime(&when);
4197 clock_Addmsec(&when, rxi_connAbortDelay);
4198 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4204 /* Associate an error all of the calls owned by a connection. Called
4205 * with error non-zero. This is only for really fatal things, like
4206 * bad authentication responses. The connection itself is set in
4207 * error at this point, so that future packets received will be
4209 void rxi_ConnectionError(register struct rx_connection *conn,
4210 register afs_int32 error)
4214 MUTEX_ENTER(&conn->conn_data_lock);
4215 if (conn->challengeEvent)
4216 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4217 if (conn->checkReachEvent) {
4218 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4219 conn->checkReachEvent = 0;
4220 conn->flags &= ~RX_CONN_ATTACHWAIT;
4223 MUTEX_EXIT(&conn->conn_data_lock);
4224 for (i=0; i<RX_MAXCALLS; i++) {
4225 struct rx_call *call = conn->call[i];
4227 MUTEX_ENTER(&call->lock);
4228 rxi_CallError(call, error);
4229 MUTEX_EXIT(&call->lock);
4232 conn->error = error;
4233 MUTEX_ENTER(&rx_stats_mutex);
4234 rx_stats.fatalErrors++;
4235 MUTEX_EXIT(&rx_stats_mutex);
4239 void rxi_CallError(register struct rx_call *call, afs_int32 error)
4241 if (call->error) error = call->error;
4242 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4243 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4244 rxi_ResetCall(call, 0);
4247 rxi_ResetCall(call, 0);
4249 call->error = error;
4250 call->mode = RX_MODE_ERROR;
4253 /* Reset various fields in a call structure, and wakeup waiting
4254 * processes. Some fields aren't changed: state & mode are not
4255 * touched (these must be set by the caller), and bufptr, nLeft, and
4256 * nFree are not reset, since these fields are manipulated by
4257 * unprotected macros, and may only be reset by non-interrupting code.
4260 /* this code requires that call->conn be set properly as a pre-condition. */
4261 #endif /* ADAPT_WINDOW */
4263 void rxi_ResetCall(register struct rx_call *call, register int newcall)
4266 register struct rx_peer *peer;
4267 struct rx_packet *packet;
4269 /* Notify anyone who is waiting for asynchronous packet arrival */
4270 if (call->arrivalProc) {
4271 (*call->arrivalProc)(call, call->arrivalProcHandle, (int) call->arrivalProcArg);
4272 call->arrivalProc = (VOID (*)()) 0;
4275 if (call->delayedAbortEvent) {
4276 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4277 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4279 rxi_SendCallAbort(call, packet, 0, 1);
4280 rxi_FreePacket(packet);
4285 * Update the peer with the congestion information in this call
4286 * so other calls on this connection can pick up where this call
4287 * left off. If the congestion sequence numbers don't match then
4288 * another call experienced a retransmission.
4290 peer = call->conn->peer;
4291 MUTEX_ENTER(&peer->peer_lock);
4293 if (call->congestSeq == peer->congestSeq) {
4294 peer->cwind = MAX(peer->cwind, call->cwind);
4295 peer->MTU = MAX(peer->MTU, call->MTU);
4296 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4299 call->abortCode = 0;
4300 call->abortCount = 0;
4302 if (peer->maxDgramPackets > 1) {
4303 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4305 call->MTU = peer->MTU;
4307 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4308 call->ssthresh = rx_maxSendWindow;
4309 call->nDgramPackets = peer->nDgramPackets;
4310 call->congestSeq = peer->congestSeq;
4311 MUTEX_EXIT(&peer->peer_lock);
4313 flags = call->flags;
4314 rxi_ClearReceiveQueue(call);
4315 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4316 if (call->flags & RX_CALL_TQ_BUSY) {
4317 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4318 call->flags |= (flags & RX_CALL_TQ_WAIT);
4320 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4322 rxi_ClearTransmitQueue(call, 0);
4323 queue_Init(&call->tq);
4326 queue_Init(&call->rq);
4328 call->rwind = rx_initReceiveWindow;
4329 call->twind = rx_initSendWindow;
4330 call->nSoftAcked = 0;
4331 call->nextCwind = 0;
4334 call->nCwindAcks = 0;
4335 call->nSoftAcks = 0;
4336 call->nHardAcks = 0;
4338 call->tfirst = call->rnext = call->tnext = 1;
4340 call->lastAcked = 0;
4341 call->localStatus = call->remoteStatus = 0;
4343 if (flags & RX_CALL_READER_WAIT) {
4344 #ifdef RX_ENABLE_LOCKS
4345 CV_BROADCAST(&call->cv_rq);
4347 osi_rxWakeup(&call->rq);
4350 if (flags & RX_CALL_WAIT_PACKETS) {
4351 MUTEX_ENTER(&rx_freePktQ_lock);
4352 rxi_PacketsUnWait(); /* XXX */
4353 MUTEX_EXIT(&rx_freePktQ_lock);
4356 #ifdef RX_ENABLE_LOCKS
4357 CV_SIGNAL(&call->cv_twind);
4359 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4360 osi_rxWakeup(&call->twind);
4363 #ifdef RX_ENABLE_LOCKS
4364 /* The following ensures that we don't mess with any queue while some
4365 * other thread might also be doing so. The call_queue_lock field is
4366 * is only modified under the call lock. If the call is in the process
4367 * of being removed from a queue, the call is not locked until the
4368 * the queue lock is dropped and only then is the call_queue_lock field
4369 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4370 * Note that any other routine which removes a call from a queue has to
4371 * obtain the queue lock before examing the queue and removing the call.
4373 if (call->call_queue_lock) {
4374 MUTEX_ENTER(call->call_queue_lock);
4375 if (queue_IsOnQueue(call)) {
4377 if (flags & RX_CALL_WAIT_PROC) {
4378 MUTEX_ENTER(&rx_stats_mutex);
4380 MUTEX_EXIT(&rx_stats_mutex);
4383 MUTEX_EXIT(call->call_queue_lock);
4384 CLEAR_CALL_QUEUE_LOCK(call);
4386 #else /* RX_ENABLE_LOCKS */
4387 if (queue_IsOnQueue(call)) {
4389 if (flags & RX_CALL_WAIT_PROC)
4392 #endif /* RX_ENABLE_LOCKS */
4394 rxi_KeepAliveOff(call);
4395 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4398 /* Send an acknowledge for the indicated packet (seq,serial) of the
4399 * indicated call, for the indicated reason (reason). This
4400 * acknowledge will specifically acknowledge receiving the packet, and
4401 * will also specify which other packets for this call have been
4402 * received. This routine returns the packet that was used to the
4403 * caller. The caller is responsible for freeing it or re-using it.
4404 * This acknowledgement also returns the highest sequence number
4405 * actually read out by the higher level to the sender; the sender
4406 * promises to keep around packets that have not been read by the
4407 * higher level yet (unless, of course, the sender decides to abort
4408 * the call altogether). Any of p, seq, serial, pflags, or reason may
4409 * be set to zero without ill effect. That is, if they are zero, they
4410 * will not convey any information.
4411 * NOW there is a trailer field, after the ack where it will safely be
4412 * ignored by mundanes, which indicates the maximum size packet this
4413 * host can swallow. */
4415 register struct rx_packet *optionalPacket; use to send ack (or null)
4416 int seq; Sequence number of the packet we are acking
4417 int serial; Serial number of the packet
4418 int pflags; Flags field from packet header
4419 int reason; Reason an acknowledge was prompted
4422 struct rx_packet *rxi_SendAck(register struct rx_call *call,
4423 register struct rx_packet *optionalPacket, int seq, int serial,
4424 int pflags, int reason, int istack)
4426 struct rx_ackPacket *ap;
4427 register struct rx_packet *rqp;
4428 register struct rx_packet *nxp; /* For queue_Scan */
4429 register struct rx_packet *p;
4434 * Open the receive window once a thread starts reading packets
4436 if (call->rnext > 1) {
4437 call->rwind = rx_maxReceiveWindow;
4440 call->nHardAcks = 0;
4441 call->nSoftAcks = 0;
4442 if (call->rnext > call->lastAcked)
4443 call->lastAcked = call->rnext;
4447 rx_computelen(p, p->length); /* reset length, you never know */
4448 } /* where that's been... */
4450 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4451 /* We won't send the ack, but don't panic. */
4452 return optionalPacket;
4455 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4457 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4458 if (!optionalPacket) rxi_FreePacket(p);
4459 return optionalPacket;
4461 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4462 if (rx_Contiguous(p)<templ) {
4463 if (!optionalPacket) rxi_FreePacket(p);
4464 return optionalPacket;
4466 } /* MTUXXX failing to send an ack is very serious. We should */
4467 /* try as hard as possible to send even a partial ack; it's */
4468 /* better than nothing. */
4470 ap = (struct rx_ackPacket *) rx_DataOf(p);
4471 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4472 ap->reason = reason;
4474 /* The skew computation used to be bogus, I think it's better now. */
4475 /* We should start paying attention to skew. XXX */
4476 ap->serial = htonl(call->conn->maxSerial);
4477 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4479 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4480 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4482 /* No fear of running out of ack packet here because there can only be at most
4483 * one window full of unacknowledged packets. The window size must be constrained
4484 * to be less than the maximum ack size, of course. Also, an ack should always
4485 * fit into a single packet -- it should not ever be fragmented. */
4486 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4487 if (!rqp || !call->rq.next
4488 || (rqp->header.seq > (call->rnext + call->rwind))) {
4489 if (!optionalPacket) rxi_FreePacket(p);
4490 rxi_CallError(call, RX_CALL_DEAD);
4491 return optionalPacket;
4494 while (rqp->header.seq > call->rnext + offset)
4495 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4496 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4498 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4499 if (!optionalPacket) rxi_FreePacket(p);
4500 rxi_CallError(call, RX_CALL_DEAD);
4501 return optionalPacket;
4506 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4508 /* these are new for AFS 3.3 */
4509 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4510 templ = htonl(templ);
4511 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4512 templ = htonl(call->conn->peer->ifMTU);
4513 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4515 /* new for AFS 3.4 */
4516 templ = htonl(call->rwind);
4517 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4519 /* new for AFS 3.5 */
4520 templ = htonl(call->conn->peer->ifDgramPackets);
4521 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4523 p->header.serviceId = call->conn->serviceId;
4524 p->header.cid = (call->conn->cid | call->channel);
4525 p->header.callNumber = *call->callNumber;
4526 p->header.seq = seq;
4527 p->header.securityIndex = call->conn->securityIndex;
4528 p->header.epoch = call->conn->epoch;
4529 p->header.type = RX_PACKET_TYPE_ACK;
4530 p->header.flags = RX_SLOW_START_OK;
4531 if (reason == RX_ACK_PING) {
4532 p->header.flags |= RX_REQUEST_ACK;
4534 clock_GetTime(&call->pingRequestTime);
4537 if (call->conn->type == RX_CLIENT_CONNECTION)
4538 p->header.flags |= RX_CLIENT_INITIATED;
4542 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4543 ap->reason, ntohl(ap->previousPacket),
4544 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4546 for (offset = 0; offset < ap->nAcks; offset++)
4547 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4554 register int i, nbytes = p->length;
4556 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4557 if (nbytes <= p->wirevec[i].iov_len) {
4558 register int savelen, saven;
4560 savelen = p->wirevec[i].iov_len;
4562 p->wirevec[i].iov_len = nbytes;
4564 rxi_Send(call, p, istack);
4565 p->wirevec[i].iov_len = savelen;
4569 else nbytes -= p->wirevec[i].iov_len;
4572 MUTEX_ENTER(&rx_stats_mutex);
4573 rx_stats.ackPacketsSent++;
4574 MUTEX_EXIT(&rx_stats_mutex);
4575 if (!optionalPacket) rxi_FreePacket(p);
4576 return optionalPacket; /* Return packet for re-use by caller */
4579 /* Send all of the packets in the list in single datagram */
4580 static void rxi_SendList(struct rx_call *call, struct rx_packet **list,
4581 int len, int istack, int moreFlag, struct clock *now,
4582 struct clock *retryTime, int resending)
4587 struct rx_connection *conn = call->conn;
4588 struct rx_peer *peer = conn->peer;
4590 MUTEX_ENTER(&peer->peer_lock);
4592 if (resending) peer->reSends += len;
4593 MUTEX_ENTER(&rx_stats_mutex);
4594 rx_stats.dataPacketsSent += len;
4595 MUTEX_EXIT(&rx_stats_mutex);
4596 MUTEX_EXIT(&peer->peer_lock);
4598 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4602 /* Set the packet flags and schedule the resend events */
4603 /* Only request an ack for the last packet in the list */
4604 for (i = 0 ; i < len ; i++) {
4605 list[i]->retryTime = *retryTime;
4606 if (list[i]->header.serial) {
4607 /* Exponentially backoff retry times */
4608 if (list[i]->backoff < MAXBACKOFF) {
4609 /* so it can't stay == 0 */
4610 list[i]->backoff = (list[i]->backoff << 1) +1;
4612 else list[i]->backoff++;
4613 clock_Addmsec(&(list[i]->retryTime),
4614 ((afs_uint32) list[i]->backoff) << 8);
4617 /* Wait a little extra for the ack on the last packet */
4618 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4619 clock_Addmsec(&(list[i]->retryTime), 400);
4622 /* Record the time sent */
4623 list[i]->timeSent = *now;
4625 /* Ask for an ack on retransmitted packets, on every other packet
4626 * if the peer doesn't support slow start. Ask for an ack on every
4627 * packet until the congestion window reaches the ack rate. */
4628 if (list[i]->header.serial) {
4630 MUTEX_ENTER(&rx_stats_mutex);
4631 rx_stats.dataPacketsReSent++;
4632 MUTEX_EXIT(&rx_stats_mutex);
4634 /* improved RTO calculation- not Karn */
4635 list[i]->firstSent = *now;
4637 && (call->cwind <= (u_short)(conn->ackRate+1)
4638 || (!(call->flags & RX_CALL_SLOW_START_OK)
4639 && (list[i]->header.seq & 1)))) {
4644 MUTEX_ENTER(&peer->peer_lock);
4646 if (resending) peer->reSends++;
4647 MUTEX_ENTER(&rx_stats_mutex);
4648 rx_stats.dataPacketsSent++;
4649 MUTEX_EXIT(&rx_stats_mutex);
4650 MUTEX_EXIT(&peer->peer_lock);
4652 /* Tag this packet as not being the last in this group,
4653 * for the receiver's benefit */
4654 if (i < len-1 || moreFlag) {
4655 list[i]->header.flags |= RX_MORE_PACKETS;
4658 /* Install the new retransmit time for the packet, and
4659 * record the time sent */
4660 list[i]->timeSent = *now;
4664 list[len-1]->header.flags |= RX_REQUEST_ACK;
4667 /* Since we're about to send a data packet to the peer, it's
4668 * safe to nuke any scheduled end-of-packets ack */
4669 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4671 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4672 MUTEX_EXIT(&call->lock);
4674 rxi_SendPacketList(conn, list, len, istack);
4676 rxi_SendPacket(conn, list[0], istack);
4678 MUTEX_ENTER(&call->lock);
4679 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4681 /* Update last send time for this call (for keep-alive
4682 * processing), and for the connection (so that we can discover
4683 * idle connections) */
4684 conn->lastSendTime = call->lastSendTime = clock_Sec();
4687 /* When sending packets we need to follow these rules:
4688 * 1. Never send more than maxDgramPackets in a jumbogram.
4689 * 2. Never send a packet with more than two iovecs in a jumbogram.
4690 * 3. Never send a retransmitted packet in a jumbogram.
4691 * 4. Never send more than cwind/4 packets in a jumbogram
4692 * We always keep the last list we should have sent so we
4693 * can set the RX_MORE_PACKETS flags correctly.
4695 static void rxi_SendXmitList(struct rx_call *call, struct rx_packet **list,
4696 int len, int istack, struct clock *now, struct clock *retryTime,
4699 int i, cnt, lastCnt = 0;
4700 struct rx_packet **listP, **lastP = 0;
4701 struct rx_peer *peer = call->conn->peer;
4702 int morePackets = 0;
4704 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4705 /* Does the current packet force us to flush the current list? */
4707 && (list[i]->header.serial
4708 || (list[i]->flags & RX_PKTFLAG_ACKED)
4709 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4711 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4712 /* If the call enters an error state stop sending, or if
4713 * we entered congestion recovery mode, stop sending */
4714 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4722 /* Add the current packet to the list if it hasn't been acked.
4723 * Otherwise adjust the list pointer to skip the current packet. */
4724 if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
4726 /* Do we need to flush the list? */
4727 if (cnt >= (int)peer->maxDgramPackets
4728 || cnt >= (int)call->nDgramPackets
4729 || cnt >= (int)call->cwind
4730 || list[i]->header.serial
4731 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4733 rxi_SendList(call, lastP, lastCnt, istack, 1,
4734 now, retryTime, resending);
4735 /* If the call enters an error state stop sending, or if
4736 * we entered congestion recovery mode, stop sending */
4737 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4747 osi_Panic("rxi_SendList error");
4753 /* Send the whole list when the call is in receive mode, when
4754 * the call is in eof mode, when we are in fast recovery mode,
4755 * and when we have the last packet */
4756 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4757 || call->mode == RX_MODE_RECEIVING
4758 || call->mode == RX_MODE_EOF
4759 || (call->flags & RX_CALL_FAST_RECOVER)) {
4760 /* Check for the case where the current list contains
4761 * an acked packet. Since we always send retransmissions
4762 * in a separate packet, we only need to check the first
4763 * packet in the list */
4764 if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
4768 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4769 now, retryTime, resending);
4770 /* If the call enters an error state stop sending, or if
4771 * we entered congestion recovery mode, stop sending */
4772 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4776 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4778 } else if (lastCnt > 0) {
4779 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4783 #ifdef RX_ENABLE_LOCKS
4784 /* Call rxi_Start, below, but with the call lock held. */
4785 void rxi_StartUnlocked(struct rxevent *event, register struct rx_call *call,
4788 MUTEX_ENTER(&call->lock);
4789 rxi_Start(event, call, istack);
4790 MUTEX_EXIT(&call->lock);
4792 #endif /* RX_ENABLE_LOCKS */
4794 /* This routine is called when new packets are readied for
4795 * transmission and when retransmission may be necessary, or when the
4796 * transmission window or burst count are favourable. This should be
4797 * better optimized for new packets, the usual case, now that we've
4798 * got rid of queues of send packets. XXXXXXXXXXX */
4799 void rxi_Start(struct rxevent *event, register struct rx_call *call,
4802 struct rx_packet *p;
4803 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4804 struct rx_peer *peer = call->conn->peer;
4805 struct clock now, retryTime;
4809 struct rx_packet **xmitList;
4812 /* If rxi_Start is being called as a result of a resend event,
4813 * then make sure that the event pointer is removed from the call
4814 * structure, since there is no longer a per-call retransmission
4816 if (event && event == call->resendEvent) {
4817 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4818 call->resendEvent = NULL;
4820 if (queue_IsEmpty(&call->tq)) {
4824 /* Timeouts trigger congestion recovery */
4825 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4826 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4827 /* someone else is waiting to start recovery */
4830 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4831 while (call->flags & RX_CALL_TQ_BUSY) {
4832 call->flags |= RX_CALL_TQ_WAIT;
4833 #ifdef RX_ENABLE_LOCKS
4834 CV_WAIT(&call->cv_tq, &call->lock);
4835 #else /* RX_ENABLE_LOCKS */
4836 osi_rxSleep(&call->tq);
4837 #endif /* RX_ENABLE_LOCKS */
4839 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4840 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4841 call->flags |= RX_CALL_FAST_RECOVER;
4842 if (peer->maxDgramPackets > 1) {
4843 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4845 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4847 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4848 call->nDgramPackets = 1;
4850 call->nextCwind = 1;
4853 MUTEX_ENTER(&peer->peer_lock);
4854 peer->MTU = call->MTU;
4855 peer->cwind = call->cwind;
4856 peer->nDgramPackets = 1;
4858 call->congestSeq = peer->congestSeq;
4859 MUTEX_EXIT(&peer->peer_lock);
4860 /* Clear retry times on packets. Otherwise, it's possible for
4861 * some packets in the queue to force resends at rates faster
4862 * than recovery rates.
4864 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4865 if (!(p->flags & RX_PKTFLAG_ACKED)) {
4866 clock_Zero(&p->retryTime);
4871 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4872 MUTEX_ENTER(&rx_stats_mutex);
4873 rx_tq_debug.rxi_start_in_error ++;
4874 MUTEX_EXIT(&rx_stats_mutex);
4879 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4880 /* Get clock to compute the re-transmit time for any packets
4881 * in this burst. Note, if we back off, it's reasonable to
4882 * back off all of the packets in the same manner, even if
4883 * some of them have been retransmitted more times than more
4884 * recent additions */
4885 clock_GetTime(&now);
4886 retryTime = now; /* initialize before use */
4887 MUTEX_ENTER(&peer->peer_lock);
4888 clock_Add(&retryTime, &peer->timeout);
4889 MUTEX_EXIT(&peer->peer_lock);
4891 /* Send (or resend) any packets that need it, subject to
4892 * window restrictions and congestion burst control
4893 * restrictions. Ask for an ack on the last packet sent in
4894 * this burst. For now, we're relying upon the window being
4895 * considerably bigger than the largest number of packets that
4896 * are typically sent at once by one initial call to
4897 * rxi_Start. This is probably bogus (perhaps we should ask
4898 * for an ack when we're half way through the current
4899 * window?). Also, for non file transfer applications, this
4900 * may end up asking for an ack for every packet. Bogus. XXXX
4903 * But check whether we're here recursively, and let the other guy
4906 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4907 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4908 call->flags |= RX_CALL_TQ_BUSY;
4910 call->flags &= ~RX_CALL_NEED_START;
4911 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4913 maxXmitPackets = MIN(call->twind, call->cwind);
4914 xmitList = (struct rx_packet **)
4915 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4916 if (xmitList == NULL)
4917 osi_Panic("rxi_Start, failed to allocate xmit list");
4918 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4919 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4920 /* We shouldn't be sending packets if a thread is waiting
4921 * to initiate congestion recovery */
4924 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4925 /* Only send one packet during fast recovery */
4928 if ((p->flags & RX_PKTFLAG_FREE) ||
4929 (!queue_IsEnd(&call->tq, nxp)
4930 && (nxp->flags & RX_PKTFLAG_FREE)) ||
4931 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4932 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4933 osi_Panic("rxi_Start: xmit queue clobbered");
4935 if (p->flags & RX_PKTFLAG_ACKED) {
4936 MUTEX_ENTER(&rx_stats_mutex);
4937 rx_stats.ignoreAckedPacket++;
4938 MUTEX_EXIT(&rx_stats_mutex);
4939 continue; /* Ignore this packet if it has been acknowledged */
4942 /* Turn off all flags except these ones, which are the same
4943 * on each transmission */
4944 p->header.flags &= RX_PRESET_FLAGS;
4946 if (p->header.seq >= call->tfirst +
4947 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4948 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4949 /* Note: if we're waiting for more window space, we can
4950 * still send retransmits; hence we don't return here, but
4951 * break out to schedule a retransmit event */
4952 dpf(("call %d waiting for window", *(call->callNumber)));
4956 /* Transmit the packet if it needs to be sent. */
4957 if (!clock_Lt(&now, &p->retryTime)) {
4958 if (nXmitPackets == maxXmitPackets) {
4959 osi_Panic("rxi_Start: xmit list overflowed");
4961 xmitList[nXmitPackets++] = p;
4965 /* xmitList now hold pointers to all of the packets that are
4966 * ready to send. Now we loop to send the packets */
4967 if (nXmitPackets > 0) {
4968 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4969 &now, &retryTime, resending);
4971 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4973 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4975 * TQ references no longer protected by this flag; they must remain
4976 * protected by the global lock.
4978 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4979 call->flags &= ~RX_CALL_TQ_BUSY;
4980 if (call->flags & RX_CALL_TQ_WAIT) {
4981 call->flags &= ~RX_CALL_TQ_WAIT;
4982 #ifdef RX_ENABLE_LOCKS
4983 CV_BROADCAST(&call->cv_tq);
4984 #else /* RX_ENABLE_LOCKS */
4985 osi_rxWakeup(&call->tq);
4986 #endif /* RX_ENABLE_LOCKS */
4991 /* We went into the error state while sending packets. Now is
4992 * the time to reset the call. This will also inform the using
4993 * process that the call is in an error state.
4995 MUTEX_ENTER(&rx_stats_mutex);
4996 rx_tq_debug.rxi_start_aborted ++;
4997 MUTEX_EXIT(&rx_stats_mutex);
4998 call->flags &= ~RX_CALL_TQ_BUSY;
4999 if (call->flags & RX_CALL_TQ_WAIT) {
5000 call->flags &= ~RX_CALL_TQ_WAIT;
5001 #ifdef RX_ENABLE_LOCKS
5002 CV_BROADCAST(&call->cv_tq);
5003 #else /* RX_ENABLE_LOCKS */
5004 osi_rxWakeup(&call->tq);
5005 #endif /* RX_ENABLE_LOCKS */
5007 rxi_CallError(call, call->error);
5010 #ifdef RX_ENABLE_LOCKS
5011 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
5012 register int missing;
5013 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
5014 /* Some packets have received acks. If they all have, we can clear
5015 * the transmit queue.
5017 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5018 if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
5026 call->flags |= RX_CALL_TQ_CLEARME;
5028 #endif /* RX_ENABLE_LOCKS */
5029 /* Don't bother doing retransmits if the TQ is cleared. */
5030 if (call->flags & RX_CALL_TQ_CLEARME) {
5031 rxi_ClearTransmitQueue(call, 1);
5033 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5036 /* Always post a resend event, if there is anything in the
5037 * queue, and resend is possible. There should be at least
5038 * one unacknowledged packet in the queue ... otherwise none
5039 * of these packets should be on the queue in the first place.
5041 if (call->resendEvent) {
5042 /* Cancel the existing event and post a new one */
5043 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5046 /* The retry time is the retry time on the first unacknowledged
5047 * packet inside the current window */
5048 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5049 /* Don't set timers for packets outside the window */
5050 if (p->header.seq >= call->tfirst + call->twind) {
5054 if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
5056 retryTime = p->retryTime;
5061 /* Post a new event to re-run rxi_Start when retries may be needed */
5062 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
5063 #ifdef RX_ENABLE_LOCKS
5064 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
5065 call->resendEvent = rxevent_Post(&retryTime,
5067 (void *)call, (void *)istack);
5068 #else /* RX_ENABLE_LOCKS */
5069 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
5070 (void *)call, (void *)istack);
5071 #endif /* RX_ENABLE_LOCKS */
5074 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5075 } while (call->flags & RX_CALL_NEED_START);
5077 * TQ references no longer protected by this flag; they must remain
5078 * protected by the global lock.
5080 call->flags &= ~RX_CALL_TQ_BUSY;
5081 if (call->flags & RX_CALL_TQ_WAIT) {
5082 call->flags &= ~RX_CALL_TQ_WAIT;
5083 #ifdef RX_ENABLE_LOCKS
5084 CV_BROADCAST(&call->cv_tq);
5085 #else /* RX_ENABLE_LOCKS */
5086 osi_rxWakeup(&call->tq);
5087 #endif /* RX_ENABLE_LOCKS */
5090 call->flags |= RX_CALL_NEED_START;
5092 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5094 if (call->resendEvent) {
5095 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5100 /* Also adjusts the keep alive parameters for the call, to reflect
5101 * that we have just sent a packet (so keep alives aren't sent
5103 void rxi_Send(register struct rx_call *call, register struct rx_packet *p,
5106 register struct rx_connection *conn = call->conn;
5108 /* Stamp each packet with the user supplied status */
5109 p->header.userStatus = call->localStatus;
5111 /* Allow the security object controlling this call's security to
5112 * make any last-minute changes to the packet */
5113 RXS_SendPacket(conn->securityObject, call, p);
5115 /* Since we're about to send SOME sort of packet to the peer, it's
5116 * safe to nuke any scheduled end-of-packets ack */
5117 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5119 /* Actually send the packet, filling in more connection-specific fields */
5120 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5121 MUTEX_EXIT(&call->lock);
5122 rxi_SendPacket(conn, p, istack);
5123 MUTEX_ENTER(&call->lock);
5124 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5126 /* Update last send time for this call (for keep-alive
5127 * processing), and for the connection (so that we can discover
5128 * idle connections) */
5129 conn->lastSendTime = call->lastSendTime = clock_Sec();
5133 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5134 * that things are fine. Also called periodically to guarantee that nothing
5135 * falls through the cracks (e.g. (error + dally) connections have keepalive
5136 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5138 * haveCTLock Set if calling from rxi_ReapConnections
5140 #ifdef RX_ENABLE_LOCKS
5141 int rxi_CheckCall(register struct rx_call *call, int haveCTLock)
5142 #else /* RX_ENABLE_LOCKS */
5143 int rxi_CheckCall(register struct rx_call *call)
5144 #endif /* RX_ENABLE_LOCKS */
5146 register struct rx_connection *conn = call->conn;
5147 register struct rx_service *tservice;
5149 afs_uint32 deadTime;
5151 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5152 if (call->flags & RX_CALL_TQ_BUSY) {
5153 /* Call is active and will be reset by rxi_Start if it's
5154 * in an error state.
5159 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5160 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5161 ((afs_uint32)conn->peer->rtt >> 3) +
5162 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5164 /* These are computed to the second (+- 1 second). But that's
5165 * good enough for these values, which should be a significant
5166 * number of seconds. */
5167 if (now > (call->lastReceiveTime + deadTime)) {
5168 if (call->state == RX_STATE_ACTIVE) {
5169 rxi_CallError(call, RX_CALL_DEAD);
5173 #ifdef RX_ENABLE_LOCKS
5174 /* Cancel pending events */
5175 rxevent_Cancel(call->delayedAckEvent, call,
5176 RX_CALL_REFCOUNT_DELAY);
5177 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5178 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5179 if (call->refCount == 0) {
5180 rxi_FreeCall(call, haveCTLock);
5184 #else /* RX_ENABLE_LOCKS */
5187 #endif /* RX_ENABLE_LOCKS */
5189 /* Non-active calls are destroyed if they are not responding
5190 * to pings; active calls are simply flagged in error, so the
5191 * attached process can die reasonably gracefully. */
5193 /* see if we have a non-activity timeout */
5194 tservice = conn->service;
5195 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5196 && tservice->idleDeadTime
5197 && ((call->startWait + tservice->idleDeadTime) < now)) {
5198 if (call->state == RX_STATE_ACTIVE) {
5199 rxi_CallError(call, RX_CALL_TIMEOUT);
5203 /* see if we have a hard timeout */
5204 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5205 if (call->state == RX_STATE_ACTIVE)
5206 rxi_CallError(call, RX_CALL_TIMEOUT);
5213 /* When a call is in progress, this routine is called occasionally to
5214 * make sure that some traffic has arrived (or been sent to) the peer.
5215 * If nothing has arrived in a reasonable amount of time, the call is
5216 * declared dead; if nothing has been sent for a while, we send a
5217 * keep-alive packet (if we're actually trying to keep the call alive)
5219 void rxi_KeepAliveEvent(struct rxevent *event, register struct rx_call *call,
5222 struct rx_connection *conn;
5225 MUTEX_ENTER(&call->lock);
5226 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5227 if (event == call->keepAliveEvent)
5228 call->keepAliveEvent = NULL;
5231 #ifdef RX_ENABLE_LOCKS
5232 if(rxi_CheckCall(call, 0)) {
5233 MUTEX_EXIT(&call->lock);
5236 #else /* RX_ENABLE_LOCKS */
5237 if (rxi_CheckCall(call)) return;
5238 #endif /* RX_ENABLE_LOCKS */
5240 /* Don't try to keep alive dallying calls */
5241 if (call->state == RX_STATE_DALLY) {
5242 MUTEX_EXIT(&call->lock);
5247 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5248 /* Don't try to send keepalives if there is unacknowledged data */
5249 /* the rexmit code should be good enough, this little hack
5250 * doesn't quite work XXX */
5251 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5253 rxi_ScheduleKeepAliveEvent(call);
5254 MUTEX_EXIT(&call->lock);
5258 void rxi_ScheduleKeepAliveEvent(register struct rx_call *call)
5260 if (!call->keepAliveEvent) {
5262 clock_GetTime(&when);
5263 when.sec += call->conn->secondsUntilPing;
5264 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5265 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5269 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5270 void rxi_KeepAliveOn(register struct rx_call *call)
5272 /* Pretend last packet received was received now--i.e. if another
5273 * packet isn't received within the keep alive time, then the call
5274 * will die; Initialize last send time to the current time--even
5275 * if a packet hasn't been sent yet. This will guarantee that a
5276 * keep-alive is sent within the ping time */
5277 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5278 rxi_ScheduleKeepAliveEvent(call);
5281 /* This routine is called to send connection abort messages
5282 * that have been delayed to throttle looping clients. */
5283 void rxi_SendDelayedConnAbort(struct rxevent *event, register struct rx_connection *conn,
5287 struct rx_packet *packet;
5289 MUTEX_ENTER(&conn->conn_data_lock);
5290 conn->delayedAbortEvent = NULL;
5291 error = htonl(conn->error);
5293 MUTEX_EXIT(&conn->conn_data_lock);
5294 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5296 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5297 RX_PACKET_TYPE_ABORT, (char *)&error,
5299 rxi_FreePacket(packet);
5303 /* This routine is called to send call abort messages
5304 * that have been delayed to throttle looping clients. */
5305 void rxi_SendDelayedCallAbort(struct rxevent *event, register struct rx_call *call,
5309 struct rx_packet *packet;
5311 MUTEX_ENTER(&call->lock);
5312 call->delayedAbortEvent = NULL;
5313 error = htonl(call->error);
5315 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5317 packet = rxi_SendSpecial(call, call->conn, packet,
5318 RX_PACKET_TYPE_ABORT, (char *)&error,
5320 rxi_FreePacket(packet);
5322 MUTEX_EXIT(&call->lock);
5325 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5326 * seconds) to ask the client to authenticate itself. The routine
5327 * issues a challenge to the client, which is obtained from the
5328 * security object associated with the connection */
5329 void rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn,
5332 int tries = (int) atries;
5333 conn->challengeEvent = NULL;
5334 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5335 register struct rx_packet *packet;
5339 /* We've failed to authenticate for too long.
5340 * Reset any calls waiting for authentication;
5341 * they are all in RX_STATE_PRECALL.
5345 MUTEX_ENTER(&conn->conn_call_lock);
5346 for (i=0; i<RX_MAXCALLS; i++) {
5347 struct rx_call *call = conn->call[i];
5349 MUTEX_ENTER(&call->lock);
5350 if (call->state == RX_STATE_PRECALL) {
5351 rxi_CallError(call, RX_CALL_DEAD);
5352 rxi_SendCallAbort(call, NULL, 0, 0);
5354 MUTEX_EXIT(&call->lock);
5357 MUTEX_EXIT(&conn->conn_call_lock);
5361 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5363 /* If there's no packet available, do this later. */
5364 RXS_GetChallenge(conn->securityObject, conn, packet);
5365 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5366 RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
5367 rxi_FreePacket(packet);
5369 clock_GetTime(&when);
5370 when.sec += RX_CHALLENGE_TIMEOUT;
5371 conn->challengeEvent =
5372 rxevent_Post(&when, rxi_ChallengeEvent, conn, (void *) (tries-1));
5376 /* Call this routine to start requesting the client to authenticate
5377 * itself. This will continue until authentication is established,
5378 * the call times out, or an invalid response is returned. The
5379 * security object associated with the connection is asked to create
5380 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5381 * defined earlier. */
5382 void rxi_ChallengeOn(register struct rx_connection *conn)
5384 if (!conn->challengeEvent) {
5385 RXS_CreateChallenge(conn->securityObject, conn);
5386 rxi_ChallengeEvent(NULL, conn, (void *) RX_CHALLENGE_MAXTRIES);
5391 /* Compute round trip time of the packet provided, in *rttp.
5394 /* rxi_ComputeRoundTripTime is called with peer locked. */
5395 /* sentp and/or peer may be null */
5396 void rxi_ComputeRoundTripTime(register struct rx_packet *p,
5397 register struct clock *sentp, register struct rx_peer *peer)
5399 struct clock thisRtt, *rttp = &thisRtt;
5401 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5402 /* making year 2038 bugs to get this running now - stroucki */
5403 struct timeval temptime;
5405 register int rtt_timeout;
5407 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5408 /* yet again. This was the worst Heisenbug of the port - stroucki */
5409 clock_GetTime(&temptime);
5410 rttp->sec=(afs_int32)temptime.tv_sec;
5411 rttp->usec=(afs_int32)temptime.tv_usec;
5413 clock_GetTime(rttp);
5415 if (clock_Lt(rttp, sentp)) {
5417 return; /* somebody set the clock back, don't count this time. */
5419 clock_Sub(rttp, sentp);
5420 MUTEX_ENTER(&rx_stats_mutex);
5421 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5422 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5423 if (rttp->sec > 60) {
5424 MUTEX_EXIT(&rx_stats_mutex);
5425 return; /* somebody set the clock ahead */
5427 rx_stats.maxRtt = *rttp;
5429 clock_Add(&rx_stats.totalRtt, rttp);
5430 rx_stats.nRttSamples++;
5431 MUTEX_EXIT(&rx_stats_mutex);
5433 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5435 /* Apply VanJacobson round-trip estimations */
5440 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5441 * srtt is stored as fixed point with 3 bits after the binary
5442 * point (i.e., scaled by 8). The following magic is
5443 * equivalent to the smoothing algorithm in rfc793 with an
5444 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5445 * srtt*8 = srtt*8 + rtt - srtt
5446 * srtt = srtt + rtt/8 - srtt/8
5449 delta = MSEC(rttp) - (peer->rtt >> 3);
5453 * We accumulate a smoothed rtt variance (actually, a smoothed
5454 * mean difference), then set the retransmit timer to smoothed
5455 * rtt + 4 times the smoothed variance (was 2x in van's original
5456 * paper, but 4x works better for me, and apparently for him as
5458 * rttvar is stored as
5459 * fixed point with 2 bits after the binary point (scaled by
5460 * 4). The following is equivalent to rfc793 smoothing with
5461 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5462 * replaces rfc793's wired-in beta.
5463 * dev*4 = dev*4 + (|actual - expected| - dev)
5469 delta -= (peer->rtt_dev >> 2);
5470 peer->rtt_dev += delta;
5473 /* I don't have a stored RTT so I start with this value. Since I'm
5474 * probably just starting a call, and will be pushing more data down
5475 * this, I expect congestion to increase rapidly. So I fudge a
5476 * little, and I set deviance to half the rtt. In practice,
5477 * deviance tends to approach something a little less than
5478 * half the smoothed rtt. */
5479 peer->rtt = (MSEC(rttp) << 3) + 8;
5480 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5482 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5483 * the other of these connections is usually in a user process, and can
5484 * be switched and/or swapped out. So on fast, reliable networks, the
5485 * timeout would otherwise be too short.
5487 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5488 clock_Zero(&(peer->timeout));
5489 clock_Addmsec(&(peer->timeout), rtt_timeout);
5491 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5492 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5493 (peer->timeout.sec),(peer->timeout.usec)) );
5497 /* Find all server connections that have not been active for a long time, and
5499 void rxi_ReapConnections(void)
5502 clock_GetTime(&now);
5504 /* Find server connection structures that haven't been used for
5505 * greater than rx_idleConnectionTime */
5506 { struct rx_connection **conn_ptr, **conn_end;
5507 int i, havecalls = 0;
5508 MUTEX_ENTER(&rx_connHashTable_lock);
5509 for (conn_ptr = &rx_connHashTable[0],
5510 conn_end = &rx_connHashTable[rx_hashTableSize];
5511 conn_ptr < conn_end; conn_ptr++) {
5512 struct rx_connection *conn, *next;
5513 struct rx_call *call;
5517 for (conn = *conn_ptr; conn; conn = next) {
5518 /* XXX -- Shouldn't the connection be locked? */
5521 for(i=0;i<RX_MAXCALLS;i++) {
5522 call = conn->call[i];
5525 MUTEX_ENTER(&call->lock);
5526 #ifdef RX_ENABLE_LOCKS
5527 result = rxi_CheckCall(call, 1);
5528 #else /* RX_ENABLE_LOCKS */
5529 result = rxi_CheckCall(call);
5530 #endif /* RX_ENABLE_LOCKS */
5531 MUTEX_EXIT(&call->lock);
5533 /* If CheckCall freed the call, it might
5534 * have destroyed the connection as well,
5535 * which screws up the linked lists.
5541 if (conn->type == RX_SERVER_CONNECTION) {
5542 /* This only actually destroys the connection if
5543 * there are no outstanding calls */
5544 MUTEX_ENTER(&conn->conn_data_lock);
5545 if (!havecalls && !conn->refCount &&
5546 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5547 conn->refCount++; /* it will be decr in rx_DestroyConn */
5548 MUTEX_EXIT(&conn->conn_data_lock);
5549 #ifdef RX_ENABLE_LOCKS
5550 rxi_DestroyConnectionNoLock(conn);
5551 #else /* RX_ENABLE_LOCKS */
5552 rxi_DestroyConnection(conn);
5553 #endif /* RX_ENABLE_LOCKS */
5555 #ifdef RX_ENABLE_LOCKS
5557 MUTEX_EXIT(&conn->conn_data_lock);
5559 #endif /* RX_ENABLE_LOCKS */
5563 #ifdef RX_ENABLE_LOCKS
5564 while (rx_connCleanup_list) {
5565 struct rx_connection *conn;
5566 conn = rx_connCleanup_list;
5567 rx_connCleanup_list = rx_connCleanup_list->next;
5568 MUTEX_EXIT(&rx_connHashTable_lock);
5569 rxi_CleanupConnection(conn);
5570 MUTEX_ENTER(&rx_connHashTable_lock);
5572 MUTEX_EXIT(&rx_connHashTable_lock);
5573 #endif /* RX_ENABLE_LOCKS */
5576 /* Find any peer structures that haven't been used (haven't had an
5577 * associated connection) for greater than rx_idlePeerTime */
5578 { struct rx_peer **peer_ptr, **peer_end;
5580 MUTEX_ENTER(&rx_rpc_stats);
5581 MUTEX_ENTER(&rx_peerHashTable_lock);
5582 for (peer_ptr = &rx_peerHashTable[0],
5583 peer_end = &rx_peerHashTable[rx_hashTableSize];
5584 peer_ptr < peer_end; peer_ptr++) {
5585 struct rx_peer *peer, *next, *prev;
5586 for (prev = peer = *peer_ptr; peer; peer = next) {
5588 code = MUTEX_TRYENTER(&peer->peer_lock);
5589 if ((code) && (peer->refCount == 0)
5590 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5591 rx_interface_stat_p rpc_stat, nrpc_stat;
5593 MUTEX_EXIT(&peer->peer_lock);
5594 MUTEX_DESTROY(&peer->peer_lock);
5595 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5596 rx_interface_stat)) {
5597 unsigned int num_funcs;
5598 if (!rpc_stat) break;
5599 queue_Remove(&rpc_stat->queue_header);
5600 queue_Remove(&rpc_stat->all_peers);
5601 num_funcs = rpc_stat->stats[0].func_total;
5602 space = sizeof(rx_interface_stat_t) +
5603 rpc_stat->stats[0].func_total *
5604 sizeof(rx_function_entry_v1_t);
5606 rxi_Free(rpc_stat, space);
5607 rxi_rpc_peer_stat_cnt -= num_funcs;
5610 MUTEX_ENTER(&rx_stats_mutex);
5611 rx_stats.nPeerStructs--;
5612 MUTEX_EXIT(&rx_stats_mutex);
5613 if (prev == *peer_ptr) {
5622 MUTEX_EXIT(&peer->peer_lock);
5628 MUTEX_EXIT(&rx_peerHashTable_lock);
5629 MUTEX_EXIT(&rx_rpc_stats);
5632 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5633 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5634 GC, just below. Really, we shouldn't have to keep moving packets from
5635 one place to another, but instead ought to always know if we can
5636 afford to hold onto a packet in its particular use. */
5637 MUTEX_ENTER(&rx_freePktQ_lock);
5638 if (rx_waitingForPackets) {
5639 rx_waitingForPackets = 0;
5640 #ifdef RX_ENABLE_LOCKS
5641 CV_BROADCAST(&rx_waitingForPackets_cv);
5643 osi_rxWakeup(&rx_waitingForPackets);
5646 MUTEX_EXIT(&rx_freePktQ_lock);
5648 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5649 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5653 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5654 * rx.h is sort of strange this is better. This is called with a security
5655 * object before it is discarded. Each connection using a security object has
5656 * its own refcount to the object so it won't actually be freed until the last
5657 * connection is destroyed.
5659 * This is the only rxs module call. A hold could also be written but no one
5662 int rxs_Release (struct rx_securityClass *aobj)
5664 return RXS_Close (aobj);
5668 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5669 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5670 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5671 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5673 /* Adjust our estimate of the transmission rate to this peer, given
5674 * that the packet p was just acked. We can adjust peer->timeout and
5675 * call->twind. Pragmatically, this is called
5676 * only with packets of maximal length.
5677 * Called with peer and call locked.
5680 static void rxi_ComputeRate(register struct rx_peer *peer,
5681 register struct rx_call *call, struct rx_packet *p,
5682 struct rx_packet *ackp, u_char ackReason)
5684 afs_int32 xferSize, xferMs;
5685 register afs_int32 minTime;
5688 /* Count down packets */
5689 if (peer->rateFlag > 0) peer->rateFlag--;
5690 /* Do nothing until we're enabled */
5691 if (peer->rateFlag != 0) return;
5692 if (!call->conn) return;
5694 /* Count only when the ack seems legitimate */
5695 switch (ackReason) {
5696 case RX_ACK_REQUESTED:
5697 xferSize = p->length + RX_HEADER_SIZE +
5698 call->conn->securityMaxTrailerSize;
5702 case RX_ACK_PING_RESPONSE:
5703 if (p) /* want the response to ping-request, not data send */
5705 clock_GetTime(&newTO);
5706 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5707 clock_Sub(&newTO, &call->pingRequestTime);
5708 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5712 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5719 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5720 ntohl(peer->host), ntohs(peer->port),
5721 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5722 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5725 /* Track only packets that are big enough. */
5726 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5730 /* absorb RTT data (in milliseconds) for these big packets */
5731 if (peer->smRtt == 0) {
5732 peer->smRtt = xferMs;
5734 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5735 if (!peer->smRtt) peer->smRtt = 1;
5738 if (peer->countDown) {
5742 peer->countDown = 10; /* recalculate only every so often */
5744 /* In practice, we can measure only the RTT for full packets,
5745 * because of the way Rx acks the data that it receives. (If it's
5746 * smaller than a full packet, it often gets implicitly acked
5747 * either by the call response (from a server) or by the next call
5748 * (from a client), and either case confuses transmission times
5749 * with processing times.) Therefore, replace the above
5750 * more-sophisticated processing with a simpler version, where the
5751 * smoothed RTT is kept for full-size packets, and the time to
5752 * transmit a windowful of full-size packets is simply RTT *
5753 * windowSize. Again, we take two steps:
5754 - ensure the timeout is large enough for a single packet's RTT;
5755 - ensure that the window is small enough to fit in the desired timeout.*/
5757 /* First, the timeout check. */
5758 minTime = peer->smRtt;
5759 /* Get a reasonable estimate for a timeout period */
5761 newTO.sec = minTime / 1000;
5762 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5764 /* Increase the timeout period so that we can always do at least
5765 * one packet exchange */
5766 if (clock_Gt(&newTO, &peer->timeout)) {
5768 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5769 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5770 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5773 peer->timeout = newTO;
5776 /* Now, get an estimate for the transmit window size. */
5777 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5778 /* Now, convert to the number of full packets that could fit in a
5779 * reasonable fraction of that interval */
5780 minTime /= (peer->smRtt << 1);
5781 xferSize = minTime; /* (make a copy) */
5783 /* Now clamp the size to reasonable bounds. */
5784 if (minTime <= 1) minTime = 1;
5785 else if (minTime > rx_Window) minTime = rx_Window;
5786 /* if (minTime != peer->maxWindow) {
5787 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5788 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5789 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5791 peer->maxWindow = minTime;
5792 elide... call->twind = minTime;
5796 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5797 * Discern this by calculating the timeout necessary for rx_Window
5799 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5800 /* calculate estimate for transmission interval in milliseconds */
5801 minTime = rx_Window * peer->smRtt;
5802 if (minTime < 1000) {
5803 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5804 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5805 peer->timeout.usec, peer->smRtt,
5808 newTO.sec = 0; /* cut back on timeout by half a second */
5809 newTO.usec = 500000;
5810 clock_Sub(&peer->timeout, &newTO);
5815 } /* end of rxi_ComputeRate */
5816 #endif /* ADAPT_WINDOW */
5824 /* Don't call this debugging routine directly; use dpf */
5826 rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8, int a9, int a10,
5827 int a11, int a12, int a13, int a14, int a15)
5830 clock_GetTime(&now);
5831 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5832 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5839 * This function is used to process the rx_stats structure that is local
5840 * to a process as well as an rx_stats structure received from a remote
5841 * process (via rxdebug). Therefore, it needs to do minimal version
5844 void rx_PrintTheseStats (FILE *file, struct rx_stats *s, int size,
5845 afs_int32 freePackets, char version)
5849 if (size != sizeof(struct rx_stats)) {
5851 "Unexpected size of stats structure: was %d, expected %d\n",
5852 size, sizeof(struct rx_stats));
5856 "rx stats: free packets %d, allocs %d, ",
5860 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5862 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5863 s->receivePktAllocFailures,
5864 s->receiveCbufPktAllocFailures,
5865 s->sendPktAllocFailures,
5866 s->sendCbufPktAllocFailures,
5867 s->specialPktAllocFailures);
5870 "alloc-failures(rcv %d,send %d,ack %d)\n",
5871 s->receivePktAllocFailures,
5872 s->sendPktAllocFailures,
5873 s->specialPktAllocFailures);
5878 "bogusReads %d (last from host %x), "
5884 s->bogusPacketOnRead,
5887 s->noPacketBuffersOnRead,
5891 fprintf(file, " packets read: ");
5892 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5898 fprintf(file, "\n");
5901 " other read counters: data %d, "
5909 s->spuriousPacketsRead,
5910 s->ignorePacketDally);
5912 fprintf(file, " packets sent: ");
5913 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5919 fprintf(file, "\n");
5922 " other send counters: ack %d, "
5923 "data %d (not resends), "
5926 "acked&ignored %d\n",
5929 s->dataPacketsReSent,
5930 s->dataPacketsPushed,
5931 s->ignoreAckedPacket);
5934 " \t(these should be small) sendFailed %d, "
5937 (int) s->fatalErrors);
5939 if (s->nRttSamples) {
5941 " Average rtt is %0.3f, with %d samples\n",
5942 clock_Float(&s->totalRtt)/s->nRttSamples,
5946 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5947 clock_Float(&s->minRtt),
5948 clock_Float(&s->maxRtt));
5952 " %d server connections, "
5953 "%d client connections, "
5956 "%d free call structs\n",
5961 s->nFreeCallStructs);
5963 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5965 " %d clock updates\n",
5971 /* for backward compatibility */
5972 void rx_PrintStats(FILE *file)
5974 MUTEX_ENTER(&rx_stats_mutex);
5975 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5976 MUTEX_EXIT(&rx_stats_mutex);
5979 void rx_PrintPeerStats(FILE *file, struct rx_peer *peer)
5984 "burst wait %u.%d.\n",
5987 (int) peer->burstSize,
5988 (int) peer->burstWait.sec,
5989 (int) peer->burstWait.usec);
5993 "retry time %u.%06d, "
5997 (int) peer->timeout.sec,
5998 (int) peer->timeout.usec,
6004 "max in packet skew %d, "
6005 "max out packet skew %d\n",
6007 (int) peer->inPacketSkew,
6008 (int) peer->outPacketSkew);
6011 #ifdef AFS_PTHREAD_ENV
6013 * This mutex protects the following static variables:
6017 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
6018 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
6020 #define LOCK_RX_DEBUG
6021 #define UNLOCK_RX_DEBUG
6022 #endif /* AFS_PTHREAD_ENV */
6024 static int MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr,
6025 afs_uint16 remotePort, u_char type, void *inputData, size_t inputLength,
6026 void *outputData, size_t outputLength)
6028 static afs_int32 counter = 100;
6030 struct rx_header theader;
6032 register afs_int32 code;
6034 struct sockaddr_in taddr, faddr;
6039 endTime = time(0) + 20; /* try for 20 seconds */
6043 tp = &tbuffer[sizeof(struct rx_header)];
6044 taddr.sin_family = AF_INET;
6045 taddr.sin_port = remotePort;
6046 taddr.sin_addr.s_addr = remoteAddr;
6047 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
6048 taddr.sin_len = sizeof(struct sockaddr_in);
6051 memset(&theader, 0, sizeof(theader));
6052 theader.epoch = htonl(999);
6054 theader.callNumber = htonl(counter);
6057 theader.type = type;
6058 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
6059 theader.serviceId = 0;
6061 memcpy(tbuffer, &theader, sizeof(theader));
6062 memcpy(tp, inputData, inputLength);
6063 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
6064 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
6066 /* see if there's a packet available */
6068 FD_SET(socket, &imask);
6071 code = select(socket+1, &imask, 0, 0, &tv);
6073 /* now receive a packet */
6074 faddrLen = sizeof(struct sockaddr_in);
6075 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6076 (struct sockaddr *) &faddr, &faddrLen);
6078 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6079 if (counter == ntohl(theader.callNumber)) break;
6082 /* see if we've timed out */
6083 if (endTime < time(0)) return -1;
6085 code -= sizeof(struct rx_header);
6086 if (code > outputLength) code = outputLength;
6087 memcpy(outputData, tp, code);
6091 afs_int32 rx_GetServerDebug(osi_socket socket, afs_uint32 remoteAddr,
6092 afs_uint16 remotePort, struct rx_debugStats *stat, afs_uint32 *supportedValues)
6094 struct rx_debugIn in;
6097 *supportedValues = 0;
6098 in.type = htonl(RX_DEBUGI_GETSTATS);
6101 rc = MakeDebugCall(socket,
6104 RX_PACKET_TYPE_DEBUG,
6111 * If the call was successful, fixup the version and indicate
6112 * what contents of the stat structure are valid.
6113 * Also do net to host conversion of fields here.
6117 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6118 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6120 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6121 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6123 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6124 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6126 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6127 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6129 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6130 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6132 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6133 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6135 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6136 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6139 stat->nFreePackets = ntohl(stat->nFreePackets);
6140 stat->packetReclaims = ntohl(stat->packetReclaims);
6141 stat->callsExecuted = ntohl(stat->callsExecuted);
6142 stat->nWaiting = ntohl(stat->nWaiting);
6143 stat->idleThreads = ntohl(stat->idleThreads);
6149 afs_int32 rx_GetServerStats(osi_socket socket, afs_uint32 remoteAddr,
6150 afs_uint16 remotePort, struct rx_stats *stat, afs_uint32 *supportedValues)
6152 struct rx_debugIn in;
6153 afs_int32 *lp = (afs_int32 *) stat;
6158 * supportedValues is currently unused, but added to allow future
6159 * versioning of this function.
6162 *supportedValues = 0;
6163 in.type = htonl(RX_DEBUGI_RXSTATS);
6165 memset(stat, 0, sizeof(*stat));
6167 rc = MakeDebugCall(socket,
6170 RX_PACKET_TYPE_DEBUG,
6179 * Do net to host conversion here
6182 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6190 afs_int32 rx_GetServerVersion(osi_socket socket, afs_uint32 remoteAddr,
6191 afs_uint16 remotePort, size_t version_length, char *version)
6194 return MakeDebugCall(socket,
6197 RX_PACKET_TYPE_VERSION,
6204 afs_int32 rx_GetServerConnections(osi_socket socket, afs_uint32 remoteAddr,
6205 afs_uint16 remotePort, afs_int32 *nextConnection, int allConnections,
6206 afs_uint32 debugSupportedValues, struct rx_debugConn *conn, afs_uint32 *supportedValues)
6208 struct rx_debugIn in;
6213 * supportedValues is currently unused, but added to allow future
6214 * versioning of this function.
6217 *supportedValues = 0;
6218 if (allConnections) {
6219 in.type = htonl(RX_DEBUGI_GETALLCONN);
6221 in.type = htonl(RX_DEBUGI_GETCONN);
6223 in.index = htonl(*nextConnection);
6224 memset(conn, 0, sizeof(*conn));
6226 rc = MakeDebugCall(socket,
6229 RX_PACKET_TYPE_DEBUG,
6236 *nextConnection += 1;
6239 * Convert old connection format to new structure.
6242 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6243 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6244 #define MOVEvL(a) (conn->a = vL->a)
6246 /* any old or unrecognized version... */
6247 for (i=0;i<RX_MAXCALLS;i++) {
6248 MOVEvL(callState[i]);
6249 MOVEvL(callMode[i]);
6250 MOVEvL(callFlags[i]);
6251 MOVEvL(callOther[i]);
6253 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6254 MOVEvL(secStats.type);
6255 MOVEvL(secStats.level);
6256 MOVEvL(secStats.flags);
6257 MOVEvL(secStats.expires);
6258 MOVEvL(secStats.packetsReceived);
6259 MOVEvL(secStats.packetsSent);
6260 MOVEvL(secStats.bytesReceived);
6261 MOVEvL(secStats.bytesSent);
6266 * Do net to host conversion here
6268 * I don't convert host or port since we are most likely
6269 * going to want these in NBO.
6271 conn->cid = ntohl(conn->cid);
6272 conn->serial = ntohl(conn->serial);
6273 for(i=0;i<RX_MAXCALLS;i++) {
6274 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6276 conn->error = ntohl(conn->error);
6277 conn->secStats.flags = ntohl(conn->secStats.flags);
6278 conn->secStats.expires = ntohl(conn->secStats.expires);
6279 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6280 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6281 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6282 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6283 conn->epoch = ntohl(conn->epoch);
6284 conn->natMTU = ntohl(conn->natMTU);
6290 afs_int32 rx_GetServerPeers(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
6291 afs_int32 *nextPeer, afs_uint32 debugSupportedValues, struct rx_debugPeer *peer,
6292 afs_uint32 *supportedValues)
6294 struct rx_debugIn in;
6298 * supportedValues is currently unused, but added to allow future
6299 * versioning of this function.
6302 *supportedValues = 0;
6303 in.type = htonl(RX_DEBUGI_GETPEER);
6304 in.index = htonl(*nextPeer);
6305 memset(peer, 0, sizeof(*peer));
6307 rc = MakeDebugCall(socket,
6310 RX_PACKET_TYPE_DEBUG,
6320 * Do net to host conversion here
6322 * I don't convert host or port since we are most likely
6323 * going to want these in NBO.
6325 peer->ifMTU = ntohs(peer->ifMTU);
6326 peer->idleWhen = ntohl(peer->idleWhen);
6327 peer->refCount = ntohs(peer->refCount);
6328 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6329 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6330 peer->rtt = ntohl(peer->rtt);
6331 peer->rtt_dev = ntohl(peer->rtt_dev);
6332 peer->timeout.sec = ntohl(peer->timeout.sec);
6333 peer->timeout.usec = ntohl(peer->timeout.usec);
6334 peer->nSent = ntohl(peer->nSent);
6335 peer->reSends = ntohl(peer->reSends);
6336 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6337 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6338 peer->rateFlag = ntohl(peer->rateFlag);
6339 peer->natMTU = ntohs(peer->natMTU);
6340 peer->maxMTU = ntohs(peer->maxMTU);
6341 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6342 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6343 peer->MTU = ntohs(peer->MTU);
6344 peer->cwind = ntohs(peer->cwind);
6345 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6346 peer->congestSeq = ntohs(peer->congestSeq);
6347 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6348 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6349 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6350 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6355 #endif /* RXDEBUG */
6357 void shutdown_rx(void)
6359 struct rx_serverQueueEntry *np;
6361 register struct rx_call *call;
6362 register struct rx_serverQueueEntry *sq;
6365 if (rxinit_status == 1) {
6367 return; /* Already shutdown. */
6372 #ifndef AFS_PTHREAD_ENV
6373 FD_ZERO(&rx_selectMask);
6374 #endif /* AFS_PTHREAD_ENV */
6375 rxi_dataQuota = RX_MAX_QUOTA;
6376 #ifndef AFS_PTHREAD_ENV
6378 #endif /* AFS_PTHREAD_ENV */
6381 #ifndef AFS_PTHREAD_ENV
6382 #ifndef AFS_USE_GETTIMEOFDAY
6384 #endif /* AFS_USE_GETTIMEOFDAY */
6385 #endif /* AFS_PTHREAD_ENV */
6387 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6388 call = queue_First(&rx_freeCallQueue, rx_call);
6390 rxi_Free(call, sizeof(struct rx_call));
6393 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6394 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6400 struct rx_peer **peer_ptr, **peer_end;
6401 for (peer_ptr = &rx_peerHashTable[0],
6402 peer_end = &rx_peerHashTable[rx_hashTableSize];
6403 peer_ptr < peer_end; peer_ptr++) {
6404 struct rx_peer *peer, *next;
6405 for (peer = *peer_ptr; peer; peer = next) {
6406 rx_interface_stat_p rpc_stat, nrpc_stat;
6408 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6409 rx_interface_stat)) {
6410 unsigned int num_funcs;
6411 if (!rpc_stat) break;
6412 queue_Remove(&rpc_stat->queue_header);
6413 queue_Remove(&rpc_stat->all_peers);
6414 num_funcs = rpc_stat->stats[0].func_total;
6415 space = sizeof(rx_interface_stat_t) +
6416 rpc_stat->stats[0].func_total *
6417 sizeof(rx_function_entry_v1_t);
6419 rxi_Free(rpc_stat, space);
6420 MUTEX_ENTER(&rx_rpc_stats);
6421 rxi_rpc_peer_stat_cnt -= num_funcs;
6422 MUTEX_EXIT(&rx_rpc_stats);
6426 MUTEX_ENTER(&rx_stats_mutex);
6427 rx_stats.nPeerStructs--;
6428 MUTEX_EXIT(&rx_stats_mutex);
6432 for (i = 0; i<RX_MAX_SERVICES; i++) {
6434 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6436 for (i = 0; i < rx_hashTableSize; i++) {
6437 register struct rx_connection *tc, *ntc;
6438 MUTEX_ENTER(&rx_connHashTable_lock);
6439 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6441 for (j = 0; j < RX_MAXCALLS; j++) {
6443 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6446 rxi_Free(tc, sizeof(*tc));
6448 MUTEX_EXIT(&rx_connHashTable_lock);
6451 MUTEX_ENTER(&freeSQEList_lock);
6453 while ((np = rx_FreeSQEList)) {
6454 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6455 MUTEX_DESTROY(&np->lock);
6456 rxi_Free(np, sizeof(*np));
6459 MUTEX_EXIT(&freeSQEList_lock);
6460 MUTEX_DESTROY(&freeSQEList_lock);
6461 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6462 MUTEX_DESTROY(&rx_connHashTable_lock);
6463 MUTEX_DESTROY(&rx_peerHashTable_lock);
6464 MUTEX_DESTROY(&rx_serverPool_lock);
6466 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6467 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6469 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6470 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6472 rxi_FreeAllPackets();
6474 MUTEX_ENTER(&rx_stats_mutex);
6475 rxi_dataQuota = RX_MAX_QUOTA;
6476 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6477 MUTEX_EXIT(&rx_stats_mutex);
6483 #ifdef RX_ENABLE_LOCKS
6484 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6486 if (!MUTEX_ISMINE(lockaddr))
6487 osi_Panic("Lock not held: %s", msg);
6489 #endif /* RX_ENABLE_LOCKS */
6494 * Routines to implement connection specific data.
6497 int rx_KeyCreate(rx_destructor_t rtn)
6500 MUTEX_ENTER(&rxi_keyCreate_lock);
6501 key = rxi_keyCreate_counter++;
6502 rxi_keyCreate_destructor = (rx_destructor_t *)
6503 realloc((void *)rxi_keyCreate_destructor,
6504 (key+1) * sizeof(rx_destructor_t));
6505 rxi_keyCreate_destructor[key] = rtn;
6506 MUTEX_EXIT(&rxi_keyCreate_lock);
6510 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6513 MUTEX_ENTER(&conn->conn_data_lock);
6514 if (!conn->specific) {
6515 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6516 for (i = 0 ; i < key ; i++)
6517 conn->specific[i] = NULL;
6518 conn->nSpecific = key+1;
6519 conn->specific[key] = ptr;
6520 } else if (key >= conn->nSpecific) {
6521 conn->specific = (void **)
6522 realloc(conn->specific,(key+1)*sizeof(void *));
6523 for (i = conn->nSpecific ; i < key ; i++)
6524 conn->specific[i] = NULL;
6525 conn->nSpecific = key+1;
6526 conn->specific[key] = ptr;
6528 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6529 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6530 conn->specific[key] = ptr;
6532 MUTEX_EXIT(&conn->conn_data_lock);
6535 void *rx_GetSpecific(struct rx_connection *conn, int key)
6538 MUTEX_ENTER(&conn->conn_data_lock);
6539 if (key >= conn->nSpecific)
6542 ptr = conn->specific[key];
6543 MUTEX_EXIT(&conn->conn_data_lock);
6547 #endif /* !KERNEL */
6550 * processStats is a queue used to store the statistics for the local
6551 * process. Its contents are similar to the contents of the rpcStats
6552 * queue on a rx_peer structure, but the actual data stored within
6553 * this queue contains totals across the lifetime of the process (assuming
6554 * the stats have not been reset) - unlike the per peer structures
6555 * which can come and go based upon the peer lifetime.
6558 static struct rx_queue processStats = {&processStats,&processStats};
6561 * peerStats is a queue used to store the statistics for all peer structs.
6562 * Its contents are the union of all the peer rpcStats queues.
6565 static struct rx_queue peerStats = {&peerStats,&peerStats};
6568 * rxi_monitor_processStats is used to turn process wide stat collection
6572 static int rxi_monitor_processStats = 0;
6575 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6578 static int rxi_monitor_peerStats = 0;
6581 * rxi_AddRpcStat - given all of the information for a particular rpc
6582 * call, create (if needed) and update the stat totals for the rpc.
6586 * IN stats - the queue of stats that will be updated with the new value
6588 * IN rxInterface - a unique number that identifies the rpc interface
6590 * IN currentFunc - the index of the function being invoked
6592 * IN totalFunc - the total number of functions in this interface
6594 * IN queueTime - the amount of time this function waited for a thread
6596 * IN execTime - the amount of time this function invocation took to execute
6598 * IN bytesSent - the number bytes sent by this invocation
6600 * IN bytesRcvd - the number bytes received by this invocation
6602 * IN isServer - if true, this invocation was made to a server
6604 * IN remoteHost - the ip address of the remote host
6606 * IN remotePort - the port of the remote host
6608 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6610 * INOUT counter - if a new stats structure is allocated, the counter will
6611 * be updated with the new number of allocated stat structures
6618 static int rxi_AddRpcStat(
6619 struct rx_queue *stats,
6620 afs_uint32 rxInterface,
6621 afs_uint32 currentFunc,
6622 afs_uint32 totalFunc,
6623 struct clock *queueTime,
6624 struct clock *execTime,
6625 afs_hyper_t *bytesSent,
6626 afs_hyper_t *bytesRcvd,
6628 afs_uint32 remoteHost,
6629 afs_uint32 remotePort,
6631 unsigned int *counter)
6634 rx_interface_stat_p rpc_stat, nrpc_stat;
6637 * See if there's already a structure for this interface
6640 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6641 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6642 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6646 * Didn't find a match so allocate a new structure and add it to the
6650 if (queue_IsEnd(stats, rpc_stat) ||
6651 (rpc_stat == NULL) ||
6652 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6653 (rpc_stat->stats[0].remote_is_server != isServer)) {
6657 space = sizeof(rx_interface_stat_t) + totalFunc *
6658 sizeof(rx_function_entry_v1_t);
6660 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6661 if (rpc_stat == NULL) {
6665 *counter += totalFunc;
6666 for(i=0;i<totalFunc;i++) {
6667 rpc_stat->stats[i].remote_peer = remoteHost;
6668 rpc_stat->stats[i].remote_port = remotePort;
6669 rpc_stat->stats[i].remote_is_server = isServer;
6670 rpc_stat->stats[i].interfaceId = rxInterface;
6671 rpc_stat->stats[i].func_total = totalFunc;
6672 rpc_stat->stats[i].func_index = i;
6673 hzero(rpc_stat->stats[i].invocations);
6674 hzero(rpc_stat->stats[i].bytes_sent);
6675 hzero(rpc_stat->stats[i].bytes_rcvd);
6676 rpc_stat->stats[i].queue_time_sum.sec = 0;
6677 rpc_stat->stats[i].queue_time_sum.usec = 0;
6678 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6679 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6680 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6681 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6682 rpc_stat->stats[i].queue_time_max.sec = 0;
6683 rpc_stat->stats[i].queue_time_max.usec = 0;
6684 rpc_stat->stats[i].execution_time_sum.sec = 0;
6685 rpc_stat->stats[i].execution_time_sum.usec = 0;
6686 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6687 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6688 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6689 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6690 rpc_stat->stats[i].execution_time_max.sec = 0;
6691 rpc_stat->stats[i].execution_time_max.usec = 0;
6693 queue_Prepend(stats, rpc_stat);
6694 if (addToPeerList) {
6695 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6700 * Increment the stats for this function
6703 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6704 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6705 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6706 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6707 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6708 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6709 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6711 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6712 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6714 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6715 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6716 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6717 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6719 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6720 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6728 * rx_IncrementTimeAndCount - increment the times and count for a particular
6733 * IN peer - the peer who invoked the rpc
6735 * IN rxInterface - a unique number that identifies the rpc interface
6737 * IN currentFunc - the index of the function being invoked
6739 * IN totalFunc - the total number of functions in this interface
6741 * IN queueTime - the amount of time this function waited for a thread
6743 * IN execTime - the amount of time this function invocation took to execute
6745 * IN bytesSent - the number bytes sent by this invocation
6747 * IN bytesRcvd - the number bytes received by this invocation
6749 * IN isServer - if true, this invocation was made to a server
6756 void rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
6757 afs_uint32 currentFunc, afs_uint32 totalFunc, struct clock *queueTime,
6758 struct clock *execTime, afs_hyper_t *bytesSent, afs_hyper_t *bytesRcvd, int isServer)
6761 MUTEX_ENTER(&rx_rpc_stats);
6762 MUTEX_ENTER(&peer->peer_lock);
6764 if (rxi_monitor_peerStats) {
6765 rxi_AddRpcStat(&peer->rpcStats,
6777 &rxi_rpc_peer_stat_cnt);
6780 if (rxi_monitor_processStats) {
6781 rxi_AddRpcStat(&processStats,
6793 &rxi_rpc_process_stat_cnt);
6796 MUTEX_EXIT(&peer->peer_lock);
6797 MUTEX_EXIT(&rx_rpc_stats);
6802 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6806 * IN callerVersion - the rpc stat version of the caller.
6808 * IN count - the number of entries to marshall.
6810 * IN stats - pointer to stats to be marshalled.
6812 * OUT ptr - Where to store the marshalled data.
6818 void rx_MarshallProcessRPCStats(afs_uint32 callerVersion,
6819 int count, rx_function_entry_v1_t *stats, afs_uint32 **ptrP)
6825 * We only support the first version
6827 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6828 *(ptr++) = stats->remote_peer;
6829 *(ptr++) = stats->remote_port;
6830 *(ptr++) = stats->remote_is_server;
6831 *(ptr++) = stats->interfaceId;
6832 *(ptr++) = stats->func_total;
6833 *(ptr++) = stats->func_index;
6834 *(ptr++) = hgethi(stats->invocations);
6835 *(ptr++) = hgetlo(stats->invocations);
6836 *(ptr++) = hgethi(stats->bytes_sent);
6837 *(ptr++) = hgetlo(stats->bytes_sent);
6838 *(ptr++) = hgethi(stats->bytes_rcvd);
6839 *(ptr++) = hgetlo(stats->bytes_rcvd);
6840 *(ptr++) = stats->queue_time_sum.sec;
6841 *(ptr++) = stats->queue_time_sum.usec;
6842 *(ptr++) = stats->queue_time_sum_sqr.sec;
6843 *(ptr++) = stats->queue_time_sum_sqr.usec;
6844 *(ptr++) = stats->queue_time_min.sec;
6845 *(ptr++) = stats->queue_time_min.usec;
6846 *(ptr++) = stats->queue_time_max.sec;
6847 *(ptr++) = stats->queue_time_max.usec;
6848 *(ptr++) = stats->execution_time_sum.sec;
6849 *(ptr++) = stats->execution_time_sum.usec;
6850 *(ptr++) = stats->execution_time_sum_sqr.sec;
6851 *(ptr++) = stats->execution_time_sum_sqr.usec;
6852 *(ptr++) = stats->execution_time_min.sec;
6853 *(ptr++) = stats->execution_time_min.usec;
6854 *(ptr++) = stats->execution_time_max.sec;
6855 *(ptr++) = stats->execution_time_max.usec;
6861 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6866 * IN callerVersion - the rpc stat version of the caller
6868 * OUT myVersion - the rpc stat version of this function
6870 * OUT clock_sec - local time seconds
6872 * OUT clock_usec - local time microseconds
6874 * OUT allocSize - the number of bytes allocated to contain stats
6876 * OUT statCount - the number stats retrieved from this process.
6878 * OUT stats - the actual stats retrieved from this process.
6882 * Returns void. If successful, stats will != NULL.
6885 int rx_RetrieveProcessRPCStats(afs_uint32 callerVersion,
6886 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6887 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6897 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6900 * Check to see if stats are enabled
6903 MUTEX_ENTER(&rx_rpc_stats);
6904 if (!rxi_monitor_processStats) {
6905 MUTEX_EXIT(&rx_rpc_stats);
6909 clock_GetTime(&now);
6910 *clock_sec = now.sec;
6911 *clock_usec = now.usec;
6914 * Allocate the space based upon the caller version
6916 * If the client is at an older version than we are,
6917 * we return the statistic data in the older data format, but
6918 * we still return our version number so the client knows we
6919 * are maintaining more data than it can retrieve.
6922 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6923 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6924 *statCount = rxi_rpc_process_stat_cnt;
6927 * This can't happen yet, but in the future version changes
6928 * can be handled by adding additional code here
6932 if (space > (size_t) 0) {
6934 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6937 rx_interface_stat_p rpc_stat, nrpc_stat;
6940 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6941 rx_interface_stat)) {
6943 * Copy the data based upon the caller version
6945 rx_MarshallProcessRPCStats(callerVersion,
6946 rpc_stat->stats[0].func_total,
6947 rpc_stat->stats, &ptr);
6953 MUTEX_EXIT(&rx_rpc_stats);
6958 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6962 * IN callerVersion - the rpc stat version of the caller
6964 * OUT myVersion - the rpc stat version of this function
6966 * OUT clock_sec - local time seconds
6968 * OUT clock_usec - local time microseconds
6970 * OUT allocSize - the number of bytes allocated to contain stats
6972 * OUT statCount - the number of stats retrieved from the individual
6975 * OUT stats - the actual stats retrieved from the individual peer structures.
6979 * Returns void. If successful, stats will != NULL.
6982 int rx_RetrievePeerRPCStats(afs_uint32 callerVersion,
6983 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6984 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6994 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6997 * Check to see if stats are enabled
7000 MUTEX_ENTER(&rx_rpc_stats);
7001 if (!rxi_monitor_peerStats) {
7002 MUTEX_EXIT(&rx_rpc_stats);
7006 clock_GetTime(&now);
7007 *clock_sec = now.sec;
7008 *clock_usec = now.usec;
7011 * Allocate the space based upon the caller version
7013 * If the client is at an older version than we are,
7014 * we return the statistic data in the older data format, but
7015 * we still return our version number so the client knows we
7016 * are maintaining more data than it can retrieve.
7019 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7020 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
7021 *statCount = rxi_rpc_peer_stat_cnt;
7024 * This can't happen yet, but in the future version changes
7025 * can be handled by adding additional code here
7029 if (space > (size_t) 0) {
7031 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7034 rx_interface_stat_p rpc_stat, nrpc_stat;
7037 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7038 rx_interface_stat)) {
7040 * We have to fix the offset of rpc_stat since we are
7041 * keeping this structure on two rx_queues. The rx_queue
7042 * package assumes that the rx_queue member is the first
7043 * member of the structure. That is, rx_queue assumes that
7044 * any one item is only on one queue at a time. We are
7045 * breaking that assumption and so we have to do a little
7046 * math to fix our pointers.
7049 fix_offset = (char *) rpc_stat;
7050 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7051 rpc_stat = (rx_interface_stat_p) fix_offset;
7054 * Copy the data based upon the caller version
7056 rx_MarshallProcessRPCStats(callerVersion,
7057 rpc_stat->stats[0].func_total,
7058 rpc_stat->stats, &ptr);
7064 MUTEX_EXIT(&rx_rpc_stats);
7069 * rx_FreeRPCStats - free memory allocated by
7070 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7074 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7075 * rx_RetrievePeerRPCStats
7077 * IN allocSize - the number of bytes in stats.
7084 void rx_FreeRPCStats(afs_uint32 *stats, size_t allocSize)
7086 rxi_Free(stats, allocSize);
7090 * rx_queryProcessRPCStats - see if process rpc stat collection is
7091 * currently enabled.
7097 * Returns 0 if stats are not enabled != 0 otherwise
7100 int rx_queryProcessRPCStats(void)
7103 MUTEX_ENTER(&rx_rpc_stats);
7104 rc = rxi_monitor_processStats;
7105 MUTEX_EXIT(&rx_rpc_stats);
7110 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7116 * Returns 0 if stats are not enabled != 0 otherwise
7119 int rx_queryPeerRPCStats(void)
7122 MUTEX_ENTER(&rx_rpc_stats);
7123 rc = rxi_monitor_peerStats;
7124 MUTEX_EXIT(&rx_rpc_stats);
7129 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7138 void rx_enableProcessRPCStats(void)
7140 MUTEX_ENTER(&rx_rpc_stats);
7141 rx_enable_stats = 1;
7142 rxi_monitor_processStats = 1;
7143 MUTEX_EXIT(&rx_rpc_stats);
7147 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7156 void rx_enablePeerRPCStats(void)
7158 MUTEX_ENTER(&rx_rpc_stats);
7159 rx_enable_stats = 1;
7160 rxi_monitor_peerStats = 1;
7161 MUTEX_EXIT(&rx_rpc_stats);
7165 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7174 void rx_disableProcessRPCStats(void)
7176 rx_interface_stat_p rpc_stat, nrpc_stat;
7179 MUTEX_ENTER(&rx_rpc_stats);
7182 * Turn off process statistics and if peer stats is also off, turn
7186 rxi_monitor_processStats = 0;
7187 if (rxi_monitor_peerStats == 0) {
7188 rx_enable_stats = 0;
7191 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7192 unsigned int num_funcs = 0;
7193 if (!rpc_stat) break;
7194 queue_Remove(rpc_stat);
7195 num_funcs = rpc_stat->stats[0].func_total;
7196 space = sizeof(rx_interface_stat_t) +
7197 rpc_stat->stats[0].func_total *
7198 sizeof(rx_function_entry_v1_t);
7200 rxi_Free(rpc_stat, space);
7201 rxi_rpc_process_stat_cnt -= num_funcs;
7203 MUTEX_EXIT(&rx_rpc_stats);
7207 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7216 void rx_disablePeerRPCStats(void)
7218 struct rx_peer **peer_ptr, **peer_end;
7221 MUTEX_ENTER(&rx_rpc_stats);
7224 * Turn off peer statistics and if process stats is also off, turn
7228 rxi_monitor_peerStats = 0;
7229 if (rxi_monitor_processStats == 0) {
7230 rx_enable_stats = 0;
7233 MUTEX_ENTER(&rx_peerHashTable_lock);
7234 for (peer_ptr = &rx_peerHashTable[0],
7235 peer_end = &rx_peerHashTable[rx_hashTableSize];
7236 peer_ptr < peer_end; peer_ptr++) {
7237 struct rx_peer *peer, *next, *prev;
7238 for (prev = peer = *peer_ptr; peer; peer = next) {
7240 code = MUTEX_TRYENTER(&peer->peer_lock);
7242 rx_interface_stat_p rpc_stat, nrpc_stat;
7244 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7245 rx_interface_stat)) {
7246 unsigned int num_funcs = 0;
7247 if (!rpc_stat) break;
7248 queue_Remove(&rpc_stat->queue_header);
7249 queue_Remove(&rpc_stat->all_peers);
7250 num_funcs = rpc_stat->stats[0].func_total;
7251 space = sizeof(rx_interface_stat_t) +
7252 rpc_stat->stats[0].func_total *
7253 sizeof(rx_function_entry_v1_t);
7255 rxi_Free(rpc_stat, space);
7256 rxi_rpc_peer_stat_cnt -= num_funcs;
7258 MUTEX_EXIT(&peer->peer_lock);
7259 if (prev == *peer_ptr) {
7271 MUTEX_EXIT(&rx_peerHashTable_lock);
7272 MUTEX_EXIT(&rx_rpc_stats);
7276 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7281 * IN clearFlag - flag indicating which stats to clear
7288 void rx_clearProcessRPCStats(afs_uint32 clearFlag)
7290 rx_interface_stat_p rpc_stat, nrpc_stat;
7292 MUTEX_ENTER(&rx_rpc_stats);
7294 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7295 unsigned int num_funcs = 0, i;
7296 num_funcs = rpc_stat->stats[0].func_total;
7297 for(i=0;i<num_funcs;i++) {
7298 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7299 hzero(rpc_stat->stats[i].invocations);
7301 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7302 hzero(rpc_stat->stats[i].bytes_sent);
7304 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7305 hzero(rpc_stat->stats[i].bytes_rcvd);
7307 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7308 rpc_stat->stats[i].queue_time_sum.sec = 0;
7309 rpc_stat->stats[i].queue_time_sum.usec = 0;
7311 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7312 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7313 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7315 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7316 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7317 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7319 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7320 rpc_stat->stats[i].queue_time_max.sec = 0;
7321 rpc_stat->stats[i].queue_time_max.usec = 0;
7323 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7324 rpc_stat->stats[i].execution_time_sum.sec = 0;
7325 rpc_stat->stats[i].execution_time_sum.usec = 0;
7327 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7328 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7329 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7331 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7332 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7333 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7335 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7336 rpc_stat->stats[i].execution_time_max.sec = 0;
7337 rpc_stat->stats[i].execution_time_max.usec = 0;
7342 MUTEX_EXIT(&rx_rpc_stats);
7346 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7351 * IN clearFlag - flag indicating which stats to clear
7358 void rx_clearPeerRPCStats(afs_uint32 clearFlag)
7360 rx_interface_stat_p rpc_stat, nrpc_stat;
7362 MUTEX_ENTER(&rx_rpc_stats);
7364 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7365 unsigned int num_funcs = 0, i;
7368 * We have to fix the offset of rpc_stat since we are
7369 * keeping this structure on two rx_queues. The rx_queue
7370 * package assumes that the rx_queue member is the first
7371 * member of the structure. That is, rx_queue assumes that
7372 * any one item is only on one queue at a time. We are
7373 * breaking that assumption and so we have to do a little
7374 * math to fix our pointers.
7377 fix_offset = (char *) rpc_stat;
7378 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7379 rpc_stat = (rx_interface_stat_p) fix_offset;
7381 num_funcs = rpc_stat->stats[0].func_total;
7382 for(i=0;i<num_funcs;i++) {
7383 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7384 hzero(rpc_stat->stats[i].invocations);
7386 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7387 hzero(rpc_stat->stats[i].bytes_sent);
7389 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7390 hzero(rpc_stat->stats[i].bytes_rcvd);
7392 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7393 rpc_stat->stats[i].queue_time_sum.sec = 0;
7394 rpc_stat->stats[i].queue_time_sum.usec = 0;
7396 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7397 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7398 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7400 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7401 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7402 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7404 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7405 rpc_stat->stats[i].queue_time_max.sec = 0;
7406 rpc_stat->stats[i].queue_time_max.usec = 0;
7408 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7409 rpc_stat->stats[i].execution_time_sum.sec = 0;
7410 rpc_stat->stats[i].execution_time_sum.usec = 0;
7412 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7413 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7414 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7416 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7417 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7418 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7420 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7421 rpc_stat->stats[i].execution_time_max.sec = 0;
7422 rpc_stat->stats[i].execution_time_max.usec = 0;
7427 MUTEX_EXIT(&rx_rpc_stats);
7431 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7432 * is authorized to enable/disable/clear RX statistics.
7434 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7436 void rx_SetRxStatUserOk(int (*proc)(struct rx_call *call))
7438 rxi_rxstat_userok = proc;
7441 int rx_RxStatUserOk(struct rx_call *call)
7443 if (!rxi_rxstat_userok)
7445 return rxi_rxstat_userok(call);