2 ****************************************************************************
3 * Copyright IBM Corporation 1988, 1989 - All Rights Reserved *
5 * Permission to use, copy, modify, and distribute this software and its *
6 * documentation for any purpose and without fee is hereby granted, *
7 * provided that the above copyright notice appear in all copies and *
8 * that both that copyright notice and this permission notice appear in *
9 * supporting documentation, and that the name of IBM not be used in *
10 * advertising or publicity pertaining to distribution of the software *
11 * without specific, written prior permission. *
13 * IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
14 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
15 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY *
16 * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER *
17 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING *
18 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *
19 ****************************************************************************
22 /* RX: Extended Remote Procedure Call */
25 #include "../afs/param.h"
26 #include "../afs/sysincludes.h"
27 #include "../afs/afsincludes.h"
29 #include "../h/types.h"
30 #include "../h/time.h"
31 #include "../h/stat.h"
33 #include <net/net_globals.h>
34 #endif /* AFS_OSF_ENV */
35 #ifdef AFS_LINUX20_ENV
36 #include "../h/socket.h"
38 #include "../netinet/in.h"
39 #include "../afs/afs_args.h"
40 #include "../afs/afs_osi.h"
41 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
42 #include "../h/systm.h"
45 #undef RXDEBUG /* turn off debugging */
47 #if defined(AFS_SGI_ENV)
48 #include "../sys/debug.h"
50 #include "../afsint/afsint.h"
57 #endif /* AFS_ALPHA_ENV */
59 #include "../afs/sysincludes.h"
60 #include "../afs/afsincludes.h"
62 #include "../afs/lock.h"
63 #include "../rx/rx_kmutex.h"
64 #include "../rx/rx_kernel.h"
65 #include "../rx/rx_clock.h"
66 #include "../rx/rx_queue.h"
68 #include "../rx/rx_globals.h"
69 #include "../rx/rx_trace.h"
70 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
71 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
72 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
73 #include "../afsint/afsint.h"
74 extern afs_int32 afs_termState;
76 #include "sys/lockl.h"
77 #include "sys/lock_def.h"
78 #endif /* AFS_AIX41_ENV */
79 # include "../afsint/rxgen_consts.h"
81 # include <afs/param.h>
82 # include <sys/types.h>
89 # include <sys/socket.h>
90 # include <sys/file.h>
92 # include <sys/stat.h>
93 # include <netinet/in.h>
94 # include <sys/time.h>
98 # include "rx_clock.h"
99 # include "rx_queue.h"
100 # include "rx_globals.h"
101 # include "rx_trace.h"
102 # include "rx_internal.h"
103 # include <afs/rxgen_consts.h>
106 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
108 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
109 afs_int32 rxi_start_in_error;
111 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
114 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
115 * currently allocated within rx. This number is used to allocate the
116 * memory required to return the statistics when queried.
119 static unsigned int rxi_rpc_peer_stat_cnt;
122 * rxi_rpc_process_stat_cnt counts the total number of local process stat
123 * structures currently allocated within rx. The number is used to allocate
124 * the memory required to return the statistics when queried.
127 static unsigned int rxi_rpc_process_stat_cnt;
129 #if !defined(offsetof)
130 #include <stddef.h> /* for definition of offsetof() */
133 #ifdef AFS_PTHREAD_ENV
137 * Use procedural initialization of mutexes/condition variables
141 extern pthread_mutex_t rxkad_stats_mutex;
142 extern pthread_mutex_t des_init_mutex;
143 extern pthread_mutex_t des_random_mutex;
144 extern pthread_mutex_t rx_clock_mutex;
145 extern pthread_mutex_t rxi_connCacheMutex;
146 extern pthread_mutex_t rx_event_mutex;
147 extern pthread_mutex_t osi_malloc_mutex;
148 extern pthread_mutex_t event_handler_mutex;
149 extern pthread_mutex_t listener_mutex;
150 extern pthread_mutex_t rx_if_init_mutex;
151 extern pthread_mutex_t rx_if_mutex;
152 extern pthread_mutex_t rxkad_client_uid_mutex;
153 extern pthread_mutex_t rxkad_random_mutex;
155 extern pthread_cond_t rx_event_handler_cond;
156 extern pthread_cond_t rx_listener_cond;
158 static pthread_mutex_t epoch_mutex;
159 static pthread_mutex_t rx_init_mutex;
160 static pthread_mutex_t rx_debug_mutex;
162 static void rxi_InitPthread(void) {
163 assert(pthread_mutex_init(&rx_clock_mutex,
164 (const pthread_mutexattr_t*)0)==0);
165 assert(pthread_mutex_init(&rxi_connCacheMutex,
166 (const pthread_mutexattr_t*)0)==0);
167 assert(pthread_mutex_init(&rx_init_mutex,
168 (const pthread_mutexattr_t*)0)==0);
169 assert(pthread_mutex_init(&epoch_mutex,
170 (const pthread_mutexattr_t*)0)==0);
171 assert(pthread_mutex_init(&rx_event_mutex,
172 (const pthread_mutexattr_t*)0)==0);
173 assert(pthread_mutex_init(&des_init_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&des_random_mutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&osi_malloc_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&event_handler_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&listener_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&rx_if_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&rx_if_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&rxkad_random_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&rxkad_stats_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_debug_mutex,
194 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_cond_init(&rx_event_handler_cond,
197 (const pthread_condattr_t*)0)==0);
198 assert(pthread_cond_init(&rx_listener_cond,
199 (const pthread_condattr_t*)0)==0);
200 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
203 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
204 #define INIT_PTHREAD_LOCKS \
205 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
207 * The rx_stats_mutex mutex protects the following global variables:
212 * rxi_lowConnRefCount
213 * rxi_lowPeerRefCount
222 #define INIT_PTHREAD_LOCKS
225 extern void rxi_DeleteCachedConnections(void);
228 /* Variables for handling the minProcs implementation. availProcs gives the
229 * number of threads available in the pool at this moment (not counting dudes
230 * executing right now). totalMin gives the total number of procs required
231 * for handling all minProcs requests. minDeficit is a dynamic variable
232 * tracking the # of procs required to satisfy all of the remaining minProcs
234 * For fine grain locking to work, the quota check and the reservation of
235 * a server thread has to come while rxi_availProcs and rxi_minDeficit
236 * are locked. To this end, the code has been modified under #ifdef
237 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
238 * same time. A new function, ReturnToServerPool() returns the allocation.
240 * A call can be on several queue's (but only one at a time). When
241 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
242 * that no one else is touching the queue. To this end, we store the address
243 * of the queue lock in the call structure (under the call lock) when we
244 * put the call on a queue, and we clear the call_queue_lock when the
245 * call is removed from a queue (once the call lock has been obtained).
246 * This allows rxi_ResetCall to safely synchronize with others wishing
247 * to manipulate the queue.
250 extern void rxi_Delay(int);
252 static int rxi_ServerThreadSelectingCall;
254 #ifdef RX_ENABLE_LOCKS
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
260 ** pretty good that the next packet coming in is from the same connection
261 ** as the last packet, since we're send multiple packets in a transmit window.
263 struct rx_connection *rxLastConn;
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
268 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269 * call->lock - locks call data fields.
270 * Most any other lock - these are all independent of each other.....
272 * rx_freeCallQueue_lock
274 * rx_connHashTable_lock
277 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
280 * peer_lock - locks peer data fields.
281 * conn_data_lock - that more than one thread is not updating a conn data
282 * field at the same time.
283 * Do we need a lock to protect the peer field in the conn structure?
284 * conn->peer was previously a constant for all intents and so has no
285 * lock protecting this field. The multihomed client delta introduced
286 * a RX code change : change the peer field in the connection structure
287 * to that remote inetrface from which the last packet for this
288 * connection was sent out. This may become an issue if further changes
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 void rxi_DestroyConnection();
305 void rxi_CleanupConnection();
306 struct rx_serverQueueEntry *rx_waitForPacket = 0;
308 /* ------------Exported Interfaces------------- */
310 /* This function allows rxkad to set the epoch to a suitably random number
311 * which rx_NewConnection will use in the future. The principle purpose is to
312 * get rxnull connections to use the same epoch as the rxkad connections do, at
313 * least once the first rxkad connection is established. This is important now
314 * that the host/port addresses aren't used in FindConnection: the uniqueness
315 * of epoch/cid matters and the start time won't do. */
317 #ifdef AFS_PTHREAD_ENV
319 * This mutex protects the following global variables:
323 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
324 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
328 #endif /* AFS_PTHREAD_ENV */
330 void rx_SetEpoch (epoch)
338 /* Initialize rx. A port number may be mentioned, in which case this
339 * becomes the default port number for any service installed later.
340 * If 0 is provided for the port number, a random port will be chosen
341 * by the kernel. Whether this will ever overlap anything in
342 * /etc/services is anybody's guess... Returns 0 on success, -1 on
344 static int rxinit_status = 1;
345 #ifdef AFS_PTHREAD_ENV
347 * This mutex protects the following global variables:
351 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
352 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
355 #define UNLOCK_RX_INIT
358 int rx_Init(u_int port)
365 char *htable, *ptable;
372 if (rxinit_status == 0) {
373 tmp_status = rxinit_status;
375 return tmp_status; /* Already started; return previous error code. */
379 if (afs_winsockInit()<0)
385 * Initialize anything necessary to provide a non-premptive threading
388 rxi_InitializeThreadSupport();
391 /* Allocate and initialize a socket for client and perhaps server
394 rx_socket = rxi_GetUDPSocket((u_short)port);
395 if (rx_socket == OSI_NULLSOCKET) {
401 #ifdef RX_ENABLE_LOCKS
404 #endif /* RX_LOCKS_DB */
405 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
406 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
407 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
411 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
412 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
413 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
414 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
416 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
418 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
419 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
421 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
422 #endif /* KERNEL && AFS_HPUX110_ENV */
423 #else /* RX_ENABLE_LOCKS */
424 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
425 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
426 #endif /* AFS_GLOBAL_SUNLOCK */
427 #endif /* RX_ENABLE_LOCKS */
430 rx_connDeadTime = 12;
431 rx_tranquil = 0; /* reset flag */
432 bzero((char *)&rx_stats, sizeof(struct rx_stats));
434 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
435 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
436 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
437 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
438 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
439 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
441 /* Malloc up a bunch of packets & buffers */
443 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
444 queue_Init(&rx_freePacketQueue);
445 rxi_NeedMorePackets = FALSE;
446 rxi_MorePackets(rx_nPackets);
454 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
455 tv.tv_sec = clock_now.sec;
456 tv.tv_usec = clock_now.usec;
457 srand((unsigned int) tv.tv_usec);
461 /* *Slightly* random start time for the cid. This is just to help
462 * out with the hashing function at the peer */
464 rx_stats.minRtt.sec = 9999999;
466 rx_SetEpoch (tv.tv_sec | 0x80000000);
468 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
469 * will provide a randomer value. */
471 MUTEX_ENTER(&rx_stats_mutex);
472 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
473 MUTEX_EXIT(&rx_stats_mutex);
474 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
475 rx_connHashTable = (struct rx_connection **) htable;
476 rx_peerHashTable = (struct rx_peer **) ptable;
478 rx_lastAckDelay.sec = 0;
479 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
480 rx_hardAckDelay.sec = 0;
481 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
482 rx_softAckDelay.sec = 0;
483 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
485 rxevent_Init(20, rxi_ReScheduleEvents);
487 /* Initialize various global queues */
488 queue_Init(&rx_idleServerQueue);
489 queue_Init(&rx_incomingCallQueue);
490 queue_Init(&rx_freeCallQueue);
492 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
493 /* Initialize our list of usable IP addresses. */
497 /* Start listener process (exact function is dependent on the
498 * implementation environment--kernel or user space) */
503 tmp_status = rxinit_status = 0;
508 /* called with unincremented nRequestsRunning to see if it is OK to start
509 * a new thread in this service. Could be "no" for two reasons: over the
510 * max quota, or would prevent others from reaching their min quota.
512 #ifdef RX_ENABLE_LOCKS
513 /* This verion of QuotaOK reserves quota if it's ok while the
514 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
516 static int QuotaOK(aservice)
517 register struct rx_service *aservice;
519 /* check if over max quota */
520 if (aservice->nRequestsRunning >= aservice->maxProcs) {
524 /* under min quota, we're OK */
525 /* otherwise, can use only if there are enough to allow everyone
526 * to go to their min quota after this guy starts.
528 MUTEX_ENTER(&rx_stats_mutex);
529 if ((aservice->nRequestsRunning < aservice->minProcs) ||
530 (rxi_availProcs > rxi_minDeficit)) {
531 aservice->nRequestsRunning++;
532 /* just started call in minProcs pool, need fewer to maintain
534 if (aservice->nRequestsRunning <= aservice->minProcs)
537 MUTEX_EXIT(&rx_stats_mutex);
540 MUTEX_EXIT(&rx_stats_mutex);
544 static void ReturnToServerPool(aservice)
545 register struct rx_service *aservice;
547 aservice->nRequestsRunning--;
548 MUTEX_ENTER(&rx_stats_mutex);
549 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
551 MUTEX_EXIT(&rx_stats_mutex);
554 #else /* RX_ENABLE_LOCKS */
555 static QuotaOK(aservice)
556 register struct rx_service *aservice; {
558 /* under min quota, we're OK */
559 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
561 /* check if over max quota */
562 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
564 /* otherwise, can use only if there are enough to allow everyone
565 * to go to their min quota after this guy starts.
567 if (rxi_availProcs > rxi_minDeficit) rc = 1;
570 #endif /* RX_ENABLE_LOCKS */
573 /* Called by rx_StartServer to start up lwp's to service calls.
574 NExistingProcs gives the number of procs already existing, and which
575 therefore needn't be created. */
576 void rxi_StartServerProcs(nExistingProcs)
579 register struct rx_service *service;
584 /* For each service, reserve N processes, where N is the "minimum"
585 number of processes that MUST be able to execute a request in parallel,
586 at any time, for that process. Also compute the maximum difference
587 between any service's maximum number of processes that can run
588 (i.e. the maximum number that ever will be run, and a guarantee
589 that this number will run if other services aren't running), and its
590 minimum number. The result is the extra number of processes that
591 we need in order to provide the latter guarantee */
592 for (i=0; i<RX_MAX_SERVICES; i++) {
594 service = rx_services[i];
595 if (service == (struct rx_service *) 0) break;
596 nProcs += service->minProcs;
597 diff = service->maxProcs - service->minProcs;
598 if (diff > maxdiff) maxdiff = diff;
600 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
601 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
602 for (i = 0; i<nProcs; i++) {
603 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
608 /* This routine must be called if any services are exported. If the
609 * donateMe flag is set, the calling process is donated to the server
611 void rx_StartServer(donateMe)
613 register struct rx_service *service;
620 /* Start server processes, if necessary (exact function is dependent
621 * on the implementation environment--kernel or user space). DonateMe
622 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
623 * case, one less new proc will be created rx_StartServerProcs.
625 rxi_StartServerProcs(donateMe);
627 /* count up the # of threads in minProcs, and add set the min deficit to
628 * be that value, too.
630 for (i=0; i<RX_MAX_SERVICES; i++) {
631 service = rx_services[i];
632 if (service == (struct rx_service *) 0) break;
633 MUTEX_ENTER(&rx_stats_mutex);
634 rxi_totalMin += service->minProcs;
635 /* below works even if a thread is running, since minDeficit would
636 * still have been decremented and later re-incremented.
638 rxi_minDeficit += service->minProcs;
639 MUTEX_EXIT(&rx_stats_mutex);
642 /* Turn on reaping of idle server connections */
643 rxi_ReapConnections();
648 if (donateMe) rx_ServerProc(); /* Never returns */
652 /* Create a new client connection to the specified service, using the
653 * specified security object to implement the security model for this
655 struct rx_connection *
656 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
657 register afs_uint32 shost; /* Server host */
658 u_short sport; /* Server port */
659 u_short sservice; /* Server service id */
660 register struct rx_securityClass *securityObject;
661 int serviceSecurityIndex;
665 register struct rx_connection *conn;
670 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
671 shost, sport, sservice, securityObject, serviceSecurityIndex));
673 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
674 * the case of kmem_alloc? */
675 conn = rxi_AllocConnection();
676 #ifdef RX_ENABLE_LOCKS
677 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
678 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
679 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
683 MUTEX_ENTER(&rx_connHashTable_lock);
684 cid = (rx_nextCid += RX_MAXCALLS);
685 conn->type = RX_CLIENT_CONNECTION;
687 conn->epoch = rx_epoch;
688 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
689 conn->serviceId = sservice;
690 conn->securityObject = securityObject;
691 /* This doesn't work in all compilers with void (they're buggy), so fake it
693 conn->securityData = (VOID *) 0;
694 conn->securityIndex = serviceSecurityIndex;
695 rx_SetConnDeadTime(conn, rx_connDeadTime);
696 conn->ackRate = RX_FAST_ACK_RATE;
698 conn->specific = NULL;
699 conn->challengeEvent = (struct rxevent *)0;
700 conn->delayedAbortEvent = (struct rxevent *)0;
701 conn->abortCount = 0;
704 RXS_NewConnection(securityObject, conn);
705 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
707 conn->refCount++; /* no lock required since only this thread knows... */
708 conn->next = rx_connHashTable[hashindex];
709 rx_connHashTable[hashindex] = conn;
710 MUTEX_ENTER(&rx_stats_mutex);
711 rx_stats.nClientConns++;
712 MUTEX_EXIT(&rx_stats_mutex);
714 MUTEX_EXIT(&rx_connHashTable_lock);
720 void rx_SetConnDeadTime(conn, seconds)
721 register struct rx_connection *conn;
722 register int seconds;
724 /* The idea is to set the dead time to a value that allows several
725 * keepalives to be dropped without timing out the connection. */
726 conn->secondsUntilDead = MAX(seconds, 6);
727 conn->secondsUntilPing = conn->secondsUntilDead/6;
730 int rxi_lowPeerRefCount = 0;
731 int rxi_lowConnRefCount = 0;
734 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
735 * NOTE: must not be called with rx_connHashTable_lock held.
737 void rxi_CleanupConnection(conn)
738 struct rx_connection *conn;
742 /* Notify the service exporter, if requested, that this connection
743 * is being destroyed */
744 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
745 (*conn->service->destroyConnProc)(conn);
747 /* Notify the security module that this connection is being destroyed */
748 RXS_DestroyConnection(conn->securityObject, conn);
750 /* If this is the last connection using the rx_peer struct, set its
751 * idle time to now. rxi_ReapConnections will reap it if it's still
752 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
754 MUTEX_ENTER(&rx_peerHashTable_lock);
755 if (--conn->peer->refCount <= 0) {
756 conn->peer->idleWhen = clock_Sec();
757 if (conn->peer->refCount < 0) {
758 conn->peer->refCount = 0;
759 MUTEX_ENTER(&rx_stats_mutex);
760 rxi_lowPeerRefCount ++;
761 MUTEX_EXIT(&rx_stats_mutex);
764 MUTEX_EXIT(&rx_peerHashTable_lock);
766 MUTEX_ENTER(&rx_stats_mutex);
767 if (conn->type == RX_SERVER_CONNECTION)
768 rx_stats.nServerConns--;
770 rx_stats.nClientConns--;
771 MUTEX_EXIT(&rx_stats_mutex);
774 if (conn->specific) {
775 for (i = 0 ; i < conn->nSpecific ; i++) {
776 if (conn->specific[i] && rxi_keyCreate_destructor[i])
777 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
778 conn->specific[i] = NULL;
780 free(conn->specific);
782 conn->specific = NULL;
786 MUTEX_DESTROY(&conn->conn_call_lock);
787 MUTEX_DESTROY(&conn->conn_data_lock);
788 CV_DESTROY(&conn->conn_call_cv);
790 rxi_FreeConnection(conn);
793 /* Destroy the specified connection */
794 void rxi_DestroyConnection(conn)
795 register struct rx_connection *conn;
797 MUTEX_ENTER(&rx_connHashTable_lock);
798 rxi_DestroyConnectionNoLock(conn);
799 /* conn should be at the head of the cleanup list */
800 if (conn == rx_connCleanup_list) {
801 rx_connCleanup_list = rx_connCleanup_list->next;
802 MUTEX_EXIT(&rx_connHashTable_lock);
803 rxi_CleanupConnection(conn);
805 #ifdef RX_ENABLE_LOCKS
807 MUTEX_EXIT(&rx_connHashTable_lock);
809 #endif /* RX_ENABLE_LOCKS */
812 static void rxi_DestroyConnectionNoLock(conn)
813 register struct rx_connection *conn;
815 register struct rx_connection **conn_ptr;
816 register int havecalls = 0;
817 struct rx_packet *packet;
824 MUTEX_ENTER(&conn->conn_data_lock);
825 if (conn->refCount > 0)
828 MUTEX_ENTER(&rx_stats_mutex);
829 rxi_lowConnRefCount++;
830 MUTEX_EXIT(&rx_stats_mutex);
833 if (conn->refCount > 0) {
834 /* Busy; wait till the last guy before proceeding */
835 MUTEX_EXIT(&conn->conn_data_lock);
840 /* If the client previously called rx_NewCall, but it is still
841 * waiting, treat this as a running call, and wait to destroy the
842 * connection later when the call completes. */
843 if ((conn->type == RX_CLIENT_CONNECTION) &&
844 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
845 conn->flags |= RX_CONN_DESTROY_ME;
846 MUTEX_EXIT(&conn->conn_data_lock);
850 MUTEX_EXIT(&conn->conn_data_lock);
852 /* Check for extant references to this connection */
853 for (i = 0; i<RX_MAXCALLS; i++) {
854 register struct rx_call *call = conn->call[i];
857 if (conn->type == RX_CLIENT_CONNECTION) {
858 MUTEX_ENTER(&call->lock);
859 if (call->delayedAckEvent) {
860 /* Push the final acknowledgment out now--there
861 * won't be a subsequent call to acknowledge the
862 * last reply packets */
863 rxevent_Cancel(call->delayedAckEvent, call,
864 RX_CALL_REFCOUNT_DELAY);
865 rxi_AckAll((struct rxevent *)0, call, 0);
867 MUTEX_EXIT(&call->lock);
871 #ifdef RX_ENABLE_LOCKS
873 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
874 MUTEX_EXIT(&conn->conn_data_lock);
877 /* Someone is accessing a packet right now. */
881 #endif /* RX_ENABLE_LOCKS */
884 /* Don't destroy the connection if there are any call
885 * structures still in use */
886 MUTEX_ENTER(&conn->conn_data_lock);
887 conn->flags |= RX_CONN_DESTROY_ME;
888 MUTEX_EXIT(&conn->conn_data_lock);
893 if (conn->delayedAbortEvent) {
894 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
895 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
897 MUTEX_ENTER(&conn->conn_data_lock);
898 rxi_SendConnectionAbort(conn, packet, 0, 1);
899 MUTEX_EXIT(&conn->conn_data_lock);
900 rxi_FreePacket(packet);
904 /* Remove from connection hash table before proceeding */
905 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
906 conn->epoch, conn->type) ];
907 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
908 if (*conn_ptr == conn) {
909 *conn_ptr = conn->next;
913 /* if the conn that we are destroying was the last connection, then we
914 * clear rxLastConn as well */
915 if ( rxLastConn == conn )
918 /* Make sure the connection is completely reset before deleting it. */
919 /* get rid of pending events that could zap us later */
920 if (conn->challengeEvent) {
921 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
924 /* Add the connection to the list of destroyed connections that
925 * need to be cleaned up. This is necessary to avoid deadlocks
926 * in the routines we call to inform others that this connection is
927 * being destroyed. */
928 conn->next = rx_connCleanup_list;
929 rx_connCleanup_list = conn;
932 /* Externally available version */
933 void rx_DestroyConnection(conn)
934 register struct rx_connection *conn;
940 rxi_DestroyConnection (conn);
945 /* Start a new rx remote procedure call, on the specified connection.
946 * If wait is set to 1, wait for a free call channel; otherwise return
947 * 0. Maxtime gives the maximum number of seconds this call may take,
948 * after rx_MakeCall returns. After this time interval, a call to any
949 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
950 * For fine grain locking, we hold the conn_call_lock in order to
951 * to ensure that we don't get signalle after we found a call in an active
952 * state and before we go to sleep.
954 struct rx_call *rx_NewCall(conn)
955 register struct rx_connection *conn;
958 register struct rx_call *call;
959 struct clock queueTime;
963 dpf (("rx_MakeCall(conn %x)\n", conn));
966 clock_GetTime(&queueTime);
968 MUTEX_ENTER(&conn->conn_call_lock);
970 for (i=0; i<RX_MAXCALLS; i++) {
971 call = conn->call[i];
973 MUTEX_ENTER(&call->lock);
974 if (call->state == RX_STATE_DALLY) {
975 rxi_ResetCall(call, 0);
976 (*call->callNumber)++;
979 MUTEX_EXIT(&call->lock);
982 call = rxi_NewCall(conn, i);
983 MUTEX_ENTER(&call->lock);
987 if (i < RX_MAXCALLS) {
990 MUTEX_ENTER(&conn->conn_data_lock);
991 conn->flags |= RX_CONN_MAKECALL_WAITING;
992 MUTEX_EXIT(&conn->conn_data_lock);
993 #ifdef RX_ENABLE_LOCKS
994 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1000 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1002 /* Client is initially in send mode */
1003 call->state = RX_STATE_ACTIVE;
1004 call->mode = RX_MODE_SENDING;
1006 /* remember start time for call in case we have hard dead time limit */
1007 call->queueTime = queueTime;
1008 clock_GetTime(&call->startTime);
1009 hzero(call->bytesSent);
1010 hzero(call->bytesRcvd);
1012 /* Turn on busy protocol. */
1013 rxi_KeepAliveOn(call);
1015 MUTEX_EXIT(&call->lock);
1016 MUTEX_EXIT(&conn->conn_call_lock);
1020 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1021 /* Now, if TQ wasn't cleared earlier, do it now. */
1023 MUTEX_ENTER(&call->lock);
1024 while (call->flags & RX_CALL_TQ_BUSY) {
1025 call->flags |= RX_CALL_TQ_WAIT;
1026 #ifdef RX_ENABLE_LOCKS
1027 CV_WAIT(&call->cv_tq, &call->lock);
1028 #else /* RX_ENABLE_LOCKS */
1029 osi_rxSleep(&call->tq);
1030 #endif /* RX_ENABLE_LOCKS */
1032 if (call->flags & RX_CALL_TQ_CLEARME) {
1033 rxi_ClearTransmitQueue(call, 0);
1034 queue_Init(&call->tq);
1036 MUTEX_EXIT(&call->lock);
1038 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1043 rxi_HasActiveCalls(aconn)
1044 register struct rx_connection *aconn; {
1046 register struct rx_call *tcall;
1050 for(i=0; i<RX_MAXCALLS; i++) {
1051 if (tcall = aconn->call[i]) {
1052 if ((tcall->state == RX_STATE_ACTIVE)
1053 || (tcall->state == RX_STATE_PRECALL)) {
1063 rxi_GetCallNumberVector(aconn, aint32s)
1064 register struct rx_connection *aconn;
1065 register afs_int32 *aint32s; {
1067 register struct rx_call *tcall;
1071 for(i=0; i<RX_MAXCALLS; i++) {
1072 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1073 aint32s[i] = aconn->callNumber[i]+1;
1075 aint32s[i] = aconn->callNumber[i];
1081 rxi_SetCallNumberVector(aconn, aint32s)
1082 register struct rx_connection *aconn;
1083 register afs_int32 *aint32s; {
1085 register struct rx_call *tcall;
1089 for(i=0; i<RX_MAXCALLS; i++) {
1090 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1091 aconn->callNumber[i] = aint32s[i] - 1;
1093 aconn->callNumber[i] = aint32s[i];
1099 /* Advertise a new service. A service is named locally by a UDP port
1100 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1103 rx_NewService(port, serviceId, serviceName, securityObjects,
1104 nSecurityObjects, serviceProc)
1107 char *serviceName; /* Name for identification purposes (e.g. the
1108 * service name might be used for probing for
1110 struct rx_securityClass **securityObjects;
1111 int nSecurityObjects;
1112 afs_int32 (*serviceProc)();
1114 osi_socket socket = OSI_NULLSOCKET;
1115 register struct rx_service *tservice;
1121 if (serviceId == 0) {
1122 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1128 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1135 tservice = rxi_AllocService();
1138 for (i = 0; i<RX_MAX_SERVICES; i++) {
1139 register struct rx_service *service = rx_services[i];
1141 if (port == service->servicePort) {
1142 if (service->serviceId == serviceId) {
1143 /* The identical service has already been
1144 * installed; if the caller was intending to
1145 * change the security classes used by this
1146 * service, he/she loses. */
1147 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1150 rxi_FreeService(tservice);
1153 /* Different service, same port: re-use the socket
1154 * which is bound to the same port */
1155 socket = service->socket;
1158 if (socket == OSI_NULLSOCKET) {
1159 /* If we don't already have a socket (from another
1160 * service on same port) get a new one */
1161 socket = rxi_GetUDPSocket(port);
1162 if (socket == OSI_NULLSOCKET) {
1165 rxi_FreeService(tservice);
1170 service->socket = socket;
1171 service->servicePort = port;
1172 service->serviceId = serviceId;
1173 service->serviceName = serviceName;
1174 service->nSecurityObjects = nSecurityObjects;
1175 service->securityObjects = securityObjects;
1176 service->minProcs = 0;
1177 service->maxProcs = 1;
1178 service->idleDeadTime = 60;
1179 service->connDeadTime = rx_connDeadTime;
1180 service->executeRequestProc = serviceProc;
1181 rx_services[i] = service; /* not visible until now */
1189 rxi_FreeService(tservice);
1190 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1194 /* Generic request processing loop. This routine should be called
1195 * by the implementation dependent rx_ServerProc. If socketp is
1196 * non-null, it will be set to the file descriptor that this thread
1197 * is now listening on. If socketp is null, this routine will never
1199 void rxi_ServerProc(threadID, newcall, socketp)
1201 struct rx_call *newcall;
1202 osi_socket *socketp;
1204 register struct rx_call *call;
1205 register afs_int32 code;
1206 register struct rx_service *tservice = NULL;
1213 call = rx_GetCall(threadID, tservice, socketp);
1214 if (socketp && *socketp != OSI_NULLSOCKET) {
1215 /* We are now a listener thread */
1220 /* if server is restarting( typically smooth shutdown) then do not
1221 * allow any new calls.
1224 if ( rx_tranquil && (call != NULL) ) {
1229 MUTEX_ENTER(&call->lock);
1231 rxi_CallError(call, RX_RESTARTING);
1232 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1234 MUTEX_EXIT(&call->lock);
1240 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1241 #ifdef RX_ENABLE_LOCKS
1243 #endif /* RX_ENABLE_LOCKS */
1244 afs_termState = AFSOP_STOP_AFS;
1245 afs_osi_Wakeup(&afs_termState);
1246 #ifdef RX_ENABLE_LOCKS
1248 #endif /* RX_ENABLE_LOCKS */
1253 tservice = call->conn->service;
1255 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1257 code = call->conn->service->executeRequestProc(call);
1259 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1261 rx_EndCall(call, code);
1262 MUTEX_ENTER(&rx_stats_mutex);
1264 MUTEX_EXIT(&rx_stats_mutex);
1269 void rx_WakeupServerProcs()
1271 struct rx_serverQueueEntry *np, *tqp;
1276 MUTEX_ENTER(&rx_serverPool_lock);
1278 #ifdef RX_ENABLE_LOCKS
1279 if (rx_waitForPacket)
1280 CV_BROADCAST(&rx_waitForPacket->cv);
1281 #else /* RX_ENABLE_LOCKS */
1282 if (rx_waitForPacket)
1283 osi_rxWakeup(rx_waitForPacket);
1284 #endif /* RX_ENABLE_LOCKS */
1285 MUTEX_ENTER(&freeSQEList_lock);
1286 for (np = rx_FreeSQEList; np; np = tqp) {
1287 tqp = *(struct rx_serverQueueEntry **)np;
1288 #ifdef RX_ENABLE_LOCKS
1289 CV_BROADCAST(&np->cv);
1290 #else /* RX_ENABLE_LOCKS */
1292 #endif /* RX_ENABLE_LOCKS */
1294 MUTEX_EXIT(&freeSQEList_lock);
1295 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1296 #ifdef RX_ENABLE_LOCKS
1297 CV_BROADCAST(&np->cv);
1298 #else /* RX_ENABLE_LOCKS */
1300 #endif /* RX_ENABLE_LOCKS */
1302 MUTEX_EXIT(&rx_serverPool_lock);
1308 * One thing that seems to happen is that all the server threads get
1309 * tied up on some empty or slow call, and then a whole bunch of calls
1310 * arrive at once, using up the packet pool, so now there are more
1311 * empty calls. The most critical resources here are server threads
1312 * and the free packet pool. The "doreclaim" code seems to help in
1313 * general. I think that eventually we arrive in this state: there
1314 * are lots of pending calls which do have all their packets present,
1315 * so they won't be reclaimed, are multi-packet calls, so they won't
1316 * be scheduled until later, and thus are tying up most of the free
1317 * packet pool for a very long time.
1319 * 1. schedule multi-packet calls if all the packets are present.
1320 * Probably CPU-bound operation, useful to return packets to pool.
1321 * Do what if there is a full window, but the last packet isn't here?
1322 * 3. preserve one thread which *only* runs "best" calls, otherwise
1323 * it sleeps and waits for that type of call.
1324 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1325 * the current dataquota business is badly broken. The quota isn't adjusted
1326 * to reflect how many packets are presently queued for a running call.
1327 * So, when we schedule a queued call with a full window of packets queued
1328 * up for it, that *should* free up a window full of packets for other 2d-class
1329 * calls to be able to use from the packet pool. But it doesn't.
1331 * NB. Most of the time, this code doesn't run -- since idle server threads
1332 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1333 * as a new call arrives.
1335 /* Sleep until a call arrives. Returns a pointer to the call, ready
1336 * for an rx_Read. */
1337 #ifdef RX_ENABLE_LOCKS
1339 rx_GetCall(tno, cur_service, socketp)
1341 struct rx_service *cur_service;
1342 osi_socket *socketp;
1344 struct rx_serverQueueEntry *sq;
1345 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1346 struct rx_service *service;
1349 MUTEX_ENTER(&freeSQEList_lock);
1351 if (sq = rx_FreeSQEList) {
1352 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1353 MUTEX_EXIT(&freeSQEList_lock);
1354 } else { /* otherwise allocate a new one and return that */
1355 MUTEX_EXIT(&freeSQEList_lock);
1356 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1357 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1358 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1361 MUTEX_ENTER(&rx_serverPool_lock);
1362 if (cur_service != NULL) {
1363 ReturnToServerPool(cur_service);
1366 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1367 register struct rx_call *tcall, *ncall;
1368 choice2 = (struct rx_call *) 0;
1369 /* Scan for eligible incoming calls. A call is not eligible
1370 * if the maximum number of calls for its service type are
1371 * already executing */
1372 /* One thread will process calls FCFS (to prevent starvation),
1373 * while the other threads may run ahead looking for calls which
1374 * have all their input data available immediately. This helps
1375 * keep threads from blocking, waiting for data from the client. */
1376 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1377 service = tcall->conn->service;
1378 if (!QuotaOK(service)) {
1381 if (!tno || !tcall->queue_item_header.next ) {
1382 /* If we're thread 0, then we'll just use
1383 * this call. If we haven't been able to find an optimal
1384 * choice, and we're at the end of the list, then use a
1385 * 2d choice if one has been identified. Otherwise... */
1386 call = (choice2 ? choice2 : tcall);
1387 service = call->conn->service;
1388 } else if (!queue_IsEmpty(&tcall->rq)) {
1389 struct rx_packet *rp;
1390 rp = queue_First(&tcall->rq, rx_packet);
1391 if (rp->header.seq == 1) {
1392 if (!meltdown_1pkt ||
1393 (rp->header.flags & RX_LAST_PACKET)) {
1395 } else if (rxi_2dchoice && !choice2 &&
1396 !(tcall->flags & RX_CALL_CLEARED) &&
1397 (tcall->rprev > rxi_HardAckRate)) {
1399 } else rxi_md2cnt++;
1405 ReturnToServerPool(service);
1412 rxi_ServerThreadSelectingCall = 1;
1413 MUTEX_EXIT(&rx_serverPool_lock);
1414 MUTEX_ENTER(&call->lock);
1415 MUTEX_ENTER(&rx_serverPool_lock);
1417 if (queue_IsEmpty(&call->rq) ||
1418 queue_First(&call->rq, rx_packet)->header.seq != 1)
1419 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1421 CLEAR_CALL_QUEUE_LOCK(call);
1423 MUTEX_EXIT(&call->lock);
1424 ReturnToServerPool(service);
1425 rxi_ServerThreadSelectingCall = 0;
1426 CV_SIGNAL(&rx_serverPool_cv);
1427 call = (struct rx_call*)0;
1430 call->flags &= (~RX_CALL_WAIT_PROC);
1431 MUTEX_ENTER(&rx_stats_mutex);
1433 MUTEX_EXIT(&rx_stats_mutex);
1434 rxi_ServerThreadSelectingCall = 0;
1435 CV_SIGNAL(&rx_serverPool_cv);
1436 MUTEX_EXIT(&rx_serverPool_lock);
1440 /* If there are no eligible incoming calls, add this process
1441 * to the idle server queue, to wait for one */
1445 *socketp = OSI_NULLSOCKET;
1447 sq->socketp = socketp;
1448 queue_Append(&rx_idleServerQueue, sq);
1449 #ifndef AFS_AIX41_ENV
1450 rx_waitForPacket = sq;
1451 #endif /* AFS_AIX41_ENV */
1453 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1455 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1456 MUTEX_EXIT(&rx_serverPool_lock);
1457 return (struct rx_call *)0;
1460 } while (!(call = sq->newcall) &&
1461 !(socketp && *socketp != OSI_NULLSOCKET));
1462 MUTEX_EXIT(&rx_serverPool_lock);
1464 MUTEX_ENTER(&call->lock);
1470 MUTEX_ENTER(&freeSQEList_lock);
1471 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1472 rx_FreeSQEList = sq;
1473 MUTEX_EXIT(&freeSQEList_lock);
1476 clock_GetTime(&call->startTime);
1477 call->state = RX_STATE_ACTIVE;
1478 call->mode = RX_MODE_RECEIVING;
1480 rxi_calltrace(RX_CALL_START, call);
1481 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1482 call->conn->service->servicePort,
1483 call->conn->service->serviceId, call));
1485 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1486 MUTEX_EXIT(&call->lock);
1488 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1493 #else /* RX_ENABLE_LOCKS */
1495 rx_GetCall(tno, cur_service, socketp)
1497 struct rx_service *cur_service;
1498 osi_socket *socketp;
1500 struct rx_serverQueueEntry *sq;
1501 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1502 struct rx_service *service;
1507 MUTEX_ENTER(&freeSQEList_lock);
1509 if (sq = rx_FreeSQEList) {
1510 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1511 MUTEX_EXIT(&freeSQEList_lock);
1512 } else { /* otherwise allocate a new one and return that */
1513 MUTEX_EXIT(&freeSQEList_lock);
1514 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1515 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1516 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1518 MUTEX_ENTER(&sq->lock);
1520 if (cur_service != NULL) {
1521 cur_service->nRequestsRunning--;
1522 if (cur_service->nRequestsRunning < cur_service->minProcs)
1526 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1527 register struct rx_call *tcall, *ncall;
1528 /* Scan for eligible incoming calls. A call is not eligible
1529 * if the maximum number of calls for its service type are
1530 * already executing */
1531 /* One thread will process calls FCFS (to prevent starvation),
1532 * while the other threads may run ahead looking for calls which
1533 * have all their input data available immediately. This helps
1534 * keep threads from blocking, waiting for data from the client. */
1535 choice2 = (struct rx_call *) 0;
1536 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1537 service = tcall->conn->service;
1538 if (QuotaOK(service)) {
1539 if (!tno || !tcall->queue_item_header.next ) {
1540 /* If we're thread 0, then we'll just use
1541 * this call. If we haven't been able to find an optimal
1542 * choice, and we're at the end of the list, then use a
1543 * 2d choice if one has been identified. Otherwise... */
1544 call = (choice2 ? choice2 : tcall);
1545 service = call->conn->service;
1546 } else if (!queue_IsEmpty(&tcall->rq)) {
1547 struct rx_packet *rp;
1548 rp = queue_First(&tcall->rq, rx_packet);
1549 if (rp->header.seq == 1
1550 && (!meltdown_1pkt ||
1551 (rp->header.flags & RX_LAST_PACKET))) {
1553 } else if (rxi_2dchoice && !choice2 &&
1554 !(tcall->flags & RX_CALL_CLEARED) &&
1555 (tcall->rprev > rxi_HardAckRate)) {
1557 } else rxi_md2cnt++;
1567 /* we can't schedule a call if there's no data!!! */
1568 /* send an ack if there's no data, if we're missing the
1569 * first packet, or we're missing something between first
1570 * and last -- there's a "hole" in the incoming data. */
1571 if (queue_IsEmpty(&call->rq) ||
1572 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1573 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1574 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1576 call->flags &= (~RX_CALL_WAIT_PROC);
1577 service->nRequestsRunning++;
1578 /* just started call in minProcs pool, need fewer to maintain
1580 if (service->nRequestsRunning <= service->minProcs)
1584 /* MUTEX_EXIT(&call->lock); */
1587 /* If there are no eligible incoming calls, add this process
1588 * to the idle server queue, to wait for one */
1591 *socketp = OSI_NULLSOCKET;
1593 sq->socketp = socketp;
1594 queue_Append(&rx_idleServerQueue, sq);
1598 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1601 return (struct rx_call *)0;
1604 } while (!(call = sq->newcall) &&
1605 !(socketp && *socketp != OSI_NULLSOCKET));
1607 MUTEX_EXIT(&sq->lock);
1609 MUTEX_ENTER(&freeSQEList_lock);
1610 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1611 rx_FreeSQEList = sq;
1612 MUTEX_EXIT(&freeSQEList_lock);
1615 clock_GetTime(&call->startTime);
1616 call->state = RX_STATE_ACTIVE;
1617 call->mode = RX_MODE_RECEIVING;
1619 rxi_calltrace(RX_CALL_START, call);
1620 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1621 call->conn->service->servicePort,
1622 call->conn->service->serviceId, call));
1624 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1632 #endif /* RX_ENABLE_LOCKS */
1636 /* Establish a procedure to be called when a packet arrives for a
1637 * call. This routine will be called at most once after each call,
1638 * and will also be called if there is an error condition on the or
1639 * the call is complete. Used by multi rx to build a selection
1640 * function which determines which of several calls is likely to be a
1641 * good one to read from.
1642 * NOTE: the way this is currently implemented it is probably only a
1643 * good idea to (1) use it immediately after a newcall (clients only)
1644 * and (2) only use it once. Other uses currently void your warranty
1646 void rx_SetArrivalProc(call, proc, handle, arg)
1647 register struct rx_call *call;
1648 register VOID (*proc)();
1649 register VOID *handle;
1652 call->arrivalProc = proc;
1653 call->arrivalProcHandle = handle;
1654 call->arrivalProcArg = arg;
1657 /* Call is finished (possibly prematurely). Return rc to the peer, if
1658 * appropriate, and return the final error code from the conversation
1661 afs_int32 rx_EndCall(call, rc)
1662 register struct rx_call *call;
1665 register struct rx_connection *conn = call->conn;
1666 register struct rx_service *service;
1667 register struct rx_packet *tp; /* Temporary packet pointer */
1668 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1672 dpf(("rx_EndCall(call %x)\n", call));
1676 MUTEX_ENTER(&call->lock);
1678 if (rc == 0 && call->error == 0) {
1679 call->abortCode = 0;
1680 call->abortCount = 0;
1683 call->arrivalProc = (VOID (*)()) 0;
1684 if (rc && call->error == 0) {
1685 rxi_CallError(call, rc);
1686 /* Send an abort message to the peer if this error code has
1687 * only just been set. If it was set previously, assume the
1688 * peer has already been sent the error code or will request it
1690 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1692 if (conn->type == RX_SERVER_CONNECTION) {
1693 /* Make sure reply or at least dummy reply is sent */
1694 if (call->mode == RX_MODE_RECEIVING) {
1695 rxi_WriteProc(call, 0, 0);
1697 if (call->mode == RX_MODE_SENDING) {
1698 rxi_FlushWrite(call);
1700 service = conn->service;
1701 rxi_calltrace(RX_CALL_END, call);
1702 /* Call goes to hold state until reply packets are acknowledged */
1703 if (call->tfirst + call->nSoftAcked < call->tnext) {
1704 call->state = RX_STATE_HOLD;
1706 call->state = RX_STATE_DALLY;
1707 rxi_ClearTransmitQueue(call, 0);
1708 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1709 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1712 else { /* Client connection */
1714 /* Make sure server receives input packets, in the case where
1715 * no reply arguments are expected */
1716 if ((call->mode == RX_MODE_SENDING)
1717 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1718 (void) rxi_ReadProc(call, &dummy, 1);
1720 /* We need to release the call lock since it's lower than the
1721 * conn_call_lock and we don't want to hold the conn_call_lock
1722 * over the rx_ReadProc call. The conn_call_lock needs to be held
1723 * here for the case where rx_NewCall is perusing the calls on
1724 * the connection structure. We don't want to signal until
1725 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1726 * have checked this call, found it active and by the time it
1727 * goes to sleep, will have missed the signal.
1729 MUTEX_EXIT(&call->lock);
1730 MUTEX_ENTER(&conn->conn_call_lock);
1731 MUTEX_ENTER(&call->lock);
1732 MUTEX_ENTER(&conn->conn_data_lock);
1733 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1734 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1735 MUTEX_EXIT(&conn->conn_data_lock);
1736 #ifdef RX_ENABLE_LOCKS
1737 CV_BROADCAST(&conn->conn_call_cv);
1742 #ifdef RX_ENABLE_LOCKS
1744 MUTEX_EXIT(&conn->conn_data_lock);
1746 #endif /* RX_ENABLE_LOCKS */
1747 call->state = RX_STATE_DALLY;
1749 error = call->error;
1751 /* currentPacket, nLeft, and NFree must be zeroed here, because
1752 * ResetCall cannot: ResetCall may be called at splnet(), in the
1753 * kernel version, and may interrupt the macros rx_Read or
1754 * rx_Write, which run at normal priority for efficiency. */
1755 if (call->currentPacket) {
1756 rxi_FreePacket(call->currentPacket);
1757 call->currentPacket = (struct rx_packet *) 0;
1758 call->nLeft = call->nFree = call->curlen = 0;
1761 call->nLeft = call->nFree = call->curlen = 0;
1763 /* Free any packets from the last call to ReadvProc/WritevProc */
1764 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1769 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1770 MUTEX_EXIT(&call->lock);
1771 if (conn->type == RX_CLIENT_CONNECTION)
1772 MUTEX_EXIT(&conn->conn_call_lock);
1776 * Map errors to the local host's errno.h format.
1778 error = ntoh_syserr_conv(error);
1782 #if !defined(KERNEL)
1784 /* Call this routine when shutting down a server or client (especially
1785 * clients). This will allow Rx to gracefully garbage collect server
1786 * connections, and reduce the number of retries that a server might
1787 * make to a dead client.
1788 * This is not quite right, since some calls may still be ongoing and
1789 * we can't lock them to destroy them. */
1790 void rx_Finalize() {
1791 register struct rx_connection **conn_ptr, **conn_end;
1795 if (rxinit_status == 1) {
1797 return; /* Already shutdown. */
1799 rxi_DeleteCachedConnections();
1800 if (rx_connHashTable) {
1801 MUTEX_ENTER(&rx_connHashTable_lock);
1802 for (conn_ptr = &rx_connHashTable[0],
1803 conn_end = &rx_connHashTable[rx_hashTableSize];
1804 conn_ptr < conn_end; conn_ptr++) {
1805 struct rx_connection *conn, *next;
1806 for (conn = *conn_ptr; conn; conn = next) {
1808 if (conn->type == RX_CLIENT_CONNECTION) {
1809 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1811 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1812 #ifdef RX_ENABLE_LOCKS
1813 rxi_DestroyConnectionNoLock(conn);
1814 #else /* RX_ENABLE_LOCKS */
1815 rxi_DestroyConnection(conn);
1816 #endif /* RX_ENABLE_LOCKS */
1820 #ifdef RX_ENABLE_LOCKS
1821 while (rx_connCleanup_list) {
1822 struct rx_connection *conn;
1823 conn = rx_connCleanup_list;
1824 rx_connCleanup_list = rx_connCleanup_list->next;
1825 MUTEX_EXIT(&rx_connHashTable_lock);
1826 rxi_CleanupConnection(conn);
1827 MUTEX_ENTER(&rx_connHashTable_lock);
1829 MUTEX_EXIT(&rx_connHashTable_lock);
1830 #endif /* RX_ENABLE_LOCKS */
1839 /* if we wakeup packet waiter too often, can get in loop with two
1840 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1842 rxi_PacketsUnWait() {
1844 if (!rx_waitingForPackets) {
1848 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1849 return; /* still over quota */
1852 rx_waitingForPackets = 0;
1853 #ifdef RX_ENABLE_LOCKS
1854 CV_BROADCAST(&rx_waitingForPackets_cv);
1856 osi_rxWakeup(&rx_waitingForPackets);
1862 /* ------------------Internal interfaces------------------------- */
1864 /* Return this process's service structure for the
1865 * specified socket and service */
1866 struct rx_service *rxi_FindService(socket, serviceId)
1867 register osi_socket socket;
1868 register u_short serviceId;
1870 register struct rx_service **sp;
1871 for (sp = &rx_services[0]; *sp; sp++) {
1872 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1878 /* Allocate a call structure, for the indicated channel of the
1879 * supplied connection. The mode and state of the call must be set by
1881 struct rx_call *rxi_NewCall(conn, channel)
1882 register struct rx_connection *conn;
1883 register int channel;
1885 register struct rx_call *call;
1886 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1887 register struct rx_call *cp; /* Call pointer temp */
1888 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1889 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1891 /* Grab an existing call structure, or allocate a new one.
1892 * Existing call structures are assumed to have been left reset by
1894 MUTEX_ENTER(&rx_freeCallQueue_lock);
1896 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1898 * EXCEPT that the TQ might not yet be cleared out.
1899 * Skip over those with in-use TQs.
1902 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1903 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1909 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1910 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1911 call = queue_First(&rx_freeCallQueue, rx_call);
1912 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1914 MUTEX_ENTER(&rx_stats_mutex);
1915 rx_stats.nFreeCallStructs--;
1916 MUTEX_EXIT(&rx_stats_mutex);
1917 MUTEX_EXIT(&rx_freeCallQueue_lock);
1918 MUTEX_ENTER(&call->lock);
1919 CLEAR_CALL_QUEUE_LOCK(call);
1920 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1921 /* Now, if TQ wasn't cleared earlier, do it now. */
1922 if (call->flags & RX_CALL_TQ_CLEARME) {
1923 rxi_ClearTransmitQueue(call, 0);
1924 queue_Init(&call->tq);
1926 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1927 /* Bind the call to its connection structure */
1929 rxi_ResetCall(call, 1);
1932 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1934 MUTEX_EXIT(&rx_freeCallQueue_lock);
1935 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1936 MUTEX_ENTER(&call->lock);
1937 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1938 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1939 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1941 MUTEX_ENTER(&rx_stats_mutex);
1942 rx_stats.nCallStructs++;
1943 MUTEX_EXIT(&rx_stats_mutex);
1944 /* Initialize once-only items */
1945 queue_Init(&call->tq);
1946 queue_Init(&call->rq);
1947 queue_Init(&call->iovq);
1948 /* Bind the call to its connection structure (prereq for reset) */
1950 rxi_ResetCall(call, 1);
1952 call->channel = channel;
1953 call->callNumber = &conn->callNumber[channel];
1954 /* Note that the next expected call number is retained (in
1955 * conn->callNumber[i]), even if we reallocate the call structure
1957 conn->call[channel] = call;
1958 /* if the channel's never been used (== 0), we should start at 1, otherwise
1959 the call number is valid from the last time this channel was used */
1960 if (*call->callNumber == 0) *call->callNumber = 1;
1962 MUTEX_EXIT(&call->lock);
1966 /* A call has been inactive long enough that so we can throw away
1967 * state, including the call structure, which is placed on the call
1969 * Call is locked upon entry.
1971 #ifdef RX_ENABLE_LOCKS
1972 void rxi_FreeCall(call, haveCTLock)
1973 int haveCTLock; /* Set if called from rxi_ReapConnections */
1974 #else /* RX_ENABLE_LOCKS */
1975 void rxi_FreeCall(call)
1976 #endif /* RX_ENABLE_LOCKS */
1977 register struct rx_call *call;
1979 register int channel = call->channel;
1980 register struct rx_connection *conn = call->conn;
1983 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
1984 (*call->callNumber)++;
1985 rxi_ResetCall(call, 0);
1986 call->conn->call[channel] = (struct rx_call *) 0;
1988 MUTEX_ENTER(&rx_freeCallQueue_lock);
1989 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
1990 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1991 /* A call may be free even though its transmit queue is still in use.
1992 * Since we search the call list from head to tail, put busy calls at
1993 * the head of the list, and idle calls at the tail.
1995 if (call->flags & RX_CALL_TQ_BUSY)
1996 queue_Prepend(&rx_freeCallQueue, call);
1998 queue_Append(&rx_freeCallQueue, call);
1999 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2000 queue_Append(&rx_freeCallQueue, call);
2001 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2002 MUTEX_ENTER(&rx_stats_mutex);
2003 rx_stats.nFreeCallStructs++;
2004 MUTEX_EXIT(&rx_stats_mutex);
2006 MUTEX_EXIT(&rx_freeCallQueue_lock);
2008 /* Destroy the connection if it was previously slated for
2009 * destruction, i.e. the Rx client code previously called
2010 * rx_DestroyConnection (client connections), or
2011 * rxi_ReapConnections called the same routine (server
2012 * connections). Only do this, however, if there are no
2013 * outstanding calls. Note that for fine grain locking, there appears
2014 * to be a deadlock in that rxi_FreeCall has a call locked and
2015 * DestroyConnectionNoLock locks each call in the conn. But note a
2016 * few lines up where we have removed this call from the conn.
2017 * If someone else destroys a connection, they either have no
2018 * call lock held or are going through this section of code.
2020 if (conn->flags & RX_CONN_DESTROY_ME) {
2021 MUTEX_ENTER(&conn->conn_data_lock);
2023 MUTEX_EXIT(&conn->conn_data_lock);
2024 #ifdef RX_ENABLE_LOCKS
2026 rxi_DestroyConnectionNoLock(conn);
2028 rxi_DestroyConnection(conn);
2029 #else /* RX_ENABLE_LOCKS */
2030 rxi_DestroyConnection(conn);
2031 #endif /* RX_ENABLE_LOCKS */
2035 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2036 char *rxi_Alloc(size)
2037 register size_t size;
2041 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2042 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2045 int glockOwner = ISAFS_GLOCK();
2049 MUTEX_ENTER(&rx_stats_mutex);
2050 rxi_Alloccnt++; rxi_Allocsize += size;
2051 MUTEX_EXIT(&rx_stats_mutex);
2052 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2053 if (size > AFS_SMALLOCSIZ) {
2054 p = (char *) osi_AllocMediumSpace(size);
2056 p = (char *) osi_AllocSmall(size, 1);
2057 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2062 p = (char *) osi_Alloc(size);
2064 if (!p) osi_Panic("rxi_Alloc error");
2069 void rxi_Free(addr, size)
2071 register size_t size;
2073 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2074 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2077 int glockOwner = ISAFS_GLOCK();
2081 MUTEX_ENTER(&rx_stats_mutex);
2082 rxi_Alloccnt--; rxi_Allocsize -= size;
2083 MUTEX_EXIT(&rx_stats_mutex);
2084 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2085 if (size > AFS_SMALLOCSIZ)
2086 osi_FreeMediumSpace(addr);
2088 osi_FreeSmall(addr);
2089 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2094 osi_Free(addr, size);
2098 /* Find the peer process represented by the supplied (host,port)
2099 * combination. If there is no appropriate active peer structure, a
2100 * new one will be allocated and initialized
2101 * The origPeer, if set, is a pointer to a peer structure on which the
2102 * refcount will be be decremented. This is used to replace the peer
2103 * structure hanging off a connection structure */
2104 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2105 register afs_uint32 host;
2106 register u_short port;
2107 struct rx_peer *origPeer;
2110 register struct rx_peer *pp;
2112 hashIndex = PEER_HASH(host, port);
2113 MUTEX_ENTER(&rx_peerHashTable_lock);
2114 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2115 if ((pp->host == host) && (pp->port == port)) break;
2119 pp = rxi_AllocPeer(); /* This bzero's *pp */
2120 pp->host = host; /* set here or in InitPeerParams is zero */
2122 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2123 queue_Init(&pp->congestionQueue);
2124 queue_Init(&pp->rpcStats);
2125 pp->next = rx_peerHashTable[hashIndex];
2126 rx_peerHashTable[hashIndex] = pp;
2127 rxi_InitPeerParams(pp);
2128 MUTEX_ENTER(&rx_stats_mutex);
2129 rx_stats.nPeerStructs++;
2130 MUTEX_EXIT(&rx_stats_mutex);
2137 origPeer->refCount--;
2138 MUTEX_EXIT(&rx_peerHashTable_lock);
2143 /* Find the connection at (host, port) started at epoch, and with the
2144 * given connection id. Creates the server connection if necessary.
2145 * The type specifies whether a client connection or a server
2146 * connection is desired. In both cases, (host, port) specify the
2147 * peer's (host, pair) pair. Client connections are not made
2148 * automatically by this routine. The parameter socket gives the
2149 * socket descriptor on which the packet was received. This is used,
2150 * in the case of server connections, to check that *new* connections
2151 * come via a valid (port, serviceId). Finally, the securityIndex
2152 * parameter must match the existing index for the connection. If a
2153 * server connection is created, it will be created using the supplied
2154 * index, if the index is valid for this service */
2155 struct rx_connection *
2156 rxi_FindConnection(socket, host, port, serviceId, cid,
2157 epoch, type, securityIndex)
2159 register afs_int32 host;
2160 register u_short port;
2165 u_int securityIndex;
2167 int hashindex, flag;
2168 register struct rx_connection *conn;
2169 struct rx_peer *peer;
2170 hashindex = CONN_HASH(host, port, cid, epoch, type);
2171 MUTEX_ENTER(&rx_connHashTable_lock);
2172 rxLastConn ? (conn = rxLastConn, flag = 0) :
2173 (conn = rx_connHashTable[hashindex], flag = 1);
2175 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2176 && (epoch == conn->epoch)) {
2177 register struct rx_peer *pp = conn->peer;
2178 if (securityIndex != conn->securityIndex) {
2179 /* this isn't supposed to happen, but someone could forge a packet
2180 like this, and there seems to be some CM bug that makes this
2181 happen from time to time -- in which case, the fileserver
2183 MUTEX_EXIT(&rx_connHashTable_lock);
2184 return (struct rx_connection *) 0;
2186 /* epoch's high order bits mean route for security reasons only on
2187 * the cid, not the host and port fields.
2189 if (conn->epoch & 0x80000000) break;
2190 if (((type == RX_CLIENT_CONNECTION)
2191 || (pp->host == host)) && (pp->port == port))
2196 /* the connection rxLastConn that was used the last time is not the
2197 ** one we are looking for now. Hence, start searching in the hash */
2199 conn = rx_connHashTable[hashindex];
2205 struct rx_service *service;
2206 if (type == RX_CLIENT_CONNECTION) {
2207 MUTEX_EXIT(&rx_connHashTable_lock);
2208 return (struct rx_connection *) 0;
2210 service = rxi_FindService(socket, serviceId);
2211 if (!service || (securityIndex >= service->nSecurityObjects)
2212 || (service->securityObjects[securityIndex] == 0)) {
2213 MUTEX_EXIT(&rx_connHashTable_lock);
2214 return (struct rx_connection *) 0;
2216 conn = rxi_AllocConnection(); /* This bzero's the connection */
2217 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2219 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2221 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2222 conn->next = rx_connHashTable[hashindex];
2223 rx_connHashTable[hashindex] = conn;
2224 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2225 conn->type = RX_SERVER_CONNECTION;
2226 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2227 conn->epoch = epoch;
2228 conn->cid = cid & RX_CIDMASK;
2229 /* conn->serial = conn->lastSerial = 0; */
2230 /* conn->timeout = 0; */
2231 conn->ackRate = RX_FAST_ACK_RATE;
2232 conn->service = service;
2233 conn->serviceId = serviceId;
2234 conn->securityIndex = securityIndex;
2235 conn->securityObject = service->securityObjects[securityIndex];
2236 conn->nSpecific = 0;
2237 conn->specific = NULL;
2238 rx_SetConnDeadTime(conn, service->connDeadTime);
2239 /* Notify security object of the new connection */
2240 RXS_NewConnection(conn->securityObject, conn);
2241 /* XXXX Connection timeout? */
2242 if (service->newConnProc) (*service->newConnProc)(conn);
2243 MUTEX_ENTER(&rx_stats_mutex);
2244 rx_stats.nServerConns++;
2245 MUTEX_EXIT(&rx_stats_mutex);
2249 /* Ensure that the peer structure is set up in such a way that
2250 ** replies in this connection go back to that remote interface
2251 ** from which the last packet was sent out. In case, this packet's
2252 ** source IP address does not match the peer struct for this conn,
2253 ** then drop the refCount on conn->peer and get a new peer structure.
2254 ** We can check the host,port field in the peer structure without the
2255 ** rx_peerHashTable_lock because the peer structure has its refCount
2256 ** incremented and the only time the host,port in the peer struct gets
2257 ** updated is when the peer structure is created.
2259 if (conn->peer->host == host )
2260 peer = conn->peer; /* no change to the peer structure */
2262 peer = rxi_FindPeer(host, port, conn->peer, 1);
2265 MUTEX_ENTER(&conn->conn_data_lock);
2268 MUTEX_EXIT(&conn->conn_data_lock);
2270 rxLastConn = conn; /* store this connection as the last conn used */
2271 MUTEX_EXIT(&rx_connHashTable_lock);
2275 /* There are two packet tracing routines available for testing and monitoring
2276 * Rx. One is called just after every packet is received and the other is
2277 * called just before every packet is sent. Received packets, have had their
2278 * headers decoded, and packets to be sent have not yet had their headers
2279 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2280 * containing the network address. Both can be modified. The return value, if
2281 * non-zero, indicates that the packet should be dropped. */
2283 int (*rx_justReceived)() = 0;
2284 int (*rx_almostSent)() = 0;
2286 /* A packet has been received off the interface. Np is the packet, socket is
2287 * the socket number it was received from (useful in determining which service
2288 * this packet corresponds to), and (host, port) reflect the host,port of the
2289 * sender. This call returns the packet to the caller if it is finished with
2290 * it, rather than de-allocating it, just as a small performance hack */
2292 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2293 register struct rx_packet *np;
2298 struct rx_call **newcallp;
2300 register struct rx_call *call;
2301 register struct rx_connection *conn;
2303 afs_uint32 currentCallNumber;
2309 struct rx_packet *tnp;
2312 /* We don't print out the packet until now because (1) the time may not be
2313 * accurate enough until now in the lwp implementation (rx_Listener only gets
2314 * the time after the packet is read) and (2) from a protocol point of view,
2315 * this is the first time the packet has been seen */
2316 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2317 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2318 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2319 np->header.serial, packetType, host, port, np->header.serviceId,
2320 np->header.epoch, np->header.cid, np->header.callNumber,
2321 np->header.seq, np->header.flags, np));
2324 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2325 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2328 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2329 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2332 /* If an input tracer function is defined, call it with the packet and
2333 * network address. Note this function may modify its arguments. */
2334 if (rx_justReceived) {
2335 struct sockaddr_in addr;
2337 addr.sin_family = AF_INET;
2338 addr.sin_port = port;
2339 addr.sin_addr.s_addr = host;
2340 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2341 addr.sin_len = sizeof(addr);
2342 #endif /* AFS_OSF_ENV */
2343 drop = (*rx_justReceived) (np, &addr);
2344 /* drop packet if return value is non-zero */
2345 if (drop) return np;
2346 port = addr.sin_port; /* in case fcn changed addr */
2347 host = addr.sin_addr.s_addr;
2351 /* If packet was not sent by the client, then *we* must be the client */
2352 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2353 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2355 /* Find the connection (or fabricate one, if we're the server & if
2356 * necessary) associated with this packet */
2357 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2358 np->header.cid, np->header.epoch, type,
2359 np->header.securityIndex);
2362 /* If no connection found or fabricated, just ignore the packet.
2363 * (An argument could be made for sending an abort packet for
2368 MUTEX_ENTER(&conn->conn_data_lock);
2369 if (conn->maxSerial < np->header.serial)
2370 conn->maxSerial = np->header.serial;
2371 MUTEX_EXIT(&conn->conn_data_lock);
2373 /* If the connection is in an error state, send an abort packet and ignore
2374 * the incoming packet */
2376 /* Don't respond to an abort packet--we don't want loops! */
2377 MUTEX_ENTER(&conn->conn_data_lock);
2378 if (np->header.type != RX_PACKET_TYPE_ABORT)
2379 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2381 MUTEX_EXIT(&conn->conn_data_lock);
2385 /* Check for connection-only requests (i.e. not call specific). */
2386 if (np->header.callNumber == 0) {
2387 switch (np->header.type) {
2388 case RX_PACKET_TYPE_ABORT:
2389 /* What if the supplied error is zero? */
2390 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2391 MUTEX_ENTER(&conn->conn_data_lock);
2393 MUTEX_EXIT(&conn->conn_data_lock);
2395 case RX_PACKET_TYPE_CHALLENGE:
2396 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2397 MUTEX_ENTER(&conn->conn_data_lock);
2399 MUTEX_EXIT(&conn->conn_data_lock);
2401 case RX_PACKET_TYPE_RESPONSE:
2402 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2403 MUTEX_ENTER(&conn->conn_data_lock);
2405 MUTEX_EXIT(&conn->conn_data_lock);
2407 case RX_PACKET_TYPE_PARAMS:
2408 case RX_PACKET_TYPE_PARAMS+1:
2409 case RX_PACKET_TYPE_PARAMS+2:
2410 /* ignore these packet types for now */
2411 MUTEX_ENTER(&conn->conn_data_lock);
2413 MUTEX_EXIT(&conn->conn_data_lock);
2418 /* Should not reach here, unless the peer is broken: send an
2420 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2421 MUTEX_ENTER(&conn->conn_data_lock);
2422 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2424 MUTEX_EXIT(&conn->conn_data_lock);
2429 channel = np->header.cid & RX_CHANNELMASK;
2430 call = conn->call[channel];
2431 #ifdef RX_ENABLE_LOCKS
2433 MUTEX_ENTER(&call->lock);
2434 /* Test to see if call struct is still attached to conn. */
2435 if (call != conn->call[channel]) {
2437 MUTEX_EXIT(&call->lock);
2438 if (type == RX_SERVER_CONNECTION) {
2439 call = conn->call[channel];
2440 /* If we started with no call attached and there is one now,
2441 * another thread is also running this routine and has gotten
2442 * the connection channel. We should drop this packet in the tests
2443 * below. If there was a call on this connection and it's now
2444 * gone, then we'll be making a new call below.
2445 * If there was previously a call and it's now different then
2446 * the old call was freed and another thread running this routine
2447 * has created a call on this channel. One of these two threads
2448 * has a packet for the old call and the code below handles those
2452 MUTEX_ENTER(&call->lock);
2455 /* This packet can't be for this call. If the new call address is
2456 * 0 then no call is running on this channel. If there is a call
2457 * then, since this is a client connection we're getting data for
2458 * it must be for the previous call.
2460 MUTEX_ENTER(&rx_stats_mutex);
2461 rx_stats.spuriousPacketsRead++;
2462 MUTEX_EXIT(&rx_stats_mutex);
2463 MUTEX_ENTER(&conn->conn_data_lock);
2465 MUTEX_EXIT(&conn->conn_data_lock);
2470 currentCallNumber = conn->callNumber[channel];
2472 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2473 if (np->header.callNumber < currentCallNumber) {
2474 MUTEX_ENTER(&rx_stats_mutex);
2475 rx_stats.spuriousPacketsRead++;
2476 MUTEX_EXIT(&rx_stats_mutex);
2477 #ifdef RX_ENABLE_LOCKS
2479 MUTEX_EXIT(&call->lock);
2481 MUTEX_ENTER(&conn->conn_data_lock);
2483 MUTEX_EXIT(&conn->conn_data_lock);
2487 call = rxi_NewCall(conn, channel);
2488 MUTEX_ENTER(&call->lock);
2489 *call->callNumber = np->header.callNumber;
2490 call->state = RX_STATE_PRECALL;
2491 clock_GetTime(&call->queueTime);
2492 hzero(call->bytesSent);
2493 hzero(call->bytesRcvd);
2494 rxi_KeepAliveOn(call);
2496 else if (np->header.callNumber != currentCallNumber) {
2497 /* Wait until the transmit queue is idle before deciding
2498 * whether to reset the current call. Chances are that the
2499 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2502 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2503 while ((call->state == RX_STATE_ACTIVE) &&
2504 (call->flags & RX_CALL_TQ_BUSY)) {
2505 call->flags |= RX_CALL_TQ_WAIT;
2506 #ifdef RX_ENABLE_LOCKS
2507 CV_WAIT(&call->cv_tq, &call->lock);
2508 #else /* RX_ENABLE_LOCKS */
2509 osi_rxSleep(&call->tq);
2510 #endif /* RX_ENABLE_LOCKS */
2512 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2513 /* If the new call cannot be taken right now send a busy and set
2514 * the error condition in this call, so that it terminates as
2515 * quickly as possible */
2516 if (call->state == RX_STATE_ACTIVE) {
2517 struct rx_packet *tp;
2519 rxi_CallError(call, RX_CALL_DEAD);
2520 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2521 MUTEX_EXIT(&call->lock);
2522 MUTEX_ENTER(&conn->conn_data_lock);
2524 MUTEX_EXIT(&conn->conn_data_lock);
2527 rxi_ResetCall(call, 0);
2528 *call->callNumber = np->header.callNumber;
2529 call->state = RX_STATE_PRECALL;
2530 clock_GetTime(&call->queueTime);
2531 hzero(call->bytesSent);
2532 hzero(call->bytesRcvd);
2534 * If the number of queued calls exceeds the overload
2535 * threshold then abort this call.
2537 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2538 struct rx_packet *tp;
2540 rxi_CallError(call, rx_BusyError);
2541 tp = rxi_SendCallAbort(call, np, 1, 0);
2542 MUTEX_EXIT(&call->lock);
2543 MUTEX_ENTER(&conn->conn_data_lock);
2545 MUTEX_EXIT(&conn->conn_data_lock);
2548 rxi_KeepAliveOn(call);
2551 /* Continuing call; do nothing here. */
2553 } else { /* we're the client */
2554 /* Ignore all incoming acknowledgements for calls in DALLY state */
2555 if ( call && (call->state == RX_STATE_DALLY)
2556 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2557 MUTEX_ENTER(&rx_stats_mutex);
2558 rx_stats.ignorePacketDally++;
2559 MUTEX_EXIT(&rx_stats_mutex);
2560 #ifdef RX_ENABLE_LOCKS
2562 MUTEX_EXIT(&call->lock);
2565 MUTEX_ENTER(&conn->conn_data_lock);
2567 MUTEX_EXIT(&conn->conn_data_lock);
2571 /* Ignore anything that's not relevant to the current call. If there
2572 * isn't a current call, then no packet is relevant. */
2573 if (!call || (np->header.callNumber != currentCallNumber)) {
2574 MUTEX_ENTER(&rx_stats_mutex);
2575 rx_stats.spuriousPacketsRead++;
2576 MUTEX_EXIT(&rx_stats_mutex);
2577 #ifdef RX_ENABLE_LOCKS
2579 MUTEX_EXIT(&call->lock);
2582 MUTEX_ENTER(&conn->conn_data_lock);
2584 MUTEX_EXIT(&conn->conn_data_lock);
2587 /* If the service security object index stamped in the packet does not
2588 * match the connection's security index, ignore the packet */
2589 if (np->header.securityIndex != conn->securityIndex) {
2590 #ifdef RX_ENABLE_LOCKS
2591 MUTEX_EXIT(&call->lock);
2593 MUTEX_ENTER(&conn->conn_data_lock);
2595 MUTEX_EXIT(&conn->conn_data_lock);
2599 /* If we're receiving the response, then all transmit packets are
2600 * implicitly acknowledged. Get rid of them. */
2601 if (np->header.type == RX_PACKET_TYPE_DATA) {
2602 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2603 /* XXX Hack. Because we must release the global rx lock when
2604 * sending packets (osi_NetSend) we drop all acks while we're
2605 * traversing the tq in rxi_Start sending packets out because
2606 * packets may move to the freePacketQueue as result of being here!
2607 * So we drop these packets until we're safely out of the
2608 * traversing. Really ugly!
2609 * For fine grain RX locking, we set the acked field in the
2610 * packets and let rxi_Start remove them from the transmit queue.
2612 if (call->flags & RX_CALL_TQ_BUSY) {
2613 #ifdef RX_ENABLE_LOCKS
2614 rxi_SetAcksInTransmitQueue(call);
2617 return np; /* xmitting; drop packet */
2621 rxi_ClearTransmitQueue(call, 0);
2623 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2624 rxi_ClearTransmitQueue(call, 0);
2625 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2627 if (np->header.type == RX_PACKET_TYPE_ACK) {
2628 /* now check to see if this is an ack packet acknowledging that the
2629 * server actually *lost* some hard-acked data. If this happens we
2630 * ignore this packet, as it may indicate that the server restarted in
2631 * the middle of a call. It is also possible that this is an old ack
2632 * packet. We don't abort the connection in this case, because this
2633 * *might* just be an old ack packet. The right way to detect a server
2634 * restart in the midst of a call is to notice that the server epoch
2636 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2637 * XXX unacknowledged. I think that this is off-by-one, but
2638 * XXX I don't dare change it just yet, since it will
2639 * XXX interact badly with the server-restart detection
2640 * XXX code in receiveackpacket. */
2641 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2642 MUTEX_ENTER(&rx_stats_mutex);
2643 rx_stats.spuriousPacketsRead++;
2644 MUTEX_EXIT(&rx_stats_mutex);
2645 MUTEX_EXIT(&call->lock);
2646 MUTEX_ENTER(&conn->conn_data_lock);
2648 MUTEX_EXIT(&conn->conn_data_lock);
2652 } /* else not a data packet */
2655 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2656 /* Set remote user defined status from packet */
2657 call->remoteStatus = np->header.userStatus;
2659 /* Note the gap between the expected next packet and the actual
2660 * packet that arrived, when the new packet has a smaller serial number
2661 * than expected. Rioses frequently reorder packets all by themselves,
2662 * so this will be quite important with very large window sizes.
2663 * Skew is checked against 0 here to avoid any dependence on the type of
2664 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2666 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2667 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2668 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2670 MUTEX_ENTER(&conn->conn_data_lock);
2671 skew = conn->lastSerial - np->header.serial;
2672 conn->lastSerial = np->header.serial;
2673 MUTEX_EXIT(&conn->conn_data_lock);
2675 register struct rx_peer *peer;
2677 if (skew > peer->inPacketSkew) {
2678 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2679 peer->inPacketSkew = skew;
2683 /* Now do packet type-specific processing */
2684 switch (np->header.type) {
2685 case RX_PACKET_TYPE_DATA:
2686 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2689 case RX_PACKET_TYPE_ACK:
2690 /* Respond immediately to ack packets requesting acknowledgement
2692 if (np->header.flags & RX_REQUEST_ACK) {
2693 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2694 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2696 np = rxi_ReceiveAckPacket(call, np, 1);
2698 case RX_PACKET_TYPE_ABORT:
2699 /* An abort packet: reset the connection, passing the error up to
2701 /* What if error is zero? */
2702 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2704 case RX_PACKET_TYPE_BUSY:
2707 case RX_PACKET_TYPE_ACKALL:
2708 /* All packets acknowledged, so we can drop all packets previously
2709 * readied for sending */
2710 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2711 /* XXX Hack. We because we can't release the global rx lock when
2712 * sending packets (osi_NetSend) we drop all ack pkts while we're
2713 * traversing the tq in rxi_Start sending packets out because
2714 * packets may move to the freePacketQueue as result of being
2715 * here! So we drop these packets until we're safely out of the
2716 * traversing. Really ugly!
2717 * For fine grain RX locking, we set the acked field in the packets
2718 * and let rxi_Start remove the packets from the transmit queue.
2720 if (call->flags & RX_CALL_TQ_BUSY) {
2721 #ifdef RX_ENABLE_LOCKS
2722 rxi_SetAcksInTransmitQueue(call);
2724 #else /* RX_ENABLE_LOCKS */
2726 return np; /* xmitting; drop packet */
2727 #endif /* RX_ENABLE_LOCKS */
2729 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2730 rxi_ClearTransmitQueue(call, 0);
2733 /* Should not reach here, unless the peer is broken: send an abort
2735 rxi_CallError(call, RX_PROTOCOL_ERROR);
2736 np = rxi_SendCallAbort(call, np, 1, 0);
2739 /* Note when this last legitimate packet was received, for keep-alive
2740 * processing. Note, we delay getting the time until now in the hope that
2741 * the packet will be delivered to the user before any get time is required
2742 * (if not, then the time won't actually be re-evaluated here). */
2743 call->lastReceiveTime = clock_Sec();
2744 MUTEX_EXIT(&call->lock);
2745 MUTEX_ENTER(&conn->conn_data_lock);
2747 MUTEX_EXIT(&conn->conn_data_lock);
2751 /* return true if this is an "interesting" connection from the point of view
2752 of someone trying to debug the system */
2753 int rxi_IsConnInteresting(struct rx_connection *aconn)
2756 register struct rx_call *tcall;
2758 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2760 for(i=0;i<RX_MAXCALLS;i++) {
2761 tcall = aconn->call[i];
2763 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2765 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2773 /* if this is one of the last few packets AND it wouldn't be used by the
2774 receiving call to immediately satisfy a read request, then drop it on
2775 the floor, since accepting it might prevent a lock-holding thread from
2776 making progress in its reading. If a call has been cleared while in
2777 the precall state then ignore all subsequent packets until the call
2778 is assigned to a thread. */
2780 static TooLow(ap, acall)
2781 struct rx_call *acall;
2782 struct rx_packet *ap; {
2784 MUTEX_ENTER(&rx_stats_mutex);
2785 if (((ap->header.seq != 1) &&
2786 (acall->flags & RX_CALL_CLEARED) &&
2787 (acall->state == RX_STATE_PRECALL)) ||
2788 ((rx_nFreePackets < rxi_dataQuota+2) &&
2789 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2790 && (acall->flags & RX_CALL_READER_WAIT)))) {
2793 MUTEX_EXIT(&rx_stats_mutex);
2798 /* try to attach call, if authentication is complete */
2799 static void TryAttach(acall, socket, tnop, newcallp)
2800 register struct rx_call *acall;
2801 register osi_socket socket;
2803 register struct rx_call **newcallp; {
2804 register struct rx_connection *conn;
2806 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2807 /* Don't attach until we have any req'd. authentication. */
2808 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2809 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2810 /* Note: this does not necessarily succeed; there
2811 may not any proc available */
2814 rxi_ChallengeOn(acall->conn);
2819 /* A data packet has been received off the interface. This packet is
2820 * appropriate to the call (the call is in the right state, etc.). This
2821 * routine can return a packet to the caller, for re-use */
2823 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2824 port, tnop, newcallp)
2825 register struct rx_call *call;
2826 register struct rx_packet *np;
2832 struct rx_call **newcallp;
2838 afs_uint32 seq, serial, flags;
2840 struct rx_packet *tnp;
2842 MUTEX_ENTER(&rx_stats_mutex);
2843 rx_stats.dataPacketsRead++;
2844 MUTEX_EXIT(&rx_stats_mutex);
2847 /* If there are no packet buffers, drop this new packet, unless we can find
2848 * packet buffers from inactive calls */
2850 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2851 MUTEX_ENTER(&rx_freePktQ_lock);
2852 rxi_NeedMorePackets = TRUE;
2853 MUTEX_EXIT(&rx_freePktQ_lock);
2854 MUTEX_ENTER(&rx_stats_mutex);
2855 rx_stats.noPacketBuffersOnRead++;
2856 MUTEX_EXIT(&rx_stats_mutex);
2857 call->rprev = np->header.serial;
2858 rxi_calltrace(RX_TRACE_DROP, call);
2859 dpf (("packet %x dropped on receipt - quota problems", np));
2861 rxi_ClearReceiveQueue(call);
2862 clock_GetTime(&when);
2863 clock_Add(&when, &rx_softAckDelay);
2864 if (!call->delayedAckEvent ||
2865 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2866 rxevent_Cancel(call->delayedAckEvent, call,
2867 RX_CALL_REFCOUNT_DELAY);
2868 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2869 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2872 /* we've damaged this call already, might as well do it in. */
2878 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2879 * packet is one of several packets transmitted as a single
2880 * datagram. Do not send any soft or hard acks until all packets
2881 * in a jumbogram have been processed. Send negative acks right away.
2883 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2884 /* tnp is non-null when there are more packets in the
2885 * current jumbo gram */
2892 seq = np->header.seq;
2893 serial = np->header.serial;
2894 flags = np->header.flags;
2896 /* If the call is in an error state, send an abort message */
2898 return rxi_SendCallAbort(call, np, istack, 0);
2900 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2901 * AFS 3.5 jumbogram. */
2902 if (flags & RX_JUMBO_PACKET) {
2903 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2908 if (np->header.spare != 0) {
2909 MUTEX_ENTER(&call->conn->conn_data_lock);
2910 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2911 MUTEX_EXIT(&call->conn->conn_data_lock);
2914 /* The usual case is that this is the expected next packet */
2915 if (seq == call->rnext) {
2917 /* Check to make sure it is not a duplicate of one already queued */
2918 if (queue_IsNotEmpty(&call->rq)
2919 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2920 MUTEX_ENTER(&rx_stats_mutex);
2921 rx_stats.dupPacketsRead++;
2922 MUTEX_EXIT(&rx_stats_mutex);
2923 dpf (("packet %x dropped on receipt - duplicate", np));
2924 rxevent_Cancel(call->delayedAckEvent, call,
2925 RX_CALL_REFCOUNT_DELAY);
2926 np = rxi_SendAck(call, np, seq, serial,
2927 flags, RX_ACK_DUPLICATE, istack);
2933 /* It's the next packet. Stick it on the receive queue
2934 * for this call. Set newPackets to make sure we wake
2935 * the reader once all packets have been processed */
2936 queue_Prepend(&call->rq, np);
2938 np = NULL; /* We can't use this anymore */
2941 /* If an ack is requested then set a flag to make sure we
2942 * send an acknowledgement for this packet */
2943 if (flags & RX_REQUEST_ACK) {
2947 /* Keep track of whether we have received the last packet */
2948 if (flags & RX_LAST_PACKET) {
2949 call->flags |= RX_CALL_HAVE_LAST;
2953 /* Check whether we have all of the packets for this call */
2954 if (call->flags & RX_CALL_HAVE_LAST) {
2955 afs_uint32 tseq; /* temporary sequence number */
2956 struct rx_packet *tp; /* Temporary packet pointer */
2957 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2959 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2960 if (tseq != tp->header.seq)
2962 if (tp->header.flags & RX_LAST_PACKET) {
2963 call->flags |= RX_CALL_RECEIVE_DONE;
2970 /* Provide asynchronous notification for those who want it
2971 * (e.g. multi rx) */
2972 if (call->arrivalProc) {
2973 (*call->arrivalProc)(call, call->arrivalProcHandle,
2974 call->arrivalProcArg);
2975 call->arrivalProc = (VOID (*)()) 0;
2978 /* Update last packet received */
2981 /* If there is no server process serving this call, grab
2982 * one, if available. We only need to do this once. If a
2983 * server thread is available, this thread becomes a server
2984 * thread and the server thread becomes a listener thread. */
2986 TryAttach(call, socket, tnop, newcallp);
2989 /* This is not the expected next packet. */
2991 /* Determine whether this is a new or old packet, and if it's
2992 * a new one, whether it fits into the current receive window.
2993 * Also figure out whether the packet was delivered in sequence.
2994 * We use the prev variable to determine whether the new packet
2995 * is the successor of its immediate predecessor in the
2996 * receive queue, and the missing flag to determine whether
2997 * any of this packets predecessors are missing. */
2999 afs_uint32 prev; /* "Previous packet" sequence number */
3000 struct rx_packet *tp; /* Temporary packet pointer */
3001 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3002 int missing; /* Are any predecessors missing? */
3004 /* If the new packet's sequence number has been sent to the
3005 * application already, then this is a duplicate */
3006 if (seq < call->rnext) {
3007 MUTEX_ENTER(&rx_stats_mutex);
3008 rx_stats.dupPacketsRead++;
3009 MUTEX_EXIT(&rx_stats_mutex);
3010 rxevent_Cancel(call->delayedAckEvent, call,
3011 RX_CALL_REFCOUNT_DELAY);
3012 np = rxi_SendAck(call, np, seq, serial,
3013 flags, RX_ACK_DUPLICATE, istack);
3019 /* If the sequence number is greater than what can be
3020 * accomodated by the current window, then send a negative
3021 * acknowledge and drop the packet */
3022 if ((call->rnext + call->rwind) <= seq) {
3023 rxevent_Cancel(call->delayedAckEvent, call,
3024 RX_CALL_REFCOUNT_DELAY);
3025 np = rxi_SendAck(call, np, seq, serial,
3026 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3032 /* Look for the packet in the queue of old received packets */
3033 for (prev = call->rnext - 1, missing = 0,
3034 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3035 /*Check for duplicate packet */
3036 if (seq == tp->header.seq) {
3037 MUTEX_ENTER(&rx_stats_mutex);
3038 rx_stats.dupPacketsRead++;
3039 MUTEX_EXIT(&rx_stats_mutex);
3040 rxevent_Cancel(call->delayedAckEvent, call,
3041 RX_CALL_REFCOUNT_DELAY);
3042 np = rxi_SendAck(call, np, seq, serial,
3043 flags, RX_ACK_DUPLICATE, istack);
3048 /* If we find a higher sequence packet, break out and
3049 * insert the new packet here. */
3050 if (seq < tp->header.seq) break;
3051 /* Check for missing packet */
3052 if (tp->header.seq != prev+1) {
3056 prev = tp->header.seq;
3059 /* Keep track of whether we have received the last packet. */
3060 if (flags & RX_LAST_PACKET) {
3061 call->flags |= RX_CALL_HAVE_LAST;
3064 /* It's within the window: add it to the the receive queue.
3065 * tp is left by the previous loop either pointing at the
3066 * packet before which to insert the new packet, or at the
3067 * queue head if the queue is empty or the packet should be
3069 queue_InsertBefore(tp, np);
3073 /* Check whether we have all of the packets for this call */
3074 if ((call->flags & RX_CALL_HAVE_LAST)
3075 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3076 afs_uint32 tseq; /* temporary sequence number */
3078 for (tseq = call->rnext,
3079 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3080 if (tseq != tp->header.seq)
3082 if (tp->header.flags & RX_LAST_PACKET) {
3083 call->flags |= RX_CALL_RECEIVE_DONE;
3090 /* We need to send an ack of the packet is out of sequence,
3091 * or if an ack was requested by the peer. */
3092 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3096 /* Acknowledge the last packet for each call */
3097 if (flags & RX_LAST_PACKET) {
3108 * If the receiver is waiting for an iovec, fill the iovec
3109 * using the data from the receive queue */
3110 if (call->flags & RX_CALL_IOVEC_WAIT) {
3111 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3112 /* the call may have been aborted */
3121 /* Wakeup the reader if any */
3122 if ((call->flags & RX_CALL_READER_WAIT) &&
3123 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3124 (call->iovNext >= call->iovMax) ||
3125 (call->flags & RX_CALL_RECEIVE_DONE))) {
3126 call->flags &= ~RX_CALL_READER_WAIT;
3127 #ifdef RX_ENABLE_LOCKS
3128 CV_BROADCAST(&call->cv_rq);
3130 osi_rxWakeup(&call->rq);
3136 * Send an ack when requested by the peer, or once every
3137 * rxi_SoftAckRate packets until the last packet has been
3138 * received. Always send a soft ack for the last packet in
3139 * the server's reply. */
3141 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3142 np = rxi_SendAck(call, np, seq, serial, flags,
3143 RX_ACK_REQUESTED, istack);
3144 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3145 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3146 np = rxi_SendAck(call, np, seq, serial, flags,
3147 RX_ACK_DELAY, istack);
3148 } else if (call->nSoftAcks) {
3149 clock_GetTime(&when);
3150 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3151 clock_Add(&when, &rx_lastAckDelay);
3153 clock_Add(&when, &rx_softAckDelay);
3155 if (!call->delayedAckEvent ||
3156 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3157 rxevent_Cancel(call->delayedAckEvent, call,
3158 RX_CALL_REFCOUNT_DELAY);
3159 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3160 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3163 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3164 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3171 static void rxi_ComputeRate();
3174 /* The real smarts of the whole thing. */
3175 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3176 register struct rx_call *call;
3177 struct rx_packet *np;
3180 struct rx_ackPacket *ap;
3182 register struct rx_packet *tp;
3183 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3184 register struct rx_connection *conn = call->conn;
3185 struct rx_peer *peer = conn->peer;
3188 /* because there are CM's that are bogus, sending weird values for this. */
3189 afs_uint32 skew = 0;
3190 int needRxStart = 0;
3195 int newAckCount = 0;
3196 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3197 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3199 MUTEX_ENTER(&rx_stats_mutex);
3200 rx_stats.ackPacketsRead++;
3201 MUTEX_EXIT(&rx_stats_mutex);
3202 ap = (struct rx_ackPacket *) rx_DataOf(np);
3203 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3205 return np; /* truncated ack packet */
3207 /* depends on ack packet struct */
3208 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3209 first = ntohl(ap->firstPacket);
3210 serial = ntohl(ap->serial);
3211 /* temporarily disabled -- needs to degrade over time
3212 skew = ntohs(ap->maxSkew); */
3214 /* Ignore ack packets received out of order */
3215 if (first < call->tfirst) {
3219 if (np->header.flags & RX_SLOW_START_OK) {
3220 call->flags |= RX_CALL_SLOW_START_OK;
3226 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3227 ap->reason, ntohl(ap->previousPacket), np->header.seq, serial,
3228 skew, ntohl(ap->firstPacket));
3231 for (offset = 0; offset < nAcks; offset++)
3232 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3238 /* if a server connection has been re-created, it doesn't remember what
3239 serial # it was up to. An ack will tell us, since the serial field
3240 contains the largest serial received by the other side */
3241 MUTEX_ENTER(&conn->conn_data_lock);
3242 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3243 conn->serial = serial+1;
3245 MUTEX_EXIT(&conn->conn_data_lock);
3247 /* Update the outgoing packet skew value to the latest value of
3248 * the peer's incoming packet skew value. The ack packet, of
3249 * course, could arrive out of order, but that won't affect things
3251 MUTEX_ENTER(&peer->peer_lock);
3252 peer->outPacketSkew = skew;
3254 /* Check for packets that no longer need to be transmitted, and
3255 * discard them. This only applies to packets positively
3256 * acknowledged as having been sent to the peer's upper level.
3257 * All other packets must be retained. So only packets with
3258 * sequence numbers < ap->firstPacket are candidates. */
3259 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3260 if (tp->header.seq >= first) break;
3261 call->tfirst = tp->header.seq + 1;
3262 if (tp->header.serial == serial) {
3263 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3265 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3268 else if ((tp->firstSerial == serial)) {
3269 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3271 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3274 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3275 /* XXX Hack. Because we have to release the global rx lock when sending
3276 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3277 * in rxi_Start sending packets out because packets may move to the
3278 * freePacketQueue as result of being here! So we drop these packets until
3279 * we're safely out of the traversing. Really ugly!
3280 * To make it even uglier, if we're using fine grain locking, we can
3281 * set the ack bits in the packets and have rxi_Start remove the packets
3282 * when it's done transmitting.
3287 if (call->flags & RX_CALL_TQ_BUSY) {
3288 #ifdef RX_ENABLE_LOCKS
3290 call->flags |= RX_CALL_TQ_SOME_ACKED;
3291 #else /* RX_ENABLE_LOCKS */
3293 #endif /* RX_ENABLE_LOCKS */
3295 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3298 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3303 /* Give rate detector a chance to respond to ping requests */
3304 if (ap->reason == RX_ACK_PING_RESPONSE) {
3305 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3309 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3311 /* Now go through explicit acks/nacks and record the results in
3312 * the waiting packets. These are packets that can't be released
3313 * yet, even with a positive acknowledge. This positive
3314 * acknowledge only means the packet has been received by the
3315 * peer, not that it will be retained long enough to be sent to
3316 * the peer's upper level. In addition, reset the transmit timers
3317 * of any missing packets (those packets that must be missing
3318 * because this packet was out of sequence) */
3320 call->nSoftAcked = 0;
3321 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3322 /* Update round trip time if the ack was stimulated on receipt
3324 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3325 #ifdef RX_ENABLE_LOCKS
3326 if (tp->header.seq >= first) {
3327 #endif /* RX_ENABLE_LOCKS */
3328 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3329 if (tp->header.serial == serial) {
3330 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3332 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3335 else if ((tp->firstSerial == serial)) {
3336 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3338 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3341 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3342 #ifdef RX_ENABLE_LOCKS
3344 #endif /* RX_ENABLE_LOCKS */
3345 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3347 /* Set the acknowledge flag per packet based on the
3348 * information in the ack packet. An acknowlegded packet can
3349 * be downgraded when the server has discarded a packet it
3350 * soacked previously, or when an ack packet is received
3351 * out of sequence. */
3352 if (tp->header.seq < first) {
3353 /* Implicit ack information */
3359 else if (tp->header.seq < first + nAcks) {
3360 /* Explicit ack information: set it in the packet appropriately */
3361 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3381 /* If packet isn't yet acked, and it has been transmitted at least
3382 * once, reset retransmit time using latest timeout
3383 * ie, this should readjust the retransmit timer for all outstanding
3384 * packets... So we don't just retransmit when we should know better*/
3386 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3387 tp->retryTime = tp->timeSent;
3388 clock_Add(&tp->retryTime, &peer->timeout);
3389 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3390 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3394 /* If the window has been extended by this acknowledge packet,
3395 * then wakeup a sender waiting in alloc for window space, or try
3396 * sending packets now, if he's been sitting on packets due to
3397 * lack of window space */
3398 if (call->tnext < (call->tfirst + call->twind)) {
3399 #ifdef RX_ENABLE_LOCKS
3400 CV_SIGNAL(&call->cv_twind);
3402 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3403 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3404 osi_rxWakeup(&call->twind);
3407 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3408 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3412 /* if the ack packet has a receivelen field hanging off it,
3413 * update our state */
3414 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3417 /* If the ack packet has a "recommended" size that is less than
3418 * what I am using now, reduce my size to match */
3419 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3420 sizeof(afs_int32), &tSize);
3421 tSize = (afs_uint32) ntohl(tSize);
3422 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3424 /* Get the maximum packet size to send to this peer */
3425 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3427 tSize = (afs_uint32)ntohl(tSize);
3428 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3429 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3431 /* sanity check - peer might have restarted with different params.
3432 * If peer says "send less", dammit, send less... Peer should never
3433 * be unable to accept packets of the size that prior AFS versions would
3434 * send without asking. */
3435 if (peer->maxMTU != tSize) {
3436 peer->maxMTU = tSize;
3437 peer->MTU = MIN(tSize, peer->MTU);
3438 call->MTU = MIN(call->MTU, tSize);
3442 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3444 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3445 sizeof(afs_int32), &tSize);
3446 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3447 if (tSize < call->twind) { /* smaller than our send */
3448 call->twind = tSize; /* window, we must send less... */
3449 call->ssthresh = MIN(call->twind, call->ssthresh);
3452 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3453 * network MTU confused with the loopback MTU. Calculate the
3454 * maximum MTU here for use in the slow start code below.
3456 maxMTU = peer->maxMTU;
3457 /* Did peer restart with older RX version? */
3458 if (peer->maxDgramPackets > 1) {
3459 peer->maxDgramPackets = 1;
3461 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3463 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3464 sizeof(afs_int32), &tSize);
3465 tSize = (afs_uint32) ntohl(tSize);
3467 * As of AFS 3.5 we set the send window to match the receive window.
3469 if (tSize < call->twind) {
3470 call->twind = tSize;
3471 call->ssthresh = MIN(call->twind, call->ssthresh);
3472 } else if (tSize > call->twind) {
3473 call->twind = tSize;
3477 * As of AFS 3.5, a jumbogram is more than one fixed size
3478 * packet transmitted in a single UDP datagram. If the remote
3479 * MTU is smaller than our local MTU then never send a datagram
3480 * larger than the natural MTU.
3482 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3483 sizeof(afs_int32), &tSize);
3484 maxDgramPackets = (afs_uint32) ntohl(tSize);
3485 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3486 maxDgramPackets = MIN(maxDgramPackets,
3487 (int)(peer->ifDgramPackets));
3488 maxDgramPackets = MIN(maxDgramPackets, tSize);
3489 if (maxDgramPackets > 1) {
3490 peer->maxDgramPackets = maxDgramPackets;
3491 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3493 peer->maxDgramPackets = 1;
3494 call->MTU = peer->natMTU;
3496 } else if (peer->maxDgramPackets > 1) {
3497 /* Restarted with lower version of RX */
3498 peer->maxDgramPackets = 1;
3500 } else if (peer->maxDgramPackets > 1 ||
3501 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3502 /* Restarted with lower version of RX */
3503 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3504 peer->natMTU = OLD_MAX_PACKET_SIZE;
3505 peer->MTU = OLD_MAX_PACKET_SIZE;
3506 peer->maxDgramPackets = 1;
3507 peer->nDgramPackets = 1;
3509 call->MTU = OLD_MAX_PACKET_SIZE;
3514 * Calculate how many datagrams were successfully received after
3515 * the first missing packet and adjust the negative ack counter
3520 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3521 if (call->nNacks < nNacked) {
3522 call->nNacks = nNacked;
3531 if (call->flags & RX_CALL_FAST_RECOVER) {
3533 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3535 call->flags &= ~RX_CALL_FAST_RECOVER;
3536 call->cwind = call->nextCwind;
3537 call->nextCwind = 0;
3540 call->nCwindAcks = 0;
3542 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3543 /* Three negative acks in a row trigger congestion recovery */
3544 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3545 MUTEX_EXIT(&peer->peer_lock);
3546 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3547 /* someone else is waiting to start recovery */
3550 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3551 while (call->flags & RX_CALL_TQ_BUSY) {
3552 call->flags |= RX_CALL_TQ_WAIT;
3553 #ifdef RX_ENABLE_LOCKS
3554 CV_WAIT(&call->cv_tq, &call->lock);
3555 #else /* RX_ENABLE_LOCKS */
3556 osi_rxSleep(&call->tq);
3557 #endif /* RX_ENABLE_LOCKS */
3559 MUTEX_ENTER(&peer->peer_lock);
3560 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3561 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3562 call->flags |= RX_CALL_FAST_RECOVER;
3563 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3564 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3566 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3567 call->nextCwind = call->ssthresh;
3570 peer->MTU = call->MTU;
3571 peer->cwind = call->nextCwind;
3572 peer->nDgramPackets = call->nDgramPackets;
3574 call->congestSeq = peer->congestSeq;
3575 /* Reset the resend times on the packets that were nacked
3576 * so we will retransmit as soon as the window permits*/
3577 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3580 clock_Zero(&tp->retryTime);
3582 } else if (tp->acked) {
3587 /* If cwind is smaller than ssthresh, then increase
3588 * the window one packet for each ack we receive (exponential
3590 * If cwind is greater than or equal to ssthresh then increase
3591 * the congestion window by one packet for each cwind acks we
3592 * receive (linear growth). */
3593 if (call->cwind < call->ssthresh) {
3594 call->cwind = MIN((int)call->ssthresh,
3595 (int)(call->cwind + newAckCount));
3596 call->nCwindAcks = 0;
3598 call->nCwindAcks += newAckCount;
3599 if (call->nCwindAcks >= call->cwind) {
3600 call->nCwindAcks = 0;
3601 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3605 * If we have received several acknowledgements in a row then
3606 * it is time to increase the size of our datagrams
3608 if ((int)call->nAcks > rx_nDgramThreshold) {
3609 if (peer->maxDgramPackets > 1) {
3610 if (call->nDgramPackets < peer->maxDgramPackets) {
3611 call->nDgramPackets++;
3613 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3614 } else if (call->MTU < peer->maxMTU) {
3615 call->MTU += peer->natMTU;
3616 call->MTU = MIN(call->MTU, peer->maxMTU);
3622 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3624 /* Servers need to hold the call until all response packets have
3625 * been acknowledged. Soft acks are good enough since clients
3626 * are not allowed to clear their receive queues. */
3627 if (call->state == RX_STATE_HOLD &&
3628 call->tfirst + call->nSoftAcked >= call->tnext) {
3629 call->state = RX_STATE_DALLY;
3630 rxi_ClearTransmitQueue(call, 0);
3631 } else if (!queue_IsEmpty(&call->tq)) {
3632 rxi_Start(0, call, istack);
3637 /* Received a response to a challenge packet */
3638 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3639 register struct rx_connection *conn;
3640 register struct rx_packet *np;
3645 /* Ignore the packet if we're the client */
3646 if (conn->type == RX_CLIENT_CONNECTION) return np;
3648 /* If already authenticated, ignore the packet (it's probably a retry) */
3649 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3652 /* Otherwise, have the security object evaluate the response packet */
3653 error = RXS_CheckResponse(conn->securityObject, conn, np);
3655 /* If the response is invalid, reset the connection, sending
3656 * an abort to the peer */
3660 rxi_ConnectionError(conn, error);
3661 MUTEX_ENTER(&conn->conn_data_lock);
3662 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3663 MUTEX_EXIT(&conn->conn_data_lock);
3667 /* If the response is valid, any calls waiting to attach
3668 * servers can now do so */
3670 for (i=0; i<RX_MAXCALLS; i++) {
3671 struct rx_call *call = conn->call[i];
3673 MUTEX_ENTER(&call->lock);
3674 if (call->state == RX_STATE_PRECALL)
3675 rxi_AttachServerProc(call, -1, NULL, NULL);
3676 MUTEX_EXIT(&call->lock);
3683 /* A client has received an authentication challenge: the security
3684 * object is asked to cough up a respectable response packet to send
3685 * back to the server. The server is responsible for retrying the
3686 * challenge if it fails to get a response. */
3689 rxi_ReceiveChallengePacket(conn, np, istack)
3690 register struct rx_connection *conn;
3691 register struct rx_packet *np;
3696 /* Ignore the challenge if we're the server */
3697 if (conn->type == RX_SERVER_CONNECTION) return np;
3699 /* Ignore the challenge if the connection is otherwise idle; someone's
3700 * trying to use us as an oracle. */
3701 if (!rxi_HasActiveCalls(conn)) return np;
3703 /* Send the security object the challenge packet. It is expected to fill
3704 * in the response. */
3705 error = RXS_GetResponse(conn->securityObject, conn, np);
3707 /* If the security object is unable to return a valid response, reset the
3708 * connection and send an abort to the peer. Otherwise send the response
3709 * packet to the peer connection. */
3711 rxi_ConnectionError(conn, error);
3712 MUTEX_ENTER(&conn->conn_data_lock);
3713 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3714 MUTEX_EXIT(&conn->conn_data_lock);
3717 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3718 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3724 /* Find an available server process to service the current request in
3725 * the given call structure. If one isn't available, queue up this
3726 * call so it eventually gets one */
3728 rxi_AttachServerProc(call, socket, tnop, newcallp)
3729 register struct rx_call *call;
3730 register osi_socket socket;
3732 register struct rx_call **newcallp;
3734 register struct rx_serverQueueEntry *sq;
3735 register struct rx_service *service = call->conn->service;
3736 #ifdef RX_ENABLE_LOCKS
3737 register int haveQuota = 0;
3738 #endif /* RX_ENABLE_LOCKS */
3739 /* May already be attached */
3740 if (call->state == RX_STATE_ACTIVE) return;
3742 MUTEX_ENTER(&rx_serverPool_lock);
3743 #ifdef RX_ENABLE_LOCKS
3744 while(rxi_ServerThreadSelectingCall) {
3745 MUTEX_EXIT(&call->lock);
3746 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3747 MUTEX_EXIT(&rx_serverPool_lock);
3748 MUTEX_ENTER(&call->lock);
3749 MUTEX_ENTER(&rx_serverPool_lock);
3750 /* Call may have been attached */
3751 if (call->state == RX_STATE_ACTIVE) return;
3754 haveQuota = QuotaOK(service);
3755 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3756 /* If there are no processes available to service this call,
3757 * put the call on the incoming call queue (unless it's
3758 * already on the queue).
3761 ReturnToServerPool(service);
3762 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3763 call->flags |= RX_CALL_WAIT_PROC;
3764 MUTEX_ENTER(&rx_stats_mutex);
3766 MUTEX_EXIT(&rx_stats_mutex);
3767 rxi_calltrace(RX_CALL_ARRIVAL, call);
3768 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3769 queue_Append(&rx_incomingCallQueue, call);
3772 #else /* RX_ENABLE_LOCKS */
3773 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3774 /* If there are no processes available to service this call,
3775 * put the call on the incoming call queue (unless it's
3776 * already on the queue).
3778 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3779 call->flags |= RX_CALL_WAIT_PROC;
3781 rxi_calltrace(RX_CALL_ARRIVAL, call);
3782 queue_Append(&rx_incomingCallQueue, call);
3785 #endif /* RX_ENABLE_LOCKS */
3787 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3789 /* If hot threads are enabled, and both newcallp and sq->socketp
3790 * are non-null, then this thread will process the call, and the
3791 * idle server thread will start listening on this threads socket.
3794 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3797 *sq->socketp = socket;
3798 clock_GetTime(&call->startTime);
3799 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3803 if (call->flags & RX_CALL_WAIT_PROC) {
3804 /* Conservative: I don't think this should happen */
3805 call->flags &= ~RX_CALL_WAIT_PROC;
3806 MUTEX_ENTER(&rx_stats_mutex);
3808 MUTEX_EXIT(&rx_stats_mutex);
3811 call->state = RX_STATE_ACTIVE;
3812 call->mode = RX_MODE_RECEIVING;
3813 if (call->flags & RX_CALL_CLEARED) {
3814 /* send an ack now to start the packet flow up again */
3815 call->flags &= ~RX_CALL_CLEARED;
3816 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3818 #ifdef RX_ENABLE_LOCKS
3821 service->nRequestsRunning++;
3822 if (service->nRequestsRunning <= service->minProcs)
3828 MUTEX_EXIT(&rx_serverPool_lock);
3831 /* Delay the sending of an acknowledge event for a short while, while
3832 * a new call is being prepared (in the case of a client) or a reply
3833 * is being prepared (in the case of a server). Rather than sending
3834 * an ack packet, an ACKALL packet is sent. */
3835 void rxi_AckAll(event, call, dummy)
3836 struct rxevent *event;
3837 register struct rx_call *call;
3840 #ifdef RX_ENABLE_LOCKS
3842 MUTEX_ENTER(&call->lock);
3843 call->delayedAckEvent = (struct rxevent *) 0;
3844 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3846 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3847 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3849 MUTEX_EXIT(&call->lock);
3850 #else /* RX_ENABLE_LOCKS */
3851 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3852 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3853 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3854 #endif /* RX_ENABLE_LOCKS */
3857 void rxi_SendDelayedAck(event, call, dummy)
3858 struct rxevent *event;
3859 register struct rx_call *call;
3862 #ifdef RX_ENABLE_LOCKS
3864 MUTEX_ENTER(&call->lock);
3865 if (event == call->delayedAckEvent)
3866 call->delayedAckEvent = (struct rxevent *) 0;
3867 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3869 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3871 MUTEX_EXIT(&call->lock);
3872 #else /* RX_ENABLE_LOCKS */
3873 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3874 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3875 #endif /* RX_ENABLE_LOCKS */
3879 #ifdef RX_ENABLE_LOCKS
3880 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3881 * clearing them out.
3883 static void rxi_SetAcksInTransmitQueue(call)
3884 register struct rx_call *call;
3886 register struct rx_packet *p, *tp;
3889 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3896 call->flags |= RX_CALL_TQ_CLEARME;
3897 call->flags |= RX_CALL_TQ_SOME_ACKED;
3900 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3901 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3902 call->tfirst = call->tnext;
3903 call->nSoftAcked = 0;
3905 if (call->flags & RX_CALL_FAST_RECOVER) {
3906 call->flags &= ~RX_CALL_FAST_RECOVER;
3907 call->cwind = call->nextCwind;
3908 call->nextCwind = 0;
3911 CV_SIGNAL(&call->cv_twind);
3913 #endif /* RX_ENABLE_LOCKS */
3915 /* Clear out the transmit queue for the current call (all packets have
3916 * been received by peer) */
3917 void rxi_ClearTransmitQueue(call, force)
3918 register struct rx_call *call;
3921 register struct rx_packet *p, *tp;
3923 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3924 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3926 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3933 call->flags |= RX_CALL_TQ_CLEARME;
3934 call->flags |= RX_CALL_TQ_SOME_ACKED;
3937 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3938 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3944 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3945 call->flags &= ~RX_CALL_TQ_CLEARME;
3947 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3949 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3950 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3951 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3952 call->nSoftAcked = 0;
3954 if (call->flags & RX_CALL_FAST_RECOVER) {
3955 call->flags &= ~RX_CALL_FAST_RECOVER;
3956 call->cwind = call->nextCwind;
3959 #ifdef RX_ENABLE_LOCKS
3960 CV_SIGNAL(&call->cv_twind);
3962 osi_rxWakeup(&call->twind);
3966 void rxi_ClearReceiveQueue(call)
3967 register struct rx_call *call;
3969 register struct rx_packet *p, *tp;
3970 if (queue_IsNotEmpty(&call->rq)) {
3971 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
3976 rx_packetReclaims++;
3978 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
3980 if (call->state == RX_STATE_PRECALL) {
3981 call->flags |= RX_CALL_CLEARED;
3985 /* Send an abort packet for the specified call */
3986 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
3987 register struct rx_call *call;
3988 struct rx_packet *packet;
3998 /* Clients should never delay abort messages */
3999 if (rx_IsClientConn(call->conn))
4002 if (call->abortCode != call->error) {
4003 call->abortCode = call->error;
4004 call->abortCount = 0;
4007 if (force || rxi_callAbortThreshhold == 0 ||
4008 call->abortCount < rxi_callAbortThreshhold) {
4009 if (call->delayedAbortEvent) {
4010 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4012 error = htonl(call->error);
4014 packet = rxi_SendSpecial(call, call->conn, packet,
4015 RX_PACKET_TYPE_ABORT, (char *)&error,
4016 sizeof(error), istack);
4017 } else if (!call->delayedAbortEvent) {
4018 clock_GetTime(&when);
4019 clock_Addmsec(&when, rxi_callAbortDelay);
4020 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4021 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4027 /* Send an abort packet for the specified connection. Packet is an
4028 * optional pointer to a packet that can be used to send the abort.
4029 * Once the number of abort messages reaches the threshhold, an
4030 * event is scheduled to send the abort. Setting the force flag
4031 * overrides sending delayed abort messages.
4033 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4034 * to send the abort packet.
4036 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4037 register struct rx_connection *conn;
4038 struct rx_packet *packet;
4048 /* Clients should never delay abort messages */
4049 if (rx_IsClientConn(conn))
4052 if (force || rxi_connAbortThreshhold == 0 ||
4053 conn->abortCount < rxi_connAbortThreshhold) {
4054 if (conn->delayedAbortEvent) {
4055 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4057 error = htonl(conn->error);
4059 MUTEX_EXIT(&conn->conn_data_lock);
4060 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4061 RX_PACKET_TYPE_ABORT, (char *)&error,
4062 sizeof(error), istack);
4063 MUTEX_ENTER(&conn->conn_data_lock);
4064 } else if (!conn->delayedAbortEvent) {
4065 clock_GetTime(&when);
4066 clock_Addmsec(&when, rxi_connAbortDelay);
4067 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4073 /* Associate an error all of the calls owned by a connection. Called
4074 * with error non-zero. This is only for really fatal things, like
4075 * bad authentication responses. The connection itself is set in
4076 * error at this point, so that future packets received will be
4078 void rxi_ConnectionError(conn, error)
4079 register struct rx_connection *conn;
4080 register afs_int32 error;
4084 if (conn->challengeEvent)
4085 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4086 for (i=0; i<RX_MAXCALLS; i++) {
4087 struct rx_call *call = conn->call[i];
4089 MUTEX_ENTER(&call->lock);
4090 rxi_CallError(call, error);
4091 MUTEX_EXIT(&call->lock);
4094 conn->error = error;
4095 MUTEX_ENTER(&rx_stats_mutex);
4096 rx_stats.fatalErrors++;
4097 MUTEX_EXIT(&rx_stats_mutex);
4101 void rxi_CallError(call, error)
4102 register struct rx_call *call;
4105 if (call->error) error = call->error;
4106 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4107 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4108 rxi_ResetCall(call, 0);
4111 rxi_ResetCall(call, 0);
4113 call->error = error;
4114 call->mode = RX_MODE_ERROR;
4117 /* Reset various fields in a call structure, and wakeup waiting
4118 * processes. Some fields aren't changed: state & mode are not
4119 * touched (these must be set by the caller), and bufptr, nLeft, and
4120 * nFree are not reset, since these fields are manipulated by
4121 * unprotected macros, and may only be reset by non-interrupting code.
4124 /* this code requires that call->conn be set properly as a pre-condition. */
4125 #endif /* ADAPT_WINDOW */
4127 void rxi_ResetCall(call, newcall)
4128 register struct rx_call *call;
4129 register int newcall;
4132 register struct rx_peer *peer;
4133 struct rx_packet *packet;
4135 /* Notify anyone who is waiting for asynchronous packet arrival */
4136 if (call->arrivalProc) {
4137 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4138 call->arrivalProc = (VOID (*)()) 0;
4141 if (call->delayedAbortEvent) {
4142 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4143 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4145 rxi_SendCallAbort(call, packet, 0, 1);
4146 rxi_FreePacket(packet);
4151 * Update the peer with the congestion information in this call
4152 * so other calls on this connection can pick up where this call
4153 * left off. If the congestion sequence numbers don't match then
4154 * another call experienced a retransmission.
4156 peer = call->conn->peer;
4157 MUTEX_ENTER(&peer->peer_lock);
4159 if (call->congestSeq == peer->congestSeq) {
4160 peer->cwind = MAX(peer->cwind, call->cwind);
4161 peer->MTU = MAX(peer->MTU, call->MTU);
4162 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4165 call->abortCode = 0;
4166 call->abortCount = 0;
4168 if (peer->maxDgramPackets > 1) {
4169 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4171 call->MTU = peer->MTU;
4173 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4174 call->ssthresh = rx_maxSendWindow;
4175 call->nDgramPackets = peer->nDgramPackets;
4176 call->congestSeq = peer->congestSeq;
4177 MUTEX_EXIT(&peer->peer_lock);
4179 flags = call->flags;
4180 rxi_ClearReceiveQueue(call);
4181 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4182 if (call->flags & RX_CALL_TQ_BUSY) {
4183 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4184 call->flags |= (flags & RX_CALL_TQ_WAIT);
4186 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4188 rxi_ClearTransmitQueue(call, 0);
4189 queue_Init(&call->tq);
4192 queue_Init(&call->rq);
4194 call->rwind = rx_initReceiveWindow;
4195 call->twind = rx_initSendWindow;
4196 call->nSoftAcked = 0;
4197 call->nextCwind = 0;
4200 call->nCwindAcks = 0;
4201 call->nSoftAcks = 0;
4202 call->nHardAcks = 0;
4204 call->tfirst = call->rnext = call->tnext = 1;
4206 call->lastAcked = 0;
4207 call->localStatus = call->remoteStatus = 0;
4209 if (flags & RX_CALL_READER_WAIT) {
4210 #ifdef RX_ENABLE_LOCKS
4211 CV_BROADCAST(&call->cv_rq);
4213 osi_rxWakeup(&call->rq);
4216 if (flags & RX_CALL_WAIT_PACKETS) {
4217 MUTEX_ENTER(&rx_freePktQ_lock);
4218 rxi_PacketsUnWait(); /* XXX */
4219 MUTEX_EXIT(&rx_freePktQ_lock);
4222 #ifdef RX_ENABLE_LOCKS
4223 CV_SIGNAL(&call->cv_twind);
4225 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4226 osi_rxWakeup(&call->twind);
4229 #ifdef RX_ENABLE_LOCKS
4230 /* The following ensures that we don't mess with any queue while some
4231 * other thread might also be doing so. The call_queue_lock field is
4232 * is only modified under the call lock. If the call is in the process
4233 * of being removed from a queue, the call is not locked until the
4234 * the queue lock is dropped and only then is the call_queue_lock field
4235 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4236 * Note that any other routine which removes a call from a queue has to
4237 * obtain the queue lock before examing the queue and removing the call.
4239 if (call->call_queue_lock) {
4240 MUTEX_ENTER(call->call_queue_lock);
4241 if (queue_IsOnQueue(call)) {
4243 if (flags & RX_CALL_WAIT_PROC) {
4244 MUTEX_ENTER(&rx_stats_mutex);
4246 MUTEX_EXIT(&rx_stats_mutex);
4249 MUTEX_EXIT(call->call_queue_lock);
4250 CLEAR_CALL_QUEUE_LOCK(call);
4252 #else /* RX_ENABLE_LOCKS */
4253 if (queue_IsOnQueue(call)) {
4255 if (flags & RX_CALL_WAIT_PROC)
4258 #endif /* RX_ENABLE_LOCKS */
4260 rxi_KeepAliveOff(call);
4261 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4264 /* Send an acknowledge for the indicated packet (seq,serial) of the
4265 * indicated call, for the indicated reason (reason). This
4266 * acknowledge will specifically acknowledge receiving the packet, and
4267 * will also specify which other packets for this call have been
4268 * received. This routine returns the packet that was used to the
4269 * caller. The caller is responsible for freeing it or re-using it.
4270 * This acknowledgement also returns the highest sequence number
4271 * actually read out by the higher level to the sender; the sender
4272 * promises to keep around packets that have not been read by the
4273 * higher level yet (unless, of course, the sender decides to abort
4274 * the call altogether). Any of p, seq, serial, pflags, or reason may
4275 * be set to zero without ill effect. That is, if they are zero, they
4276 * will not convey any information.
4277 * NOW there is a trailer field, after the ack where it will safely be
4278 * ignored by mundanes, which indicates the maximum size packet this
4279 * host can swallow. */
4280 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4281 register struct rx_call *call;
4282 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4283 int seq; /* Sequence number of the packet we are acking */
4284 int serial; /* Serial number of the packet */
4285 int pflags; /* Flags field from packet header */
4286 int reason; /* Reason an acknowledge was prompted */
4289 struct rx_ackPacket *ap;
4290 register struct rx_packet *rqp;
4291 register struct rx_packet *nxp; /* For queue_Scan */
4292 register struct rx_packet *p;
4297 * Open the receive window once a thread starts reading packets
4299 if (call->rnext > 1) {
4300 call->rwind = rx_maxReceiveWindow;
4303 call->nHardAcks = 0;
4304 call->nSoftAcks = 0;
4305 if (call->rnext > call->lastAcked)
4306 call->lastAcked = call->rnext;
4310 rx_computelen(p, p->length); /* reset length, you never know */
4311 } /* where that's been... */
4313 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4314 /* We won't send the ack, but don't panic. */
4315 return optionalPacket;
4318 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4320 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4321 if (!optionalPacket) rxi_FreePacket(p);
4322 return optionalPacket;
4324 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4325 if (rx_Contiguous(p)<templ) {
4326 if (!optionalPacket) rxi_FreePacket(p);
4327 return optionalPacket;
4329 } /* MTUXXX failing to send an ack is very serious. We should */
4330 /* try as hard as possible to send even a partial ack; it's */
4331 /* better than nothing. */
4333 ap = (struct rx_ackPacket *) rx_DataOf(p);
4334 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4335 ap->reason = reason;
4337 /* The skew computation used to be bogus, I think it's better now. */
4338 /* We should start paying attention to skew. XXX */
4339 ap->serial = htonl(call->conn->maxSerial);
4340 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4342 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4343 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4345 /* No fear of running out of ack packet here because there can only be at most
4346 * one window full of unacknowledged packets. The window size must be constrained
4347 * to be less than the maximum ack size, of course. Also, an ack should always
4348 * fit into a single packet -- it should not ever be fragmented. */
4349 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4350 if (!rqp || !call->rq.next
4351 || (rqp->header.seq > (call->rnext + call->rwind))) {
4352 if (!optionalPacket) rxi_FreePacket(p);
4353 rxi_CallError(call, RX_CALL_DEAD);
4354 return optionalPacket;
4357 while (rqp->header.seq > call->rnext + offset)
4358 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4359 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4361 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4362 if (!optionalPacket) rxi_FreePacket(p);
4363 rxi_CallError(call, RX_CALL_DEAD);
4364 return optionalPacket;
4369 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4371 /* these are new for AFS 3.3 */
4372 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4373 templ = htonl(templ);
4374 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4375 templ = htonl(call->conn->peer->ifMTU);
4376 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4378 /* new for AFS 3.4 */
4379 templ = htonl(call->rwind);
4380 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4382 /* new for AFS 3.5 */
4383 templ = htonl(call->conn->peer->ifDgramPackets);
4384 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4386 p->header.serviceId = call->conn->serviceId;
4387 p->header.cid = (call->conn->cid | call->channel);
4388 p->header.callNumber = *call->callNumber;
4389 p->header.seq = seq;
4390 p->header.securityIndex = call->conn->securityIndex;
4391 p->header.epoch = call->conn->epoch;
4392 p->header.type = RX_PACKET_TYPE_ACK;
4393 p->header.flags = RX_SLOW_START_OK;
4394 if (reason == RX_ACK_PING) {
4395 p->header.flags |= RX_REQUEST_ACK;
4397 clock_GetTime(&call->pingRequestTime);
4400 if (call->conn->type == RX_CLIENT_CONNECTION)
4401 p->header.flags |= RX_CLIENT_INITIATED;
4405 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4406 ap->reason, ntohl(ap->previousPacket), p->header.seq,
4407 ntohl(ap->firstPacket));
4409 for (offset = 0; offset < ap->nAcks; offset++)
4410 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4417 register int i, nbytes = p->length;
4419 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4420 if (nbytes <= p->wirevec[i].iov_len) {
4421 register int savelen, saven;
4423 savelen = p->wirevec[i].iov_len;
4425 p->wirevec[i].iov_len = nbytes;
4427 rxi_Send(call, p, istack);
4428 p->wirevec[i].iov_len = savelen;
4432 else nbytes -= p->wirevec[i].iov_len;
4435 MUTEX_ENTER(&rx_stats_mutex);
4436 rx_stats.ackPacketsSent++;
4437 MUTEX_EXIT(&rx_stats_mutex);
4438 if (!optionalPacket) rxi_FreePacket(p);
4439 return optionalPacket; /* Return packet for re-use by caller */
4442 /* Send all of the packets in the list in single datagram */
4443 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime)
4444 struct rx_call *call;
4445 struct rx_packet **list;
4450 struct clock *retryTime;
4455 struct rx_connection *conn = call->conn;
4456 struct rx_peer *peer = conn->peer;
4458 MUTEX_ENTER(&peer->peer_lock);
4460 MUTEX_ENTER(&rx_stats_mutex);
4461 rx_stats.dataPacketsSent += len;
4462 MUTEX_EXIT(&rx_stats_mutex);
4463 MUTEX_EXIT(&peer->peer_lock);
4465 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4469 /* Set the packet flags and schedule the resend events */
4470 /* Only request an ack for the last packet in the list */
4471 for (i = 0 ; i < len ; i++) {
4472 list[i]->retryTime = *retryTime;
4473 if (list[i]->header.serial) {
4474 /* Exponentially backoff retry times */
4475 if (list[i]->backoff < MAXBACKOFF) {
4476 /* so it can't stay == 0 */
4477 list[i]->backoff = (list[i]->backoff << 1) +1;
4479 else list[i]->backoff++;
4480 clock_Addmsec(&(list[i]->retryTime),
4481 ((afs_uint32) list[i]->backoff) << 8);
4484 /* Wait a little extra for the ack on the last packet */
4485 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4486 clock_Addmsec(&(list[i]->retryTime), 400);
4489 /* Record the time sent */
4490 list[i]->timeSent = *now;
4492 /* Ask for an ack on retransmitted packets, on every other packet
4493 * if the peer doesn't support slow start. Ask for an ack on every
4494 * packet until the congestion window reaches the ack rate. */
4495 if (list[i]->header.serial) {
4497 MUTEX_ENTER(&rx_stats_mutex);
4498 rx_stats.dataPacketsReSent++;
4499 MUTEX_EXIT(&rx_stats_mutex);
4501 /* improved RTO calculation- not Karn */
4502 list[i]->firstSent = *now;
4504 && (call->cwind <= (u_short)(conn->ackRate+1)
4505 || (!(call->flags & RX_CALL_SLOW_START_OK)
4506 && (list[i]->header.seq & 1)))) {
4511 MUTEX_ENTER(&peer->peer_lock);
4513 MUTEX_ENTER(&rx_stats_mutex);
4514 rx_stats.dataPacketsSent++;
4515 MUTEX_EXIT(&rx_stats_mutex);
4516 MUTEX_EXIT(&peer->peer_lock);
4518 /* Tag this packet as not being the last in this group,
4519 * for the receiver's benefit */
4520 if (i < len-1 || moreFlag) {
4521 list[i]->header.flags |= RX_MORE_PACKETS;
4524 /* Install the new retransmit time for the packet, and
4525 * record the time sent */
4526 list[i]->timeSent = *now;
4530 list[len-1]->header.flags |= RX_REQUEST_ACK;
4533 /* Since we're about to send a data packet to the peer, it's
4534 * safe to nuke any scheduled end-of-packets ack */
4535 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4537 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4538 MUTEX_EXIT(&call->lock);
4540 rxi_SendPacketList(conn, list, len, istack);
4542 rxi_SendPacket(conn, list[0], istack);
4544 MUTEX_ENTER(&call->lock);
4545 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4547 /* Update last send time for this call (for keep-alive
4548 * processing), and for the connection (so that we can discover
4549 * idle connections) */
4550 conn->lastSendTime = call->lastSendTime = clock_Sec();
4553 /* When sending packets we need to follow these rules:
4554 * 1. Never send more than maxDgramPackets in a jumbogram.
4555 * 2. Never send a packet with more than two iovecs in a jumbogram.
4556 * 3. Never send a retransmitted packet in a jumbogram.
4557 * 4. Never send more than cwind/4 packets in a jumbogram
4558 * We always keep the last list we should have sent so we
4559 * can set the RX_MORE_PACKETS flags correctly.
4561 static void rxi_SendXmitList(call, list, len, istack, now, retryTime)
4562 struct rx_call *call;
4563 struct rx_packet **list;
4567 struct clock *retryTime;
4569 int i, cnt, lastCnt = 0;
4570 struct rx_packet **listP, **lastP = 0;
4571 struct rx_peer *peer = call->conn->peer;
4572 int morePackets = 0;
4574 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4575 /* Does the current packet force us to flush the current list? */
4577 && (list[i]->header.serial
4579 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4581 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime);
4582 /* If the call enters an error state stop sending, or if
4583 * we entered congestion recovery mode, stop sending */
4584 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4592 /* Add the current packet to the list if it hasn't been acked.
4593 * Otherwise adjust the list pointer to skip the current packet. */
4594 if (!list[i]->acked) {
4596 /* Do we need to flush the list? */
4597 if (cnt >= (int)peer->maxDgramPackets
4598 || cnt >= (int)call->nDgramPackets
4599 || cnt >= (int)call->cwind
4600 || list[i]->header.serial
4601 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4603 rxi_SendList(call, lastP, lastCnt, istack, 1,
4605 /* If the call enters an error state stop sending, or if
4606 * we entered congestion recovery mode, stop sending */
4607 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4617 osi_Panic("rxi_SendList error");
4623 /* Send the whole list when the call is in receive mode, when
4624 * the call is in eof mode, when we are in fast recovery mode,
4625 * and when we have the last packet */
4626 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4627 || call->mode == RX_MODE_RECEIVING
4628 || call->mode == RX_MODE_EOF
4629 || (call->flags & RX_CALL_FAST_RECOVER)) {
4630 /* Check for the case where the current list contains
4631 * an acked packet. Since we always send retransmissions
4632 * in a separate packet, we only need to check the first
4633 * packet in the list */
4634 if (cnt > 0 && !listP[0]->acked) {
4638 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4640 /* If the call enters an error state stop sending, or if
4641 * we entered congestion recovery mode, stop sending */
4642 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4646 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime);
4648 } else if (lastCnt > 0) {
4649 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime);
4653 #ifdef RX_ENABLE_LOCKS
4654 /* Call rxi_Start, below, but with the call lock held. */
4655 void rxi_StartUnlocked(event, call, istack)
4656 struct rxevent *event;
4657 register struct rx_call *call;
4660 MUTEX_ENTER(&call->lock);
4661 rxi_Start(event, call, istack);
4662 MUTEX_EXIT(&call->lock);
4664 #endif /* RX_ENABLE_LOCKS */
4666 /* This routine is called when new packets are readied for
4667 * transmission and when retransmission may be necessary, or when the
4668 * transmission window or burst count are favourable. This should be
4669 * better optimized for new packets, the usual case, now that we've
4670 * got rid of queues of send packets. XXXXXXXXXXX */
4671 void rxi_Start(event, call, istack)
4672 struct rxevent *event;
4673 register struct rx_call *call;
4676 struct rx_packet *p;
4677 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4678 struct rx_peer *peer = call->conn->peer;
4679 struct clock now, retryTime;
4683 struct rx_packet **xmitList;
4685 /* If rxi_Start is being called as a result of a resend event,
4686 * then make sure that the event pointer is removed from the call
4687 * structure, since there is no longer a per-call retransmission
4689 if (event && event == call->resendEvent) {
4690 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4691 call->resendEvent = NULL;
4692 if (queue_IsEmpty(&call->tq)) {
4696 /* Timeouts trigger congestion recovery */
4697 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4698 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4699 /* someone else is waiting to start recovery */
4702 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4703 while (call->flags & RX_CALL_TQ_BUSY) {
4704 call->flags |= RX_CALL_TQ_WAIT;
4705 #ifdef RX_ENABLE_LOCKS
4706 CV_WAIT(&call->cv_tq, &call->lock);
4707 #else /* RX_ENABLE_LOCKS */
4708 osi_rxSleep(&call->tq);
4709 #endif /* RX_ENABLE_LOCKS */
4711 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4712 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4713 call->flags |= RX_CALL_FAST_RECOVER;
4714 if (peer->maxDgramPackets > 1) {
4715 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4717 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4719 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4720 call->nDgramPackets = 1;
4722 call->nextCwind = 1;
4725 MUTEX_ENTER(&peer->peer_lock);
4726 peer->MTU = call->MTU;
4727 peer->cwind = call->cwind;
4728 peer->nDgramPackets = 1;
4730 call->congestSeq = peer->congestSeq;
4731 MUTEX_EXIT(&peer->peer_lock);
4732 /* Clear retry times on packets. Otherwise, it's possible for
4733 * some packets in the queue to force resends at rates faster
4734 * than recovery rates.
4736 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4738 clock_Zero(&p->retryTime);
4743 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4744 MUTEX_ENTER(&rx_stats_mutex);
4745 rx_tq_debug.rxi_start_in_error ++;
4746 MUTEX_EXIT(&rx_stats_mutex);
4751 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4752 /* Get clock to compute the re-transmit time for any packets
4753 * in this burst. Note, if we back off, it's reasonable to
4754 * back off all of the packets in the same manner, even if
4755 * some of them have been retransmitted more times than more
4756 * recent additions */
4757 clock_GetTime(&now);
4758 retryTime = now; /* initialize before use */
4759 MUTEX_ENTER(&peer->peer_lock);
4760 clock_Add(&retryTime, &peer->timeout);
4761 MUTEX_EXIT(&peer->peer_lock);
4763 /* Send (or resend) any packets that need it, subject to
4764 * window restrictions and congestion burst control
4765 * restrictions. Ask for an ack on the last packet sent in
4766 * this burst. For now, we're relying upon the window being
4767 * considerably bigger than the largest number of packets that
4768 * are typically sent at once by one initial call to
4769 * rxi_Start. This is probably bogus (perhaps we should ask
4770 * for an ack when we're half way through the current
4771 * window?). Also, for non file transfer applications, this
4772 * may end up asking for an ack for every packet. Bogus. XXXX
4775 * But check whether we're here recursively, and let the other guy
4778 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4779 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4780 call->flags |= RX_CALL_TQ_BUSY;
4782 call->flags &= ~RX_CALL_NEED_START;
4783 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4785 maxXmitPackets = MIN(call->twind, call->cwind);
4786 xmitList = (struct rx_packet **)
4787 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4788 if (xmitList == NULL)
4789 osi_Panic("rxi_Start, failed to allocate xmit list");
4790 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4791 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4792 /* We shouldn't be sending packets if a thread is waiting
4793 * to initiate congestion recovery */
4796 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4797 /* Only send one packet during fast recovery */
4800 if ((p->header.flags == RX_FREE_PACKET) ||
4801 (!queue_IsEnd(&call->tq, nxp)
4802 && (nxp->header.flags == RX_FREE_PACKET)) ||
4803 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4804 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4805 osi_Panic("rxi_Start: xmit queue clobbered");
4808 MUTEX_ENTER(&rx_stats_mutex);
4809 rx_stats.ignoreAckedPacket++;
4810 MUTEX_EXIT(&rx_stats_mutex);
4811 continue; /* Ignore this packet if it has been acknowledged */
4814 /* Turn off all flags except these ones, which are the same
4815 * on each transmission */
4816 p->header.flags &= RX_PRESET_FLAGS;
4818 if (p->header.seq >= call->tfirst +
4819 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4820 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4821 /* Note: if we're waiting for more window space, we can
4822 * still send retransmits; hence we don't return here, but
4823 * break out to schedule a retransmit event */
4824 dpf(("call %d waiting for window", *(call->callNumber)));
4828 /* Transmit the packet if it needs to be sent. */
4829 if (!clock_Lt(&now, &p->retryTime)) {
4830 if (nXmitPackets == maxXmitPackets) {
4831 osi_Panic("rxi_Start: xmit list overflowed");
4833 xmitList[nXmitPackets++] = p;
4837 /* xmitList now hold pointers to all of the packets that are
4838 * ready to send. Now we loop to send the packets */
4839 if (nXmitPackets > 0) {
4840 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4843 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4845 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4847 * TQ references no longer protected by this flag; they must remain
4848 * protected by the global lock.
4850 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4851 call->flags &= ~RX_CALL_TQ_BUSY;
4852 if (call->flags & RX_CALL_TQ_WAIT) {
4853 call->flags &= ~RX_CALL_TQ_WAIT;
4854 #ifdef RX_ENABLE_LOCKS
4855 CV_BROADCAST(&call->cv_tq);
4856 #else /* RX_ENABLE_LOCKS */
4857 osi_rxWakeup(&call->tq);
4858 #endif /* RX_ENABLE_LOCKS */
4863 /* We went into the error state while sending packets. Now is
4864 * the time to reset the call. This will also inform the using
4865 * process that the call is in an error state.
4867 MUTEX_ENTER(&rx_stats_mutex);
4868 rx_tq_debug.rxi_start_aborted ++;
4869 MUTEX_EXIT(&rx_stats_mutex);
4870 call->flags &= ~RX_CALL_TQ_BUSY;
4871 if (call->flags & RX_CALL_TQ_WAIT) {
4872 call->flags &= ~RX_CALL_TQ_WAIT;
4873 #ifdef RX_ENABLE_LOCKS
4874 CV_BROADCAST(&call->cv_tq);
4875 #else /* RX_ENABLE_LOCKS */
4876 osi_rxWakeup(&call->tq);
4877 #endif /* RX_ENABLE_LOCKS */
4879 rxi_CallError(call, call->error);
4882 #ifdef RX_ENABLE_LOCKS
4883 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4884 register int missing;
4885 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4886 /* Some packets have received acks. If they all have, we can clear
4887 * the transmit queue.
4889 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4890 if (p->header.seq < call->tfirst && p->acked) {
4898 call->flags |= RX_CALL_TQ_CLEARME;
4900 #endif /* RX_ENABLE_LOCKS */
4901 /* Don't bother doing retransmits if the TQ is cleared. */
4902 if (call->flags & RX_CALL_TQ_CLEARME) {
4903 rxi_ClearTransmitQueue(call, 1);
4905 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4908 /* Always post a resend event, if there is anything in the
4909 * queue, and resend is possible. There should be at least
4910 * one unacknowledged packet in the queue ... otherwise none
4911 * of these packets should be on the queue in the first place.
4913 if (call->resendEvent) {
4914 /* Cancel the existing event and post a new one */
4915 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4918 /* The retry time is the retry time on the first unacknowledged
4919 * packet inside the current window */
4920 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4921 /* Don't set timers for packets outside the window */
4922 if (p->header.seq >= call->tfirst + call->twind) {
4926 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4928 retryTime = p->retryTime;
4933 /* Post a new event to re-run rxi_Start when retries may be needed */
4934 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4935 #ifdef RX_ENABLE_LOCKS
4936 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4937 call->resendEvent = rxevent_Post(&retryTime,
4939 (char *)call, istack);
4940 #else /* RX_ENABLE_LOCKS */
4941 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4942 (char *)call, (void*)istack);
4943 #endif /* RX_ENABLE_LOCKS */
4946 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4947 } while (call->flags & RX_CALL_NEED_START);
4949 * TQ references no longer protected by this flag; they must remain
4950 * protected by the global lock.
4952 call->flags &= ~RX_CALL_TQ_BUSY;
4953 if (call->flags & RX_CALL_TQ_WAIT) {
4954 call->flags &= ~RX_CALL_TQ_WAIT;
4955 #ifdef RX_ENABLE_LOCKS
4956 CV_BROADCAST(&call->cv_tq);
4957 #else /* RX_ENABLE_LOCKS */
4958 osi_rxWakeup(&call->tq);
4959 #endif /* RX_ENABLE_LOCKS */
4962 call->flags |= RX_CALL_NEED_START;
4964 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4966 if (call->resendEvent) {
4967 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4972 /* Also adjusts the keep alive parameters for the call, to reflect
4973 * that we have just sent a packet (so keep alives aren't sent
4975 void rxi_Send(call, p, istack)
4976 register struct rx_call *call;
4977 register struct rx_packet *p;
4980 register struct rx_connection *conn = call->conn;
4982 /* Stamp each packet with the user supplied status */
4983 p->header.userStatus = call->localStatus;
4985 /* Allow the security object controlling this call's security to
4986 * make any last-minute changes to the packet */
4987 RXS_SendPacket(conn->securityObject, call, p);
4989 /* Since we're about to send SOME sort of packet to the peer, it's
4990 * safe to nuke any scheduled end-of-packets ack */
4991 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4993 /* Actually send the packet, filling in more connection-specific fields */
4994 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4995 MUTEX_EXIT(&call->lock);
4996 rxi_SendPacket(conn, p, istack);
4997 MUTEX_ENTER(&call->lock);
4998 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5000 /* Update last send time for this call (for keep-alive
5001 * processing), and for the connection (so that we can discover
5002 * idle connections) */
5003 conn->lastSendTime = call->lastSendTime = clock_Sec();
5007 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5008 * that things are fine. Also called periodically to guarantee that nothing
5009 * falls through the cracks (e.g. (error + dally) connections have keepalive
5010 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5013 #ifdef RX_ENABLE_LOCKS
5014 int rxi_CheckCall(call, haveCTLock)
5015 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5016 #else /* RX_ENABLE_LOCKS */
5017 int rxi_CheckCall(call)
5018 #endif /* RX_ENABLE_LOCKS */
5019 register struct rx_call *call;
5021 register struct rx_connection *conn = call->conn;
5022 register struct rx_service *tservice;
5024 afs_uint32 deadTime;
5026 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5027 if (call->flags & RX_CALL_TQ_BUSY) {
5028 /* Call is active and will be reset by rxi_Start if it's
5029 * in an error state.
5034 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5035 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5036 ((afs_uint32)conn->peer->rtt >> 3) +
5037 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5039 /* These are computed to the second (+- 1 second). But that's
5040 * good enough for these values, which should be a significant
5041 * number of seconds. */
5042 if (now > (call->lastReceiveTime + deadTime)) {
5043 if (call->state == RX_STATE_ACTIVE) {
5044 rxi_CallError(call, RX_CALL_DEAD);
5048 #ifdef RX_ENABLE_LOCKS
5049 /* Cancel pending events */
5050 rxevent_Cancel(call->delayedAckEvent, call,
5051 RX_CALL_REFCOUNT_DELAY);
5052 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5053 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5054 if (call->refCount == 0) {
5055 rxi_FreeCall(call, haveCTLock);
5059 #else /* RX_ENABLE_LOCKS */
5062 #endif /* RX_ENABLE_LOCKS */
5064 /* Non-active calls are destroyed if they are not responding
5065 * to pings; active calls are simply flagged in error, so the
5066 * attached process can die reasonably gracefully. */
5068 /* see if we have a non-activity timeout */
5069 tservice = conn->service;
5070 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5071 && tservice->idleDeadTime
5072 && ((call->startWait + tservice->idleDeadTime) < now)) {
5073 if (call->state == RX_STATE_ACTIVE) {
5074 rxi_CallError(call, RX_CALL_TIMEOUT);
5078 /* see if we have a hard timeout */
5079 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5080 if (call->state == RX_STATE_ACTIVE)
5081 rxi_CallError(call, RX_CALL_TIMEOUT);
5088 /* When a call is in progress, this routine is called occasionally to
5089 * make sure that some traffic has arrived (or been sent to) the peer.
5090 * If nothing has arrived in a reasonable amount of time, the call is
5091 * declared dead; if nothing has been sent for a while, we send a
5092 * keep-alive packet (if we're actually trying to keep the call alive)
5094 void rxi_KeepAliveEvent(event, call, dummy)
5095 struct rxevent *event;
5096 register struct rx_call *call;
5098 struct rx_connection *conn;
5101 MUTEX_ENTER(&call->lock);
5102 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5103 if (event == call->keepAliveEvent)
5104 call->keepAliveEvent = (struct rxevent *) 0;
5107 #ifdef RX_ENABLE_LOCKS
5108 if(rxi_CheckCall(call, 0)) {
5109 MUTEX_EXIT(&call->lock);
5112 #else /* RX_ENABLE_LOCKS */
5113 if (rxi_CheckCall(call)) return;
5114 #endif /* RX_ENABLE_LOCKS */
5116 /* Don't try to keep alive dallying calls */
5117 if (call->state == RX_STATE_DALLY) {
5118 MUTEX_EXIT(&call->lock);
5123 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5124 /* Don't try to send keepalives if there is unacknowledged data */
5125 /* the rexmit code should be good enough, this little hack
5126 * doesn't quite work XXX */
5127 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5129 rxi_ScheduleKeepAliveEvent(call);
5130 MUTEX_EXIT(&call->lock);
5134 void rxi_ScheduleKeepAliveEvent(call)
5135 register struct rx_call *call;
5137 if (!call->keepAliveEvent) {
5139 clock_GetTime(&when);
5140 when.sec += call->conn->secondsUntilPing;
5141 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5142 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5146 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5147 void rxi_KeepAliveOn(call)
5148 register struct rx_call *call;
5150 /* Pretend last packet received was received now--i.e. if another
5151 * packet isn't received within the keep alive time, then the call
5152 * will die; Initialize last send time to the current time--even
5153 * if a packet hasn't been sent yet. This will guarantee that a
5154 * keep-alive is sent within the ping time */
5155 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5156 rxi_ScheduleKeepAliveEvent(call);
5159 /* This routine is called to send connection abort messages
5160 * that have been delayed to throttle looping clients. */
5161 void rxi_SendDelayedConnAbort(event, conn, dummy)
5162 struct rxevent *event;
5163 register struct rx_connection *conn;
5167 struct rx_packet *packet;
5169 MUTEX_ENTER(&conn->conn_data_lock);
5170 conn->delayedAbortEvent = (struct rxevent *) 0;
5171 error = htonl(conn->error);
5173 MUTEX_EXIT(&conn->conn_data_lock);
5174 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5176 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5177 RX_PACKET_TYPE_ABORT, (char *)&error,
5179 rxi_FreePacket(packet);
5183 /* This routine is called to send call abort messages
5184 * that have been delayed to throttle looping clients. */
5185 void rxi_SendDelayedCallAbort(event, call, dummy)
5186 struct rxevent *event;
5187 register struct rx_call *call;
5191 struct rx_packet *packet;
5193 MUTEX_ENTER(&call->lock);
5194 call->delayedAbortEvent = (struct rxevent *) 0;
5195 error = htonl(call->error);
5197 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5199 packet = rxi_SendSpecial(call, call->conn, packet,
5200 RX_PACKET_TYPE_ABORT, (char *)&error,
5202 rxi_FreePacket(packet);
5204 MUTEX_EXIT(&call->lock);
5207 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5208 * seconds) to ask the client to authenticate itself. The routine
5209 * issues a challenge to the client, which is obtained from the
5210 * security object associated with the connection */
5211 void rxi_ChallengeEvent(event, conn, dummy)
5212 struct rxevent *event;
5213 register struct rx_connection *conn;
5216 conn->challengeEvent = (struct rxevent *) 0;
5217 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5218 register struct rx_packet *packet;
5220 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5222 /* If there's no packet available, do this later. */
5223 RXS_GetChallenge(conn->securityObject, conn, packet);
5224 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5225 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5226 rxi_FreePacket(packet);
5228 clock_GetTime(&when);
5229 when.sec += RX_CHALLENGE_TIMEOUT;
5230 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5234 /* Call this routine to start requesting the client to authenticate
5235 * itself. This will continue until authentication is established,
5236 * the call times out, or an invalid response is returned. The
5237 * security object associated with the connection is asked to create
5238 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5239 * defined earlier. */
5240 void rxi_ChallengeOn(conn)
5241 register struct rx_connection *conn;
5243 if (!conn->challengeEvent) {
5244 RXS_CreateChallenge(conn->securityObject, conn);
5245 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5250 /* Compute round trip time of the packet provided, in *rttp.
5253 /* rxi_ComputeRoundTripTime is called with peer locked. */
5254 void rxi_ComputeRoundTripTime(p, sentp, peer)
5255 register struct clock *sentp; /* may be null */
5256 register struct rx_peer *peer; /* may be null */
5257 register struct rx_packet *p;
5259 struct clock thisRtt, *rttp = &thisRtt;
5261 register int rtt_timeout;
5262 static char id[]="@(#)adaptive RTO";
5264 clock_GetTime(rttp);
5265 if (clock_Lt(rttp, sentp)) {
5267 return; /* somebody set the clock back, don't count this time. */
5269 clock_Sub(rttp, sentp);
5270 MUTEX_ENTER(&rx_stats_mutex);
5271 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5272 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5273 if (rttp->sec > 60) {
5274 MUTEX_EXIT(&rx_stats_mutex);
5275 return; /* somebody set the clock ahead */
5277 rx_stats.maxRtt = *rttp;
5279 clock_Add(&rx_stats.totalRtt, rttp);
5280 rx_stats.nRttSamples++;
5281 MUTEX_EXIT(&rx_stats_mutex);
5283 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5285 /* Apply VanJacobson round-trip estimations */
5290 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5291 * srtt is stored as fixed point with 3 bits after the binary
5292 * point (i.e., scaled by 8). The following magic is
5293 * equivalent to the smoothing algorithm in rfc793 with an
5294 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5295 * srtt*8 = srtt*8 + rtt - srtt
5296 * srtt = srtt + rtt/8 - srtt/8
5299 delta = MSEC(rttp) - (peer->rtt >> 3);
5303 * We accumulate a smoothed rtt variance (actually, a smoothed
5304 * mean difference), then set the retransmit timer to smoothed
5305 * rtt + 4 times the smoothed variance (was 2x in van's original
5306 * paper, but 4x works better for me, and apparently for him as
5308 * rttvar is stored as
5309 * fixed point with 2 bits after the binary point (scaled by
5310 * 4). The following is equivalent to rfc793 smoothing with
5311 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5312 * replaces rfc793's wired-in beta.
5313 * dev*4 = dev*4 + (|actual - expected| - dev)
5319 delta -= (peer->rtt_dev >> 2);
5320 peer->rtt_dev += delta;
5323 /* I don't have a stored RTT so I start with this value. Since I'm
5324 * probably just starting a call, and will be pushing more data down
5325 * this, I expect congestion to increase rapidly. So I fudge a
5326 * little, and I set deviance to half the rtt. In practice,
5327 * deviance tends to approach something a little less than
5328 * half the smoothed rtt. */
5329 peer->rtt = (MSEC(rttp) << 3) + 8;
5330 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5332 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5333 * the other of these connections is usually in a user process, and can
5334 * be switched and/or swapped out. So on fast, reliable networks, the
5335 * timeout would otherwise be too short.
5337 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5338 clock_Zero(&(peer->timeout));
5339 clock_Addmsec(&(peer->timeout), rtt_timeout);
5341 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5342 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5343 (peer->timeout.sec),(peer->timeout.usec)) );
5347 /* Find all server connections that have not been active for a long time, and
5349 void rxi_ReapConnections()
5352 clock_GetTime(&now);
5354 /* Find server connection structures that haven't been used for
5355 * greater than rx_idleConnectionTime */
5356 { struct rx_connection **conn_ptr, **conn_end;
5357 int i, havecalls = 0;
5358 MUTEX_ENTER(&rx_connHashTable_lock);
5359 for (conn_ptr = &rx_connHashTable[0],
5360 conn_end = &rx_connHashTable[rx_hashTableSize];
5361 conn_ptr < conn_end; conn_ptr++) {
5362 struct rx_connection *conn, *next;
5363 struct rx_call *call;
5367 for (conn = *conn_ptr; conn; conn = next) {
5368 /* XXX -- Shouldn't the connection be locked? */
5371 for(i=0;i<RX_MAXCALLS;i++) {
5372 call = conn->call[i];
5375 MUTEX_ENTER(&call->lock);
5376 #ifdef RX_ENABLE_LOCKS
5377 result = rxi_CheckCall(call, 1);
5378 #else /* RX_ENABLE_LOCKS */
5379 result = rxi_CheckCall(call);
5380 #endif /* RX_ENABLE_LOCKS */
5381 MUTEX_EXIT(&call->lock);
5383 /* If CheckCall freed the call, it might
5384 * have destroyed the connection as well,
5385 * which screws up the linked lists.
5391 if (conn->type == RX_SERVER_CONNECTION) {
5392 /* This only actually destroys the connection if
5393 * there are no outstanding calls */
5394 MUTEX_ENTER(&conn->conn_data_lock);
5395 if (!havecalls && !conn->refCount &&
5396 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5397 conn->refCount++; /* it will be decr in rx_DestroyConn */
5398 MUTEX_EXIT(&conn->conn_data_lock);
5399 #ifdef RX_ENABLE_LOCKS
5400 rxi_DestroyConnectionNoLock(conn);
5401 #else /* RX_ENABLE_LOCKS */
5402 rxi_DestroyConnection(conn);
5403 #endif /* RX_ENABLE_LOCKS */
5405 #ifdef RX_ENABLE_LOCKS
5407 MUTEX_EXIT(&conn->conn_data_lock);
5409 #endif /* RX_ENABLE_LOCKS */
5413 #ifdef RX_ENABLE_LOCKS
5414 while (rx_connCleanup_list) {
5415 struct rx_connection *conn;
5416 conn = rx_connCleanup_list;
5417 rx_connCleanup_list = rx_connCleanup_list->next;
5418 MUTEX_EXIT(&rx_connHashTable_lock);
5419 rxi_CleanupConnection(conn);
5420 MUTEX_ENTER(&rx_connHashTable_lock);
5422 MUTEX_EXIT(&rx_connHashTable_lock);
5423 #endif /* RX_ENABLE_LOCKS */
5426 /* Find any peer structures that haven't been used (haven't had an
5427 * associated connection) for greater than rx_idlePeerTime */
5428 { struct rx_peer **peer_ptr, **peer_end;
5430 MUTEX_ENTER(&rx_rpc_stats);
5431 MUTEX_ENTER(&rx_peerHashTable_lock);
5432 for (peer_ptr = &rx_peerHashTable[0],
5433 peer_end = &rx_peerHashTable[rx_hashTableSize];
5434 peer_ptr < peer_end; peer_ptr++) {
5435 struct rx_peer *peer, *next, *prev;
5436 for (prev = peer = *peer_ptr; peer; peer = next) {
5438 code = MUTEX_TRYENTER(&peer->peer_lock);
5439 if ((code) && (peer->refCount == 0)
5440 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5441 rx_interface_stat_p rpc_stat, nrpc_stat;
5443 MUTEX_EXIT(&peer->peer_lock);
5444 MUTEX_DESTROY(&peer->peer_lock);
5445 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5446 rx_interface_stat)) {
5447 unsigned int num_funcs;
5448 if (!rpc_stat) break;
5449 queue_Remove(&rpc_stat->queue_header);
5450 queue_Remove(&rpc_stat->all_peers);
5451 num_funcs = rpc_stat->stats[0].func_total;
5452 space = sizeof(rx_interface_stat_t) +
5453 rpc_stat->stats[0].func_total *
5454 sizeof(rx_function_entry_v1_t);
5456 rxi_Free(rpc_stat, space);
5457 rxi_rpc_peer_stat_cnt -= num_funcs;
5460 MUTEX_ENTER(&rx_stats_mutex);
5461 rx_stats.nPeerStructs--;
5462 MUTEX_EXIT(&rx_stats_mutex);
5463 if (prev == *peer_ptr) {
5472 MUTEX_EXIT(&peer->peer_lock);
5478 MUTEX_EXIT(&rx_peerHashTable_lock);
5479 MUTEX_EXIT(&rx_rpc_stats);
5482 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5483 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5484 GC, just below. Really, we shouldn't have to keep moving packets from
5485 one place to another, but instead ought to always know if we can
5486 afford to hold onto a packet in its particular use. */
5487 MUTEX_ENTER(&rx_freePktQ_lock);
5488 if (rx_waitingForPackets) {
5489 rx_waitingForPackets = 0;
5490 #ifdef RX_ENABLE_LOCKS
5491 CV_BROADCAST(&rx_waitingForPackets_cv);
5493 osi_rxWakeup(&rx_waitingForPackets);
5496 MUTEX_EXIT(&rx_freePktQ_lock);
5498 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5499 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5503 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5504 * rx.h is sort of strange this is better. This is called with a security
5505 * object before it is discarded. Each connection using a security object has
5506 * its own refcount to the object so it won't actually be freed until the last
5507 * connection is destroyed.
5509 * This is the only rxs module call. A hold could also be written but no one
5512 int rxs_Release (aobj)
5513 struct rx_securityClass *aobj;
5515 return RXS_Close (aobj);
5519 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5520 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5521 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5522 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5524 /* Adjust our estimate of the transmission rate to this peer, given
5525 * that the packet p was just acked. We can adjust peer->timeout and
5526 * call->twind. Pragmatically, this is called
5527 * only with packets of maximal length.
5528 * Called with peer and call locked.
5531 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5532 register struct rx_peer *peer;
5533 register struct rx_call *call;
5534 struct rx_packet *p, *ackp;
5537 afs_int32 xferSize, xferMs;
5538 register afs_int32 minTime;
5541 /* Count down packets */
5542 if (peer->rateFlag > 0) peer->rateFlag--;
5543 /* Do nothing until we're enabled */
5544 if (peer->rateFlag != 0) return;
5545 if (!call->conn) return;
5547 /* Count only when the ack seems legitimate */
5548 switch (ackReason) {
5549 case RX_ACK_REQUESTED:
5550 xferSize = p->length + RX_HEADER_SIZE +
5551 call->conn->securityMaxTrailerSize;
5555 case RX_ACK_PING_RESPONSE:
5556 if (p) /* want the response to ping-request, not data send */
5558 clock_GetTime(&newTO);
5559 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5560 clock_Sub(&newTO, &call->pingRequestTime);
5561 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5565 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5572 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5573 ntohl(peer->host), ntohs(peer->port),
5574 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5575 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5578 /* Track only packets that are big enough. */
5579 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5583 /* absorb RTT data (in milliseconds) for these big packets */
5584 if (peer->smRtt == 0) {
5585 peer->smRtt = xferMs;
5587 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5588 if (!peer->smRtt) peer->smRtt = 1;
5591 if (peer->countDown) {
5595 peer->countDown = 10; /* recalculate only every so often */
5597 /* In practice, we can measure only the RTT for full packets,
5598 * because of the way Rx acks the data that it receives. (If it's
5599 * smaller than a full packet, it often gets implicitly acked
5600 * either by the call response (from a server) or by the next call
5601 * (from a client), and either case confuses transmission times
5602 * with processing times.) Therefore, replace the above
5603 * more-sophisticated processing with a simpler version, where the
5604 * smoothed RTT is kept for full-size packets, and the time to
5605 * transmit a windowful of full-size packets is simply RTT *
5606 * windowSize. Again, we take two steps:
5607 - ensure the timeout is large enough for a single packet's RTT;
5608 - ensure that the window is small enough to fit in the desired timeout.*/
5610 /* First, the timeout check. */
5611 minTime = peer->smRtt;
5612 /* Get a reasonable estimate for a timeout period */
5614 newTO.sec = minTime / 1000;
5615 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5617 /* Increase the timeout period so that we can always do at least
5618 * one packet exchange */
5619 if (clock_Gt(&newTO, &peer->timeout)) {
5621 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5622 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5623 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5626 peer->timeout = newTO;
5629 /* Now, get an estimate for the transmit window size. */
5630 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5631 /* Now, convert to the number of full packets that could fit in a
5632 * reasonable fraction of that interval */
5633 minTime /= (peer->smRtt << 1);
5634 xferSize = minTime; /* (make a copy) */
5636 /* Now clamp the size to reasonable bounds. */
5637 if (minTime <= 1) minTime = 1;
5638 else if (minTime > rx_Window) minTime = rx_Window;
5639 /* if (minTime != peer->maxWindow) {
5640 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5641 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5642 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5644 peer->maxWindow = minTime;
5645 elide... call->twind = minTime;
5649 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5650 * Discern this by calculating the timeout necessary for rx_Window
5652 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5653 /* calculate estimate for transmission interval in milliseconds */
5654 minTime = rx_Window * peer->smRtt;
5655 if (minTime < 1000) {
5656 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5657 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5658 peer->timeout.usec, peer->smRtt,
5661 newTO.sec = 0; /* cut back on timeout by half a second */
5662 newTO.usec = 500000;
5663 clock_Sub(&peer->timeout, &newTO);
5668 } /* end of rxi_ComputeRate */
5669 #endif /* ADAPT_WINDOW */
5677 /* Don't call this debugging routine directly; use dpf */
5679 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5680 a11, a12, a13, a14, a15)
5684 clock_GetTime(&now);
5685 fprintf(rx_Log, " %u.%.3u:", now.sec, now.usec/1000);
5686 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5693 * This function is used to process the rx_stats structure that is local
5694 * to a process as well as an rx_stats structure received from a remote
5695 * process (via rxdebug). Therefore, it needs to do minimal version
5698 void rx_PrintTheseStats (file, s, size, freePackets, version)
5701 int size; /* some idea of version control */
5702 afs_int32 freePackets;
5707 if (size != sizeof(struct rx_stats)) {
5709 "Unexpected size of stats structure: was %d, expected %d\n",
5710 size, sizeof(struct rx_stats));
5714 "rx stats: free packets %d, "
5719 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5721 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5722 s->receivePktAllocFailures,
5723 s->receiveCbufPktAllocFailures,
5724 s->sendPktAllocFailures,
5725 s->sendCbufPktAllocFailures,
5726 s->specialPktAllocFailures);
5729 "alloc-failures(rcv %d,send %d,ack %d)\n",
5730 s->receivePktAllocFailures,
5731 s->sendPktAllocFailures,
5732 s->specialPktAllocFailures);
5737 "bogusReads %d (last from host %x), "
5743 s->bogusPacketOnRead,
5746 s->noPacketBuffersOnRead,
5750 fprintf(file, " packets read: ");
5751 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5757 fprintf(file, "\n");
5760 " other read counters: data %d, "
5768 s->spuriousPacketsRead,
5769 s->ignorePacketDally);
5771 fprintf(file, " packets sent: ");
5772 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5778 fprintf(file, "\n");
5781 " other send counters: ack %d, "
5782 "data %d (not resends), "
5785 "acked&ignored %d\n",
5788 s->dataPacketsReSent,
5789 s->dataPacketsPushed,
5790 s->ignoreAckedPacket);
5793 " \t(these should be small) sendFailed %d, "
5798 if (s->nRttSamples) {
5800 " Average rtt is %0.3f, with %d samples\n",
5801 clock_Float(&s->totalRtt)/s->nRttSamples,
5805 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5806 clock_Float(&s->minRtt),
5807 clock_Float(&s->maxRtt));
5811 " %d server connections, "
5812 "%d client connections, "
5815 "%d free call structs\n",
5820 s->nFreeCallStructs);
5822 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5824 " %d clock updates\n",
5830 /* for backward compatibility */
5831 void rx_PrintStats(file)
5834 MUTEX_ENTER(&rx_stats_mutex);
5835 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5836 MUTEX_EXIT(&rx_stats_mutex);
5839 void rx_PrintPeerStats(file, peer)
5841 struct rx_peer *peer;
5846 "burst wait %u.%d.\n",
5850 peer->burstWait.sec,
5851 peer->burstWait.usec);
5855 "retry time %u.%06d, "
5866 "max in packet skew %d, "
5867 "max out packet skew %d\n",
5870 peer->outPacketSkew);
5873 #ifdef AFS_PTHREAD_ENV
5875 * This mutex protects the following static variables:
5879 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5880 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5882 #define LOCK_RX_DEBUG
5883 #define UNLOCK_RX_DEBUG
5884 #endif /* AFS_PTHREAD_ENV */
5886 static int MakeDebugCall(
5888 afs_uint32 remoteAddr,
5889 afs_uint16 remotePort,
5897 static afs_int32 counter = 100;
5899 struct rx_header theader;
5901 register afs_int32 code;
5903 struct sockaddr_in taddr, faddr;
5908 endTime = time(0) + 20; /* try for 20 seconds */
5912 tp = &tbuffer[sizeof(struct rx_header)];
5913 taddr.sin_family = AF_INET;
5914 taddr.sin_port = remotePort;
5915 taddr.sin_addr.s_addr = remoteAddr;
5917 memset(&theader, 0, sizeof(theader));
5918 theader.epoch = htonl(999);
5920 theader.callNumber = htonl(counter);
5923 theader.type = type;
5924 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5925 theader.serviceId = 0;
5927 bcopy(&theader, tbuffer, sizeof(theader));
5928 bcopy(inputData, tp, inputLength);
5929 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5930 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5932 /* see if there's a packet available */
5934 FD_SET(socket, &imask);
5937 code = select(socket+1, &imask, 0, 0, &tv);
5939 /* now receive a packet */
5940 faddrLen = sizeof(struct sockaddr_in);
5941 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
5942 (struct sockaddr *) &faddr, &faddrLen);
5944 bcopy(tbuffer, &theader, sizeof(struct rx_header));
5945 if (counter == ntohl(theader.callNumber)) break;
5948 /* see if we've timed out */
5949 if (endTime < time(0)) return -1;
5951 code -= sizeof(struct rx_header);
5952 if (code > outputLength) code = outputLength;
5953 bcopy(tp, outputData, code);
5957 afs_int32 rx_GetServerDebug(
5959 afs_uint32 remoteAddr,
5960 afs_uint16 remotePort,
5961 struct rx_debugStats *stat,
5962 afs_uint32 *supportedValues
5965 struct rx_debugIn in;
5968 *supportedValues = 0;
5969 in.type = htonl(RX_DEBUGI_GETSTATS);
5972 rc = MakeDebugCall(socket,
5975 RX_PACKET_TYPE_DEBUG,
5982 * If the call was successful, fixup the version and indicate
5983 * what contents of the stat structure are valid.
5984 * Also do net to host conversion of fields here.
5988 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
5989 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
5991 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
5992 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
5994 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
5995 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
5997 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
5998 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6000 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6001 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6003 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6004 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6006 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6007 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6010 stat->nFreePackets = ntohl(stat->nFreePackets);
6011 stat->packetReclaims = ntohl(stat->packetReclaims);
6012 stat->callsExecuted = ntohl(stat->callsExecuted);
6013 stat->nWaiting = ntohl(stat->nWaiting);
6014 stat->idleThreads = ntohl(stat->idleThreads);
6020 afs_int32 rx_GetServerStats(
6022 afs_uint32 remoteAddr,
6023 afs_uint16 remotePort,
6024 struct rx_stats *stat,
6025 afs_uint32 *supportedValues
6028 struct rx_debugIn in;
6029 afs_int32 *lp = (afs_int32 *) stat;
6034 * supportedValues is currently unused, but added to allow future
6035 * versioning of this function.
6038 *supportedValues = 0;
6039 in.type = htonl(RX_DEBUGI_RXSTATS);
6041 memset(stat, 0, sizeof(*stat));
6043 rc = MakeDebugCall(socket,
6046 RX_PACKET_TYPE_DEBUG,
6055 * Do net to host conversion here
6058 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6066 afs_int32 rx_GetServerVersion(
6068 afs_uint32 remoteAddr,
6069 afs_uint16 remotePort,
6070 size_t version_length,
6075 return MakeDebugCall(socket,
6078 RX_PACKET_TYPE_VERSION,
6085 afs_int32 rx_GetServerConnections(
6087 afs_uint32 remoteAddr,
6088 afs_uint16 remotePort,
6089 afs_int32 *nextConnection,
6091 afs_uint32 debugSupportedValues,
6092 struct rx_debugConn *conn,
6093 afs_uint32 *supportedValues
6096 struct rx_debugIn in;
6101 * supportedValues is currently unused, but added to allow future
6102 * versioning of this function.
6105 *supportedValues = 0;
6106 if (allConnections) {
6107 in.type = htonl(RX_DEBUGI_GETALLCONN);
6109 in.type = htonl(RX_DEBUGI_GETCONN);
6111 in.index = htonl(*nextConnection);
6112 memset(conn, 0, sizeof(*conn));
6114 rc = MakeDebugCall(socket,
6117 RX_PACKET_TYPE_DEBUG,
6124 *nextConnection += 1;
6127 * Convert old connection format to new structure.
6130 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6131 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6132 #define MOVEvL(a) (conn->a = vL->a)
6134 /* any old or unrecognized version... */
6135 for (i=0;i<RX_MAXCALLS;i++) {
6136 MOVEvL(callState[i]);
6137 MOVEvL(callMode[i]);
6138 MOVEvL(callFlags[i]);
6139 MOVEvL(callOther[i]);
6141 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6142 MOVEvL(secStats.type);
6143 MOVEvL(secStats.level);
6144 MOVEvL(secStats.flags);
6145 MOVEvL(secStats.expires);
6146 MOVEvL(secStats.packetsReceived);
6147 MOVEvL(secStats.packetsSent);
6148 MOVEvL(secStats.bytesReceived);
6149 MOVEvL(secStats.bytesSent);
6154 * Do net to host conversion here
6156 * I don't convert host or port since we are most likely
6157 * going to want these in NBO.
6159 conn->cid = ntohl(conn->cid);
6160 conn->serial = ntohl(conn->serial);
6161 for(i=0;i<RX_MAXCALLS;i++) {
6162 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6164 conn->error = ntohl(conn->error);
6165 conn->secStats.flags = ntohl(conn->secStats.flags);
6166 conn->secStats.expires = ntohl(conn->secStats.expires);
6167 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6168 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6169 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6170 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6171 conn->epoch = ntohl(conn->epoch);
6172 conn->natMTU = ntohl(conn->natMTU);
6178 afs_int32 rx_GetServerPeers(
6180 afs_uint32 remoteAddr,
6181 afs_uint16 remotePort,
6182 afs_int32 *nextPeer,
6183 afs_uint32 debugSupportedValues,
6184 struct rx_debugPeer *peer,
6185 afs_uint32 *supportedValues
6188 struct rx_debugIn in;
6193 * supportedValues is currently unused, but added to allow future
6194 * versioning of this function.
6197 *supportedValues = 0;
6198 in.type = htonl(RX_DEBUGI_GETPEER);
6199 in.index = htonl(*nextPeer);
6200 memset(peer, 0, sizeof(*peer));
6202 rc = MakeDebugCall(socket,
6205 RX_PACKET_TYPE_DEBUG,
6215 * Do net to host conversion here
6217 * I don't convert host or port since we are most likely
6218 * going to want these in NBO.
6220 peer->ifMTU = ntohs(peer->ifMTU);
6221 peer->idleWhen = ntohl(peer->idleWhen);
6222 peer->refCount = ntohs(peer->refCount);
6223 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6224 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6225 peer->rtt = ntohl(peer->rtt);
6226 peer->rtt_dev = ntohl(peer->rtt_dev);
6227 peer->timeout.sec = ntohl(peer->timeout.sec);
6228 peer->timeout.usec = ntohl(peer->timeout.usec);
6229 peer->nSent = ntohl(peer->nSent);
6230 peer->reSends = ntohl(peer->reSends);
6231 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6232 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6233 peer->rateFlag = ntohl(peer->rateFlag);
6234 peer->natMTU = ntohs(peer->natMTU);
6235 peer->maxMTU = ntohs(peer->maxMTU);
6236 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6237 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6238 peer->MTU = ntohs(peer->MTU);
6239 peer->cwind = ntohs(peer->cwind);
6240 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6241 peer->congestSeq = ntohs(peer->congestSeq);
6242 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6243 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6244 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6245 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6250 #endif /* RXDEBUG */
6252 void shutdown_rx(void)
6254 struct rx_serverQueueEntry *np;
6258 if (rxinit_status == 1) {
6260 return; /* Already shutdown. */
6264 struct rx_peer **peer_ptr, **peer_end;
6265 for (peer_ptr = &rx_peerHashTable[0],
6266 peer_end = &rx_peerHashTable[rx_hashTableSize];
6267 peer_ptr < peer_end; peer_ptr++) {
6268 struct rx_peer *peer, *next;
6269 for (peer = *peer_ptr; peer; peer = next) {
6270 rx_interface_stat_p rpc_stat, nrpc_stat;
6272 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6273 rx_interface_stat)) {
6274 unsigned int num_funcs;
6275 if (!rpc_stat) break;
6276 queue_Remove(&rpc_stat->queue_header);
6277 queue_Remove(&rpc_stat->all_peers);
6278 num_funcs = rpc_stat->stats[0].func_total;
6279 space = sizeof(rx_interface_stat_t) +
6280 rpc_stat->stats[0].func_total *
6281 sizeof(rx_function_entry_v1_t);
6283 rxi_Free(rpc_stat, space);
6284 MUTEX_ENTER(&rx_rpc_stats);
6285 rxi_rpc_peer_stat_cnt -= num_funcs;
6286 MUTEX_EXIT(&rx_rpc_stats);
6290 MUTEX_ENTER(&rx_stats_mutex);
6291 rx_stats.nPeerStructs--;
6292 MUTEX_EXIT(&rx_stats_mutex);
6296 for (i = 0; i<RX_MAX_SERVICES; i++) {
6298 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6300 for (i = 0; i < rx_hashTableSize; i++) {
6301 register struct rx_connection *tc, *ntc;
6302 MUTEX_ENTER(&rx_connHashTable_lock);
6303 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6305 for (j = 0; j < RX_MAXCALLS; j++) {
6307 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6310 rxi_Free(tc, sizeof(*tc));
6312 MUTEX_EXIT(&rx_connHashTable_lock);
6315 MUTEX_ENTER(&freeSQEList_lock);
6317 while (np = rx_FreeSQEList) {
6318 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6319 MUTEX_DESTROY(&np->lock);
6320 rxi_Free(np, sizeof(*np));
6323 MUTEX_EXIT(&freeSQEList_lock);
6324 MUTEX_DESTROY(&freeSQEList_lock);
6325 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6326 MUTEX_DESTROY(&rx_connHashTable_lock);
6327 MUTEX_DESTROY(&rx_peerHashTable_lock);
6328 MUTEX_DESTROY(&rx_serverPool_lock);
6330 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6331 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6333 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6334 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6336 rxi_FreeAllPackets();
6338 MUTEX_ENTER(&rx_stats_mutex);
6339 rxi_dataQuota = RX_MAX_QUOTA;
6340 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6341 MUTEX_EXIT(&rx_stats_mutex);
6347 #ifdef RX_ENABLE_LOCKS
6348 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6350 if (!MUTEX_ISMINE(lockaddr))
6351 osi_Panic("Lock not held: %s", msg);
6353 #endif /* RX_ENABLE_LOCKS */
6358 * Routines to implement connection specific data.
6361 int rx_KeyCreate(rx_destructor_t rtn)
6364 MUTEX_ENTER(&rxi_keyCreate_lock);
6365 key = rxi_keyCreate_counter++;
6366 rxi_keyCreate_destructor = (rx_destructor_t *)
6367 realloc((void *)rxi_keyCreate_destructor,
6368 (key+1) * sizeof(rx_destructor_t));
6369 rxi_keyCreate_destructor[key] = rtn;
6370 MUTEX_EXIT(&rxi_keyCreate_lock);
6374 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6377 MUTEX_ENTER(&conn->conn_data_lock);
6378 if (!conn->specific) {
6379 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6380 for (i = 0 ; i < key ; i++)
6381 conn->specific[i] = NULL;
6382 conn->nSpecific = key+1;
6383 conn->specific[key] = ptr;
6384 } else if (key >= conn->nSpecific) {
6385 conn->specific = (void **)
6386 realloc(conn->specific,(key+1)*sizeof(void *));
6387 for (i = conn->nSpecific ; i < key ; i++)
6388 conn->specific[i] = NULL;
6389 conn->nSpecific = key+1;
6390 conn->specific[key] = ptr;
6392 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6393 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6394 conn->specific[key] = ptr;
6396 MUTEX_EXIT(&conn->conn_data_lock);
6399 void *rx_GetSpecific(struct rx_connection *conn, int key)
6402 MUTEX_ENTER(&conn->conn_data_lock);
6403 if (key >= conn->nSpecific)
6406 ptr = conn->specific[key];
6407 MUTEX_EXIT(&conn->conn_data_lock);
6411 #endif /* !KERNEL */
6414 * processStats is a queue used to store the statistics for the local
6415 * process. Its contents are similar to the contents of the rpcStats
6416 * queue on a rx_peer structure, but the actual data stored within
6417 * this queue contains totals across the lifetime of the process (assuming
6418 * the stats have not been reset) - unlike the per peer structures
6419 * which can come and go based upon the peer lifetime.
6422 static struct rx_queue processStats = {&processStats,&processStats};
6425 * peerStats is a queue used to store the statistics for all peer structs.
6426 * Its contents are the union of all the peer rpcStats queues.
6429 static struct rx_queue peerStats = {&peerStats,&peerStats};
6432 * rxi_monitor_processStats is used to turn process wide stat collection
6436 static int rxi_monitor_processStats = 0;
6439 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6442 static int rxi_monitor_peerStats = 0;
6445 * rxi_AddRpcStat - given all of the information for a particular rpc
6446 * call, create (if needed) and update the stat totals for the rpc.
6450 * IN stats - the queue of stats that will be updated with the new value
6452 * IN rxInterface - a unique number that identifies the rpc interface
6454 * IN currentFunc - the index of the function being invoked
6456 * IN totalFunc - the total number of functions in this interface
6458 * IN queueTime - the amount of time this function waited for a thread
6460 * IN execTime - the amount of time this function invocation took to execute
6462 * IN bytesSent - the number bytes sent by this invocation
6464 * IN bytesRcvd - the number bytes received by this invocation
6466 * IN isServer - if true, this invocation was made to a server
6468 * IN remoteHost - the ip address of the remote host
6470 * IN remotePort - the port of the remote host
6472 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6474 * INOUT counter - if a new stats structure is allocated, the counter will
6475 * be updated with the new number of allocated stat structures
6482 static int rxi_AddRpcStat(
6483 struct rx_queue *stats,
6484 afs_uint32 rxInterface,
6485 afs_uint32 currentFunc,
6486 afs_uint32 totalFunc,
6487 struct clock *queueTime,
6488 struct clock *execTime,
6489 afs_hyper_t *bytesSent,
6490 afs_hyper_t *bytesRcvd,
6492 afs_uint32 remoteHost,
6493 afs_uint32 remotePort,
6495 unsigned int *counter)
6498 rx_interface_stat_p rpc_stat, nrpc_stat;
6501 * See if there's already a structure for this interface
6504 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6505 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6506 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6510 * Didn't find a match so allocate a new structure and add it to the
6514 if ((rpc_stat == NULL) ||
6515 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6516 (rpc_stat->stats[0].remote_is_server != isServer)) {
6520 space = sizeof(rx_interface_stat_t) + totalFunc *
6521 sizeof(rx_function_entry_v1_t);
6523 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6524 if (rpc_stat == NULL) {
6528 *counter += totalFunc;
6529 for(i=0;i<totalFunc;i++) {
6530 rpc_stat->stats[i].remote_peer = remoteHost;
6531 rpc_stat->stats[i].remote_port = remotePort;
6532 rpc_stat->stats[i].remote_is_server = isServer;
6533 rpc_stat->stats[i].interfaceId = rxInterface;
6534 rpc_stat->stats[i].func_total = totalFunc;
6535 rpc_stat->stats[i].func_index = i;
6536 hzero(rpc_stat->stats[i].invocations);
6537 hzero(rpc_stat->stats[i].bytes_sent);
6538 hzero(rpc_stat->stats[i].bytes_rcvd);
6539 rpc_stat->stats[i].queue_time_sum.sec = 0;
6540 rpc_stat->stats[i].queue_time_sum.usec = 0;
6541 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6542 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6543 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6544 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6545 rpc_stat->stats[i].queue_time_max.sec = 0;
6546 rpc_stat->stats[i].queue_time_max.usec = 0;
6547 rpc_stat->stats[i].execution_time_sum.sec = 0;
6548 rpc_stat->stats[i].execution_time_sum.usec = 0;
6549 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6550 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6551 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6552 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6553 rpc_stat->stats[i].execution_time_max.sec = 0;
6554 rpc_stat->stats[i].execution_time_max.usec = 0;
6556 queue_Prepend(stats, rpc_stat);
6557 if (addToPeerList) {
6558 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6563 * Increment the stats for this function
6566 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6567 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6568 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6569 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6570 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6571 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6572 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6574 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6575 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6577 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6578 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6579 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6580 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6582 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6583 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6591 * rx_IncrementTimeAndCount - increment the times and count for a particular
6596 * IN peer - the peer who invoked the rpc
6598 * IN rxInterface - a unique number that identifies the rpc interface
6600 * IN currentFunc - the index of the function being invoked
6602 * IN totalFunc - the total number of functions in this interface
6604 * IN queueTime - the amount of time this function waited for a thread
6606 * IN execTime - the amount of time this function invocation took to execute
6608 * IN bytesSent - the number bytes sent by this invocation
6610 * IN bytesRcvd - the number bytes received by this invocation
6612 * IN isServer - if true, this invocation was made to a server
6619 void rx_IncrementTimeAndCount(
6620 struct rx_peer *peer,
6621 afs_uint32 rxInterface,
6622 afs_uint32 currentFunc,
6623 afs_uint32 totalFunc,
6624 struct clock *queueTime,
6625 struct clock *execTime,
6626 afs_hyper_t *bytesSent,
6627 afs_hyper_t *bytesRcvd,
6631 MUTEX_ENTER(&rx_rpc_stats);
6632 MUTEX_ENTER(&peer->peer_lock);
6634 if (rxi_monitor_peerStats) {
6635 rxi_AddRpcStat(&peer->rpcStats,
6647 &rxi_rpc_peer_stat_cnt);
6650 if (rxi_monitor_processStats) {
6651 rxi_AddRpcStat(&processStats,
6663 &rxi_rpc_process_stat_cnt);
6666 MUTEX_EXIT(&peer->peer_lock);
6667 MUTEX_EXIT(&rx_rpc_stats);
6672 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6676 * IN callerVersion - the rpc stat version of the caller.
6678 * IN count - the number of entries to marshall.
6680 * IN stats - pointer to stats to be marshalled.
6682 * OUT ptr - Where to store the marshalled data.
6688 void rx_MarshallProcessRPCStats(
6689 afs_uint32 callerVersion,
6691 rx_function_entry_v1_t *stats,
6698 * We only support the first version
6700 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6701 *(ptr++) = stats->remote_peer;
6702 *(ptr++) = stats->remote_port;
6703 *(ptr++) = stats->remote_is_server;
6704 *(ptr++) = stats->interfaceId;
6705 *(ptr++) = stats->func_total;
6706 *(ptr++) = stats->func_index;
6707 *(ptr++) = hgethi(stats->invocations);
6708 *(ptr++) = hgetlo(stats->invocations);
6709 *(ptr++) = hgethi(stats->bytes_sent);
6710 *(ptr++) = hgetlo(stats->bytes_sent);
6711 *(ptr++) = hgethi(stats->bytes_rcvd);
6712 *(ptr++) = hgetlo(stats->bytes_rcvd);
6713 *(ptr++) = stats->queue_time_sum.sec;
6714 *(ptr++) = stats->queue_time_sum.usec;
6715 *(ptr++) = stats->queue_time_sum_sqr.sec;
6716 *(ptr++) = stats->queue_time_sum_sqr.usec;
6717 *(ptr++) = stats->queue_time_min.sec;
6718 *(ptr++) = stats->queue_time_min.usec;
6719 *(ptr++) = stats->queue_time_max.sec;
6720 *(ptr++) = stats->queue_time_max.usec;
6721 *(ptr++) = stats->execution_time_sum.sec;
6722 *(ptr++) = stats->execution_time_sum.usec;
6723 *(ptr++) = stats->execution_time_sum_sqr.sec;
6724 *(ptr++) = stats->execution_time_sum_sqr.usec;
6725 *(ptr++) = stats->execution_time_min.sec;
6726 *(ptr++) = stats->execution_time_min.usec;
6727 *(ptr++) = stats->execution_time_max.sec;
6728 *(ptr++) = stats->execution_time_max.usec;
6734 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6739 * IN callerVersion - the rpc stat version of the caller
6741 * OUT myVersion - the rpc stat version of this function
6743 * OUT clock_sec - local time seconds
6745 * OUT clock_usec - local time microseconds
6747 * OUT allocSize - the number of bytes allocated to contain stats
6749 * OUT statCount - the number stats retrieved from this process.
6751 * OUT stats - the actual stats retrieved from this process.
6755 * Returns void. If successful, stats will != NULL.
6758 int rx_RetrieveProcessRPCStats(
6759 afs_uint32 callerVersion,
6760 afs_uint32 *myVersion,
6761 afs_uint32 *clock_sec,
6762 afs_uint32 *clock_usec,
6764 afs_uint32 *statCount,
6775 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6778 * Check to see if stats are enabled
6781 MUTEX_ENTER(&rx_rpc_stats);
6782 if (!rxi_monitor_processStats) {
6783 MUTEX_EXIT(&rx_rpc_stats);
6787 clock_GetTime(&now);
6788 *clock_sec = now.sec;
6789 *clock_usec = now.usec;
6792 * Allocate the space based upon the caller version
6794 * If the client is at an older version than we are,
6795 * we return the statistic data in the older data format, but
6796 * we still return our version number so the client knows we
6797 * are maintaining more data than it can retrieve.
6800 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6801 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6802 *statCount = rxi_rpc_process_stat_cnt;
6805 * This can't happen yet, but in the future version changes
6806 * can be handled by adding additional code here
6810 if (space > (size_t) 0) {
6812 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6815 register struct rx_peer *pp;
6818 rx_interface_stat_p rpc_stat, nrpc_stat;
6821 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6822 rx_interface_stat)) {
6824 * Copy the data based upon the caller version
6826 rx_MarshallProcessRPCStats(callerVersion,
6827 rpc_stat->stats[0].func_total,
6828 rpc_stat->stats, &ptr);
6834 MUTEX_EXIT(&rx_rpc_stats);
6839 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6843 * IN callerVersion - the rpc stat version of the caller
6845 * OUT myVersion - the rpc stat version of this function
6847 * OUT clock_sec - local time seconds
6849 * OUT clock_usec - local time microseconds
6851 * OUT allocSize - the number of bytes allocated to contain stats
6853 * OUT statCount - the number of stats retrieved from the individual
6856 * OUT stats - the actual stats retrieved from the individual peer structures.
6860 * Returns void. If successful, stats will != NULL.
6863 int rx_RetrievePeerRPCStats(
6864 afs_uint32 callerVersion,
6865 afs_uint32 *myVersion,
6866 afs_uint32 *clock_sec,
6867 afs_uint32 *clock_usec,
6869 afs_uint32 *statCount,
6880 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6883 * Check to see if stats are enabled
6886 MUTEX_ENTER(&rx_rpc_stats);
6887 if (!rxi_monitor_peerStats) {
6888 MUTEX_EXIT(&rx_rpc_stats);
6892 clock_GetTime(&now);
6893 *clock_sec = now.sec;
6894 *clock_usec = now.usec;
6897 * Allocate the space based upon the caller version
6899 * If the client is at an older version than we are,
6900 * we return the statistic data in the older data format, but
6901 * we still return our version number so the client knows we
6902 * are maintaining more data than it can retrieve.
6905 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6906 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6907 *statCount = rxi_rpc_peer_stat_cnt;
6910 * This can't happen yet, but in the future version changes
6911 * can be handled by adding additional code here
6915 if (space > (size_t) 0) {
6917 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6922 rx_interface_stat_p rpc_stat, nrpc_stat;
6925 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
6926 rx_interface_stat)) {
6928 * We have to fix the offset of rpc_stat since we are
6929 * keeping this structure on two rx_queues. The rx_queue
6930 * package assumes that the rx_queue member is the first
6931 * member of the structure. That is, rx_queue assumes that
6932 * any one item is only on one queue at a time. We are
6933 * breaking that assumption and so we have to do a little
6934 * math to fix our pointers.
6937 fix_offset = (char *) rpc_stat;
6938 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
6939 rpc_stat = (rx_interface_stat_p) fix_offset;
6942 * Copy the data based upon the caller version
6944 rx_MarshallProcessRPCStats(callerVersion,
6945 rpc_stat->stats[0].func_total,
6946 rpc_stat->stats, &ptr);
6952 MUTEX_EXIT(&rx_rpc_stats);
6957 * rx_FreeRPCStats - free memory allocated by
6958 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
6962 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
6963 * rx_RetrievePeerRPCStats
6965 * IN allocSize - the number of bytes in stats.
6972 void rx_FreeRPCStats(
6976 rxi_Free(stats, allocSize);
6980 * rx_queryProcessRPCStats - see if process rpc stat collection is
6981 * currently enabled.
6987 * Returns 0 if stats are not enabled != 0 otherwise
6990 int rx_queryProcessRPCStats()
6993 MUTEX_ENTER(&rx_rpc_stats);
6994 rc = rxi_monitor_processStats;
6995 MUTEX_EXIT(&rx_rpc_stats);
7000 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7006 * Returns 0 if stats are not enabled != 0 otherwise
7009 int rx_queryPeerRPCStats()
7012 MUTEX_ENTER(&rx_rpc_stats);
7013 rc = rxi_monitor_peerStats;
7014 MUTEX_EXIT(&rx_rpc_stats);
7019 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7028 void rx_enableProcessRPCStats()
7030 MUTEX_ENTER(&rx_rpc_stats);
7031 rx_enable_stats = 1;
7032 rxi_monitor_processStats = 1;
7033 MUTEX_EXIT(&rx_rpc_stats);
7037 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7046 void rx_enablePeerRPCStats()
7048 MUTEX_ENTER(&rx_rpc_stats);
7049 rx_enable_stats = 1;
7050 rxi_monitor_peerStats = 1;
7051 MUTEX_EXIT(&rx_rpc_stats);
7055 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7064 void rx_disableProcessRPCStats()
7066 rx_interface_stat_p rpc_stat, nrpc_stat;
7069 MUTEX_ENTER(&rx_rpc_stats);
7072 * Turn off process statistics and if peer stats is also off, turn
7076 rxi_monitor_processStats = 0;
7077 if (rxi_monitor_peerStats == 0) {
7078 rx_enable_stats = 0;
7081 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7082 unsigned int num_funcs = 0;
7083 if (!rpc_stat) break;
7084 queue_Remove(rpc_stat);
7085 num_funcs = rpc_stat->stats[0].func_total;
7086 space = sizeof(rx_interface_stat_t) +
7087 rpc_stat->stats[0].func_total *
7088 sizeof(rx_function_entry_v1_t);
7090 rxi_Free(rpc_stat, space);
7091 rxi_rpc_process_stat_cnt -= num_funcs;
7093 MUTEX_EXIT(&rx_rpc_stats);
7097 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7106 void rx_disablePeerRPCStats()
7108 struct rx_peer **peer_ptr, **peer_end;
7111 MUTEX_ENTER(&rx_rpc_stats);
7114 * Turn off peer statistics and if process stats is also off, turn
7118 rxi_monitor_peerStats = 0;
7119 if (rxi_monitor_processStats == 0) {
7120 rx_enable_stats = 0;
7123 MUTEX_ENTER(&rx_peerHashTable_lock);
7124 for (peer_ptr = &rx_peerHashTable[0],
7125 peer_end = &rx_peerHashTable[rx_hashTableSize];
7126 peer_ptr < peer_end; peer_ptr++) {
7127 struct rx_peer *peer, *next, *prev;
7128 for (prev = peer = *peer_ptr; peer; peer = next) {
7130 code = MUTEX_TRYENTER(&peer->peer_lock);
7132 rx_interface_stat_p rpc_stat, nrpc_stat;
7134 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7135 rx_interface_stat)) {
7136 unsigned int num_funcs = 0;
7137 if (!rpc_stat) break;
7138 queue_Remove(&rpc_stat->queue_header);
7139 queue_Remove(&rpc_stat->all_peers);
7140 num_funcs = rpc_stat->stats[0].func_total;
7141 space = sizeof(rx_interface_stat_t) +
7142 rpc_stat->stats[0].func_total *
7143 sizeof(rx_function_entry_v1_t);
7145 rxi_Free(rpc_stat, space);
7146 rxi_rpc_peer_stat_cnt -= num_funcs;
7148 MUTEX_EXIT(&peer->peer_lock);
7149 if (prev == *peer_ptr) {
7161 MUTEX_EXIT(&rx_peerHashTable_lock);
7162 MUTEX_EXIT(&rx_rpc_stats);
7166 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7171 * IN clearFlag - flag indicating which stats to clear
7178 void rx_clearProcessRPCStats(
7179 afs_uint32 clearFlag)
7181 rx_interface_stat_p rpc_stat, nrpc_stat;
7183 MUTEX_ENTER(&rx_rpc_stats);
7185 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7186 unsigned int num_funcs = 0, i;
7187 num_funcs = rpc_stat->stats[0].func_total;
7188 for(i=0;i<num_funcs;i++) {
7189 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7190 hzero(rpc_stat->stats[i].invocations);
7192 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7193 hzero(rpc_stat->stats[i].bytes_sent);
7195 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7196 hzero(rpc_stat->stats[i].bytes_rcvd);
7198 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7199 rpc_stat->stats[i].queue_time_sum.sec = 0;
7200 rpc_stat->stats[i].queue_time_sum.usec = 0;
7202 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7203 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7204 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7206 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7207 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7208 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7210 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7211 rpc_stat->stats[i].queue_time_max.sec = 0;
7212 rpc_stat->stats[i].queue_time_max.usec = 0;
7214 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7215 rpc_stat->stats[i].execution_time_sum.sec = 0;
7216 rpc_stat->stats[i].execution_time_sum.usec = 0;
7218 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7219 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7220 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7222 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7223 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7224 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7226 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7227 rpc_stat->stats[i].execution_time_max.sec = 0;
7228 rpc_stat->stats[i].execution_time_max.usec = 0;
7233 MUTEX_EXIT(&rx_rpc_stats);
7237 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7242 * IN clearFlag - flag indicating which stats to clear
7249 void rx_clearPeerRPCStats(
7250 afs_uint32 clearFlag)
7252 rx_interface_stat_p rpc_stat, nrpc_stat;
7254 MUTEX_ENTER(&rx_rpc_stats);
7256 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7257 unsigned int num_funcs = 0, i;
7260 * We have to fix the offset of rpc_stat since we are
7261 * keeping this structure on two rx_queues. The rx_queue
7262 * package assumes that the rx_queue member is the first
7263 * member of the structure. That is, rx_queue assumes that
7264 * any one item is only on one queue at a time. We are
7265 * breaking that assumption and so we have to do a little
7266 * math to fix our pointers.
7269 fix_offset = (char *) rpc_stat;
7270 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7271 rpc_stat = (rx_interface_stat_p) fix_offset;
7273 num_funcs = rpc_stat->stats[0].func_total;
7274 for(i=0;i<num_funcs;i++) {
7275 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7276 hzero(rpc_stat->stats[i].invocations);
7278 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7279 hzero(rpc_stat->stats[i].bytes_sent);
7281 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7282 hzero(rpc_stat->stats[i].bytes_rcvd);
7284 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7285 rpc_stat->stats[i].queue_time_sum.sec = 0;
7286 rpc_stat->stats[i].queue_time_sum.usec = 0;
7288 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7289 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7290 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7292 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7293 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7294 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7296 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7297 rpc_stat->stats[i].queue_time_max.sec = 0;
7298 rpc_stat->stats[i].queue_time_max.usec = 0;
7300 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7301 rpc_stat->stats[i].execution_time_sum.sec = 0;
7302 rpc_stat->stats[i].execution_time_sum.usec = 0;
7304 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7305 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7306 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7308 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7309 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7310 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7312 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7313 rpc_stat->stats[i].execution_time_max.sec = 0;
7314 rpc_stat->stats[i].execution_time_max.usec = 0;
7319 MUTEX_EXIT(&rx_rpc_stats);
7323 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7324 * is authorized to enable/disable/clear RX statistics.
7326 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7328 void rx_SetRxStatUserOk(
7329 int (*proc)(struct rx_call *call))
7331 rxi_rxstat_userok = proc;
7334 int rx_RxStatUserOk(
7335 struct rx_call *call)
7337 if (!rxi_rxstat_userok)
7339 return rxi_rxstat_userok(call);