2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "../afs/param.h"
16 #include <afs/param.h>
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
46 #include "../afsint/afsint.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include "rx_internal.h"
105 # include <afs/rxgen_consts.h>
108 int (*registerProgram)() = 0;
109 int (*swapNameProgram)() = 0;
111 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
113 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
114 afs_int32 rxi_start_in_error;
116 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
119 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
120 * currently allocated within rx. This number is used to allocate the
121 * memory required to return the statistics when queried.
124 static unsigned int rxi_rpc_peer_stat_cnt;
127 * rxi_rpc_process_stat_cnt counts the total number of local process stat
128 * structures currently allocated within rx. The number is used to allocate
129 * the memory required to return the statistics when queried.
132 static unsigned int rxi_rpc_process_stat_cnt;
134 #if !defined(offsetof)
135 #include <stddef.h> /* for definition of offsetof() */
138 #ifdef AFS_PTHREAD_ENV
142 * Use procedural initialization of mutexes/condition variables
146 extern pthread_mutex_t rxkad_stats_mutex;
147 extern pthread_mutex_t des_init_mutex;
148 extern pthread_mutex_t des_random_mutex;
149 extern pthread_mutex_t rx_clock_mutex;
150 extern pthread_mutex_t rxi_connCacheMutex;
151 extern pthread_mutex_t rx_event_mutex;
152 extern pthread_mutex_t osi_malloc_mutex;
153 extern pthread_mutex_t event_handler_mutex;
154 extern pthread_mutex_t listener_mutex;
155 extern pthread_mutex_t rx_if_init_mutex;
156 extern pthread_mutex_t rx_if_mutex;
157 extern pthread_mutex_t rxkad_client_uid_mutex;
158 extern pthread_mutex_t rxkad_random_mutex;
160 extern pthread_cond_t rx_event_handler_cond;
161 extern pthread_cond_t rx_listener_cond;
163 static pthread_mutex_t epoch_mutex;
164 static pthread_mutex_t rx_init_mutex;
165 static pthread_mutex_t rx_debug_mutex;
167 static void rxi_InitPthread(void) {
168 assert(pthread_mutex_init(&rx_clock_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rxi_connCacheMutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&rx_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&epoch_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&rx_event_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&des_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&des_random_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&osi_malloc_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&event_handler_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&listener_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_if_init_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rx_if_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
193 (const pthread_mutexattr_t*)0)==0);
194 assert(pthread_mutex_init(&rxkad_random_mutex,
195 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_mutex_init(&rxkad_stats_mutex,
197 (const pthread_mutexattr_t*)0)==0);
198 assert(pthread_mutex_init(&rx_debug_mutex,
199 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_cond_init(&rx_event_handler_cond,
202 (const pthread_condattr_t*)0)==0);
203 assert(pthread_cond_init(&rx_listener_cond,
204 (const pthread_condattr_t*)0)==0);
205 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
208 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
209 #define INIT_PTHREAD_LOCKS \
210 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
212 * The rx_stats_mutex mutex protects the following global variables:
217 * rxi_lowConnRefCount
218 * rxi_lowPeerRefCount
227 #define INIT_PTHREAD_LOCKS
231 /* Variables for handling the minProcs implementation. availProcs gives the
232 * number of threads available in the pool at this moment (not counting dudes
233 * executing right now). totalMin gives the total number of procs required
234 * for handling all minProcs requests. minDeficit is a dynamic variable
235 * tracking the # of procs required to satisfy all of the remaining minProcs
237 * For fine grain locking to work, the quota check and the reservation of
238 * a server thread has to come while rxi_availProcs and rxi_minDeficit
239 * are locked. To this end, the code has been modified under #ifdef
240 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
241 * same time. A new function, ReturnToServerPool() returns the allocation.
243 * A call can be on several queue's (but only one at a time). When
244 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
245 * that no one else is touching the queue. To this end, we store the address
246 * of the queue lock in the call structure (under the call lock) when we
247 * put the call on a queue, and we clear the call_queue_lock when the
248 * call is removed from a queue (once the call lock has been obtained).
249 * This allows rxi_ResetCall to safely synchronize with others wishing
250 * to manipulate the queue.
253 #ifdef RX_ENABLE_LOCKS
254 static int rxi_ServerThreadSelectingCall;
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
260 ** pretty good that the next packet coming in is from the same connection
261 ** as the last packet, since we're send multiple packets in a transmit window.
263 struct rx_connection *rxLastConn = 0;
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
268 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269 * call->lock - locks call data fields.
270 * Most any other lock - these are all independent of each other.....
272 * rx_freeCallQueue_lock
274 * rx_connHashTable_lock
277 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
280 * peer_lock - locks peer data fields.
281 * conn_data_lock - that more than one thread is not updating a conn data
282 * field at the same time.
283 * Do we need a lock to protect the peer field in the conn structure?
284 * conn->peer was previously a constant for all intents and so has no
285 * lock protecting this field. The multihomed client delta introduced
286 * a RX code change : change the peer field in the connection structure
287 * to that remote inetrface from which the last packet for this
288 * connection was sent out. This may become an issue if further changes
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 struct rx_serverQueueEntry *rx_waitForPacket = 0;
306 /* ------------Exported Interfaces------------- */
308 /* This function allows rxkad to set the epoch to a suitably random number
309 * which rx_NewConnection will use in the future. The principle purpose is to
310 * get rxnull connections to use the same epoch as the rxkad connections do, at
311 * least once the first rxkad connection is established. This is important now
312 * that the host/port addresses aren't used in FindConnection: the uniqueness
313 * of epoch/cid matters and the start time won't do. */
315 #ifdef AFS_PTHREAD_ENV
317 * This mutex protects the following global variables:
321 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
322 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
326 #endif /* AFS_PTHREAD_ENV */
328 void rx_SetEpoch (epoch)
336 /* Initialize rx. A port number may be mentioned, in which case this
337 * becomes the default port number for any service installed later.
338 * If 0 is provided for the port number, a random port will be chosen
339 * by the kernel. Whether this will ever overlap anything in
340 * /etc/services is anybody's guess... Returns 0 on success, -1 on
342 static int rxinit_status = 1;
343 #ifdef AFS_PTHREAD_ENV
345 * This mutex protects the following global variables:
349 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
353 #define UNLOCK_RX_INIT
356 int rx_Init(u_int port)
363 char *htable, *ptable;
366 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
367 __djgpp_set_quiet_socket(1);
374 if (rxinit_status == 0) {
375 tmp_status = rxinit_status;
377 return tmp_status; /* Already started; return previous error code. */
381 if (afs_winsockInit()<0)
387 * Initialize anything necessary to provide a non-premptive threading
390 rxi_InitializeThreadSupport();
393 /* Allocate and initialize a socket for client and perhaps server
396 rx_socket = rxi_GetUDPSocket((u_short)port);
397 if (rx_socket == OSI_NULLSOCKET) {
403 #ifdef RX_ENABLE_LOCKS
406 #endif /* RX_LOCKS_DB */
407 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
413 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
414 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
415 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
418 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
420 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
421 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
423 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
424 #endif /* KERNEL && AFS_HPUX110_ENV */
425 #else /* RX_ENABLE_LOCKS */
426 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
427 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
428 #endif /* AFS_GLOBAL_SUNLOCK */
429 #endif /* RX_ENABLE_LOCKS */
432 rx_connDeadTime = 12;
433 rx_tranquil = 0; /* reset flag */
434 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
436 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
437 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
438 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
439 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
440 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
441 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
443 /* Malloc up a bunch of packets & buffers */
445 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
446 queue_Init(&rx_freePacketQueue);
447 rxi_NeedMorePackets = FALSE;
448 rxi_MorePackets(rx_nPackets);
456 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
457 tv.tv_sec = clock_now.sec;
458 tv.tv_usec = clock_now.usec;
459 srand((unsigned int) tv.tv_usec);
466 #if defined(KERNEL) && !defined(UKERNEL)
467 /* Really, this should never happen in a real kernel */
470 struct sockaddr_in addr;
471 int addrlen = sizeof(addr);
472 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
476 rx_port = addr.sin_port;
479 rx_stats.minRtt.sec = 9999999;
481 rx_SetEpoch (tv.tv_sec | 0x80000000);
483 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
484 * will provide a randomer value. */
486 MUTEX_ENTER(&rx_stats_mutex);
487 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
488 MUTEX_EXIT(&rx_stats_mutex);
489 /* *Slightly* random start time for the cid. This is just to help
490 * out with the hashing function at the peer */
491 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
492 rx_connHashTable = (struct rx_connection **) htable;
493 rx_peerHashTable = (struct rx_peer **) ptable;
495 rx_lastAckDelay.sec = 0;
496 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
497 rx_hardAckDelay.sec = 0;
498 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
499 rx_softAckDelay.sec = 0;
500 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
502 rxevent_Init(20, rxi_ReScheduleEvents);
504 /* Initialize various global queues */
505 queue_Init(&rx_idleServerQueue);
506 queue_Init(&rx_incomingCallQueue);
507 queue_Init(&rx_freeCallQueue);
509 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
510 /* Initialize our list of usable IP addresses. */
514 /* Start listener process (exact function is dependent on the
515 * implementation environment--kernel or user space) */
520 tmp_status = rxinit_status = 0;
525 /* called with unincremented nRequestsRunning to see if it is OK to start
526 * a new thread in this service. Could be "no" for two reasons: over the
527 * max quota, or would prevent others from reaching their min quota.
529 #ifdef RX_ENABLE_LOCKS
530 /* This verion of QuotaOK reserves quota if it's ok while the
531 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
533 static int QuotaOK(aservice)
534 register struct rx_service *aservice;
536 /* check if over max quota */
537 if (aservice->nRequestsRunning >= aservice->maxProcs) {
541 /* under min quota, we're OK */
542 /* otherwise, can use only if there are enough to allow everyone
543 * to go to their min quota after this guy starts.
545 MUTEX_ENTER(&rx_stats_mutex);
546 if ((aservice->nRequestsRunning < aservice->minProcs) ||
547 (rxi_availProcs > rxi_minDeficit)) {
548 aservice->nRequestsRunning++;
549 /* just started call in minProcs pool, need fewer to maintain
551 if (aservice->nRequestsRunning <= aservice->minProcs)
554 MUTEX_EXIT(&rx_stats_mutex);
557 MUTEX_EXIT(&rx_stats_mutex);
561 static void ReturnToServerPool(aservice)
562 register struct rx_service *aservice;
564 aservice->nRequestsRunning--;
565 MUTEX_ENTER(&rx_stats_mutex);
566 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
568 MUTEX_EXIT(&rx_stats_mutex);
571 #else /* RX_ENABLE_LOCKS */
572 static int QuotaOK(aservice)
573 register struct rx_service *aservice; {
575 /* under min quota, we're OK */
576 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
578 /* check if over max quota */
579 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
581 /* otherwise, can use only if there are enough to allow everyone
582 * to go to their min quota after this guy starts.
584 if (rxi_availProcs > rxi_minDeficit) rc = 1;
587 #endif /* RX_ENABLE_LOCKS */
590 /* Called by rx_StartServer to start up lwp's to service calls.
591 NExistingProcs gives the number of procs already existing, and which
592 therefore needn't be created. */
593 void rxi_StartServerProcs(nExistingProcs)
596 register struct rx_service *service;
601 /* For each service, reserve N processes, where N is the "minimum"
602 number of processes that MUST be able to execute a request in parallel,
603 at any time, for that process. Also compute the maximum difference
604 between any service's maximum number of processes that can run
605 (i.e. the maximum number that ever will be run, and a guarantee
606 that this number will run if other services aren't running), and its
607 minimum number. The result is the extra number of processes that
608 we need in order to provide the latter guarantee */
609 for (i=0; i<RX_MAX_SERVICES; i++) {
611 service = rx_services[i];
612 if (service == (struct rx_service *) 0) break;
613 nProcs += service->minProcs;
614 diff = service->maxProcs - service->minProcs;
615 if (diff > maxdiff) maxdiff = diff;
617 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
618 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
619 for (i = 0; i<nProcs; i++) {
620 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
625 /* This routine must be called if any services are exported. If the
626 * donateMe flag is set, the calling process is donated to the server
628 void rx_StartServer(donateMe)
630 register struct rx_service *service;
631 register int i, nProcs=0;
637 /* Start server processes, if necessary (exact function is dependent
638 * on the implementation environment--kernel or user space). DonateMe
639 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
640 * case, one less new proc will be created rx_StartServerProcs.
642 rxi_StartServerProcs(donateMe);
644 /* count up the # of threads in minProcs, and add set the min deficit to
645 * be that value, too.
647 for (i=0; i<RX_MAX_SERVICES; i++) {
648 service = rx_services[i];
649 if (service == (struct rx_service *) 0) break;
650 MUTEX_ENTER(&rx_stats_mutex);
651 rxi_totalMin += service->minProcs;
652 /* below works even if a thread is running, since minDeficit would
653 * still have been decremented and later re-incremented.
655 rxi_minDeficit += service->minProcs;
656 MUTEX_EXIT(&rx_stats_mutex);
659 /* Turn on reaping of idle server connections */
660 rxi_ReapConnections();
669 #ifdef AFS_PTHREAD_ENV
671 pid = (pid_t) pthread_self();
672 #else /* AFS_PTHREAD_ENV */
674 LWP_CurrentProcess(&pid);
675 #endif /* AFS_PTHREAD_ENV */
677 sprintf(name,"srv_%d", ++nProcs);
679 (*registerProgram)(pid, name);
681 #endif /* AFS_NT40_ENV */
682 rx_ServerProc(); /* Never returns */
687 /* Create a new client connection to the specified service, using the
688 * specified security object to implement the security model for this
690 struct rx_connection *
691 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
692 register afs_uint32 shost; /* Server host */
693 u_short sport; /* Server port */
694 u_short sservice; /* Server service id */
695 register struct rx_securityClass *securityObject;
696 int serviceSecurityIndex;
700 register struct rx_connection *conn;
705 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
706 shost, sport, sservice, securityObject, serviceSecurityIndex));
708 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
709 * the case of kmem_alloc? */
710 conn = rxi_AllocConnection();
711 #ifdef RX_ENABLE_LOCKS
712 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
713 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
714 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
718 MUTEX_ENTER(&rx_connHashTable_lock);
719 cid = (rx_nextCid += RX_MAXCALLS);
720 conn->type = RX_CLIENT_CONNECTION;
722 conn->epoch = rx_epoch;
723 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
724 conn->serviceId = sservice;
725 conn->securityObject = securityObject;
726 /* This doesn't work in all compilers with void (they're buggy), so fake it
728 conn->securityData = (VOID *) 0;
729 conn->securityIndex = serviceSecurityIndex;
730 rx_SetConnDeadTime(conn, rx_connDeadTime);
731 conn->ackRate = RX_FAST_ACK_RATE;
733 conn->specific = NULL;
734 conn->challengeEvent = (struct rxevent *)0;
735 conn->delayedAbortEvent = (struct rxevent *)0;
736 conn->abortCount = 0;
739 RXS_NewConnection(securityObject, conn);
740 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
742 conn->refCount++; /* no lock required since only this thread knows... */
743 conn->next = rx_connHashTable[hashindex];
744 rx_connHashTable[hashindex] = conn;
745 MUTEX_ENTER(&rx_stats_mutex);
746 rx_stats.nClientConns++;
747 MUTEX_EXIT(&rx_stats_mutex);
749 MUTEX_EXIT(&rx_connHashTable_lock);
755 void rx_SetConnDeadTime(conn, seconds)
756 register struct rx_connection *conn;
757 register int seconds;
759 /* The idea is to set the dead time to a value that allows several
760 * keepalives to be dropped without timing out the connection. */
761 conn->secondsUntilDead = MAX(seconds, 6);
762 conn->secondsUntilPing = conn->secondsUntilDead/6;
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
769 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770 * NOTE: must not be called with rx_connHashTable_lock held.
772 void rxi_CleanupConnection(conn)
773 struct rx_connection *conn;
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(conn)
830 register struct rx_connection *conn;
832 MUTEX_ENTER(&rx_connHashTable_lock);
833 rxi_DestroyConnectionNoLock(conn);
834 /* conn should be at the head of the cleanup list */
835 if (conn == rx_connCleanup_list) {
836 rx_connCleanup_list = rx_connCleanup_list->next;
837 MUTEX_EXIT(&rx_connHashTable_lock);
838 rxi_CleanupConnection(conn);
840 #ifdef RX_ENABLE_LOCKS
842 MUTEX_EXIT(&rx_connHashTable_lock);
844 #endif /* RX_ENABLE_LOCKS */
847 static void rxi_DestroyConnectionNoLock(conn)
848 register struct rx_connection *conn;
850 register struct rx_connection **conn_ptr;
851 register int havecalls = 0;
852 struct rx_packet *packet;
859 MUTEX_ENTER(&conn->conn_data_lock);
860 if (conn->refCount > 0)
863 MUTEX_ENTER(&rx_stats_mutex);
864 rxi_lowConnRefCount++;
865 MUTEX_EXIT(&rx_stats_mutex);
868 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
869 /* Busy; wait till the last guy before proceeding */
870 MUTEX_EXIT(&conn->conn_data_lock);
875 /* If the client previously called rx_NewCall, but it is still
876 * waiting, treat this as a running call, and wait to destroy the
877 * connection later when the call completes. */
878 if ((conn->type == RX_CLIENT_CONNECTION) &&
879 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
880 conn->flags |= RX_CONN_DESTROY_ME;
881 MUTEX_EXIT(&conn->conn_data_lock);
885 MUTEX_EXIT(&conn->conn_data_lock);
887 /* Check for extant references to this connection */
888 for (i = 0; i<RX_MAXCALLS; i++) {
889 register struct rx_call *call = conn->call[i];
892 if (conn->type == RX_CLIENT_CONNECTION) {
893 MUTEX_ENTER(&call->lock);
894 if (call->delayedAckEvent) {
895 /* Push the final acknowledgment out now--there
896 * won't be a subsequent call to acknowledge the
897 * last reply packets */
898 rxevent_Cancel(call->delayedAckEvent, call,
899 RX_CALL_REFCOUNT_DELAY);
900 if (call->state == RX_STATE_PRECALL ||
901 call->state == RX_STATE_ACTIVE) {
902 rxi_SendDelayedAck(call->delayedAckEvent, call, 0);
904 rxi_AckAll((struct rxevent *)0, call, 0);
907 MUTEX_EXIT(&call->lock);
911 #ifdef RX_ENABLE_LOCKS
913 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
914 MUTEX_EXIT(&conn->conn_data_lock);
917 /* Someone is accessing a packet right now. */
921 #endif /* RX_ENABLE_LOCKS */
924 /* Don't destroy the connection if there are any call
925 * structures still in use */
926 MUTEX_ENTER(&conn->conn_data_lock);
927 conn->flags |= RX_CONN_DESTROY_ME;
928 MUTEX_EXIT(&conn->conn_data_lock);
933 if (conn->delayedAbortEvent) {
934 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
935 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
937 MUTEX_ENTER(&conn->conn_data_lock);
938 rxi_SendConnectionAbort(conn, packet, 0, 1);
939 MUTEX_EXIT(&conn->conn_data_lock);
940 rxi_FreePacket(packet);
944 /* Remove from connection hash table before proceeding */
945 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
946 conn->epoch, conn->type) ];
947 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
948 if (*conn_ptr == conn) {
949 *conn_ptr = conn->next;
953 /* if the conn that we are destroying was the last connection, then we
954 * clear rxLastConn as well */
955 if ( rxLastConn == conn )
958 /* Make sure the connection is completely reset before deleting it. */
959 /* get rid of pending events that could zap us later */
960 if (conn->challengeEvent) {
961 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
964 /* Add the connection to the list of destroyed connections that
965 * need to be cleaned up. This is necessary to avoid deadlocks
966 * in the routines we call to inform others that this connection is
967 * being destroyed. */
968 conn->next = rx_connCleanup_list;
969 rx_connCleanup_list = conn;
972 /* Externally available version */
973 void rx_DestroyConnection(conn)
974 register struct rx_connection *conn;
980 rxi_DestroyConnection (conn);
985 /* Start a new rx remote procedure call, on the specified connection.
986 * If wait is set to 1, wait for a free call channel; otherwise return
987 * 0. Maxtime gives the maximum number of seconds this call may take,
988 * after rx_MakeCall returns. After this time interval, a call to any
989 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
990 * For fine grain locking, we hold the conn_call_lock in order to
991 * to ensure that we don't get signalle after we found a call in an active
992 * state and before we go to sleep.
994 struct rx_call *rx_NewCall(conn)
995 register struct rx_connection *conn;
998 register struct rx_call *call;
999 struct clock queueTime;
1003 dpf (("rx_MakeCall(conn %x)\n", conn));
1006 clock_GetTime(&queueTime);
1008 MUTEX_ENTER(&conn->conn_call_lock);
1010 for (i=0; i<RX_MAXCALLS; i++) {
1011 call = conn->call[i];
1013 MUTEX_ENTER(&call->lock);
1014 if (call->state == RX_STATE_DALLY) {
1015 rxi_ResetCall(call, 0);
1016 (*call->callNumber)++;
1019 MUTEX_EXIT(&call->lock);
1022 call = rxi_NewCall(conn, i);
1023 MUTEX_ENTER(&call->lock);
1027 if (i < RX_MAXCALLS) {
1030 MUTEX_ENTER(&conn->conn_data_lock);
1031 conn->flags |= RX_CONN_MAKECALL_WAITING;
1032 MUTEX_EXIT(&conn->conn_data_lock);
1033 #ifdef RX_ENABLE_LOCKS
1034 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1040 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1042 /* Client is initially in send mode */
1043 call->state = RX_STATE_ACTIVE;
1044 call->mode = RX_MODE_SENDING;
1046 /* remember start time for call in case we have hard dead time limit */
1047 call->queueTime = queueTime;
1048 clock_GetTime(&call->startTime);
1049 hzero(call->bytesSent);
1050 hzero(call->bytesRcvd);
1052 /* Turn on busy protocol. */
1053 rxi_KeepAliveOn(call);
1055 MUTEX_EXIT(&call->lock);
1056 MUTEX_EXIT(&conn->conn_call_lock);
1060 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1061 /* Now, if TQ wasn't cleared earlier, do it now. */
1063 MUTEX_ENTER(&call->lock);
1064 while (call->flags & RX_CALL_TQ_BUSY) {
1065 call->flags |= RX_CALL_TQ_WAIT;
1066 #ifdef RX_ENABLE_LOCKS
1067 CV_WAIT(&call->cv_tq, &call->lock);
1068 #else /* RX_ENABLE_LOCKS */
1069 osi_rxSleep(&call->tq);
1070 #endif /* RX_ENABLE_LOCKS */
1072 if (call->flags & RX_CALL_TQ_CLEARME) {
1073 rxi_ClearTransmitQueue(call, 0);
1074 queue_Init(&call->tq);
1076 MUTEX_EXIT(&call->lock);
1078 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1084 rxi_HasActiveCalls(aconn)
1085 register struct rx_connection *aconn; {
1087 register struct rx_call *tcall;
1091 for(i=0; i<RX_MAXCALLS; i++) {
1092 if ((tcall = aconn->call[i])) {
1093 if ((tcall->state == RX_STATE_ACTIVE)
1094 || (tcall->state == RX_STATE_PRECALL)) {
1105 rxi_GetCallNumberVector(aconn, aint32s)
1106 register struct rx_connection *aconn;
1107 register afs_int32 *aint32s; {
1109 register struct rx_call *tcall;
1113 for(i=0; i<RX_MAXCALLS; i++) {
1114 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1115 aint32s[i] = aconn->callNumber[i]+1;
1117 aint32s[i] = aconn->callNumber[i];
1124 rxi_SetCallNumberVector(aconn, aint32s)
1125 register struct rx_connection *aconn;
1126 register afs_int32 *aint32s; {
1128 register struct rx_call *tcall;
1132 for(i=0; i<RX_MAXCALLS; i++) {
1133 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1134 aconn->callNumber[i] = aint32s[i] - 1;
1136 aconn->callNumber[i] = aint32s[i];
1142 /* Advertise a new service. A service is named locally by a UDP port
1143 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1146 rx_NewService(port, serviceId, serviceName, securityObjects,
1147 nSecurityObjects, serviceProc)
1150 char *serviceName; /* Name for identification purposes (e.g. the
1151 * service name might be used for probing for
1153 struct rx_securityClass **securityObjects;
1154 int nSecurityObjects;
1155 afs_int32 (*serviceProc)();
1157 osi_socket socket = OSI_NULLSOCKET;
1158 register struct rx_service *tservice;
1164 if (serviceId == 0) {
1165 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1171 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1178 tservice = rxi_AllocService();
1181 for (i = 0; i<RX_MAX_SERVICES; i++) {
1182 register struct rx_service *service = rx_services[i];
1184 if (port == service->servicePort) {
1185 if (service->serviceId == serviceId) {
1186 /* The identical service has already been
1187 * installed; if the caller was intending to
1188 * change the security classes used by this
1189 * service, he/she loses. */
1190 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1193 rxi_FreeService(tservice);
1196 /* Different service, same port: re-use the socket
1197 * which is bound to the same port */
1198 socket = service->socket;
1201 if (socket == OSI_NULLSOCKET) {
1202 /* If we don't already have a socket (from another
1203 * service on same port) get a new one */
1204 socket = rxi_GetUDPSocket(port);
1205 if (socket == OSI_NULLSOCKET) {
1208 rxi_FreeService(tservice);
1213 service->socket = socket;
1214 service->servicePort = port;
1215 service->serviceId = serviceId;
1216 service->serviceName = serviceName;
1217 service->nSecurityObjects = nSecurityObjects;
1218 service->securityObjects = securityObjects;
1219 service->minProcs = 0;
1220 service->maxProcs = 1;
1221 service->idleDeadTime = 60;
1222 service->connDeadTime = rx_connDeadTime;
1223 service->executeRequestProc = serviceProc;
1224 rx_services[i] = service; /* not visible until now */
1232 rxi_FreeService(tservice);
1233 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1237 /* Generic request processing loop. This routine should be called
1238 * by the implementation dependent rx_ServerProc. If socketp is
1239 * non-null, it will be set to the file descriptor that this thread
1240 * is now listening on. If socketp is null, this routine will never
1242 void rxi_ServerProc(threadID, newcall, socketp)
1244 struct rx_call *newcall;
1245 osi_socket *socketp;
1247 register struct rx_call *call;
1248 register afs_int32 code;
1249 register struct rx_service *tservice = NULL;
1256 call = rx_GetCall(threadID, tservice, socketp);
1257 if (socketp && *socketp != OSI_NULLSOCKET) {
1258 /* We are now a listener thread */
1263 /* if server is restarting( typically smooth shutdown) then do not
1264 * allow any new calls.
1267 if ( rx_tranquil && (call != NULL) ) {
1272 MUTEX_ENTER(&call->lock);
1274 rxi_CallError(call, RX_RESTARTING);
1275 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1277 MUTEX_EXIT(&call->lock);
1283 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1284 #ifdef RX_ENABLE_LOCKS
1286 #endif /* RX_ENABLE_LOCKS */
1287 afs_termState = AFSOP_STOP_AFS;
1288 afs_osi_Wakeup(&afs_termState);
1289 #ifdef RX_ENABLE_LOCKS
1291 #endif /* RX_ENABLE_LOCKS */
1296 tservice = call->conn->service;
1298 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1300 code = call->conn->service->executeRequestProc(call);
1302 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1304 rx_EndCall(call, code);
1305 MUTEX_ENTER(&rx_stats_mutex);
1307 MUTEX_EXIT(&rx_stats_mutex);
1312 void rx_WakeupServerProcs()
1314 struct rx_serverQueueEntry *np, *tqp;
1319 MUTEX_ENTER(&rx_serverPool_lock);
1321 #ifdef RX_ENABLE_LOCKS
1322 if (rx_waitForPacket)
1323 CV_BROADCAST(&rx_waitForPacket->cv);
1324 #else /* RX_ENABLE_LOCKS */
1325 if (rx_waitForPacket)
1326 osi_rxWakeup(rx_waitForPacket);
1327 #endif /* RX_ENABLE_LOCKS */
1328 MUTEX_ENTER(&freeSQEList_lock);
1329 for (np = rx_FreeSQEList; np; np = tqp) {
1330 tqp = *(struct rx_serverQueueEntry **)np;
1331 #ifdef RX_ENABLE_LOCKS
1332 CV_BROADCAST(&np->cv);
1333 #else /* RX_ENABLE_LOCKS */
1335 #endif /* RX_ENABLE_LOCKS */
1337 MUTEX_EXIT(&freeSQEList_lock);
1338 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1339 #ifdef RX_ENABLE_LOCKS
1340 CV_BROADCAST(&np->cv);
1341 #else /* RX_ENABLE_LOCKS */
1343 #endif /* RX_ENABLE_LOCKS */
1345 MUTEX_EXIT(&rx_serverPool_lock);
1351 * One thing that seems to happen is that all the server threads get
1352 * tied up on some empty or slow call, and then a whole bunch of calls
1353 * arrive at once, using up the packet pool, so now there are more
1354 * empty calls. The most critical resources here are server threads
1355 * and the free packet pool. The "doreclaim" code seems to help in
1356 * general. I think that eventually we arrive in this state: there
1357 * are lots of pending calls which do have all their packets present,
1358 * so they won't be reclaimed, are multi-packet calls, so they won't
1359 * be scheduled until later, and thus are tying up most of the free
1360 * packet pool for a very long time.
1362 * 1. schedule multi-packet calls if all the packets are present.
1363 * Probably CPU-bound operation, useful to return packets to pool.
1364 * Do what if there is a full window, but the last packet isn't here?
1365 * 3. preserve one thread which *only* runs "best" calls, otherwise
1366 * it sleeps and waits for that type of call.
1367 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1368 * the current dataquota business is badly broken. The quota isn't adjusted
1369 * to reflect how many packets are presently queued for a running call.
1370 * So, when we schedule a queued call with a full window of packets queued
1371 * up for it, that *should* free up a window full of packets for other 2d-class
1372 * calls to be able to use from the packet pool. But it doesn't.
1374 * NB. Most of the time, this code doesn't run -- since idle server threads
1375 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1376 * as a new call arrives.
1378 /* Sleep until a call arrives. Returns a pointer to the call, ready
1379 * for an rx_Read. */
1380 #ifdef RX_ENABLE_LOCKS
1382 rx_GetCall(tno, cur_service, socketp)
1384 struct rx_service *cur_service;
1385 osi_socket *socketp;
1387 struct rx_serverQueueEntry *sq;
1388 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1389 struct rx_service *service = NULL;
1392 MUTEX_ENTER(&freeSQEList_lock);
1394 if ((sq = rx_FreeSQEList)) {
1395 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1396 MUTEX_EXIT(&freeSQEList_lock);
1397 } else { /* otherwise allocate a new one and return that */
1398 MUTEX_EXIT(&freeSQEList_lock);
1399 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1400 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1401 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1404 MUTEX_ENTER(&rx_serverPool_lock);
1405 if (cur_service != NULL) {
1406 ReturnToServerPool(cur_service);
1409 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1410 register struct rx_call *tcall, *ncall;
1411 choice2 = (struct rx_call *) 0;
1412 /* Scan for eligible incoming calls. A call is not eligible
1413 * if the maximum number of calls for its service type are
1414 * already executing */
1415 /* One thread will process calls FCFS (to prevent starvation),
1416 * while the other threads may run ahead looking for calls which
1417 * have all their input data available immediately. This helps
1418 * keep threads from blocking, waiting for data from the client. */
1419 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1420 service = tcall->conn->service;
1421 if (!QuotaOK(service)) {
1424 if (!tno || !tcall->queue_item_header.next ) {
1425 /* If we're thread 0, then we'll just use
1426 * this call. If we haven't been able to find an optimal
1427 * choice, and we're at the end of the list, then use a
1428 * 2d choice if one has been identified. Otherwise... */
1429 call = (choice2 ? choice2 : tcall);
1430 service = call->conn->service;
1431 } else if (!queue_IsEmpty(&tcall->rq)) {
1432 struct rx_packet *rp;
1433 rp = queue_First(&tcall->rq, rx_packet);
1434 if (rp->header.seq == 1) {
1435 if (!meltdown_1pkt ||
1436 (rp->header.flags & RX_LAST_PACKET)) {
1438 } else if (rxi_2dchoice && !choice2 &&
1439 !(tcall->flags & RX_CALL_CLEARED) &&
1440 (tcall->rprev > rxi_HardAckRate)) {
1442 } else rxi_md2cnt++;
1448 ReturnToServerPool(service);
1455 rxi_ServerThreadSelectingCall = 1;
1456 MUTEX_EXIT(&rx_serverPool_lock);
1457 MUTEX_ENTER(&call->lock);
1458 MUTEX_ENTER(&rx_serverPool_lock);
1460 if (queue_IsEmpty(&call->rq) ||
1461 queue_First(&call->rq, rx_packet)->header.seq != 1)
1462 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1464 CLEAR_CALL_QUEUE_LOCK(call);
1466 MUTEX_EXIT(&call->lock);
1467 ReturnToServerPool(service);
1468 rxi_ServerThreadSelectingCall = 0;
1469 CV_SIGNAL(&rx_serverPool_cv);
1470 call = (struct rx_call*)0;
1473 call->flags &= (~RX_CALL_WAIT_PROC);
1474 MUTEX_ENTER(&rx_stats_mutex);
1476 MUTEX_EXIT(&rx_stats_mutex);
1477 rxi_ServerThreadSelectingCall = 0;
1478 CV_SIGNAL(&rx_serverPool_cv);
1479 MUTEX_EXIT(&rx_serverPool_lock);
1483 /* If there are no eligible incoming calls, add this process
1484 * to the idle server queue, to wait for one */
1488 *socketp = OSI_NULLSOCKET;
1490 sq->socketp = socketp;
1491 queue_Append(&rx_idleServerQueue, sq);
1492 #ifndef AFS_AIX41_ENV
1493 rx_waitForPacket = sq;
1494 #endif /* AFS_AIX41_ENV */
1496 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1498 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1499 MUTEX_EXIT(&rx_serverPool_lock);
1500 return (struct rx_call *)0;
1503 } while (!(call = sq->newcall) &&
1504 !(socketp && *socketp != OSI_NULLSOCKET));
1505 MUTEX_EXIT(&rx_serverPool_lock);
1507 MUTEX_ENTER(&call->lock);
1513 MUTEX_ENTER(&freeSQEList_lock);
1514 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1515 rx_FreeSQEList = sq;
1516 MUTEX_EXIT(&freeSQEList_lock);
1519 clock_GetTime(&call->startTime);
1520 call->state = RX_STATE_ACTIVE;
1521 call->mode = RX_MODE_RECEIVING;
1523 rxi_calltrace(RX_CALL_START, call);
1524 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1525 call->conn->service->servicePort,
1526 call->conn->service->serviceId, call));
1528 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1529 MUTEX_EXIT(&call->lock);
1531 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1536 #else /* RX_ENABLE_LOCKS */
1538 rx_GetCall(tno, cur_service, socketp)
1540 struct rx_service *cur_service;
1541 osi_socket *socketp;
1543 struct rx_serverQueueEntry *sq;
1544 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1545 struct rx_service *service = NULL;
1550 MUTEX_ENTER(&freeSQEList_lock);
1552 if ((sq = rx_FreeSQEList)) {
1553 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1554 MUTEX_EXIT(&freeSQEList_lock);
1555 } else { /* otherwise allocate a new one and return that */
1556 MUTEX_EXIT(&freeSQEList_lock);
1557 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1558 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1559 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1561 MUTEX_ENTER(&sq->lock);
1563 if (cur_service != NULL) {
1564 cur_service->nRequestsRunning--;
1565 if (cur_service->nRequestsRunning < cur_service->minProcs)
1569 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1570 register struct rx_call *tcall, *ncall;
1571 /* Scan for eligible incoming calls. A call is not eligible
1572 * if the maximum number of calls for its service type are
1573 * already executing */
1574 /* One thread will process calls FCFS (to prevent starvation),
1575 * while the other threads may run ahead looking for calls which
1576 * have all their input data available immediately. This helps
1577 * keep threads from blocking, waiting for data from the client. */
1578 choice2 = (struct rx_call *) 0;
1579 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1580 service = tcall->conn->service;
1581 if (QuotaOK(service)) {
1582 if (!tno || !tcall->queue_item_header.next ) {
1583 /* If we're thread 0, then we'll just use
1584 * this call. If we haven't been able to find an optimal
1585 * choice, and we're at the end of the list, then use a
1586 * 2d choice if one has been identified. Otherwise... */
1587 call = (choice2 ? choice2 : tcall);
1588 service = call->conn->service;
1589 } else if (!queue_IsEmpty(&tcall->rq)) {
1590 struct rx_packet *rp;
1591 rp = queue_First(&tcall->rq, rx_packet);
1592 if (rp->header.seq == 1
1593 && (!meltdown_1pkt ||
1594 (rp->header.flags & RX_LAST_PACKET))) {
1596 } else if (rxi_2dchoice && !choice2 &&
1597 !(tcall->flags & RX_CALL_CLEARED) &&
1598 (tcall->rprev > rxi_HardAckRate)) {
1600 } else rxi_md2cnt++;
1610 /* we can't schedule a call if there's no data!!! */
1611 /* send an ack if there's no data, if we're missing the
1612 * first packet, or we're missing something between first
1613 * and last -- there's a "hole" in the incoming data. */
1614 if (queue_IsEmpty(&call->rq) ||
1615 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1616 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1617 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1619 call->flags &= (~RX_CALL_WAIT_PROC);
1620 service->nRequestsRunning++;
1621 /* just started call in minProcs pool, need fewer to maintain
1623 if (service->nRequestsRunning <= service->minProcs)
1627 /* MUTEX_EXIT(&call->lock); */
1630 /* If there are no eligible incoming calls, add this process
1631 * to the idle server queue, to wait for one */
1634 *socketp = OSI_NULLSOCKET;
1636 sq->socketp = socketp;
1637 queue_Append(&rx_idleServerQueue, sq);
1641 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1644 return (struct rx_call *)0;
1647 } while (!(call = sq->newcall) &&
1648 !(socketp && *socketp != OSI_NULLSOCKET));
1650 MUTEX_EXIT(&sq->lock);
1652 MUTEX_ENTER(&freeSQEList_lock);
1653 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1654 rx_FreeSQEList = sq;
1655 MUTEX_EXIT(&freeSQEList_lock);
1658 clock_GetTime(&call->startTime);
1659 call->state = RX_STATE_ACTIVE;
1660 call->mode = RX_MODE_RECEIVING;
1662 rxi_calltrace(RX_CALL_START, call);
1663 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1664 call->conn->service->servicePort,
1665 call->conn->service->serviceId, call));
1667 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1675 #endif /* RX_ENABLE_LOCKS */
1679 /* Establish a procedure to be called when a packet arrives for a
1680 * call. This routine will be called at most once after each call,
1681 * and will also be called if there is an error condition on the or
1682 * the call is complete. Used by multi rx to build a selection
1683 * function which determines which of several calls is likely to be a
1684 * good one to read from.
1685 * NOTE: the way this is currently implemented it is probably only a
1686 * good idea to (1) use it immediately after a newcall (clients only)
1687 * and (2) only use it once. Other uses currently void your warranty
1689 void rx_SetArrivalProc(call, proc, handle, arg)
1690 register struct rx_call *call;
1691 register VOID (*proc)();
1692 register VOID *handle;
1695 call->arrivalProc = proc;
1696 call->arrivalProcHandle = handle;
1697 call->arrivalProcArg = arg;
1700 /* Call is finished (possibly prematurely). Return rc to the peer, if
1701 * appropriate, and return the final error code from the conversation
1704 afs_int32 rx_EndCall(call, rc)
1705 register struct rx_call *call;
1708 register struct rx_connection *conn = call->conn;
1709 register struct rx_service *service;
1710 register struct rx_packet *tp; /* Temporary packet pointer */
1711 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1715 dpf(("rx_EndCall(call %x)\n", call));
1719 MUTEX_ENTER(&call->lock);
1721 if (rc == 0 && call->error == 0) {
1722 call->abortCode = 0;
1723 call->abortCount = 0;
1726 call->arrivalProc = (VOID (*)()) 0;
1727 if (rc && call->error == 0) {
1728 rxi_CallError(call, rc);
1729 /* Send an abort message to the peer if this error code has
1730 * only just been set. If it was set previously, assume the
1731 * peer has already been sent the error code or will request it
1733 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1735 if (conn->type == RX_SERVER_CONNECTION) {
1736 /* Make sure reply or at least dummy reply is sent */
1737 if (call->mode == RX_MODE_RECEIVING) {
1738 rxi_WriteProc(call, 0, 0);
1740 if (call->mode == RX_MODE_SENDING) {
1741 rxi_FlushWrite(call);
1743 service = conn->service;
1744 rxi_calltrace(RX_CALL_END, call);
1745 /* Call goes to hold state until reply packets are acknowledged */
1746 if (call->tfirst + call->nSoftAcked < call->tnext) {
1747 call->state = RX_STATE_HOLD;
1749 call->state = RX_STATE_DALLY;
1750 rxi_ClearTransmitQueue(call, 0);
1751 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1752 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1755 else { /* Client connection */
1757 /* Make sure server receives input packets, in the case where
1758 * no reply arguments are expected */
1759 if ((call->mode == RX_MODE_SENDING)
1760 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1761 (void) rxi_ReadProc(call, &dummy, 1);
1763 /* We need to release the call lock since it's lower than the
1764 * conn_call_lock and we don't want to hold the conn_call_lock
1765 * over the rx_ReadProc call. The conn_call_lock needs to be held
1766 * here for the case where rx_NewCall is perusing the calls on
1767 * the connection structure. We don't want to signal until
1768 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1769 * have checked this call, found it active and by the time it
1770 * goes to sleep, will have missed the signal.
1772 MUTEX_EXIT(&call->lock);
1773 MUTEX_ENTER(&conn->conn_call_lock);
1774 MUTEX_ENTER(&call->lock);
1775 MUTEX_ENTER(&conn->conn_data_lock);
1776 conn->flags |= RX_CONN_BUSY;
1777 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1778 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1779 MUTEX_EXIT(&conn->conn_data_lock);
1780 #ifdef RX_ENABLE_LOCKS
1781 CV_BROADCAST(&conn->conn_call_cv);
1786 #ifdef RX_ENABLE_LOCKS
1788 MUTEX_EXIT(&conn->conn_data_lock);
1790 #endif /* RX_ENABLE_LOCKS */
1791 call->state = RX_STATE_DALLY;
1793 error = call->error;
1795 /* currentPacket, nLeft, and NFree must be zeroed here, because
1796 * ResetCall cannot: ResetCall may be called at splnet(), in the
1797 * kernel version, and may interrupt the macros rx_Read or
1798 * rx_Write, which run at normal priority for efficiency. */
1799 if (call->currentPacket) {
1800 rxi_FreePacket(call->currentPacket);
1801 call->currentPacket = (struct rx_packet *) 0;
1802 call->nLeft = call->nFree = call->curlen = 0;
1805 call->nLeft = call->nFree = call->curlen = 0;
1807 /* Free any packets from the last call to ReadvProc/WritevProc */
1808 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1813 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1814 MUTEX_EXIT(&call->lock);
1815 if (conn->type == RX_CLIENT_CONNECTION) {
1816 MUTEX_EXIT(&conn->conn_call_lock);
1817 conn->flags &= ~RX_CONN_BUSY;
1822 * Map errors to the local host's errno.h format.
1824 error = ntoh_syserr_conv(error);
1828 #if !defined(KERNEL)
1830 /* Call this routine when shutting down a server or client (especially
1831 * clients). This will allow Rx to gracefully garbage collect server
1832 * connections, and reduce the number of retries that a server might
1833 * make to a dead client.
1834 * This is not quite right, since some calls may still be ongoing and
1835 * we can't lock them to destroy them. */
1836 void rx_Finalize() {
1837 register struct rx_connection **conn_ptr, **conn_end;
1841 if (rxinit_status == 1) {
1843 return; /* Already shutdown. */
1845 rxi_DeleteCachedConnections();
1846 if (rx_connHashTable) {
1847 MUTEX_ENTER(&rx_connHashTable_lock);
1848 for (conn_ptr = &rx_connHashTable[0],
1849 conn_end = &rx_connHashTable[rx_hashTableSize];
1850 conn_ptr < conn_end; conn_ptr++) {
1851 struct rx_connection *conn, *next;
1852 for (conn = *conn_ptr; conn; conn = next) {
1854 if (conn->type == RX_CLIENT_CONNECTION) {
1855 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1857 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1858 #ifdef RX_ENABLE_LOCKS
1859 rxi_DestroyConnectionNoLock(conn);
1860 #else /* RX_ENABLE_LOCKS */
1861 rxi_DestroyConnection(conn);
1862 #endif /* RX_ENABLE_LOCKS */
1866 #ifdef RX_ENABLE_LOCKS
1867 while (rx_connCleanup_list) {
1868 struct rx_connection *conn;
1869 conn = rx_connCleanup_list;
1870 rx_connCleanup_list = rx_connCleanup_list->next;
1871 MUTEX_EXIT(&rx_connHashTable_lock);
1872 rxi_CleanupConnection(conn);
1873 MUTEX_ENTER(&rx_connHashTable_lock);
1875 MUTEX_EXIT(&rx_connHashTable_lock);
1876 #endif /* RX_ENABLE_LOCKS */
1885 /* if we wakeup packet waiter too often, can get in loop with two
1886 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1888 rxi_PacketsUnWait() {
1890 if (!rx_waitingForPackets) {
1894 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1895 return; /* still over quota */
1898 rx_waitingForPackets = 0;
1899 #ifdef RX_ENABLE_LOCKS
1900 CV_BROADCAST(&rx_waitingForPackets_cv);
1902 osi_rxWakeup(&rx_waitingForPackets);
1908 /* ------------------Internal interfaces------------------------- */
1910 /* Return this process's service structure for the
1911 * specified socket and service */
1912 struct rx_service *rxi_FindService(socket, serviceId)
1913 register osi_socket socket;
1914 register u_short serviceId;
1916 register struct rx_service **sp;
1917 for (sp = &rx_services[0]; *sp; sp++) {
1918 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1924 /* Allocate a call structure, for the indicated channel of the
1925 * supplied connection. The mode and state of the call must be set by
1927 struct rx_call *rxi_NewCall(conn, channel)
1928 register struct rx_connection *conn;
1929 register int channel;
1931 register struct rx_call *call;
1932 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1933 register struct rx_call *cp; /* Call pointer temp */
1934 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1935 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1937 /* Grab an existing call structure, or allocate a new one.
1938 * Existing call structures are assumed to have been left reset by
1940 MUTEX_ENTER(&rx_freeCallQueue_lock);
1942 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1944 * EXCEPT that the TQ might not yet be cleared out.
1945 * Skip over those with in-use TQs.
1948 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1949 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1955 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1956 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1957 call = queue_First(&rx_freeCallQueue, rx_call);
1958 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1960 MUTEX_ENTER(&rx_stats_mutex);
1961 rx_stats.nFreeCallStructs--;
1962 MUTEX_EXIT(&rx_stats_mutex);
1963 MUTEX_EXIT(&rx_freeCallQueue_lock);
1964 MUTEX_ENTER(&call->lock);
1965 CLEAR_CALL_QUEUE_LOCK(call);
1966 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1967 /* Now, if TQ wasn't cleared earlier, do it now. */
1968 if (call->flags & RX_CALL_TQ_CLEARME) {
1969 rxi_ClearTransmitQueue(call, 0);
1970 queue_Init(&call->tq);
1972 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1973 /* Bind the call to its connection structure */
1975 rxi_ResetCall(call, 1);
1978 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1980 MUTEX_EXIT(&rx_freeCallQueue_lock);
1981 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1982 MUTEX_ENTER(&call->lock);
1983 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1984 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1985 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1987 MUTEX_ENTER(&rx_stats_mutex);
1988 rx_stats.nCallStructs++;
1989 MUTEX_EXIT(&rx_stats_mutex);
1990 /* Initialize once-only items */
1991 queue_Init(&call->tq);
1992 queue_Init(&call->rq);
1993 queue_Init(&call->iovq);
1994 /* Bind the call to its connection structure (prereq for reset) */
1996 rxi_ResetCall(call, 1);
1998 call->channel = channel;
1999 call->callNumber = &conn->callNumber[channel];
2000 /* Note that the next expected call number is retained (in
2001 * conn->callNumber[i]), even if we reallocate the call structure
2003 conn->call[channel] = call;
2004 /* if the channel's never been used (== 0), we should start at 1, otherwise
2005 the call number is valid from the last time this channel was used */
2006 if (*call->callNumber == 0) *call->callNumber = 1;
2008 MUTEX_EXIT(&call->lock);
2012 /* A call has been inactive long enough that so we can throw away
2013 * state, including the call structure, which is placed on the call
2015 * Call is locked upon entry.
2017 #ifdef RX_ENABLE_LOCKS
2018 void rxi_FreeCall(call, haveCTLock)
2019 int haveCTLock; /* Set if called from rxi_ReapConnections */
2020 #else /* RX_ENABLE_LOCKS */
2021 void rxi_FreeCall(call)
2022 #endif /* RX_ENABLE_LOCKS */
2023 register struct rx_call *call;
2025 register int channel = call->channel;
2026 register struct rx_connection *conn = call->conn;
2029 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2030 (*call->callNumber)++;
2031 rxi_ResetCall(call, 0);
2032 call->conn->call[channel] = (struct rx_call *) 0;
2034 MUTEX_ENTER(&rx_freeCallQueue_lock);
2035 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2036 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2037 /* A call may be free even though its transmit queue is still in use.
2038 * Since we search the call list from head to tail, put busy calls at
2039 * the head of the list, and idle calls at the tail.
2041 if (call->flags & RX_CALL_TQ_BUSY)
2042 queue_Prepend(&rx_freeCallQueue, call);
2044 queue_Append(&rx_freeCallQueue, call);
2045 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2046 queue_Append(&rx_freeCallQueue, call);
2047 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2048 MUTEX_ENTER(&rx_stats_mutex);
2049 rx_stats.nFreeCallStructs++;
2050 MUTEX_EXIT(&rx_stats_mutex);
2052 MUTEX_EXIT(&rx_freeCallQueue_lock);
2054 /* Destroy the connection if it was previously slated for
2055 * destruction, i.e. the Rx client code previously called
2056 * rx_DestroyConnection (client connections), or
2057 * rxi_ReapConnections called the same routine (server
2058 * connections). Only do this, however, if there are no
2059 * outstanding calls. Note that for fine grain locking, there appears
2060 * to be a deadlock in that rxi_FreeCall has a call locked and
2061 * DestroyConnectionNoLock locks each call in the conn. But note a
2062 * few lines up where we have removed this call from the conn.
2063 * If someone else destroys a connection, they either have no
2064 * call lock held or are going through this section of code.
2066 if (conn->flags & RX_CONN_DESTROY_ME) {
2067 MUTEX_ENTER(&conn->conn_data_lock);
2069 MUTEX_EXIT(&conn->conn_data_lock);
2070 #ifdef RX_ENABLE_LOCKS
2072 rxi_DestroyConnectionNoLock(conn);
2074 rxi_DestroyConnection(conn);
2075 #else /* RX_ENABLE_LOCKS */
2076 rxi_DestroyConnection(conn);
2077 #endif /* RX_ENABLE_LOCKS */
2081 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2082 char *rxi_Alloc(size)
2083 register size_t size;
2087 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2088 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2091 int glockOwner = ISAFS_GLOCK();
2095 MUTEX_ENTER(&rx_stats_mutex);
2096 rxi_Alloccnt++; rxi_Allocsize += size;
2097 MUTEX_EXIT(&rx_stats_mutex);
2098 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2099 if (size > AFS_SMALLOCSIZ) {
2100 p = (char *) osi_AllocMediumSpace(size);
2102 p = (char *) osi_AllocSmall(size, 1);
2103 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2108 p = (char *) osi_Alloc(size);
2110 if (!p) osi_Panic("rxi_Alloc error");
2115 void rxi_Free(addr, size)
2117 register size_t size;
2119 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2120 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2123 int glockOwner = ISAFS_GLOCK();
2127 MUTEX_ENTER(&rx_stats_mutex);
2128 rxi_Alloccnt--; rxi_Allocsize -= size;
2129 MUTEX_EXIT(&rx_stats_mutex);
2130 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2131 if (size > AFS_SMALLOCSIZ)
2132 osi_FreeMediumSpace(addr);
2134 osi_FreeSmall(addr);
2135 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2140 osi_Free(addr, size);
2144 /* Find the peer process represented by the supplied (host,port)
2145 * combination. If there is no appropriate active peer structure, a
2146 * new one will be allocated and initialized
2147 * The origPeer, if set, is a pointer to a peer structure on which the
2148 * refcount will be be decremented. This is used to replace the peer
2149 * structure hanging off a connection structure */
2150 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2151 register afs_uint32 host;
2152 register u_short port;
2153 struct rx_peer *origPeer;
2156 register struct rx_peer *pp;
2158 hashIndex = PEER_HASH(host, port);
2159 MUTEX_ENTER(&rx_peerHashTable_lock);
2160 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2161 if ((pp->host == host) && (pp->port == port)) break;
2165 pp = rxi_AllocPeer(); /* This bzero's *pp */
2166 pp->host = host; /* set here or in InitPeerParams is zero */
2168 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2169 queue_Init(&pp->congestionQueue);
2170 queue_Init(&pp->rpcStats);
2171 pp->next = rx_peerHashTable[hashIndex];
2172 rx_peerHashTable[hashIndex] = pp;
2173 rxi_InitPeerParams(pp);
2174 MUTEX_ENTER(&rx_stats_mutex);
2175 rx_stats.nPeerStructs++;
2176 MUTEX_EXIT(&rx_stats_mutex);
2183 origPeer->refCount--;
2184 MUTEX_EXIT(&rx_peerHashTable_lock);
2189 /* Find the connection at (host, port) started at epoch, and with the
2190 * given connection id. Creates the server connection if necessary.
2191 * The type specifies whether a client connection or a server
2192 * connection is desired. In both cases, (host, port) specify the
2193 * peer's (host, pair) pair. Client connections are not made
2194 * automatically by this routine. The parameter socket gives the
2195 * socket descriptor on which the packet was received. This is used,
2196 * in the case of server connections, to check that *new* connections
2197 * come via a valid (port, serviceId). Finally, the securityIndex
2198 * parameter must match the existing index for the connection. If a
2199 * server connection is created, it will be created using the supplied
2200 * index, if the index is valid for this service */
2201 struct rx_connection *
2202 rxi_FindConnection(socket, host, port, serviceId, cid,
2203 epoch, type, securityIndex)
2205 register afs_int32 host;
2206 register u_short port;
2211 u_int securityIndex;
2213 int hashindex, flag;
2214 register struct rx_connection *conn;
2215 struct rx_peer *peer;
2216 hashindex = CONN_HASH(host, port, cid, epoch, type);
2217 MUTEX_ENTER(&rx_connHashTable_lock);
2218 rxLastConn ? (conn = rxLastConn, flag = 0) :
2219 (conn = rx_connHashTable[hashindex], flag = 1);
2221 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2222 && (epoch == conn->epoch)) {
2223 register struct rx_peer *pp = conn->peer;
2224 if (securityIndex != conn->securityIndex) {
2225 /* this isn't supposed to happen, but someone could forge a packet
2226 like this, and there seems to be some CM bug that makes this
2227 happen from time to time -- in which case, the fileserver
2229 MUTEX_EXIT(&rx_connHashTable_lock);
2230 return (struct rx_connection *) 0;
2232 /* epoch's high order bits mean route for security reasons only on
2233 * the cid, not the host and port fields.
2235 if (conn->epoch & 0x80000000) break;
2236 if (((type == RX_CLIENT_CONNECTION)
2237 || (pp->host == host)) && (pp->port == port))
2242 /* the connection rxLastConn that was used the last time is not the
2243 ** one we are looking for now. Hence, start searching in the hash */
2245 conn = rx_connHashTable[hashindex];
2251 struct rx_service *service;
2252 if (type == RX_CLIENT_CONNECTION) {
2253 MUTEX_EXIT(&rx_connHashTable_lock);
2254 return (struct rx_connection *) 0;
2256 service = rxi_FindService(socket, serviceId);
2257 if (!service || (securityIndex >= service->nSecurityObjects)
2258 || (service->securityObjects[securityIndex] == 0)) {
2259 MUTEX_EXIT(&rx_connHashTable_lock);
2260 return (struct rx_connection *) 0;
2262 conn = rxi_AllocConnection(); /* This bzero's the connection */
2263 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2265 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2267 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2268 conn->next = rx_connHashTable[hashindex];
2269 rx_connHashTable[hashindex] = conn;
2270 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2271 conn->type = RX_SERVER_CONNECTION;
2272 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2273 conn->epoch = epoch;
2274 conn->cid = cid & RX_CIDMASK;
2275 /* conn->serial = conn->lastSerial = 0; */
2276 /* conn->timeout = 0; */
2277 conn->ackRate = RX_FAST_ACK_RATE;
2278 conn->service = service;
2279 conn->serviceId = serviceId;
2280 conn->securityIndex = securityIndex;
2281 conn->securityObject = service->securityObjects[securityIndex];
2282 conn->nSpecific = 0;
2283 conn->specific = NULL;
2284 rx_SetConnDeadTime(conn, service->connDeadTime);
2285 /* Notify security object of the new connection */
2286 RXS_NewConnection(conn->securityObject, conn);
2287 /* XXXX Connection timeout? */
2288 if (service->newConnProc) (*service->newConnProc)(conn);
2289 MUTEX_ENTER(&rx_stats_mutex);
2290 rx_stats.nServerConns++;
2291 MUTEX_EXIT(&rx_stats_mutex);
2295 /* Ensure that the peer structure is set up in such a way that
2296 ** replies in this connection go back to that remote interface
2297 ** from which the last packet was sent out. In case, this packet's
2298 ** source IP address does not match the peer struct for this conn,
2299 ** then drop the refCount on conn->peer and get a new peer structure.
2300 ** We can check the host,port field in the peer structure without the
2301 ** rx_peerHashTable_lock because the peer structure has its refCount
2302 ** incremented and the only time the host,port in the peer struct gets
2303 ** updated is when the peer structure is created.
2305 if (conn->peer->host == host )
2306 peer = conn->peer; /* no change to the peer structure */
2308 peer = rxi_FindPeer(host, port, conn->peer, 1);
2311 MUTEX_ENTER(&conn->conn_data_lock);
2314 MUTEX_EXIT(&conn->conn_data_lock);
2316 rxLastConn = conn; /* store this connection as the last conn used */
2317 MUTEX_EXIT(&rx_connHashTable_lock);
2321 /* There are two packet tracing routines available for testing and monitoring
2322 * Rx. One is called just after every packet is received and the other is
2323 * called just before every packet is sent. Received packets, have had their
2324 * headers decoded, and packets to be sent have not yet had their headers
2325 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2326 * containing the network address. Both can be modified. The return value, if
2327 * non-zero, indicates that the packet should be dropped. */
2329 int (*rx_justReceived)() = 0;
2330 int (*rx_almostSent)() = 0;
2332 /* A packet has been received off the interface. Np is the packet, socket is
2333 * the socket number it was received from (useful in determining which service
2334 * this packet corresponds to), and (host, port) reflect the host,port of the
2335 * sender. This call returns the packet to the caller if it is finished with
2336 * it, rather than de-allocating it, just as a small performance hack */
2338 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2339 register struct rx_packet *np;
2344 struct rx_call **newcallp;
2346 register struct rx_call *call;
2347 register struct rx_connection *conn;
2349 afs_uint32 currentCallNumber;
2355 struct rx_packet *tnp;
2358 /* We don't print out the packet until now because (1) the time may not be
2359 * accurate enough until now in the lwp implementation (rx_Listener only gets
2360 * the time after the packet is read) and (2) from a protocol point of view,
2361 * this is the first time the packet has been seen */
2362 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2363 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2364 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2365 np->header.serial, packetType, host, port, np->header.serviceId,
2366 np->header.epoch, np->header.cid, np->header.callNumber,
2367 np->header.seq, np->header.flags, np));
2370 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2371 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2374 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2375 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2378 /* If an input tracer function is defined, call it with the packet and
2379 * network address. Note this function may modify its arguments. */
2380 if (rx_justReceived) {
2381 struct sockaddr_in addr;
2383 addr.sin_family = AF_INET;
2384 addr.sin_port = port;
2385 addr.sin_addr.s_addr = host;
2386 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2387 addr.sin_len = sizeof(addr);
2388 #endif /* AFS_OSF_ENV */
2389 drop = (*rx_justReceived) (np, &addr);
2390 /* drop packet if return value is non-zero */
2391 if (drop) return np;
2392 port = addr.sin_port; /* in case fcn changed addr */
2393 host = addr.sin_addr.s_addr;
2397 /* If packet was not sent by the client, then *we* must be the client */
2398 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2399 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2401 /* Find the connection (or fabricate one, if we're the server & if
2402 * necessary) associated with this packet */
2403 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2404 np->header.cid, np->header.epoch, type,
2405 np->header.securityIndex);
2408 /* If no connection found or fabricated, just ignore the packet.
2409 * (An argument could be made for sending an abort packet for
2414 MUTEX_ENTER(&conn->conn_data_lock);
2415 if (conn->maxSerial < np->header.serial)
2416 conn->maxSerial = np->header.serial;
2417 MUTEX_EXIT(&conn->conn_data_lock);
2419 /* If the connection is in an error state, send an abort packet and ignore
2420 * the incoming packet */
2422 /* Don't respond to an abort packet--we don't want loops! */
2423 MUTEX_ENTER(&conn->conn_data_lock);
2424 if (np->header.type != RX_PACKET_TYPE_ABORT)
2425 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2427 MUTEX_EXIT(&conn->conn_data_lock);
2431 /* Check for connection-only requests (i.e. not call specific). */
2432 if (np->header.callNumber == 0) {
2433 switch (np->header.type) {
2434 case RX_PACKET_TYPE_ABORT:
2435 /* What if the supplied error is zero? */
2436 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2437 MUTEX_ENTER(&conn->conn_data_lock);
2439 MUTEX_EXIT(&conn->conn_data_lock);
2441 case RX_PACKET_TYPE_CHALLENGE:
2442 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2443 MUTEX_ENTER(&conn->conn_data_lock);
2445 MUTEX_EXIT(&conn->conn_data_lock);
2447 case RX_PACKET_TYPE_RESPONSE:
2448 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2449 MUTEX_ENTER(&conn->conn_data_lock);
2451 MUTEX_EXIT(&conn->conn_data_lock);
2453 case RX_PACKET_TYPE_PARAMS:
2454 case RX_PACKET_TYPE_PARAMS+1:
2455 case RX_PACKET_TYPE_PARAMS+2:
2456 /* ignore these packet types for now */
2457 MUTEX_ENTER(&conn->conn_data_lock);
2459 MUTEX_EXIT(&conn->conn_data_lock);
2464 /* Should not reach here, unless the peer is broken: send an
2466 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2467 MUTEX_ENTER(&conn->conn_data_lock);
2468 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2470 MUTEX_EXIT(&conn->conn_data_lock);
2475 channel = np->header.cid & RX_CHANNELMASK;
2476 call = conn->call[channel];
2477 #ifdef RX_ENABLE_LOCKS
2479 MUTEX_ENTER(&call->lock);
2480 /* Test to see if call struct is still attached to conn. */
2481 if (call != conn->call[channel]) {
2483 MUTEX_EXIT(&call->lock);
2484 if (type == RX_SERVER_CONNECTION) {
2485 call = conn->call[channel];
2486 /* If we started with no call attached and there is one now,
2487 * another thread is also running this routine and has gotten
2488 * the connection channel. We should drop this packet in the tests
2489 * below. If there was a call on this connection and it's now
2490 * gone, then we'll be making a new call below.
2491 * If there was previously a call and it's now different then
2492 * the old call was freed and another thread running this routine
2493 * has created a call on this channel. One of these two threads
2494 * has a packet for the old call and the code below handles those
2498 MUTEX_ENTER(&call->lock);
2501 /* This packet can't be for this call. If the new call address is
2502 * 0 then no call is running on this channel. If there is a call
2503 * then, since this is a client connection we're getting data for
2504 * it must be for the previous call.
2506 MUTEX_ENTER(&rx_stats_mutex);
2507 rx_stats.spuriousPacketsRead++;
2508 MUTEX_EXIT(&rx_stats_mutex);
2509 MUTEX_ENTER(&conn->conn_data_lock);
2511 MUTEX_EXIT(&conn->conn_data_lock);
2516 currentCallNumber = conn->callNumber[channel];
2518 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2519 if (np->header.callNumber < currentCallNumber) {
2520 MUTEX_ENTER(&rx_stats_mutex);
2521 rx_stats.spuriousPacketsRead++;
2522 MUTEX_EXIT(&rx_stats_mutex);
2523 #ifdef RX_ENABLE_LOCKS
2525 MUTEX_EXIT(&call->lock);
2527 MUTEX_ENTER(&conn->conn_data_lock);
2529 MUTEX_EXIT(&conn->conn_data_lock);
2533 call = rxi_NewCall(conn, channel);
2534 MUTEX_ENTER(&call->lock);
2535 *call->callNumber = np->header.callNumber;
2536 call->state = RX_STATE_PRECALL;
2537 clock_GetTime(&call->queueTime);
2538 hzero(call->bytesSent);
2539 hzero(call->bytesRcvd);
2540 rxi_KeepAliveOn(call);
2542 else if (np->header.callNumber != currentCallNumber) {
2543 /* Wait until the transmit queue is idle before deciding
2544 * whether to reset the current call. Chances are that the
2545 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2548 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2549 while ((call->state == RX_STATE_ACTIVE) &&
2550 (call->flags & RX_CALL_TQ_BUSY)) {
2551 call->flags |= RX_CALL_TQ_WAIT;
2552 #ifdef RX_ENABLE_LOCKS
2553 CV_WAIT(&call->cv_tq, &call->lock);
2554 #else /* RX_ENABLE_LOCKS */
2555 osi_rxSleep(&call->tq);
2556 #endif /* RX_ENABLE_LOCKS */
2558 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2559 /* If the new call cannot be taken right now send a busy and set
2560 * the error condition in this call, so that it terminates as
2561 * quickly as possible */
2562 if (call->state == RX_STATE_ACTIVE) {
2563 struct rx_packet *tp;
2565 rxi_CallError(call, RX_CALL_DEAD);
2566 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2567 MUTEX_EXIT(&call->lock);
2568 MUTEX_ENTER(&conn->conn_data_lock);
2570 MUTEX_EXIT(&conn->conn_data_lock);
2573 rxi_ResetCall(call, 0);
2574 *call->callNumber = np->header.callNumber;
2575 call->state = RX_STATE_PRECALL;
2576 clock_GetTime(&call->queueTime);
2577 hzero(call->bytesSent);
2578 hzero(call->bytesRcvd);
2580 * If the number of queued calls exceeds the overload
2581 * threshold then abort this call.
2583 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2584 struct rx_packet *tp;
2586 rxi_CallError(call, rx_BusyError);
2587 tp = rxi_SendCallAbort(call, np, 1, 0);
2588 MUTEX_EXIT(&call->lock);
2589 MUTEX_ENTER(&conn->conn_data_lock);
2591 MUTEX_EXIT(&conn->conn_data_lock);
2594 rxi_KeepAliveOn(call);
2597 /* Continuing call; do nothing here. */
2599 } else { /* we're the client */
2600 /* Ignore all incoming acknowledgements for calls in DALLY state */
2601 if ( call && (call->state == RX_STATE_DALLY)
2602 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2603 MUTEX_ENTER(&rx_stats_mutex);
2604 rx_stats.ignorePacketDally++;
2605 MUTEX_EXIT(&rx_stats_mutex);
2606 #ifdef RX_ENABLE_LOCKS
2608 MUTEX_EXIT(&call->lock);
2611 MUTEX_ENTER(&conn->conn_data_lock);
2613 MUTEX_EXIT(&conn->conn_data_lock);
2617 /* Ignore anything that's not relevant to the current call. If there
2618 * isn't a current call, then no packet is relevant. */
2619 if (!call || (np->header.callNumber != currentCallNumber)) {
2620 MUTEX_ENTER(&rx_stats_mutex);
2621 rx_stats.spuriousPacketsRead++;
2622 MUTEX_EXIT(&rx_stats_mutex);
2623 #ifdef RX_ENABLE_LOCKS
2625 MUTEX_EXIT(&call->lock);
2628 MUTEX_ENTER(&conn->conn_data_lock);
2630 MUTEX_EXIT(&conn->conn_data_lock);
2633 /* If the service security object index stamped in the packet does not
2634 * match the connection's security index, ignore the packet */
2635 if (np->header.securityIndex != conn->securityIndex) {
2636 #ifdef RX_ENABLE_LOCKS
2637 MUTEX_EXIT(&call->lock);
2639 MUTEX_ENTER(&conn->conn_data_lock);
2641 MUTEX_EXIT(&conn->conn_data_lock);
2645 /* If we're receiving the response, then all transmit packets are
2646 * implicitly acknowledged. Get rid of them. */
2647 if (np->header.type == RX_PACKET_TYPE_DATA) {
2648 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2649 /* XXX Hack. Because we must release the global rx lock when
2650 * sending packets (osi_NetSend) we drop all acks while we're
2651 * traversing the tq in rxi_Start sending packets out because
2652 * packets may move to the freePacketQueue as result of being here!
2653 * So we drop these packets until we're safely out of the
2654 * traversing. Really ugly!
2655 * For fine grain RX locking, we set the acked field in the
2656 * packets and let rxi_Start remove them from the transmit queue.
2658 if (call->flags & RX_CALL_TQ_BUSY) {
2659 #ifdef RX_ENABLE_LOCKS
2660 rxi_SetAcksInTransmitQueue(call);
2663 return np; /* xmitting; drop packet */
2667 rxi_ClearTransmitQueue(call, 0);
2669 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2670 rxi_ClearTransmitQueue(call, 0);
2671 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2673 if (np->header.type == RX_PACKET_TYPE_ACK) {
2674 /* now check to see if this is an ack packet acknowledging that the
2675 * server actually *lost* some hard-acked data. If this happens we
2676 * ignore this packet, as it may indicate that the server restarted in
2677 * the middle of a call. It is also possible that this is an old ack
2678 * packet. We don't abort the connection in this case, because this
2679 * *might* just be an old ack packet. The right way to detect a server
2680 * restart in the midst of a call is to notice that the server epoch
2682 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2683 * XXX unacknowledged. I think that this is off-by-one, but
2684 * XXX I don't dare change it just yet, since it will
2685 * XXX interact badly with the server-restart detection
2686 * XXX code in receiveackpacket. */
2687 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2688 MUTEX_ENTER(&rx_stats_mutex);
2689 rx_stats.spuriousPacketsRead++;
2690 MUTEX_EXIT(&rx_stats_mutex);
2691 MUTEX_EXIT(&call->lock);
2692 MUTEX_ENTER(&conn->conn_data_lock);
2694 MUTEX_EXIT(&conn->conn_data_lock);
2698 } /* else not a data packet */
2701 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2702 /* Set remote user defined status from packet */
2703 call->remoteStatus = np->header.userStatus;
2705 /* Note the gap between the expected next packet and the actual
2706 * packet that arrived, when the new packet has a smaller serial number
2707 * than expected. Rioses frequently reorder packets all by themselves,
2708 * so this will be quite important with very large window sizes.
2709 * Skew is checked against 0 here to avoid any dependence on the type of
2710 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2712 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2713 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2714 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2716 MUTEX_ENTER(&conn->conn_data_lock);
2717 skew = conn->lastSerial - np->header.serial;
2718 conn->lastSerial = np->header.serial;
2719 MUTEX_EXIT(&conn->conn_data_lock);
2721 register struct rx_peer *peer;
2723 if (skew > peer->inPacketSkew) {
2724 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2725 peer->inPacketSkew = skew;
2729 /* Now do packet type-specific processing */
2730 switch (np->header.type) {
2731 case RX_PACKET_TYPE_DATA:
2732 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2735 case RX_PACKET_TYPE_ACK:
2736 /* Respond immediately to ack packets requesting acknowledgement
2738 if (np->header.flags & RX_REQUEST_ACK) {
2739 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2740 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2742 np = rxi_ReceiveAckPacket(call, np, 1);
2744 case RX_PACKET_TYPE_ABORT:
2745 /* An abort packet: reset the connection, passing the error up to
2747 /* What if error is zero? */
2748 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2750 case RX_PACKET_TYPE_BUSY:
2753 case RX_PACKET_TYPE_ACKALL:
2754 /* All packets acknowledged, so we can drop all packets previously
2755 * readied for sending */
2756 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2757 /* XXX Hack. We because we can't release the global rx lock when
2758 * sending packets (osi_NetSend) we drop all ack pkts while we're
2759 * traversing the tq in rxi_Start sending packets out because
2760 * packets may move to the freePacketQueue as result of being
2761 * here! So we drop these packets until we're safely out of the
2762 * traversing. Really ugly!
2763 * For fine grain RX locking, we set the acked field in the packets
2764 * and let rxi_Start remove the packets from the transmit queue.
2766 if (call->flags & RX_CALL_TQ_BUSY) {
2767 #ifdef RX_ENABLE_LOCKS
2768 rxi_SetAcksInTransmitQueue(call);
2770 #else /* RX_ENABLE_LOCKS */
2772 return np; /* xmitting; drop packet */
2773 #endif /* RX_ENABLE_LOCKS */
2775 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2776 rxi_ClearTransmitQueue(call, 0);
2779 /* Should not reach here, unless the peer is broken: send an abort
2781 rxi_CallError(call, RX_PROTOCOL_ERROR);
2782 np = rxi_SendCallAbort(call, np, 1, 0);
2785 /* Note when this last legitimate packet was received, for keep-alive
2786 * processing. Note, we delay getting the time until now in the hope that
2787 * the packet will be delivered to the user before any get time is required
2788 * (if not, then the time won't actually be re-evaluated here). */
2789 call->lastReceiveTime = clock_Sec();
2790 MUTEX_EXIT(&call->lock);
2791 MUTEX_ENTER(&conn->conn_data_lock);
2793 MUTEX_EXIT(&conn->conn_data_lock);
2797 /* return true if this is an "interesting" connection from the point of view
2798 of someone trying to debug the system */
2799 int rxi_IsConnInteresting(struct rx_connection *aconn)
2802 register struct rx_call *tcall;
2804 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2806 for(i=0;i<RX_MAXCALLS;i++) {
2807 tcall = aconn->call[i];
2809 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2811 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2819 /* if this is one of the last few packets AND it wouldn't be used by the
2820 receiving call to immediately satisfy a read request, then drop it on
2821 the floor, since accepting it might prevent a lock-holding thread from
2822 making progress in its reading. If a call has been cleared while in
2823 the precall state then ignore all subsequent packets until the call
2824 is assigned to a thread. */
2826 static TooLow(ap, acall)
2827 struct rx_call *acall;
2828 struct rx_packet *ap; {
2830 MUTEX_ENTER(&rx_stats_mutex);
2831 if (((ap->header.seq != 1) &&
2832 (acall->flags & RX_CALL_CLEARED) &&
2833 (acall->state == RX_STATE_PRECALL)) ||
2834 ((rx_nFreePackets < rxi_dataQuota+2) &&
2835 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2836 && (acall->flags & RX_CALL_READER_WAIT)))) {
2839 MUTEX_EXIT(&rx_stats_mutex);
2844 /* try to attach call, if authentication is complete */
2845 static void TryAttach(acall, socket, tnop, newcallp)
2846 register struct rx_call *acall;
2847 register osi_socket socket;
2849 register struct rx_call **newcallp; {
2850 register struct rx_connection *conn;
2852 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2853 /* Don't attach until we have any req'd. authentication. */
2854 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2855 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2856 /* Note: this does not necessarily succeed; there
2857 may not any proc available */
2860 rxi_ChallengeOn(acall->conn);
2865 /* A data packet has been received off the interface. This packet is
2866 * appropriate to the call (the call is in the right state, etc.). This
2867 * routine can return a packet to the caller, for re-use */
2869 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2870 port, tnop, newcallp)
2871 register struct rx_call *call;
2872 register struct rx_packet *np;
2878 struct rx_call **newcallp;
2884 afs_uint32 seq, serial, flags;
2886 struct rx_packet *tnp;
2888 MUTEX_ENTER(&rx_stats_mutex);
2889 rx_stats.dataPacketsRead++;
2890 MUTEX_EXIT(&rx_stats_mutex);
2893 /* If there are no packet buffers, drop this new packet, unless we can find
2894 * packet buffers from inactive calls */
2896 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2897 MUTEX_ENTER(&rx_freePktQ_lock);
2898 rxi_NeedMorePackets = TRUE;
2899 MUTEX_EXIT(&rx_freePktQ_lock);
2900 MUTEX_ENTER(&rx_stats_mutex);
2901 rx_stats.noPacketBuffersOnRead++;
2902 MUTEX_EXIT(&rx_stats_mutex);
2903 call->rprev = np->header.serial;
2904 rxi_calltrace(RX_TRACE_DROP, call);
2905 dpf (("packet %x dropped on receipt - quota problems", np));
2907 rxi_ClearReceiveQueue(call);
2908 clock_GetTime(&when);
2909 clock_Add(&when, &rx_softAckDelay);
2910 if (!call->delayedAckEvent ||
2911 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2912 rxevent_Cancel(call->delayedAckEvent, call,
2913 RX_CALL_REFCOUNT_DELAY);
2914 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2915 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2918 /* we've damaged this call already, might as well do it in. */
2924 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2925 * packet is one of several packets transmitted as a single
2926 * datagram. Do not send any soft or hard acks until all packets
2927 * in a jumbogram have been processed. Send negative acks right away.
2929 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2930 /* tnp is non-null when there are more packets in the
2931 * current jumbo gram */
2938 seq = np->header.seq;
2939 serial = np->header.serial;
2940 flags = np->header.flags;
2942 /* If the call is in an error state, send an abort message */
2944 return rxi_SendCallAbort(call, np, istack, 0);
2946 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2947 * AFS 3.5 jumbogram. */
2948 if (flags & RX_JUMBO_PACKET) {
2949 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2954 if (np->header.spare != 0) {
2955 MUTEX_ENTER(&call->conn->conn_data_lock);
2956 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2957 MUTEX_EXIT(&call->conn->conn_data_lock);
2960 /* The usual case is that this is the expected next packet */
2961 if (seq == call->rnext) {
2963 /* Check to make sure it is not a duplicate of one already queued */
2964 if (queue_IsNotEmpty(&call->rq)
2965 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2966 MUTEX_ENTER(&rx_stats_mutex);
2967 rx_stats.dupPacketsRead++;
2968 MUTEX_EXIT(&rx_stats_mutex);
2969 dpf (("packet %x dropped on receipt - duplicate", np));
2970 rxevent_Cancel(call->delayedAckEvent, call,
2971 RX_CALL_REFCOUNT_DELAY);
2972 np = rxi_SendAck(call, np, seq, serial,
2973 flags, RX_ACK_DUPLICATE, istack);
2979 /* It's the next packet. Stick it on the receive queue
2980 * for this call. Set newPackets to make sure we wake
2981 * the reader once all packets have been processed */
2982 queue_Prepend(&call->rq, np);
2984 np = NULL; /* We can't use this anymore */
2987 /* If an ack is requested then set a flag to make sure we
2988 * send an acknowledgement for this packet */
2989 if (flags & RX_REQUEST_ACK) {
2993 /* Keep track of whether we have received the last packet */
2994 if (flags & RX_LAST_PACKET) {
2995 call->flags |= RX_CALL_HAVE_LAST;
2999 /* Check whether we have all of the packets for this call */
3000 if (call->flags & RX_CALL_HAVE_LAST) {
3001 afs_uint32 tseq; /* temporary sequence number */
3002 struct rx_packet *tp; /* Temporary packet pointer */
3003 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3005 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3006 if (tseq != tp->header.seq)
3008 if (tp->header.flags & RX_LAST_PACKET) {
3009 call->flags |= RX_CALL_RECEIVE_DONE;
3016 /* Provide asynchronous notification for those who want it
3017 * (e.g. multi rx) */
3018 if (call->arrivalProc) {
3019 (*call->arrivalProc)(call, call->arrivalProcHandle,
3020 call->arrivalProcArg);
3021 call->arrivalProc = (VOID (*)()) 0;
3024 /* Update last packet received */
3027 /* If there is no server process serving this call, grab
3028 * one, if available. We only need to do this once. If a
3029 * server thread is available, this thread becomes a server
3030 * thread and the server thread becomes a listener thread. */
3032 TryAttach(call, socket, tnop, newcallp);
3035 /* This is not the expected next packet. */
3037 /* Determine whether this is a new or old packet, and if it's
3038 * a new one, whether it fits into the current receive window.
3039 * Also figure out whether the packet was delivered in sequence.
3040 * We use the prev variable to determine whether the new packet
3041 * is the successor of its immediate predecessor in the
3042 * receive queue, and the missing flag to determine whether
3043 * any of this packets predecessors are missing. */
3045 afs_uint32 prev; /* "Previous packet" sequence number */
3046 struct rx_packet *tp; /* Temporary packet pointer */
3047 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3048 int missing; /* Are any predecessors missing? */
3050 /* If the new packet's sequence number has been sent to the
3051 * application already, then this is a duplicate */
3052 if (seq < call->rnext) {
3053 MUTEX_ENTER(&rx_stats_mutex);
3054 rx_stats.dupPacketsRead++;
3055 MUTEX_EXIT(&rx_stats_mutex);
3056 rxevent_Cancel(call->delayedAckEvent, call,
3057 RX_CALL_REFCOUNT_DELAY);
3058 np = rxi_SendAck(call, np, seq, serial,
3059 flags, RX_ACK_DUPLICATE, istack);
3065 /* If the sequence number is greater than what can be
3066 * accomodated by the current window, then send a negative
3067 * acknowledge and drop the packet */
3068 if ((call->rnext + call->rwind) <= seq) {
3069 rxevent_Cancel(call->delayedAckEvent, call,
3070 RX_CALL_REFCOUNT_DELAY);
3071 np = rxi_SendAck(call, np, seq, serial,
3072 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3078 /* Look for the packet in the queue of old received packets */
3079 for (prev = call->rnext - 1, missing = 0,
3080 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3081 /*Check for duplicate packet */
3082 if (seq == tp->header.seq) {
3083 MUTEX_ENTER(&rx_stats_mutex);
3084 rx_stats.dupPacketsRead++;
3085 MUTEX_EXIT(&rx_stats_mutex);
3086 rxevent_Cancel(call->delayedAckEvent, call,
3087 RX_CALL_REFCOUNT_DELAY);
3088 np = rxi_SendAck(call, np, seq, serial,
3089 flags, RX_ACK_DUPLICATE, istack);
3094 /* If we find a higher sequence packet, break out and
3095 * insert the new packet here. */
3096 if (seq < tp->header.seq) break;
3097 /* Check for missing packet */
3098 if (tp->header.seq != prev+1) {
3102 prev = tp->header.seq;
3105 /* Keep track of whether we have received the last packet. */
3106 if (flags & RX_LAST_PACKET) {
3107 call->flags |= RX_CALL_HAVE_LAST;
3110 /* It's within the window: add it to the the receive queue.
3111 * tp is left by the previous loop either pointing at the
3112 * packet before which to insert the new packet, or at the
3113 * queue head if the queue is empty or the packet should be
3115 queue_InsertBefore(tp, np);
3119 /* Check whether we have all of the packets for this call */
3120 if ((call->flags & RX_CALL_HAVE_LAST)
3121 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3122 afs_uint32 tseq; /* temporary sequence number */
3124 for (tseq = call->rnext,
3125 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3126 if (tseq != tp->header.seq)
3128 if (tp->header.flags & RX_LAST_PACKET) {
3129 call->flags |= RX_CALL_RECEIVE_DONE;
3136 /* We need to send an ack of the packet is out of sequence,
3137 * or if an ack was requested by the peer. */
3138 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3142 /* Acknowledge the last packet for each call */
3143 if (flags & RX_LAST_PACKET) {
3154 * If the receiver is waiting for an iovec, fill the iovec
3155 * using the data from the receive queue */
3156 if (call->flags & RX_CALL_IOVEC_WAIT) {
3157 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3158 /* the call may have been aborted */
3167 /* Wakeup the reader if any */
3168 if ((call->flags & RX_CALL_READER_WAIT) &&
3169 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3170 (call->iovNext >= call->iovMax) ||
3171 (call->flags & RX_CALL_RECEIVE_DONE))) {
3172 call->flags &= ~RX_CALL_READER_WAIT;
3173 #ifdef RX_ENABLE_LOCKS
3174 CV_BROADCAST(&call->cv_rq);
3176 osi_rxWakeup(&call->rq);
3182 * Send an ack when requested by the peer, or once every
3183 * rxi_SoftAckRate packets until the last packet has been
3184 * received. Always send a soft ack for the last packet in
3185 * the server's reply. */
3187 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3188 np = rxi_SendAck(call, np, seq, serial, flags,
3189 RX_ACK_REQUESTED, istack);
3190 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3191 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3192 np = rxi_SendAck(call, np, seq, serial, flags,
3193 RX_ACK_IDLE, istack);
3194 } else if (call->nSoftAcks) {
3195 clock_GetTime(&when);
3196 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3197 clock_Add(&when, &rx_lastAckDelay);
3199 clock_Add(&when, &rx_softAckDelay);
3201 if (!call->delayedAckEvent ||
3202 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3203 rxevent_Cancel(call->delayedAckEvent, call,
3204 RX_CALL_REFCOUNT_DELAY);
3205 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3206 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3209 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3210 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3217 static void rxi_ComputeRate();
3220 /* The real smarts of the whole thing. */
3221 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3222 register struct rx_call *call;
3223 struct rx_packet *np;
3226 struct rx_ackPacket *ap;
3228 register struct rx_packet *tp;
3229 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3230 register struct rx_connection *conn = call->conn;
3231 struct rx_peer *peer = conn->peer;
3234 /* because there are CM's that are bogus, sending weird values for this. */
3235 afs_uint32 skew = 0;
3240 int newAckCount = 0;
3241 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3242 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3244 MUTEX_ENTER(&rx_stats_mutex);
3245 rx_stats.ackPacketsRead++;
3246 MUTEX_EXIT(&rx_stats_mutex);
3247 ap = (struct rx_ackPacket *) rx_DataOf(np);
3248 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3250 return np; /* truncated ack packet */
3252 /* depends on ack packet struct */
3253 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3254 first = ntohl(ap->firstPacket);
3255 serial = ntohl(ap->serial);
3256 /* temporarily disabled -- needs to degrade over time
3257 skew = ntohs(ap->maxSkew); */
3259 /* Ignore ack packets received out of order */
3260 if (first < call->tfirst) {
3264 if (np->header.flags & RX_SLOW_START_OK) {
3265 call->flags |= RX_CALL_SLOW_START_OK;
3271 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3272 ap->reason, ntohl(ap->previousPacket),
3273 (unsigned int) np->header.seq, (unsigned int) serial,
3274 (unsigned int) skew, ntohl(ap->firstPacket));
3277 for (offset = 0; offset < nAcks; offset++)
3278 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3284 /* if a server connection has been re-created, it doesn't remember what
3285 serial # it was up to. An ack will tell us, since the serial field
3286 contains the largest serial received by the other side */
3287 MUTEX_ENTER(&conn->conn_data_lock);
3288 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3289 conn->serial = serial+1;
3291 MUTEX_EXIT(&conn->conn_data_lock);
3293 /* Update the outgoing packet skew value to the latest value of
3294 * the peer's incoming packet skew value. The ack packet, of
3295 * course, could arrive out of order, but that won't affect things
3297 MUTEX_ENTER(&peer->peer_lock);
3298 peer->outPacketSkew = skew;
3300 /* Check for packets that no longer need to be transmitted, and
3301 * discard them. This only applies to packets positively
3302 * acknowledged as having been sent to the peer's upper level.
3303 * All other packets must be retained. So only packets with
3304 * sequence numbers < ap->firstPacket are candidates. */
3305 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3306 if (tp->header.seq >= first) break;
3307 call->tfirst = tp->header.seq + 1;
3308 if (tp->header.serial == serial) {
3309 /* Use RTT if not delayed by client. */
3310 if (ap->reason != RX_ACK_DELAY)
3311 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3313 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3316 else if (tp->firstSerial == serial) {
3317 /* Use RTT if not delayed by client. */
3318 if (ap->reason != RX_ACK_DELAY)
3319 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3321 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3324 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3325 /* XXX Hack. Because we have to release the global rx lock when sending
3326 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3327 * in rxi_Start sending packets out because packets may move to the
3328 * freePacketQueue as result of being here! So we drop these packets until
3329 * we're safely out of the traversing. Really ugly!
3330 * To make it even uglier, if we're using fine grain locking, we can
3331 * set the ack bits in the packets and have rxi_Start remove the packets
3332 * when it's done transmitting.
3334 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3337 if (call->flags & RX_CALL_TQ_BUSY) {
3338 #ifdef RX_ENABLE_LOCKS
3339 tp->flags |= RX_PKTFLAG_ACKED;
3340 call->flags |= RX_CALL_TQ_SOME_ACKED;
3341 #else /* RX_ENABLE_LOCKS */
3343 #endif /* RX_ENABLE_LOCKS */
3345 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3348 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3353 /* Give rate detector a chance to respond to ping requests */
3354 if (ap->reason == RX_ACK_PING_RESPONSE) {
3355 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3359 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3361 /* Now go through explicit acks/nacks and record the results in
3362 * the waiting packets. These are packets that can't be released
3363 * yet, even with a positive acknowledge. This positive
3364 * acknowledge only means the packet has been received by the
3365 * peer, not that it will be retained long enough to be sent to
3366 * the peer's upper level. In addition, reset the transmit timers
3367 * of any missing packets (those packets that must be missing
3368 * because this packet was out of sequence) */
3370 call->nSoftAcked = 0;
3371 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3372 /* Update round trip time if the ack was stimulated on receipt
3374 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3375 #ifdef RX_ENABLE_LOCKS
3376 if (tp->header.seq >= first) {
3377 #endif /* RX_ENABLE_LOCKS */
3378 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3379 if (tp->header.serial == serial) {
3380 /* Use RTT if not delayed by client. */
3381 if (ap->reason != RX_ACK_DELAY)
3382 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3384 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3387 else if ((tp->firstSerial == serial)) {
3388 /* Use RTT if not delayed by client. */
3389 if (ap->reason != RX_ACK_DELAY)
3390 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3392 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3395 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3396 #ifdef RX_ENABLE_LOCKS
3398 #endif /* RX_ENABLE_LOCKS */
3399 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3401 /* Set the acknowledge flag per packet based on the
3402 * information in the ack packet. An acknowlegded packet can
3403 * be downgraded when the server has discarded a packet it
3404 * soacked previously, or when an ack packet is received
3405 * out of sequence. */
3406 if (tp->header.seq < first) {
3407 /* Implicit ack information */
3408 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3411 tp->flags |= RX_PKTFLAG_ACKED;
3413 else if (tp->header.seq < first + nAcks) {
3414 /* Explicit ack information: set it in the packet appropriately */
3415 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3416 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3418 tp->flags |= RX_PKTFLAG_ACKED;
3426 tp->flags &= ~RX_PKTFLAG_ACKED;
3431 tp->flags &= ~RX_PKTFLAG_ACKED;
3435 /* If packet isn't yet acked, and it has been transmitted at least
3436 * once, reset retransmit time using latest timeout
3437 * ie, this should readjust the retransmit timer for all outstanding
3438 * packets... So we don't just retransmit when we should know better*/
3440 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3441 tp->retryTime = tp->timeSent;
3442 clock_Add(&tp->retryTime, &peer->timeout);
3443 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3444 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3448 /* If the window has been extended by this acknowledge packet,
3449 * then wakeup a sender waiting in alloc for window space, or try
3450 * sending packets now, if he's been sitting on packets due to
3451 * lack of window space */
3452 if (call->tnext < (call->tfirst + call->twind)) {
3453 #ifdef RX_ENABLE_LOCKS
3454 CV_SIGNAL(&call->cv_twind);
3456 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3457 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3458 osi_rxWakeup(&call->twind);
3461 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3462 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3466 /* if the ack packet has a receivelen field hanging off it,
3467 * update our state */
3468 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3471 /* If the ack packet has a "recommended" size that is less than
3472 * what I am using now, reduce my size to match */
3473 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3474 sizeof(afs_int32), &tSize);
3475 tSize = (afs_uint32) ntohl(tSize);
3476 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3478 /* Get the maximum packet size to send to this peer */
3479 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3481 tSize = (afs_uint32)ntohl(tSize);
3482 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3483 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3485 /* sanity check - peer might have restarted with different params.
3486 * If peer says "send less", dammit, send less... Peer should never
3487 * be unable to accept packets of the size that prior AFS versions would
3488 * send without asking. */
3489 if (peer->maxMTU != tSize) {
3490 peer->maxMTU = tSize;
3491 peer->MTU = MIN(tSize, peer->MTU);
3492 call->MTU = MIN(call->MTU, tSize);
3496 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3498 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3499 sizeof(afs_int32), &tSize);
3500 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3501 if (tSize < call->twind) { /* smaller than our send */
3502 call->twind = tSize; /* window, we must send less... */
3503 call->ssthresh = MIN(call->twind, call->ssthresh);
3506 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3507 * network MTU confused with the loopback MTU. Calculate the
3508 * maximum MTU here for use in the slow start code below.
3510 maxMTU = peer->maxMTU;
3511 /* Did peer restart with older RX version? */
3512 if (peer->maxDgramPackets > 1) {
3513 peer->maxDgramPackets = 1;
3515 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3517 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3518 sizeof(afs_int32), &tSize);
3519 tSize = (afs_uint32) ntohl(tSize);
3521 * As of AFS 3.5 we set the send window to match the receive window.
3523 if (tSize < call->twind) {
3524 call->twind = tSize;
3525 call->ssthresh = MIN(call->twind, call->ssthresh);
3526 } else if (tSize > call->twind) {
3527 call->twind = tSize;
3531 * As of AFS 3.5, a jumbogram is more than one fixed size
3532 * packet transmitted in a single UDP datagram. If the remote
3533 * MTU is smaller than our local MTU then never send a datagram
3534 * larger than the natural MTU.
3536 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3537 sizeof(afs_int32), &tSize);
3538 maxDgramPackets = (afs_uint32) ntohl(tSize);
3539 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3540 maxDgramPackets = MIN(maxDgramPackets,
3541 (int)(peer->ifDgramPackets));
3542 maxDgramPackets = MIN(maxDgramPackets, tSize);
3543 if (maxDgramPackets > 1) {
3544 peer->maxDgramPackets = maxDgramPackets;
3545 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3547 peer->maxDgramPackets = 1;
3548 call->MTU = peer->natMTU;
3550 } else if (peer->maxDgramPackets > 1) {
3551 /* Restarted with lower version of RX */
3552 peer->maxDgramPackets = 1;
3554 } else if (peer->maxDgramPackets > 1 ||
3555 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3556 /* Restarted with lower version of RX */
3557 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3558 peer->natMTU = OLD_MAX_PACKET_SIZE;
3559 peer->MTU = OLD_MAX_PACKET_SIZE;
3560 peer->maxDgramPackets = 1;
3561 peer->nDgramPackets = 1;
3563 call->MTU = OLD_MAX_PACKET_SIZE;
3568 * Calculate how many datagrams were successfully received after
3569 * the first missing packet and adjust the negative ack counter
3574 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3575 if (call->nNacks < nNacked) {
3576 call->nNacks = nNacked;
3585 if (call->flags & RX_CALL_FAST_RECOVER) {
3587 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3589 call->flags &= ~RX_CALL_FAST_RECOVER;
3590 call->cwind = call->nextCwind;
3591 call->nextCwind = 0;
3594 call->nCwindAcks = 0;
3596 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3597 /* Three negative acks in a row trigger congestion recovery */
3598 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3599 MUTEX_EXIT(&peer->peer_lock);
3600 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3601 /* someone else is waiting to start recovery */
3604 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3605 while (call->flags & RX_CALL_TQ_BUSY) {
3606 call->flags |= RX_CALL_TQ_WAIT;
3607 #ifdef RX_ENABLE_LOCKS
3608 CV_WAIT(&call->cv_tq, &call->lock);
3609 #else /* RX_ENABLE_LOCKS */
3610 osi_rxSleep(&call->tq);
3611 #endif /* RX_ENABLE_LOCKS */
3613 MUTEX_ENTER(&peer->peer_lock);
3614 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3615 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3616 call->flags |= RX_CALL_FAST_RECOVER;
3617 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3618 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3620 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3621 call->nextCwind = call->ssthresh;
3624 peer->MTU = call->MTU;
3625 peer->cwind = call->nextCwind;
3626 peer->nDgramPackets = call->nDgramPackets;
3628 call->congestSeq = peer->congestSeq;
3629 /* Reset the resend times on the packets that were nacked
3630 * so we will retransmit as soon as the window permits*/
3631 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3633 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3634 clock_Zero(&tp->retryTime);
3636 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3641 /* If cwind is smaller than ssthresh, then increase
3642 * the window one packet for each ack we receive (exponential
3644 * If cwind is greater than or equal to ssthresh then increase
3645 * the congestion window by one packet for each cwind acks we
3646 * receive (linear growth). */
3647 if (call->cwind < call->ssthresh) {
3648 call->cwind = MIN((int)call->ssthresh,
3649 (int)(call->cwind + newAckCount));
3650 call->nCwindAcks = 0;
3652 call->nCwindAcks += newAckCount;
3653 if (call->nCwindAcks >= call->cwind) {
3654 call->nCwindAcks = 0;
3655 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3659 * If we have received several acknowledgements in a row then
3660 * it is time to increase the size of our datagrams
3662 if ((int)call->nAcks > rx_nDgramThreshold) {
3663 if (peer->maxDgramPackets > 1) {
3664 if (call->nDgramPackets < peer->maxDgramPackets) {
3665 call->nDgramPackets++;
3667 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3668 } else if (call->MTU < peer->maxMTU) {
3669 call->MTU += peer->natMTU;
3670 call->MTU = MIN(call->MTU, peer->maxMTU);
3676 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3678 /* Servers need to hold the call until all response packets have
3679 * been acknowledged. Soft acks are good enough since clients
3680 * are not allowed to clear their receive queues. */
3681 if (call->state == RX_STATE_HOLD &&
3682 call->tfirst + call->nSoftAcked >= call->tnext) {
3683 call->state = RX_STATE_DALLY;
3684 rxi_ClearTransmitQueue(call, 0);
3685 } else if (!queue_IsEmpty(&call->tq)) {
3686 rxi_Start(0, call, istack);
3691 /* Received a response to a challenge packet */
3692 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3693 register struct rx_connection *conn;
3694 register struct rx_packet *np;
3699 /* Ignore the packet if we're the client */
3700 if (conn->type == RX_CLIENT_CONNECTION) return np;
3702 /* If already authenticated, ignore the packet (it's probably a retry) */
3703 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3706 /* Otherwise, have the security object evaluate the response packet */
3707 error = RXS_CheckResponse(conn->securityObject, conn, np);
3709 /* If the response is invalid, reset the connection, sending
3710 * an abort to the peer */
3714 rxi_ConnectionError(conn, error);
3715 MUTEX_ENTER(&conn->conn_data_lock);
3716 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3717 MUTEX_EXIT(&conn->conn_data_lock);
3721 /* If the response is valid, any calls waiting to attach
3722 * servers can now do so */
3724 for (i=0; i<RX_MAXCALLS; i++) {
3725 struct rx_call *call = conn->call[i];
3727 MUTEX_ENTER(&call->lock);
3728 if (call->state == RX_STATE_PRECALL)
3729 rxi_AttachServerProc(call, -1, NULL, NULL);
3730 MUTEX_EXIT(&call->lock);
3737 /* A client has received an authentication challenge: the security
3738 * object is asked to cough up a respectable response packet to send
3739 * back to the server. The server is responsible for retrying the
3740 * challenge if it fails to get a response. */
3743 rxi_ReceiveChallengePacket(conn, np, istack)
3744 register struct rx_connection *conn;
3745 register struct rx_packet *np;
3750 /* Ignore the challenge if we're the server */
3751 if (conn->type == RX_SERVER_CONNECTION) return np;
3753 /* Ignore the challenge if the connection is otherwise idle; someone's
3754 * trying to use us as an oracle. */
3755 if (!rxi_HasActiveCalls(conn)) return np;
3757 /* Send the security object the challenge packet. It is expected to fill
3758 * in the response. */
3759 error = RXS_GetResponse(conn->securityObject, conn, np);
3761 /* If the security object is unable to return a valid response, reset the
3762 * connection and send an abort to the peer. Otherwise send the response
3763 * packet to the peer connection. */
3765 rxi_ConnectionError(conn, error);
3766 MUTEX_ENTER(&conn->conn_data_lock);
3767 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3768 MUTEX_EXIT(&conn->conn_data_lock);
3771 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3772 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3778 /* Find an available server process to service the current request in
3779 * the given call structure. If one isn't available, queue up this
3780 * call so it eventually gets one */
3782 rxi_AttachServerProc(call, socket, tnop, newcallp)
3783 register struct rx_call *call;
3784 register osi_socket socket;
3786 register struct rx_call **newcallp;
3788 register struct rx_serverQueueEntry *sq;
3789 register struct rx_service *service = call->conn->service;
3790 #ifdef RX_ENABLE_LOCKS
3791 register int haveQuota = 0;
3792 #endif /* RX_ENABLE_LOCKS */
3793 /* May already be attached */
3794 if (call->state == RX_STATE_ACTIVE) return;
3796 MUTEX_ENTER(&rx_serverPool_lock);
3797 #ifdef RX_ENABLE_LOCKS
3798 while(rxi_ServerThreadSelectingCall) {
3799 MUTEX_EXIT(&call->lock);
3800 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3801 MUTEX_EXIT(&rx_serverPool_lock);
3802 MUTEX_ENTER(&call->lock);
3803 MUTEX_ENTER(&rx_serverPool_lock);
3804 /* Call may have been attached */
3805 if (call->state == RX_STATE_ACTIVE) return;
3808 haveQuota = QuotaOK(service);
3809 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3810 /* If there are no processes available to service this call,
3811 * put the call on the incoming call queue (unless it's
3812 * already on the queue).
3815 ReturnToServerPool(service);
3816 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3817 call->flags |= RX_CALL_WAIT_PROC;
3818 MUTEX_ENTER(&rx_stats_mutex);
3820 MUTEX_EXIT(&rx_stats_mutex);
3821 rxi_calltrace(RX_CALL_ARRIVAL, call);
3822 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3823 queue_Append(&rx_incomingCallQueue, call);
3826 #else /* RX_ENABLE_LOCKS */
3827 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3828 /* If there are no processes available to service this call,
3829 * put the call on the incoming call queue (unless it's
3830 * already on the queue).
3832 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3833 call->flags |= RX_CALL_WAIT_PROC;
3835 rxi_calltrace(RX_CALL_ARRIVAL, call);
3836 queue_Append(&rx_incomingCallQueue, call);
3839 #endif /* RX_ENABLE_LOCKS */
3841 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3843 /* If hot threads are enabled, and both newcallp and sq->socketp
3844 * are non-null, then this thread will process the call, and the
3845 * idle server thread will start listening on this threads socket.
3848 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3851 *sq->socketp = socket;
3852 clock_GetTime(&call->startTime);
3853 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3857 if (call->flags & RX_CALL_WAIT_PROC) {
3858 /* Conservative: I don't think this should happen */
3859 call->flags &= ~RX_CALL_WAIT_PROC;
3860 MUTEX_ENTER(&rx_stats_mutex);
3862 MUTEX_EXIT(&rx_stats_mutex);
3865 call->state = RX_STATE_ACTIVE;
3866 call->mode = RX_MODE_RECEIVING;
3867 if (call->flags & RX_CALL_CLEARED) {
3868 /* send an ack now to start the packet flow up again */
3869 call->flags &= ~RX_CALL_CLEARED;
3870 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3872 #ifdef RX_ENABLE_LOCKS
3875 service->nRequestsRunning++;
3876 if (service->nRequestsRunning <= service->minProcs)
3882 MUTEX_EXIT(&rx_serverPool_lock);
3885 /* Delay the sending of an acknowledge event for a short while, while
3886 * a new call is being prepared (in the case of a client) or a reply
3887 * is being prepared (in the case of a server). Rather than sending
3888 * an ack packet, an ACKALL packet is sent. */
3889 void rxi_AckAll(event, call, dummy)
3890 struct rxevent *event;
3891 register struct rx_call *call;
3894 #ifdef RX_ENABLE_LOCKS
3896 MUTEX_ENTER(&call->lock);
3897 call->delayedAckEvent = (struct rxevent *) 0;
3898 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3900 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3901 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3903 MUTEX_EXIT(&call->lock);
3904 #else /* RX_ENABLE_LOCKS */
3905 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3906 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3907 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3908 #endif /* RX_ENABLE_LOCKS */
3911 void rxi_SendDelayedAck(event, call, dummy)
3912 struct rxevent *event;
3913 register struct rx_call *call;
3916 #ifdef RX_ENABLE_LOCKS
3918 MUTEX_ENTER(&call->lock);
3919 if (event == call->delayedAckEvent)
3920 call->delayedAckEvent = (struct rxevent *) 0;
3921 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3923 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3925 MUTEX_EXIT(&call->lock);
3926 #else /* RX_ENABLE_LOCKS */
3927 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3928 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3929 #endif /* RX_ENABLE_LOCKS */
3933 #ifdef RX_ENABLE_LOCKS
3934 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3935 * clearing them out.
3937 static void rxi_SetAcksInTransmitQueue(call)
3938 register struct rx_call *call;
3940 register struct rx_packet *p, *tp;
3943 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3946 p->flags |= RX_PKTFLAG_ACKED;
3950 call->flags |= RX_CALL_TQ_CLEARME;
3951 call->flags |= RX_CALL_TQ_SOME_ACKED;
3954 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3955 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3956 call->tfirst = call->tnext;
3957 call->nSoftAcked = 0;
3959 if (call->flags & RX_CALL_FAST_RECOVER) {
3960 call->flags &= ~RX_CALL_FAST_RECOVER;
3961 call->cwind = call->nextCwind;
3962 call->nextCwind = 0;
3965 CV_SIGNAL(&call->cv_twind);
3967 #endif /* RX_ENABLE_LOCKS */
3969 /* Clear out the transmit queue for the current call (all packets have
3970 * been received by peer) */
3971 void rxi_ClearTransmitQueue(call, force)
3972 register struct rx_call *call;
3975 register struct rx_packet *p, *tp;
3977 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3978 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3980 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3983 p->flags |= RX_PKTFLAG_ACKED;
3987 call->flags |= RX_CALL_TQ_CLEARME;
3988 call->flags |= RX_CALL_TQ_SOME_ACKED;
3991 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3992 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3998 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3999 call->flags &= ~RX_CALL_TQ_CLEARME;
4001 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4003 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4004 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4005 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4006 call->nSoftAcked = 0;
4008 if (call->flags & RX_CALL_FAST_RECOVER) {
4009 call->flags &= ~RX_CALL_FAST_RECOVER;
4010 call->cwind = call->nextCwind;
4013 #ifdef RX_ENABLE_LOCKS
4014 CV_SIGNAL(&call->cv_twind);
4016 osi_rxWakeup(&call->twind);
4020 void rxi_ClearReceiveQueue(call)
4021 register struct rx_call *call;
4023 register struct rx_packet *p, *tp;
4024 if (queue_IsNotEmpty(&call->rq)) {
4025 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4030 rx_packetReclaims++;
4032 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4034 if (call->state == RX_STATE_PRECALL) {
4035 call->flags |= RX_CALL_CLEARED;
4039 /* Send an abort packet for the specified call */
4040 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4041 register struct rx_call *call;
4042 struct rx_packet *packet;
4052 /* Clients should never delay abort messages */
4053 if (rx_IsClientConn(call->conn))
4056 if (call->abortCode != call->error) {
4057 call->abortCode = call->error;
4058 call->abortCount = 0;
4061 if (force || rxi_callAbortThreshhold == 0 ||
4062 call->abortCount < rxi_callAbortThreshhold) {
4063 if (call->delayedAbortEvent) {
4064 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4066 error = htonl(call->error);
4068 packet = rxi_SendSpecial(call, call->conn, packet,
4069 RX_PACKET_TYPE_ABORT, (char *)&error,
4070 sizeof(error), istack);
4071 } else if (!call->delayedAbortEvent) {
4072 clock_GetTime(&when);
4073 clock_Addmsec(&when, rxi_callAbortDelay);
4074 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4075 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4081 /* Send an abort packet for the specified connection. Packet is an
4082 * optional pointer to a packet that can be used to send the abort.
4083 * Once the number of abort messages reaches the threshhold, an
4084 * event is scheduled to send the abort. Setting the force flag
4085 * overrides sending delayed abort messages.
4087 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4088 * to send the abort packet.
4090 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4091 register struct rx_connection *conn;
4092 struct rx_packet *packet;
4102 /* Clients should never delay abort messages */
4103 if (rx_IsClientConn(conn))
4106 if (force || rxi_connAbortThreshhold == 0 ||
4107 conn->abortCount < rxi_connAbortThreshhold) {
4108 if (conn->delayedAbortEvent) {
4109 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4111 error = htonl(conn->error);
4113 MUTEX_EXIT(&conn->conn_data_lock);
4114 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4115 RX_PACKET_TYPE_ABORT, (char *)&error,
4116 sizeof(error), istack);
4117 MUTEX_ENTER(&conn->conn_data_lock);
4118 } else if (!conn->delayedAbortEvent) {
4119 clock_GetTime(&when);
4120 clock_Addmsec(&when, rxi_connAbortDelay);
4121 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4127 /* Associate an error all of the calls owned by a connection. Called
4128 * with error non-zero. This is only for really fatal things, like
4129 * bad authentication responses. The connection itself is set in
4130 * error at this point, so that future packets received will be
4132 void rxi_ConnectionError(conn, error)
4133 register struct rx_connection *conn;
4134 register afs_int32 error;
4138 if (conn->challengeEvent)
4139 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4140 for (i=0; i<RX_MAXCALLS; i++) {
4141 struct rx_call *call = conn->call[i];
4143 MUTEX_ENTER(&call->lock);
4144 rxi_CallError(call, error);
4145 MUTEX_EXIT(&call->lock);
4148 conn->error = error;
4149 MUTEX_ENTER(&rx_stats_mutex);
4150 rx_stats.fatalErrors++;
4151 MUTEX_EXIT(&rx_stats_mutex);
4155 void rxi_CallError(call, error)
4156 register struct rx_call *call;
4159 if (call->error) error = call->error;
4160 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4161 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4162 rxi_ResetCall(call, 0);
4165 rxi_ResetCall(call, 0);
4167 call->error = error;
4168 call->mode = RX_MODE_ERROR;
4171 /* Reset various fields in a call structure, and wakeup waiting
4172 * processes. Some fields aren't changed: state & mode are not
4173 * touched (these must be set by the caller), and bufptr, nLeft, and
4174 * nFree are not reset, since these fields are manipulated by
4175 * unprotected macros, and may only be reset by non-interrupting code.
4178 /* this code requires that call->conn be set properly as a pre-condition. */
4179 #endif /* ADAPT_WINDOW */
4181 void rxi_ResetCall(call, newcall)
4182 register struct rx_call *call;
4183 register int newcall;
4186 register struct rx_peer *peer;
4187 struct rx_packet *packet;
4189 /* Notify anyone who is waiting for asynchronous packet arrival */
4190 if (call->arrivalProc) {
4191 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4192 call->arrivalProc = (VOID (*)()) 0;
4195 if (call->delayedAbortEvent) {
4196 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4197 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4199 rxi_SendCallAbort(call, packet, 0, 1);
4200 rxi_FreePacket(packet);
4205 * Update the peer with the congestion information in this call
4206 * so other calls on this connection can pick up where this call
4207 * left off. If the congestion sequence numbers don't match then
4208 * another call experienced a retransmission.
4210 peer = call->conn->peer;
4211 MUTEX_ENTER(&peer->peer_lock);
4213 if (call->congestSeq == peer->congestSeq) {
4214 peer->cwind = MAX(peer->cwind, call->cwind);
4215 peer->MTU = MAX(peer->MTU, call->MTU);
4216 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4219 call->abortCode = 0;
4220 call->abortCount = 0;
4222 if (peer->maxDgramPackets > 1) {
4223 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4225 call->MTU = peer->MTU;
4227 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4228 call->ssthresh = rx_maxSendWindow;
4229 call->nDgramPackets = peer->nDgramPackets;
4230 call->congestSeq = peer->congestSeq;
4231 MUTEX_EXIT(&peer->peer_lock);
4233 flags = call->flags;
4234 rxi_ClearReceiveQueue(call);
4235 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4236 if (call->flags & RX_CALL_TQ_BUSY) {
4237 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4238 call->flags |= (flags & RX_CALL_TQ_WAIT);
4240 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4242 rxi_ClearTransmitQueue(call, 0);
4243 queue_Init(&call->tq);
4246 queue_Init(&call->rq);
4248 call->rwind = rx_initReceiveWindow;
4249 call->twind = rx_initSendWindow;
4250 call->nSoftAcked = 0;
4251 call->nextCwind = 0;
4254 call->nCwindAcks = 0;
4255 call->nSoftAcks = 0;
4256 call->nHardAcks = 0;
4258 call->tfirst = call->rnext = call->tnext = 1;
4260 call->lastAcked = 0;
4261 call->localStatus = call->remoteStatus = 0;
4263 if (flags & RX_CALL_READER_WAIT) {
4264 #ifdef RX_ENABLE_LOCKS
4265 CV_BROADCAST(&call->cv_rq);
4267 osi_rxWakeup(&call->rq);
4270 if (flags & RX_CALL_WAIT_PACKETS) {
4271 MUTEX_ENTER(&rx_freePktQ_lock);
4272 rxi_PacketsUnWait(); /* XXX */
4273 MUTEX_EXIT(&rx_freePktQ_lock);
4276 #ifdef RX_ENABLE_LOCKS
4277 CV_SIGNAL(&call->cv_twind);
4279 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4280 osi_rxWakeup(&call->twind);
4283 #ifdef RX_ENABLE_LOCKS
4284 /* The following ensures that we don't mess with any queue while some
4285 * other thread might also be doing so. The call_queue_lock field is
4286 * is only modified under the call lock. If the call is in the process
4287 * of being removed from a queue, the call is not locked until the
4288 * the queue lock is dropped and only then is the call_queue_lock field
4289 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4290 * Note that any other routine which removes a call from a queue has to
4291 * obtain the queue lock before examing the queue and removing the call.
4293 if (call->call_queue_lock) {
4294 MUTEX_ENTER(call->call_queue_lock);
4295 if (queue_IsOnQueue(call)) {
4297 if (flags & RX_CALL_WAIT_PROC) {
4298 MUTEX_ENTER(&rx_stats_mutex);
4300 MUTEX_EXIT(&rx_stats_mutex);
4303 MUTEX_EXIT(call->call_queue_lock);
4304 CLEAR_CALL_QUEUE_LOCK(call);
4306 #else /* RX_ENABLE_LOCKS */
4307 if (queue_IsOnQueue(call)) {
4309 if (flags & RX_CALL_WAIT_PROC)
4312 #endif /* RX_ENABLE_LOCKS */
4314 rxi_KeepAliveOff(call);
4315 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4318 /* Send an acknowledge for the indicated packet (seq,serial) of the
4319 * indicated call, for the indicated reason (reason). This
4320 * acknowledge will specifically acknowledge receiving the packet, and
4321 * will also specify which other packets for this call have been
4322 * received. This routine returns the packet that was used to the
4323 * caller. The caller is responsible for freeing it or re-using it.
4324 * This acknowledgement also returns the highest sequence number
4325 * actually read out by the higher level to the sender; the sender
4326 * promises to keep around packets that have not been read by the
4327 * higher level yet (unless, of course, the sender decides to abort
4328 * the call altogether). Any of p, seq, serial, pflags, or reason may
4329 * be set to zero without ill effect. That is, if they are zero, they
4330 * will not convey any information.
4331 * NOW there is a trailer field, after the ack where it will safely be
4332 * ignored by mundanes, which indicates the maximum size packet this
4333 * host can swallow. */
4334 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4335 register struct rx_call *call;
4336 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4337 int seq; /* Sequence number of the packet we are acking */
4338 int serial; /* Serial number of the packet */
4339 int pflags; /* Flags field from packet header */
4340 int reason; /* Reason an acknowledge was prompted */
4343 struct rx_ackPacket *ap;
4344 register struct rx_packet *rqp;
4345 register struct rx_packet *nxp; /* For queue_Scan */
4346 register struct rx_packet *p;
4351 * Open the receive window once a thread starts reading packets
4353 if (call->rnext > 1) {
4354 call->rwind = rx_maxReceiveWindow;
4357 call->nHardAcks = 0;
4358 call->nSoftAcks = 0;
4359 if (call->rnext > call->lastAcked)
4360 call->lastAcked = call->rnext;
4364 rx_computelen(p, p->length); /* reset length, you never know */
4365 } /* where that's been... */
4367 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4368 /* We won't send the ack, but don't panic. */
4369 return optionalPacket;
4372 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4374 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4375 if (!optionalPacket) rxi_FreePacket(p);
4376 return optionalPacket;
4378 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4379 if (rx_Contiguous(p)<templ) {
4380 if (!optionalPacket) rxi_FreePacket(p);
4381 return optionalPacket;
4383 } /* MTUXXX failing to send an ack is very serious. We should */
4384 /* try as hard as possible to send even a partial ack; it's */
4385 /* better than nothing. */
4387 ap = (struct rx_ackPacket *) rx_DataOf(p);
4388 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4389 ap->reason = reason;
4391 /* The skew computation used to be bogus, I think it's better now. */
4392 /* We should start paying attention to skew. XXX */
4393 ap->serial = htonl(call->conn->maxSerial);
4394 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4396 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4397 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4399 /* No fear of running out of ack packet here because there can only be at most
4400 * one window full of unacknowledged packets. The window size must be constrained
4401 * to be less than the maximum ack size, of course. Also, an ack should always
4402 * fit into a single packet -- it should not ever be fragmented. */
4403 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4404 if (!rqp || !call->rq.next
4405 || (rqp->header.seq > (call->rnext + call->rwind))) {
4406 if (!optionalPacket) rxi_FreePacket(p);
4407 rxi_CallError(call, RX_CALL_DEAD);
4408 return optionalPacket;
4411 while (rqp->header.seq > call->rnext + offset)
4412 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4413 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4415 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4416 if (!optionalPacket) rxi_FreePacket(p);
4417 rxi_CallError(call, RX_CALL_DEAD);
4418 return optionalPacket;
4423 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4425 /* these are new for AFS 3.3 */
4426 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4427 templ = htonl(templ);
4428 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4429 templ = htonl(call->conn->peer->ifMTU);
4430 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4432 /* new for AFS 3.4 */
4433 templ = htonl(call->rwind);
4434 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4436 /* new for AFS 3.5 */
4437 templ = htonl(call->conn->peer->ifDgramPackets);
4438 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4440 p->header.serviceId = call->conn->serviceId;
4441 p->header.cid = (call->conn->cid | call->channel);
4442 p->header.callNumber = *call->callNumber;
4443 p->header.seq = seq;
4444 p->header.securityIndex = call->conn->securityIndex;
4445 p->header.epoch = call->conn->epoch;
4446 p->header.type = RX_PACKET_TYPE_ACK;
4447 p->header.flags = RX_SLOW_START_OK;
4448 if (reason == RX_ACK_PING) {
4449 p->header.flags |= RX_REQUEST_ACK;
4451 clock_GetTime(&call->pingRequestTime);
4454 if (call->conn->type == RX_CLIENT_CONNECTION)
4455 p->header.flags |= RX_CLIENT_INITIATED;
4459 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4460 ap->reason, ntohl(ap->previousPacket),
4461 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4463 for (offset = 0; offset < ap->nAcks; offset++)
4464 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4471 register int i, nbytes = p->length;
4473 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4474 if (nbytes <= p->wirevec[i].iov_len) {
4475 register int savelen, saven;
4477 savelen = p->wirevec[i].iov_len;
4479 p->wirevec[i].iov_len = nbytes;
4481 rxi_Send(call, p, istack);
4482 p->wirevec[i].iov_len = savelen;
4486 else nbytes -= p->wirevec[i].iov_len;
4489 MUTEX_ENTER(&rx_stats_mutex);
4490 rx_stats.ackPacketsSent++;
4491 MUTEX_EXIT(&rx_stats_mutex);
4492 if (!optionalPacket) rxi_FreePacket(p);
4493 return optionalPacket; /* Return packet for re-use by caller */
4496 /* Send all of the packets in the list in single datagram */
4497 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4498 struct rx_call *call;
4499 struct rx_packet **list;
4504 struct clock *retryTime;
4510 struct rx_connection *conn = call->conn;
4511 struct rx_peer *peer = conn->peer;
4513 MUTEX_ENTER(&peer->peer_lock);
4515 if (resending) peer->reSends += len;
4516 MUTEX_ENTER(&rx_stats_mutex);
4517 rx_stats.dataPacketsSent += len;
4518 MUTEX_EXIT(&rx_stats_mutex);
4519 MUTEX_EXIT(&peer->peer_lock);
4521 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4525 /* Set the packet flags and schedule the resend events */
4526 /* Only request an ack for the last packet in the list */
4527 for (i = 0 ; i < len ; i++) {
4528 list[i]->retryTime = *retryTime;
4529 if (list[i]->header.serial) {
4530 /* Exponentially backoff retry times */
4531 if (list[i]->backoff < MAXBACKOFF) {
4532 /* so it can't stay == 0 */
4533 list[i]->backoff = (list[i]->backoff << 1) +1;
4535 else list[i]->backoff++;
4536 clock_Addmsec(&(list[i]->retryTime),
4537 ((afs_uint32) list[i]->backoff) << 8);
4540 /* Wait a little extra for the ack on the last packet */
4541 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4542 clock_Addmsec(&(list[i]->retryTime), 400);
4545 /* Record the time sent */
4546 list[i]->timeSent = *now;
4548 /* Ask for an ack on retransmitted packets, on every other packet
4549 * if the peer doesn't support slow start. Ask for an ack on every
4550 * packet until the congestion window reaches the ack rate. */
4551 if (list[i]->header.serial) {
4553 MUTEX_ENTER(&rx_stats_mutex);
4554 rx_stats.dataPacketsReSent++;
4555 MUTEX_EXIT(&rx_stats_mutex);
4557 /* improved RTO calculation- not Karn */
4558 list[i]->firstSent = *now;
4560 && (call->cwind <= (u_short)(conn->ackRate+1)
4561 || (!(call->flags & RX_CALL_SLOW_START_OK)
4562 && (list[i]->header.seq & 1)))) {
4567 MUTEX_ENTER(&peer->peer_lock);
4569 if (resending) peer->reSends++;
4570 MUTEX_ENTER(&rx_stats_mutex);
4571 rx_stats.dataPacketsSent++;
4572 MUTEX_EXIT(&rx_stats_mutex);
4573 MUTEX_EXIT(&peer->peer_lock);
4575 /* Tag this packet as not being the last in this group,
4576 * for the receiver's benefit */
4577 if (i < len-1 || moreFlag) {
4578 list[i]->header.flags |= RX_MORE_PACKETS;
4581 /* Install the new retransmit time for the packet, and
4582 * record the time sent */
4583 list[i]->timeSent = *now;
4587 list[len-1]->header.flags |= RX_REQUEST_ACK;
4590 /* Since we're about to send a data packet to the peer, it's
4591 * safe to nuke any scheduled end-of-packets ack */
4592 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4594 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4595 MUTEX_EXIT(&call->lock);
4597 rxi_SendPacketList(conn, list, len, istack);
4599 rxi_SendPacket(conn, list[0], istack);
4601 MUTEX_ENTER(&call->lock);
4602 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4604 /* Update last send time for this call (for keep-alive
4605 * processing), and for the connection (so that we can discover
4606 * idle connections) */
4607 conn->lastSendTime = call->lastSendTime = clock_Sec();
4610 /* When sending packets we need to follow these rules:
4611 * 1. Never send more than maxDgramPackets in a jumbogram.
4612 * 2. Never send a packet with more than two iovecs in a jumbogram.
4613 * 3. Never send a retransmitted packet in a jumbogram.
4614 * 4. Never send more than cwind/4 packets in a jumbogram
4615 * We always keep the last list we should have sent so we
4616 * can set the RX_MORE_PACKETS flags correctly.
4618 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4619 struct rx_call *call;
4620 struct rx_packet **list;
4624 struct clock *retryTime;
4627 int i, cnt, lastCnt = 0;
4628 struct rx_packet **listP, **lastP = 0;
4629 struct rx_peer *peer = call->conn->peer;
4630 int morePackets = 0;
4632 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4633 /* Does the current packet force us to flush the current list? */
4635 && (list[i]->header.serial
4636 || (list[i]->flags & RX_PKTFLAG_ACKED)
4637 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4639 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4640 /* If the call enters an error state stop sending, or if
4641 * we entered congestion recovery mode, stop sending */
4642 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4650 /* Add the current packet to the list if it hasn't been acked.
4651 * Otherwise adjust the list pointer to skip the current packet. */
4652 if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
4654 /* Do we need to flush the list? */
4655 if (cnt >= (int)peer->maxDgramPackets
4656 || cnt >= (int)call->nDgramPackets
4657 || cnt >= (int)call->cwind
4658 || list[i]->header.serial
4659 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4661 rxi_SendList(call, lastP, lastCnt, istack, 1,
4662 now, retryTime, resending);
4663 /* If the call enters an error state stop sending, or if
4664 * we entered congestion recovery mode, stop sending */
4665 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4675 osi_Panic("rxi_SendList error");
4681 /* Send the whole list when the call is in receive mode, when
4682 * the call is in eof mode, when we are in fast recovery mode,
4683 * and when we have the last packet */
4684 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4685 || call->mode == RX_MODE_RECEIVING
4686 || call->mode == RX_MODE_EOF
4687 || (call->flags & RX_CALL_FAST_RECOVER)) {
4688 /* Check for the case where the current list contains
4689 * an acked packet. Since we always send retransmissions
4690 * in a separate packet, we only need to check the first
4691 * packet in the list */
4692 if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
4696 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4697 now, retryTime, resending);
4698 /* If the call enters an error state stop sending, or if
4699 * we entered congestion recovery mode, stop sending */
4700 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4704 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4706 } else if (lastCnt > 0) {
4707 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4711 #ifdef RX_ENABLE_LOCKS
4712 /* Call rxi_Start, below, but with the call lock held. */
4713 void rxi_StartUnlocked(event, call, istack)
4714 struct rxevent *event;
4715 register struct rx_call *call;
4718 MUTEX_ENTER(&call->lock);
4719 rxi_Start(event, call, istack);
4720 MUTEX_EXIT(&call->lock);
4722 #endif /* RX_ENABLE_LOCKS */
4724 /* This routine is called when new packets are readied for
4725 * transmission and when retransmission may be necessary, or when the
4726 * transmission window or burst count are favourable. This should be
4727 * better optimized for new packets, the usual case, now that we've
4728 * got rid of queues of send packets. XXXXXXXXXXX */
4729 void rxi_Start(event, call, istack)
4730 struct rxevent *event;
4731 register struct rx_call *call;
4734 struct rx_packet *p;
4735 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4736 struct rx_peer *peer = call->conn->peer;
4737 struct clock now, retryTime;
4741 struct rx_packet **xmitList;
4744 /* If rxi_Start is being called as a result of a resend event,
4745 * then make sure that the event pointer is removed from the call
4746 * structure, since there is no longer a per-call retransmission
4748 if (event && event == call->resendEvent) {
4749 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4750 call->resendEvent = NULL;
4752 if (queue_IsEmpty(&call->tq)) {
4756 /* Timeouts trigger congestion recovery */
4757 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4758 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4759 /* someone else is waiting to start recovery */
4762 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4763 while (call->flags & RX_CALL_TQ_BUSY) {
4764 call->flags |= RX_CALL_TQ_WAIT;
4765 #ifdef RX_ENABLE_LOCKS
4766 CV_WAIT(&call->cv_tq, &call->lock);
4767 #else /* RX_ENABLE_LOCKS */
4768 osi_rxSleep(&call->tq);
4769 #endif /* RX_ENABLE_LOCKS */
4771 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4772 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4773 call->flags |= RX_CALL_FAST_RECOVER;
4774 if (peer->maxDgramPackets > 1) {
4775 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4777 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4779 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4780 call->nDgramPackets = 1;
4782 call->nextCwind = 1;
4785 MUTEX_ENTER(&peer->peer_lock);
4786 peer->MTU = call->MTU;
4787 peer->cwind = call->cwind;
4788 peer->nDgramPackets = 1;
4790 call->congestSeq = peer->congestSeq;
4791 MUTEX_EXIT(&peer->peer_lock);
4792 /* Clear retry times on packets. Otherwise, it's possible for
4793 * some packets in the queue to force resends at rates faster
4794 * than recovery rates.
4796 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4797 if (!(p->flags & RX_PKTFLAG_ACKED)) {
4798 clock_Zero(&p->retryTime);
4803 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4804 MUTEX_ENTER(&rx_stats_mutex);
4805 rx_tq_debug.rxi_start_in_error ++;
4806 MUTEX_EXIT(&rx_stats_mutex);
4811 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4812 /* Get clock to compute the re-transmit time for any packets
4813 * in this burst. Note, if we back off, it's reasonable to
4814 * back off all of the packets in the same manner, even if
4815 * some of them have been retransmitted more times than more
4816 * recent additions */
4817 clock_GetTime(&now);
4818 retryTime = now; /* initialize before use */
4819 MUTEX_ENTER(&peer->peer_lock);
4820 clock_Add(&retryTime, &peer->timeout);
4821 MUTEX_EXIT(&peer->peer_lock);
4823 /* Send (or resend) any packets that need it, subject to
4824 * window restrictions and congestion burst control
4825 * restrictions. Ask for an ack on the last packet sent in
4826 * this burst. For now, we're relying upon the window being
4827 * considerably bigger than the largest number of packets that
4828 * are typically sent at once by one initial call to
4829 * rxi_Start. This is probably bogus (perhaps we should ask
4830 * for an ack when we're half way through the current
4831 * window?). Also, for non file transfer applications, this
4832 * may end up asking for an ack for every packet. Bogus. XXXX
4835 * But check whether we're here recursively, and let the other guy
4838 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4839 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4840 call->flags |= RX_CALL_TQ_BUSY;
4842 call->flags &= ~RX_CALL_NEED_START;
4843 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4845 maxXmitPackets = MIN(call->twind, call->cwind);
4846 xmitList = (struct rx_packet **)
4847 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4848 if (xmitList == NULL)
4849 osi_Panic("rxi_Start, failed to allocate xmit list");
4850 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4851 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4852 /* We shouldn't be sending packets if a thread is waiting
4853 * to initiate congestion recovery */
4856 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4857 /* Only send one packet during fast recovery */
4860 if ((p->flags & RX_PKTFLAG_FREE) ||
4861 (!queue_IsEnd(&call->tq, nxp)
4862 && (nxp->flags & RX_PKTFLAG_FREE)) ||
4863 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4864 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4865 osi_Panic("rxi_Start: xmit queue clobbered");
4867 if (p->flags & RX_PKTFLAG_ACKED) {
4868 MUTEX_ENTER(&rx_stats_mutex);
4869 rx_stats.ignoreAckedPacket++;
4870 MUTEX_EXIT(&rx_stats_mutex);
4871 continue; /* Ignore this packet if it has been acknowledged */
4874 /* Turn off all flags except these ones, which are the same
4875 * on each transmission */
4876 p->header.flags &= RX_PRESET_FLAGS;
4878 if (p->header.seq >= call->tfirst +
4879 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4880 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4881 /* Note: if we're waiting for more window space, we can
4882 * still send retransmits; hence we don't return here, but
4883 * break out to schedule a retransmit event */
4884 dpf(("call %d waiting for window", *(call->callNumber)));
4888 /* Transmit the packet if it needs to be sent. */
4889 if (!clock_Lt(&now, &p->retryTime)) {
4890 if (nXmitPackets == maxXmitPackets) {
4891 osi_Panic("rxi_Start: xmit list overflowed");
4893 xmitList[nXmitPackets++] = p;
4897 /* xmitList now hold pointers to all of the packets that are
4898 * ready to send. Now we loop to send the packets */
4899 if (nXmitPackets > 0) {
4900 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4901 &now, &retryTime, resending);
4903 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4905 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4907 * TQ references no longer protected by this flag; they must remain
4908 * protected by the global lock.
4910 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4911 call->flags &= ~RX_CALL_TQ_BUSY;
4912 if (call->flags & RX_CALL_TQ_WAIT) {
4913 call->flags &= ~RX_CALL_TQ_WAIT;
4914 #ifdef RX_ENABLE_LOCKS
4915 CV_BROADCAST(&call->cv_tq);
4916 #else /* RX_ENABLE_LOCKS */
4917 osi_rxWakeup(&call->tq);
4918 #endif /* RX_ENABLE_LOCKS */
4923 /* We went into the error state while sending packets. Now is
4924 * the time to reset the call. This will also inform the using
4925 * process that the call is in an error state.
4927 MUTEX_ENTER(&rx_stats_mutex);
4928 rx_tq_debug.rxi_start_aborted ++;
4929 MUTEX_EXIT(&rx_stats_mutex);
4930 call->flags &= ~RX_CALL_TQ_BUSY;
4931 if (call->flags & RX_CALL_TQ_WAIT) {
4932 call->flags &= ~RX_CALL_TQ_WAIT;
4933 #ifdef RX_ENABLE_LOCKS
4934 CV_BROADCAST(&call->cv_tq);
4935 #else /* RX_ENABLE_LOCKS */
4936 osi_rxWakeup(&call->tq);
4937 #endif /* RX_ENABLE_LOCKS */
4939 rxi_CallError(call, call->error);
4942 #ifdef RX_ENABLE_LOCKS
4943 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4944 register int missing;
4945 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4946 /* Some packets have received acks. If they all have, we can clear
4947 * the transmit queue.
4949 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4950 if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
4958 call->flags |= RX_CALL_TQ_CLEARME;
4960 #endif /* RX_ENABLE_LOCKS */
4961 /* Don't bother doing retransmits if the TQ is cleared. */
4962 if (call->flags & RX_CALL_TQ_CLEARME) {
4963 rxi_ClearTransmitQueue(call, 1);
4965 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4968 /* Always post a resend event, if there is anything in the
4969 * queue, and resend is possible. There should be at least
4970 * one unacknowledged packet in the queue ... otherwise none
4971 * of these packets should be on the queue in the first place.
4973 if (call->resendEvent) {
4974 /* Cancel the existing event and post a new one */
4975 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4978 /* The retry time is the retry time on the first unacknowledged
4979 * packet inside the current window */
4980 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4981 /* Don't set timers for packets outside the window */
4982 if (p->header.seq >= call->tfirst + call->twind) {
4986 if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
4988 retryTime = p->retryTime;
4993 /* Post a new event to re-run rxi_Start when retries may be needed */
4994 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4995 #ifdef RX_ENABLE_LOCKS
4996 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4997 call->resendEvent = rxevent_Post(&retryTime,
4999 (char *)call, istack);
5000 #else /* RX_ENABLE_LOCKS */
5001 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
5002 (char *)call, (void*)(long)istack);
5003 #endif /* RX_ENABLE_LOCKS */
5006 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5007 } while (call->flags & RX_CALL_NEED_START);
5009 * TQ references no longer protected by this flag; they must remain
5010 * protected by the global lock.
5012 call->flags &= ~RX_CALL_TQ_BUSY;
5013 if (call->flags & RX_CALL_TQ_WAIT) {
5014 call->flags &= ~RX_CALL_TQ_WAIT;
5015 #ifdef RX_ENABLE_LOCKS
5016 CV_BROADCAST(&call->cv_tq);
5017 #else /* RX_ENABLE_LOCKS */
5018 osi_rxWakeup(&call->tq);
5019 #endif /* RX_ENABLE_LOCKS */
5022 call->flags |= RX_CALL_NEED_START;
5024 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5026 if (call->resendEvent) {
5027 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5032 /* Also adjusts the keep alive parameters for the call, to reflect
5033 * that we have just sent a packet (so keep alives aren't sent
5035 void rxi_Send(call, p, istack)
5036 register struct rx_call *call;
5037 register struct rx_packet *p;
5040 register struct rx_connection *conn = call->conn;
5042 /* Stamp each packet with the user supplied status */
5043 p->header.userStatus = call->localStatus;
5045 /* Allow the security object controlling this call's security to
5046 * make any last-minute changes to the packet */
5047 RXS_SendPacket(conn->securityObject, call, p);
5049 /* Since we're about to send SOME sort of packet to the peer, it's
5050 * safe to nuke any scheduled end-of-packets ack */
5051 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5053 /* Actually send the packet, filling in more connection-specific fields */
5054 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5055 MUTEX_EXIT(&call->lock);
5056 rxi_SendPacket(conn, p, istack);
5057 MUTEX_ENTER(&call->lock);
5058 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5060 /* Update last send time for this call (for keep-alive
5061 * processing), and for the connection (so that we can discover
5062 * idle connections) */
5063 conn->lastSendTime = call->lastSendTime = clock_Sec();
5067 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5068 * that things are fine. Also called periodically to guarantee that nothing
5069 * falls through the cracks (e.g. (error + dally) connections have keepalive
5070 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5073 #ifdef RX_ENABLE_LOCKS
5074 int rxi_CheckCall(call, haveCTLock)
5075 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5076 #else /* RX_ENABLE_LOCKS */
5077 int rxi_CheckCall(call)
5078 #endif /* RX_ENABLE_LOCKS */
5079 register struct rx_call *call;
5081 register struct rx_connection *conn = call->conn;
5082 register struct rx_service *tservice;
5084 afs_uint32 deadTime;
5086 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5087 if (call->flags & RX_CALL_TQ_BUSY) {
5088 /* Call is active and will be reset by rxi_Start if it's
5089 * in an error state.
5094 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5095 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5096 ((afs_uint32)conn->peer->rtt >> 3) +
5097 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5099 /* These are computed to the second (+- 1 second). But that's
5100 * good enough for these values, which should be a significant
5101 * number of seconds. */
5102 if (now > (call->lastReceiveTime + deadTime)) {
5103 if (call->state == RX_STATE_ACTIVE) {
5104 rxi_CallError(call, RX_CALL_DEAD);
5108 #ifdef RX_ENABLE_LOCKS
5109 /* Cancel pending events */
5110 rxevent_Cancel(call->delayedAckEvent, call,
5111 RX_CALL_REFCOUNT_DELAY);
5112 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5113 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5114 if (call->refCount == 0) {
5115 rxi_FreeCall(call, haveCTLock);
5119 #else /* RX_ENABLE_LOCKS */
5122 #endif /* RX_ENABLE_LOCKS */
5124 /* Non-active calls are destroyed if they are not responding
5125 * to pings; active calls are simply flagged in error, so the
5126 * attached process can die reasonably gracefully. */
5128 /* see if we have a non-activity timeout */
5129 tservice = conn->service;
5130 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5131 && tservice->idleDeadTime
5132 && ((call->startWait + tservice->idleDeadTime) < now)) {
5133 if (call->state == RX_STATE_ACTIVE) {
5134 rxi_CallError(call, RX_CALL_TIMEOUT);
5138 /* see if we have a hard timeout */
5139 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5140 if (call->state == RX_STATE_ACTIVE)
5141 rxi_CallError(call, RX_CALL_TIMEOUT);
5148 /* When a call is in progress, this routine is called occasionally to
5149 * make sure that some traffic has arrived (or been sent to) the peer.
5150 * If nothing has arrived in a reasonable amount of time, the call is
5151 * declared dead; if nothing has been sent for a while, we send a
5152 * keep-alive packet (if we're actually trying to keep the call alive)
5154 void rxi_KeepAliveEvent(event, call, dummy)
5155 struct rxevent *event;
5156 register struct rx_call *call;
5158 struct rx_connection *conn;
5161 MUTEX_ENTER(&call->lock);
5162 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5163 if (event == call->keepAliveEvent)
5164 call->keepAliveEvent = (struct rxevent *) 0;
5167 #ifdef RX_ENABLE_LOCKS
5168 if(rxi_CheckCall(call, 0)) {
5169 MUTEX_EXIT(&call->lock);
5172 #else /* RX_ENABLE_LOCKS */
5173 if (rxi_CheckCall(call)) return;
5174 #endif /* RX_ENABLE_LOCKS */
5176 /* Don't try to keep alive dallying calls */
5177 if (call->state == RX_STATE_DALLY) {
5178 MUTEX_EXIT(&call->lock);
5183 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5184 /* Don't try to send keepalives if there is unacknowledged data */
5185 /* the rexmit code should be good enough, this little hack
5186 * doesn't quite work XXX */
5187 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5189 rxi_ScheduleKeepAliveEvent(call);
5190 MUTEX_EXIT(&call->lock);
5194 void rxi_ScheduleKeepAliveEvent(call)
5195 register struct rx_call *call;
5197 if (!call->keepAliveEvent) {
5199 clock_GetTime(&when);
5200 when.sec += call->conn->secondsUntilPing;
5201 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5202 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5206 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5207 void rxi_KeepAliveOn(call)
5208 register struct rx_call *call;
5210 /* Pretend last packet received was received now--i.e. if another
5211 * packet isn't received within the keep alive time, then the call
5212 * will die; Initialize last send time to the current time--even
5213 * if a packet hasn't been sent yet. This will guarantee that a
5214 * keep-alive is sent within the ping time */
5215 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5216 rxi_ScheduleKeepAliveEvent(call);
5219 /* This routine is called to send connection abort messages
5220 * that have been delayed to throttle looping clients. */
5221 void rxi_SendDelayedConnAbort(event, conn, dummy)
5222 struct rxevent *event;
5223 register struct rx_connection *conn;
5227 struct rx_packet *packet;
5229 MUTEX_ENTER(&conn->conn_data_lock);
5230 conn->delayedAbortEvent = (struct rxevent *) 0;
5231 error = htonl(conn->error);
5233 MUTEX_EXIT(&conn->conn_data_lock);
5234 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5236 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5237 RX_PACKET_TYPE_ABORT, (char *)&error,
5239 rxi_FreePacket(packet);
5243 /* This routine is called to send call abort messages
5244 * that have been delayed to throttle looping clients. */
5245 void rxi_SendDelayedCallAbort(event, call, dummy)
5246 struct rxevent *event;
5247 register struct rx_call *call;
5251 struct rx_packet *packet;
5253 MUTEX_ENTER(&call->lock);
5254 call->delayedAbortEvent = (struct rxevent *) 0;
5255 error = htonl(call->error);
5257 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5259 packet = rxi_SendSpecial(call, call->conn, packet,
5260 RX_PACKET_TYPE_ABORT, (char *)&error,
5262 rxi_FreePacket(packet);
5264 MUTEX_EXIT(&call->lock);
5267 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5268 * seconds) to ask the client to authenticate itself. The routine
5269 * issues a challenge to the client, which is obtained from the
5270 * security object associated with the connection */
5271 void rxi_ChallengeEvent(event, conn, dummy)
5272 struct rxevent *event;
5273 register struct rx_connection *conn;
5276 conn->challengeEvent = (struct rxevent *) 0;
5277 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5278 register struct rx_packet *packet;
5280 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5282 /* If there's no packet available, do this later. */
5283 RXS_GetChallenge(conn->securityObject, conn, packet);
5284 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5285 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5286 rxi_FreePacket(packet);
5288 clock_GetTime(&when);
5289 when.sec += RX_CHALLENGE_TIMEOUT;
5290 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5294 /* Call this routine to start requesting the client to authenticate
5295 * itself. This will continue until authentication is established,
5296 * the call times out, or an invalid response is returned. The
5297 * security object associated with the connection is asked to create
5298 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5299 * defined earlier. */
5300 void rxi_ChallengeOn(conn)
5301 register struct rx_connection *conn;
5303 if (!conn->challengeEvent) {
5304 RXS_CreateChallenge(conn->securityObject, conn);
5305 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5310 /* Compute round trip time of the packet provided, in *rttp.
5313 /* rxi_ComputeRoundTripTime is called with peer locked. */
5314 void rxi_ComputeRoundTripTime(p, sentp, peer)
5315 register struct clock *sentp; /* may be null */
5316 register struct rx_peer *peer; /* may be null */
5317 register struct rx_packet *p;
5319 struct clock thisRtt, *rttp = &thisRtt;
5321 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5322 /* making year 2038 bugs to get this running now - stroucki */
5323 struct timeval temptime;
5325 register int rtt_timeout;
5327 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5328 /* yet again. This was the worst Heisenbug of the port - stroucki */
5329 clock_GetTime(&temptime);
5330 rttp->sec=(afs_int32)temptime.tv_sec;
5331 rttp->usec=(afs_int32)temptime.tv_usec;
5333 clock_GetTime(rttp);
5335 if (clock_Lt(rttp, sentp)) {
5337 return; /* somebody set the clock back, don't count this time. */
5339 clock_Sub(rttp, sentp);
5340 MUTEX_ENTER(&rx_stats_mutex);
5341 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5342 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5343 if (rttp->sec > 60) {
5344 MUTEX_EXIT(&rx_stats_mutex);
5345 return; /* somebody set the clock ahead */
5347 rx_stats.maxRtt = *rttp;
5349 clock_Add(&rx_stats.totalRtt, rttp);
5350 rx_stats.nRttSamples++;
5351 MUTEX_EXIT(&rx_stats_mutex);
5353 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5355 /* Apply VanJacobson round-trip estimations */
5360 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5361 * srtt is stored as fixed point with 3 bits after the binary
5362 * point (i.e., scaled by 8). The following magic is
5363 * equivalent to the smoothing algorithm in rfc793 with an
5364 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5365 * srtt*8 = srtt*8 + rtt - srtt
5366 * srtt = srtt + rtt/8 - srtt/8
5369 delta = MSEC(rttp) - (peer->rtt >> 3);
5373 * We accumulate a smoothed rtt variance (actually, a smoothed
5374 * mean difference), then set the retransmit timer to smoothed
5375 * rtt + 4 times the smoothed variance (was 2x in van's original
5376 * paper, but 4x works better for me, and apparently for him as
5378 * rttvar is stored as
5379 * fixed point with 2 bits after the binary point (scaled by
5380 * 4). The following is equivalent to rfc793 smoothing with
5381 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5382 * replaces rfc793's wired-in beta.
5383 * dev*4 = dev*4 + (|actual - expected| - dev)
5389 delta -= (peer->rtt_dev >> 2);
5390 peer->rtt_dev += delta;
5393 /* I don't have a stored RTT so I start with this value. Since I'm
5394 * probably just starting a call, and will be pushing more data down
5395 * this, I expect congestion to increase rapidly. So I fudge a
5396 * little, and I set deviance to half the rtt. In practice,
5397 * deviance tends to approach something a little less than
5398 * half the smoothed rtt. */
5399 peer->rtt = (MSEC(rttp) << 3) + 8;
5400 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5402 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5403 * the other of these connections is usually in a user process, and can
5404 * be switched and/or swapped out. So on fast, reliable networks, the
5405 * timeout would otherwise be too short.
5407 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5408 clock_Zero(&(peer->timeout));
5409 clock_Addmsec(&(peer->timeout), rtt_timeout);
5411 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5412 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5413 (peer->timeout.sec),(peer->timeout.usec)) );
5417 /* Find all server connections that have not been active for a long time, and
5419 void rxi_ReapConnections()
5422 clock_GetTime(&now);
5424 /* Find server connection structures that haven't been used for
5425 * greater than rx_idleConnectionTime */
5426 { struct rx_connection **conn_ptr, **conn_end;
5427 int i, havecalls = 0;
5428 MUTEX_ENTER(&rx_connHashTable_lock);
5429 for (conn_ptr = &rx_connHashTable[0],
5430 conn_end = &rx_connHashTable[rx_hashTableSize];
5431 conn_ptr < conn_end; conn_ptr++) {
5432 struct rx_connection *conn, *next;
5433 struct rx_call *call;
5437 for (conn = *conn_ptr; conn; conn = next) {
5438 /* XXX -- Shouldn't the connection be locked? */
5441 for(i=0;i<RX_MAXCALLS;i++) {
5442 call = conn->call[i];
5445 MUTEX_ENTER(&call->lock);
5446 #ifdef RX_ENABLE_LOCKS
5447 result = rxi_CheckCall(call, 1);
5448 #else /* RX_ENABLE_LOCKS */
5449 result = rxi_CheckCall(call);
5450 #endif /* RX_ENABLE_LOCKS */
5451 MUTEX_EXIT(&call->lock);
5453 /* If CheckCall freed the call, it might
5454 * have destroyed the connection as well,
5455 * which screws up the linked lists.
5461 if (conn->type == RX_SERVER_CONNECTION) {
5462 /* This only actually destroys the connection if
5463 * there are no outstanding calls */
5464 MUTEX_ENTER(&conn->conn_data_lock);
5465 if (!havecalls && !conn->refCount &&
5466 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5467 conn->refCount++; /* it will be decr in rx_DestroyConn */
5468 MUTEX_EXIT(&conn->conn_data_lock);
5469 #ifdef RX_ENABLE_LOCKS
5470 rxi_DestroyConnectionNoLock(conn);
5471 #else /* RX_ENABLE_LOCKS */
5472 rxi_DestroyConnection(conn);
5473 #endif /* RX_ENABLE_LOCKS */
5475 #ifdef RX_ENABLE_LOCKS
5477 MUTEX_EXIT(&conn->conn_data_lock);
5479 #endif /* RX_ENABLE_LOCKS */
5483 #ifdef RX_ENABLE_LOCKS
5484 while (rx_connCleanup_list) {
5485 struct rx_connection *conn;
5486 conn = rx_connCleanup_list;
5487 rx_connCleanup_list = rx_connCleanup_list->next;
5488 MUTEX_EXIT(&rx_connHashTable_lock);
5489 rxi_CleanupConnection(conn);
5490 MUTEX_ENTER(&rx_connHashTable_lock);
5492 MUTEX_EXIT(&rx_connHashTable_lock);
5493 #endif /* RX_ENABLE_LOCKS */
5496 /* Find any peer structures that haven't been used (haven't had an
5497 * associated connection) for greater than rx_idlePeerTime */
5498 { struct rx_peer **peer_ptr, **peer_end;
5500 MUTEX_ENTER(&rx_rpc_stats);
5501 MUTEX_ENTER(&rx_peerHashTable_lock);
5502 for (peer_ptr = &rx_peerHashTable[0],
5503 peer_end = &rx_peerHashTable[rx_hashTableSize];
5504 peer_ptr < peer_end; peer_ptr++) {
5505 struct rx_peer *peer, *next, *prev;
5506 for (prev = peer = *peer_ptr; peer; peer = next) {
5508 code = MUTEX_TRYENTER(&peer->peer_lock);
5509 if ((code) && (peer->refCount == 0)
5510 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5511 rx_interface_stat_p rpc_stat, nrpc_stat;
5513 MUTEX_EXIT(&peer->peer_lock);
5514 MUTEX_DESTROY(&peer->peer_lock);
5515 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5516 rx_interface_stat)) {
5517 unsigned int num_funcs;
5518 if (!rpc_stat) break;
5519 queue_Remove(&rpc_stat->queue_header);
5520 queue_Remove(&rpc_stat->all_peers);
5521 num_funcs = rpc_stat->stats[0].func_total;
5522 space = sizeof(rx_interface_stat_t) +
5523 rpc_stat->stats[0].func_total *
5524 sizeof(rx_function_entry_v1_t);
5526 rxi_Free(rpc_stat, space);
5527 rxi_rpc_peer_stat_cnt -= num_funcs;
5530 MUTEX_ENTER(&rx_stats_mutex);
5531 rx_stats.nPeerStructs--;
5532 MUTEX_EXIT(&rx_stats_mutex);
5533 if (prev == *peer_ptr) {
5542 MUTEX_EXIT(&peer->peer_lock);
5548 MUTEX_EXIT(&rx_peerHashTable_lock);
5549 MUTEX_EXIT(&rx_rpc_stats);
5552 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5553 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5554 GC, just below. Really, we shouldn't have to keep moving packets from
5555 one place to another, but instead ought to always know if we can
5556 afford to hold onto a packet in its particular use. */
5557 MUTEX_ENTER(&rx_freePktQ_lock);
5558 if (rx_waitingForPackets) {
5559 rx_waitingForPackets = 0;
5560 #ifdef RX_ENABLE_LOCKS
5561 CV_BROADCAST(&rx_waitingForPackets_cv);
5563 osi_rxWakeup(&rx_waitingForPackets);
5566 MUTEX_EXIT(&rx_freePktQ_lock);
5568 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5569 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5573 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5574 * rx.h is sort of strange this is better. This is called with a security
5575 * object before it is discarded. Each connection using a security object has
5576 * its own refcount to the object so it won't actually be freed until the last
5577 * connection is destroyed.
5579 * This is the only rxs module call. A hold could also be written but no one
5582 int rxs_Release (aobj)
5583 struct rx_securityClass *aobj;
5585 return RXS_Close (aobj);
5589 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5590 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5591 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5592 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5594 /* Adjust our estimate of the transmission rate to this peer, given
5595 * that the packet p was just acked. We can adjust peer->timeout and
5596 * call->twind. Pragmatically, this is called
5597 * only with packets of maximal length.
5598 * Called with peer and call locked.
5601 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5602 register struct rx_peer *peer;
5603 register struct rx_call *call;
5604 struct rx_packet *p, *ackp;
5607 afs_int32 xferSize, xferMs;
5608 register afs_int32 minTime;
5611 /* Count down packets */
5612 if (peer->rateFlag > 0) peer->rateFlag--;
5613 /* Do nothing until we're enabled */
5614 if (peer->rateFlag != 0) return;
5615 if (!call->conn) return;
5617 /* Count only when the ack seems legitimate */
5618 switch (ackReason) {
5619 case RX_ACK_REQUESTED:
5620 xferSize = p->length + RX_HEADER_SIZE +
5621 call->conn->securityMaxTrailerSize;
5625 case RX_ACK_PING_RESPONSE:
5626 if (p) /* want the response to ping-request, not data send */
5628 clock_GetTime(&newTO);
5629 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5630 clock_Sub(&newTO, &call->pingRequestTime);
5631 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5635 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5642 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5643 ntohl(peer->host), ntohs(peer->port),
5644 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5645 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5648 /* Track only packets that are big enough. */
5649 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5653 /* absorb RTT data (in milliseconds) for these big packets */
5654 if (peer->smRtt == 0) {
5655 peer->smRtt = xferMs;
5657 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5658 if (!peer->smRtt) peer->smRtt = 1;
5661 if (peer->countDown) {
5665 peer->countDown = 10; /* recalculate only every so often */
5667 /* In practice, we can measure only the RTT for full packets,
5668 * because of the way Rx acks the data that it receives. (If it's
5669 * smaller than a full packet, it often gets implicitly acked
5670 * either by the call response (from a server) or by the next call
5671 * (from a client), and either case confuses transmission times
5672 * with processing times.) Therefore, replace the above
5673 * more-sophisticated processing with a simpler version, where the
5674 * smoothed RTT is kept for full-size packets, and the time to
5675 * transmit a windowful of full-size packets is simply RTT *
5676 * windowSize. Again, we take two steps:
5677 - ensure the timeout is large enough for a single packet's RTT;
5678 - ensure that the window is small enough to fit in the desired timeout.*/
5680 /* First, the timeout check. */
5681 minTime = peer->smRtt;
5682 /* Get a reasonable estimate for a timeout period */
5684 newTO.sec = minTime / 1000;
5685 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5687 /* Increase the timeout period so that we can always do at least
5688 * one packet exchange */
5689 if (clock_Gt(&newTO, &peer->timeout)) {
5691 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5692 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5693 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5696 peer->timeout = newTO;
5699 /* Now, get an estimate for the transmit window size. */
5700 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5701 /* Now, convert to the number of full packets that could fit in a
5702 * reasonable fraction of that interval */
5703 minTime /= (peer->smRtt << 1);
5704 xferSize = minTime; /* (make a copy) */
5706 /* Now clamp the size to reasonable bounds. */
5707 if (minTime <= 1) minTime = 1;
5708 else if (minTime > rx_Window) minTime = rx_Window;
5709 /* if (minTime != peer->maxWindow) {
5710 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5711 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5712 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5714 peer->maxWindow = minTime;
5715 elide... call->twind = minTime;
5719 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5720 * Discern this by calculating the timeout necessary for rx_Window
5722 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5723 /* calculate estimate for transmission interval in milliseconds */
5724 minTime = rx_Window * peer->smRtt;
5725 if (minTime < 1000) {
5726 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5727 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5728 peer->timeout.usec, peer->smRtt,
5731 newTO.sec = 0; /* cut back on timeout by half a second */
5732 newTO.usec = 500000;
5733 clock_Sub(&peer->timeout, &newTO);
5738 } /* end of rxi_ComputeRate */
5739 #endif /* ADAPT_WINDOW */
5747 /* Don't call this debugging routine directly; use dpf */
5749 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5750 a11, a12, a13, a14, a15)
5754 clock_GetTime(&now);
5755 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5756 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5763 * This function is used to process the rx_stats structure that is local
5764 * to a process as well as an rx_stats structure received from a remote
5765 * process (via rxdebug). Therefore, it needs to do minimal version
5768 void rx_PrintTheseStats (file, s, size, freePackets, version)
5771 int size; /* some idea of version control */
5772 afs_int32 freePackets;
5777 if (size != sizeof(struct rx_stats)) {
5779 "Unexpected size of stats structure: was %d, expected %d\n",
5780 size, sizeof(struct rx_stats));
5784 "rx stats: free packets %d, allocs %d, ",
5788 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5790 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5791 s->receivePktAllocFailures,
5792 s->receiveCbufPktAllocFailures,
5793 s->sendPktAllocFailures,
5794 s->sendCbufPktAllocFailures,
5795 s->specialPktAllocFailures);
5798 "alloc-failures(rcv %d,send %d,ack %d)\n",
5799 s->receivePktAllocFailures,
5800 s->sendPktAllocFailures,
5801 s->specialPktAllocFailures);
5806 "bogusReads %d (last from host %x), "
5812 s->bogusPacketOnRead,
5815 s->noPacketBuffersOnRead,
5819 fprintf(file, " packets read: ");
5820 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5826 fprintf(file, "\n");
5829 " other read counters: data %d, "
5837 s->spuriousPacketsRead,
5838 s->ignorePacketDally);
5840 fprintf(file, " packets sent: ");
5841 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5847 fprintf(file, "\n");
5850 " other send counters: ack %d, "
5851 "data %d (not resends), "
5854 "acked&ignored %d\n",
5857 s->dataPacketsReSent,
5858 s->dataPacketsPushed,
5859 s->ignoreAckedPacket);
5862 " \t(these should be small) sendFailed %d, "
5865 (int) s->fatalErrors);
5867 if (s->nRttSamples) {
5869 " Average rtt is %0.3f, with %d samples\n",
5870 clock_Float(&s->totalRtt)/s->nRttSamples,
5874 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5875 clock_Float(&s->minRtt),
5876 clock_Float(&s->maxRtt));
5880 " %d server connections, "
5881 "%d client connections, "
5884 "%d free call structs\n",
5889 s->nFreeCallStructs);
5891 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5893 " %d clock updates\n",
5899 /* for backward compatibility */
5900 void rx_PrintStats(file)
5903 MUTEX_ENTER(&rx_stats_mutex);
5904 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5905 MUTEX_EXIT(&rx_stats_mutex);
5908 void rx_PrintPeerStats(file, peer)
5910 struct rx_peer *peer;
5915 "burst wait %u.%d.\n",
5918 (int) peer->burstSize,
5919 (int) peer->burstWait.sec,
5920 (int) peer->burstWait.usec);
5924 "retry time %u.%06d, "
5928 (int) peer->timeout.sec,
5929 (int) peer->timeout.usec,
5935 "max in packet skew %d, "
5936 "max out packet skew %d\n",
5938 (int) peer->inPacketSkew,
5939 (int) peer->outPacketSkew);
5942 #ifdef AFS_PTHREAD_ENV
5944 * This mutex protects the following static variables:
5948 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5949 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5951 #define LOCK_RX_DEBUG
5952 #define UNLOCK_RX_DEBUG
5953 #endif /* AFS_PTHREAD_ENV */
5955 static int MakeDebugCall(
5957 afs_uint32 remoteAddr,
5958 afs_uint16 remotePort,
5966 static afs_int32 counter = 100;
5968 struct rx_header theader;
5970 register afs_int32 code;
5972 struct sockaddr_in taddr, faddr;
5977 endTime = time(0) + 20; /* try for 20 seconds */
5981 tp = &tbuffer[sizeof(struct rx_header)];
5982 taddr.sin_family = AF_INET;
5983 taddr.sin_port = remotePort;
5984 taddr.sin_addr.s_addr = remoteAddr;
5986 memset(&theader, 0, sizeof(theader));
5987 theader.epoch = htonl(999);
5989 theader.callNumber = htonl(counter);
5992 theader.type = type;
5993 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5994 theader.serviceId = 0;
5996 memcpy(tbuffer, &theader, sizeof(theader));
5997 memcpy(tp, inputData, inputLength);
5998 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5999 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
6001 /* see if there's a packet available */
6003 FD_SET(socket, &imask);
6006 code = select(socket+1, &imask, 0, 0, &tv);
6008 /* now receive a packet */
6009 faddrLen = sizeof(struct sockaddr_in);
6010 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6011 (struct sockaddr *) &faddr, &faddrLen);
6013 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6014 if (counter == ntohl(theader.callNumber)) break;
6017 /* see if we've timed out */
6018 if (endTime < time(0)) return -1;
6020 code -= sizeof(struct rx_header);
6021 if (code > outputLength) code = outputLength;
6022 memcpy(outputData, tp, code);
6026 afs_int32 rx_GetServerDebug(
6028 afs_uint32 remoteAddr,
6029 afs_uint16 remotePort,
6030 struct rx_debugStats *stat,
6031 afs_uint32 *supportedValues
6034 struct rx_debugIn in;
6037 *supportedValues = 0;
6038 in.type = htonl(RX_DEBUGI_GETSTATS);
6041 rc = MakeDebugCall(socket,
6044 RX_PACKET_TYPE_DEBUG,
6051 * If the call was successful, fixup the version and indicate
6052 * what contents of the stat structure are valid.
6053 * Also do net to host conversion of fields here.
6057 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6058 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6060 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6061 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6063 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6064 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6066 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6067 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6069 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6070 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6072 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6073 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6075 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6076 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6079 stat->nFreePackets = ntohl(stat->nFreePackets);
6080 stat->packetReclaims = ntohl(stat->packetReclaims);
6081 stat->callsExecuted = ntohl(stat->callsExecuted);
6082 stat->nWaiting = ntohl(stat->nWaiting);
6083 stat->idleThreads = ntohl(stat->idleThreads);
6089 afs_int32 rx_GetServerStats(
6091 afs_uint32 remoteAddr,
6092 afs_uint16 remotePort,
6093 struct rx_stats *stat,
6094 afs_uint32 *supportedValues
6097 struct rx_debugIn in;
6098 afs_int32 *lp = (afs_int32 *) stat;
6103 * supportedValues is currently unused, but added to allow future
6104 * versioning of this function.
6107 *supportedValues = 0;
6108 in.type = htonl(RX_DEBUGI_RXSTATS);
6110 memset(stat, 0, sizeof(*stat));
6112 rc = MakeDebugCall(socket,
6115 RX_PACKET_TYPE_DEBUG,
6124 * Do net to host conversion here
6127 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6135 afs_int32 rx_GetServerVersion(
6137 afs_uint32 remoteAddr,
6138 afs_uint16 remotePort,
6139 size_t version_length,
6144 return MakeDebugCall(socket,
6147 RX_PACKET_TYPE_VERSION,
6154 afs_int32 rx_GetServerConnections(
6156 afs_uint32 remoteAddr,
6157 afs_uint16 remotePort,
6158 afs_int32 *nextConnection,
6160 afs_uint32 debugSupportedValues,
6161 struct rx_debugConn *conn,
6162 afs_uint32 *supportedValues
6165 struct rx_debugIn in;
6170 * supportedValues is currently unused, but added to allow future
6171 * versioning of this function.
6174 *supportedValues = 0;
6175 if (allConnections) {
6176 in.type = htonl(RX_DEBUGI_GETALLCONN);
6178 in.type = htonl(RX_DEBUGI_GETCONN);
6180 in.index = htonl(*nextConnection);
6181 memset(conn, 0, sizeof(*conn));
6183 rc = MakeDebugCall(socket,
6186 RX_PACKET_TYPE_DEBUG,
6193 *nextConnection += 1;
6196 * Convert old connection format to new structure.
6199 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6200 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6201 #define MOVEvL(a) (conn->a = vL->a)
6203 /* any old or unrecognized version... */
6204 for (i=0;i<RX_MAXCALLS;i++) {
6205 MOVEvL(callState[i]);
6206 MOVEvL(callMode[i]);
6207 MOVEvL(callFlags[i]);
6208 MOVEvL(callOther[i]);
6210 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6211 MOVEvL(secStats.type);
6212 MOVEvL(secStats.level);
6213 MOVEvL(secStats.flags);
6214 MOVEvL(secStats.expires);
6215 MOVEvL(secStats.packetsReceived);
6216 MOVEvL(secStats.packetsSent);
6217 MOVEvL(secStats.bytesReceived);
6218 MOVEvL(secStats.bytesSent);
6223 * Do net to host conversion here
6225 * I don't convert host or port since we are most likely
6226 * going to want these in NBO.
6228 conn->cid = ntohl(conn->cid);
6229 conn->serial = ntohl(conn->serial);
6230 for(i=0;i<RX_MAXCALLS;i++) {
6231 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6233 conn->error = ntohl(conn->error);
6234 conn->secStats.flags = ntohl(conn->secStats.flags);
6235 conn->secStats.expires = ntohl(conn->secStats.expires);
6236 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6237 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6238 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6239 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6240 conn->epoch = ntohl(conn->epoch);
6241 conn->natMTU = ntohl(conn->natMTU);
6247 afs_int32 rx_GetServerPeers(
6249 afs_uint32 remoteAddr,
6250 afs_uint16 remotePort,
6251 afs_int32 *nextPeer,
6252 afs_uint32 debugSupportedValues,
6253 struct rx_debugPeer *peer,
6254 afs_uint32 *supportedValues
6257 struct rx_debugIn in;
6261 * supportedValues is currently unused, but added to allow future
6262 * versioning of this function.
6265 *supportedValues = 0;
6266 in.type = htonl(RX_DEBUGI_GETPEER);
6267 in.index = htonl(*nextPeer);
6268 memset(peer, 0, sizeof(*peer));
6270 rc = MakeDebugCall(socket,
6273 RX_PACKET_TYPE_DEBUG,
6283 * Do net to host conversion here
6285 * I don't convert host or port since we are most likely
6286 * going to want these in NBO.
6288 peer->ifMTU = ntohs(peer->ifMTU);
6289 peer->idleWhen = ntohl(peer->idleWhen);
6290 peer->refCount = ntohs(peer->refCount);
6291 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6292 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6293 peer->rtt = ntohl(peer->rtt);
6294 peer->rtt_dev = ntohl(peer->rtt_dev);
6295 peer->timeout.sec = ntohl(peer->timeout.sec);
6296 peer->timeout.usec = ntohl(peer->timeout.usec);
6297 peer->nSent = ntohl(peer->nSent);
6298 peer->reSends = ntohl(peer->reSends);
6299 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6300 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6301 peer->rateFlag = ntohl(peer->rateFlag);
6302 peer->natMTU = ntohs(peer->natMTU);
6303 peer->maxMTU = ntohs(peer->maxMTU);
6304 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6305 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6306 peer->MTU = ntohs(peer->MTU);
6307 peer->cwind = ntohs(peer->cwind);
6308 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6309 peer->congestSeq = ntohs(peer->congestSeq);
6310 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6311 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6312 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6313 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6318 #endif /* RXDEBUG */
6320 void shutdown_rx(void)
6322 struct rx_serverQueueEntry *np;
6324 register struct rx_call *call;
6325 register struct rx_serverQueueEntry *sq;
6328 if (rxinit_status == 1) {
6330 return; /* Already shutdown. */
6335 #ifndef AFS_PTHREAD_ENV
6336 FD_ZERO(&rx_selectMask);
6337 #endif /* AFS_PTHREAD_ENV */
6338 rxi_dataQuota = RX_MAX_QUOTA;
6339 #ifndef AFS_PTHREAD_ENV
6341 #endif /* AFS_PTHREAD_ENV */
6344 #ifndef AFS_PTHREAD_ENV
6345 #ifndef AFS_USE_GETTIMEOFDAY
6347 #endif /* AFS_USE_GETTIMEOFDAY */
6348 #endif /* AFS_PTHREAD_ENV */
6350 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6351 call = queue_First(&rx_freeCallQueue, rx_call);
6353 rxi_Free(call, sizeof(struct rx_call));
6356 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6357 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6363 struct rx_peer **peer_ptr, **peer_end;
6364 for (peer_ptr = &rx_peerHashTable[0],
6365 peer_end = &rx_peerHashTable[rx_hashTableSize];
6366 peer_ptr < peer_end; peer_ptr++) {
6367 struct rx_peer *peer, *next;
6368 for (peer = *peer_ptr; peer; peer = next) {
6369 rx_interface_stat_p rpc_stat, nrpc_stat;
6371 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6372 rx_interface_stat)) {
6373 unsigned int num_funcs;
6374 if (!rpc_stat) break;
6375 queue_Remove(&rpc_stat->queue_header);
6376 queue_Remove(&rpc_stat->all_peers);
6377 num_funcs = rpc_stat->stats[0].func_total;
6378 space = sizeof(rx_interface_stat_t) +
6379 rpc_stat->stats[0].func_total *
6380 sizeof(rx_function_entry_v1_t);
6382 rxi_Free(rpc_stat, space);
6383 MUTEX_ENTER(&rx_rpc_stats);
6384 rxi_rpc_peer_stat_cnt -= num_funcs;
6385 MUTEX_EXIT(&rx_rpc_stats);
6389 MUTEX_ENTER(&rx_stats_mutex);
6390 rx_stats.nPeerStructs--;
6391 MUTEX_EXIT(&rx_stats_mutex);
6395 for (i = 0; i<RX_MAX_SERVICES; i++) {
6397 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6399 for (i = 0; i < rx_hashTableSize; i++) {
6400 register struct rx_connection *tc, *ntc;
6401 MUTEX_ENTER(&rx_connHashTable_lock);
6402 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6404 for (j = 0; j < RX_MAXCALLS; j++) {
6406 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6409 rxi_Free(tc, sizeof(*tc));
6411 MUTEX_EXIT(&rx_connHashTable_lock);
6414 MUTEX_ENTER(&freeSQEList_lock);
6416 while ((np = rx_FreeSQEList)) {
6417 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6418 MUTEX_DESTROY(&np->lock);
6419 rxi_Free(np, sizeof(*np));
6422 MUTEX_EXIT(&freeSQEList_lock);
6423 MUTEX_DESTROY(&freeSQEList_lock);
6424 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6425 MUTEX_DESTROY(&rx_connHashTable_lock);
6426 MUTEX_DESTROY(&rx_peerHashTable_lock);
6427 MUTEX_DESTROY(&rx_serverPool_lock);
6429 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6430 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6432 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6433 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6435 rxi_FreeAllPackets();
6437 MUTEX_ENTER(&rx_stats_mutex);
6438 rxi_dataQuota = RX_MAX_QUOTA;
6439 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6440 MUTEX_EXIT(&rx_stats_mutex);
6446 #ifdef RX_ENABLE_LOCKS
6447 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6449 if (!MUTEX_ISMINE(lockaddr))
6450 osi_Panic("Lock not held: %s", msg);
6452 #endif /* RX_ENABLE_LOCKS */
6457 * Routines to implement connection specific data.
6460 int rx_KeyCreate(rx_destructor_t rtn)
6463 MUTEX_ENTER(&rxi_keyCreate_lock);
6464 key = rxi_keyCreate_counter++;
6465 rxi_keyCreate_destructor = (rx_destructor_t *)
6466 realloc((void *)rxi_keyCreate_destructor,
6467 (key+1) * sizeof(rx_destructor_t));
6468 rxi_keyCreate_destructor[key] = rtn;
6469 MUTEX_EXIT(&rxi_keyCreate_lock);
6473 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6476 MUTEX_ENTER(&conn->conn_data_lock);
6477 if (!conn->specific) {
6478 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6479 for (i = 0 ; i < key ; i++)
6480 conn->specific[i] = NULL;
6481 conn->nSpecific = key+1;
6482 conn->specific[key] = ptr;
6483 } else if (key >= conn->nSpecific) {
6484 conn->specific = (void **)
6485 realloc(conn->specific,(key+1)*sizeof(void *));
6486 for (i = conn->nSpecific ; i < key ; i++)
6487 conn->specific[i] = NULL;
6488 conn->nSpecific = key+1;
6489 conn->specific[key] = ptr;
6491 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6492 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6493 conn->specific[key] = ptr;
6495 MUTEX_EXIT(&conn->conn_data_lock);
6498 void *rx_GetSpecific(struct rx_connection *conn, int key)
6501 MUTEX_ENTER(&conn->conn_data_lock);
6502 if (key >= conn->nSpecific)
6505 ptr = conn->specific[key];
6506 MUTEX_EXIT(&conn->conn_data_lock);
6510 #endif /* !KERNEL */
6513 * processStats is a queue used to store the statistics for the local
6514 * process. Its contents are similar to the contents of the rpcStats
6515 * queue on a rx_peer structure, but the actual data stored within
6516 * this queue contains totals across the lifetime of the process (assuming
6517 * the stats have not been reset) - unlike the per peer structures
6518 * which can come and go based upon the peer lifetime.
6521 static struct rx_queue processStats = {&processStats,&processStats};
6524 * peerStats is a queue used to store the statistics for all peer structs.
6525 * Its contents are the union of all the peer rpcStats queues.
6528 static struct rx_queue peerStats = {&peerStats,&peerStats};
6531 * rxi_monitor_processStats is used to turn process wide stat collection
6535 static int rxi_monitor_processStats = 0;
6538 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6541 static int rxi_monitor_peerStats = 0;
6544 * rxi_AddRpcStat - given all of the information for a particular rpc
6545 * call, create (if needed) and update the stat totals for the rpc.
6549 * IN stats - the queue of stats that will be updated with the new value
6551 * IN rxInterface - a unique number that identifies the rpc interface
6553 * IN currentFunc - the index of the function being invoked
6555 * IN totalFunc - the total number of functions in this interface
6557 * IN queueTime - the amount of time this function waited for a thread
6559 * IN execTime - the amount of time this function invocation took to execute
6561 * IN bytesSent - the number bytes sent by this invocation
6563 * IN bytesRcvd - the number bytes received by this invocation
6565 * IN isServer - if true, this invocation was made to a server
6567 * IN remoteHost - the ip address of the remote host
6569 * IN remotePort - the port of the remote host
6571 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6573 * INOUT counter - if a new stats structure is allocated, the counter will
6574 * be updated with the new number of allocated stat structures
6581 static int rxi_AddRpcStat(
6582 struct rx_queue *stats,
6583 afs_uint32 rxInterface,
6584 afs_uint32 currentFunc,
6585 afs_uint32 totalFunc,
6586 struct clock *queueTime,
6587 struct clock *execTime,
6588 afs_hyper_t *bytesSent,
6589 afs_hyper_t *bytesRcvd,
6591 afs_uint32 remoteHost,
6592 afs_uint32 remotePort,
6594 unsigned int *counter)
6597 rx_interface_stat_p rpc_stat, nrpc_stat;
6600 * See if there's already a structure for this interface
6603 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6604 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6605 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6609 * Didn't find a match so allocate a new structure and add it to the
6613 if (queue_IsEnd(stats, rpc_stat) ||
6614 (rpc_stat == NULL) ||
6615 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6616 (rpc_stat->stats[0].remote_is_server != isServer)) {
6620 space = sizeof(rx_interface_stat_t) + totalFunc *
6621 sizeof(rx_function_entry_v1_t);
6623 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6624 if (rpc_stat == NULL) {
6628 *counter += totalFunc;
6629 for(i=0;i<totalFunc;i++) {
6630 rpc_stat->stats[i].remote_peer = remoteHost;
6631 rpc_stat->stats[i].remote_port = remotePort;
6632 rpc_stat->stats[i].remote_is_server = isServer;
6633 rpc_stat->stats[i].interfaceId = rxInterface;
6634 rpc_stat->stats[i].func_total = totalFunc;
6635 rpc_stat->stats[i].func_index = i;
6636 hzero(rpc_stat->stats[i].invocations);
6637 hzero(rpc_stat->stats[i].bytes_sent);
6638 hzero(rpc_stat->stats[i].bytes_rcvd);
6639 rpc_stat->stats[i].queue_time_sum.sec = 0;
6640 rpc_stat->stats[i].queue_time_sum.usec = 0;
6641 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6642 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6643 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6644 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6645 rpc_stat->stats[i].queue_time_max.sec = 0;
6646 rpc_stat->stats[i].queue_time_max.usec = 0;
6647 rpc_stat->stats[i].execution_time_sum.sec = 0;
6648 rpc_stat->stats[i].execution_time_sum.usec = 0;
6649 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6650 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6651 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6652 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6653 rpc_stat->stats[i].execution_time_max.sec = 0;
6654 rpc_stat->stats[i].execution_time_max.usec = 0;
6656 queue_Prepend(stats, rpc_stat);
6657 if (addToPeerList) {
6658 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6663 * Increment the stats for this function
6666 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6667 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6668 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6669 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6670 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6671 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6672 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6674 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6675 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6677 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6678 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6679 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6680 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6682 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6683 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6691 * rx_IncrementTimeAndCount - increment the times and count for a particular
6696 * IN peer - the peer who invoked the rpc
6698 * IN rxInterface - a unique number that identifies the rpc interface
6700 * IN currentFunc - the index of the function being invoked
6702 * IN totalFunc - the total number of functions in this interface
6704 * IN queueTime - the amount of time this function waited for a thread
6706 * IN execTime - the amount of time this function invocation took to execute
6708 * IN bytesSent - the number bytes sent by this invocation
6710 * IN bytesRcvd - the number bytes received by this invocation
6712 * IN isServer - if true, this invocation was made to a server
6719 void rx_IncrementTimeAndCount(
6720 struct rx_peer *peer,
6721 afs_uint32 rxInterface,
6722 afs_uint32 currentFunc,
6723 afs_uint32 totalFunc,
6724 struct clock *queueTime,
6725 struct clock *execTime,
6726 afs_hyper_t *bytesSent,
6727 afs_hyper_t *bytesRcvd,
6731 MUTEX_ENTER(&rx_rpc_stats);
6732 MUTEX_ENTER(&peer->peer_lock);
6734 if (rxi_monitor_peerStats) {
6735 rxi_AddRpcStat(&peer->rpcStats,
6747 &rxi_rpc_peer_stat_cnt);
6750 if (rxi_monitor_processStats) {
6751 rxi_AddRpcStat(&processStats,
6763 &rxi_rpc_process_stat_cnt);
6766 MUTEX_EXIT(&peer->peer_lock);
6767 MUTEX_EXIT(&rx_rpc_stats);
6772 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6776 * IN callerVersion - the rpc stat version of the caller.
6778 * IN count - the number of entries to marshall.
6780 * IN stats - pointer to stats to be marshalled.
6782 * OUT ptr - Where to store the marshalled data.
6788 void rx_MarshallProcessRPCStats(
6789 afs_uint32 callerVersion,
6791 rx_function_entry_v1_t *stats,
6798 * We only support the first version
6800 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6801 *(ptr++) = stats->remote_peer;
6802 *(ptr++) = stats->remote_port;
6803 *(ptr++) = stats->remote_is_server;
6804 *(ptr++) = stats->interfaceId;
6805 *(ptr++) = stats->func_total;
6806 *(ptr++) = stats->func_index;
6807 *(ptr++) = hgethi(stats->invocations);
6808 *(ptr++) = hgetlo(stats->invocations);
6809 *(ptr++) = hgethi(stats->bytes_sent);
6810 *(ptr++) = hgetlo(stats->bytes_sent);
6811 *(ptr++) = hgethi(stats->bytes_rcvd);
6812 *(ptr++) = hgetlo(stats->bytes_rcvd);
6813 *(ptr++) = stats->queue_time_sum.sec;
6814 *(ptr++) = stats->queue_time_sum.usec;
6815 *(ptr++) = stats->queue_time_sum_sqr.sec;
6816 *(ptr++) = stats->queue_time_sum_sqr.usec;
6817 *(ptr++) = stats->queue_time_min.sec;
6818 *(ptr++) = stats->queue_time_min.usec;
6819 *(ptr++) = stats->queue_time_max.sec;
6820 *(ptr++) = stats->queue_time_max.usec;
6821 *(ptr++) = stats->execution_time_sum.sec;
6822 *(ptr++) = stats->execution_time_sum.usec;
6823 *(ptr++) = stats->execution_time_sum_sqr.sec;
6824 *(ptr++) = stats->execution_time_sum_sqr.usec;
6825 *(ptr++) = stats->execution_time_min.sec;
6826 *(ptr++) = stats->execution_time_min.usec;
6827 *(ptr++) = stats->execution_time_max.sec;
6828 *(ptr++) = stats->execution_time_max.usec;
6834 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6839 * IN callerVersion - the rpc stat version of the caller
6841 * OUT myVersion - the rpc stat version of this function
6843 * OUT clock_sec - local time seconds
6845 * OUT clock_usec - local time microseconds
6847 * OUT allocSize - the number of bytes allocated to contain stats
6849 * OUT statCount - the number stats retrieved from this process.
6851 * OUT stats - the actual stats retrieved from this process.
6855 * Returns void. If successful, stats will != NULL.
6858 int rx_RetrieveProcessRPCStats(
6859 afs_uint32 callerVersion,
6860 afs_uint32 *myVersion,
6861 afs_uint32 *clock_sec,
6862 afs_uint32 *clock_usec,
6864 afs_uint32 *statCount,
6875 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6878 * Check to see if stats are enabled
6881 MUTEX_ENTER(&rx_rpc_stats);
6882 if (!rxi_monitor_processStats) {
6883 MUTEX_EXIT(&rx_rpc_stats);
6887 clock_GetTime(&now);
6888 *clock_sec = now.sec;
6889 *clock_usec = now.usec;
6892 * Allocate the space based upon the caller version
6894 * If the client is at an older version than we are,
6895 * we return the statistic data in the older data format, but
6896 * we still return our version number so the client knows we
6897 * are maintaining more data than it can retrieve.
6900 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6901 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6902 *statCount = rxi_rpc_process_stat_cnt;
6905 * This can't happen yet, but in the future version changes
6906 * can be handled by adding additional code here
6910 if (space > (size_t) 0) {
6912 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6915 rx_interface_stat_p rpc_stat, nrpc_stat;
6918 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6919 rx_interface_stat)) {
6921 * Copy the data based upon the caller version
6923 rx_MarshallProcessRPCStats(callerVersion,
6924 rpc_stat->stats[0].func_total,
6925 rpc_stat->stats, &ptr);
6931 MUTEX_EXIT(&rx_rpc_stats);
6936 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6940 * IN callerVersion - the rpc stat version of the caller
6942 * OUT myVersion - the rpc stat version of this function
6944 * OUT clock_sec - local time seconds
6946 * OUT clock_usec - local time microseconds
6948 * OUT allocSize - the number of bytes allocated to contain stats
6950 * OUT statCount - the number of stats retrieved from the individual
6953 * OUT stats - the actual stats retrieved from the individual peer structures.
6957 * Returns void. If successful, stats will != NULL.
6960 int rx_RetrievePeerRPCStats(
6961 afs_uint32 callerVersion,
6962 afs_uint32 *myVersion,
6963 afs_uint32 *clock_sec,
6964 afs_uint32 *clock_usec,
6966 afs_uint32 *statCount,
6977 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6980 * Check to see if stats are enabled
6983 MUTEX_ENTER(&rx_rpc_stats);
6984 if (!rxi_monitor_peerStats) {
6985 MUTEX_EXIT(&rx_rpc_stats);
6989 clock_GetTime(&now);
6990 *clock_sec = now.sec;
6991 *clock_usec = now.usec;
6994 * Allocate the space based upon the caller version
6996 * If the client is at an older version than we are,
6997 * we return the statistic data in the older data format, but
6998 * we still return our version number so the client knows we
6999 * are maintaining more data than it can retrieve.
7002 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7003 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
7004 *statCount = rxi_rpc_peer_stat_cnt;
7007 * This can't happen yet, but in the future version changes
7008 * can be handled by adding additional code here
7012 if (space > (size_t) 0) {
7014 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7017 rx_interface_stat_p rpc_stat, nrpc_stat;
7020 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7021 rx_interface_stat)) {
7023 * We have to fix the offset of rpc_stat since we are
7024 * keeping this structure on two rx_queues. The rx_queue
7025 * package assumes that the rx_queue member is the first
7026 * member of the structure. That is, rx_queue assumes that
7027 * any one item is only on one queue at a time. We are
7028 * breaking that assumption and so we have to do a little
7029 * math to fix our pointers.
7032 fix_offset = (char *) rpc_stat;
7033 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7034 rpc_stat = (rx_interface_stat_p) fix_offset;
7037 * Copy the data based upon the caller version
7039 rx_MarshallProcessRPCStats(callerVersion,
7040 rpc_stat->stats[0].func_total,
7041 rpc_stat->stats, &ptr);
7047 MUTEX_EXIT(&rx_rpc_stats);
7052 * rx_FreeRPCStats - free memory allocated by
7053 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7057 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7058 * rx_RetrievePeerRPCStats
7060 * IN allocSize - the number of bytes in stats.
7067 void rx_FreeRPCStats(
7071 rxi_Free(stats, allocSize);
7075 * rx_queryProcessRPCStats - see if process rpc stat collection is
7076 * currently enabled.
7082 * Returns 0 if stats are not enabled != 0 otherwise
7085 int rx_queryProcessRPCStats()
7088 MUTEX_ENTER(&rx_rpc_stats);
7089 rc = rxi_monitor_processStats;
7090 MUTEX_EXIT(&rx_rpc_stats);
7095 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7101 * Returns 0 if stats are not enabled != 0 otherwise
7104 int rx_queryPeerRPCStats()
7107 MUTEX_ENTER(&rx_rpc_stats);
7108 rc = rxi_monitor_peerStats;
7109 MUTEX_EXIT(&rx_rpc_stats);
7114 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7123 void rx_enableProcessRPCStats()
7125 MUTEX_ENTER(&rx_rpc_stats);
7126 rx_enable_stats = 1;
7127 rxi_monitor_processStats = 1;
7128 MUTEX_EXIT(&rx_rpc_stats);
7132 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7141 void rx_enablePeerRPCStats()
7143 MUTEX_ENTER(&rx_rpc_stats);
7144 rx_enable_stats = 1;
7145 rxi_monitor_peerStats = 1;
7146 MUTEX_EXIT(&rx_rpc_stats);
7150 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7159 void rx_disableProcessRPCStats()
7161 rx_interface_stat_p rpc_stat, nrpc_stat;
7164 MUTEX_ENTER(&rx_rpc_stats);
7167 * Turn off process statistics and if peer stats is also off, turn
7171 rxi_monitor_processStats = 0;
7172 if (rxi_monitor_peerStats == 0) {
7173 rx_enable_stats = 0;
7176 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7177 unsigned int num_funcs = 0;
7178 if (!rpc_stat) break;
7179 queue_Remove(rpc_stat);
7180 num_funcs = rpc_stat->stats[0].func_total;
7181 space = sizeof(rx_interface_stat_t) +
7182 rpc_stat->stats[0].func_total *
7183 sizeof(rx_function_entry_v1_t);
7185 rxi_Free(rpc_stat, space);
7186 rxi_rpc_process_stat_cnt -= num_funcs;
7188 MUTEX_EXIT(&rx_rpc_stats);
7192 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7201 void rx_disablePeerRPCStats()
7203 struct rx_peer **peer_ptr, **peer_end;
7206 MUTEX_ENTER(&rx_rpc_stats);
7209 * Turn off peer statistics and if process stats is also off, turn
7213 rxi_monitor_peerStats = 0;
7214 if (rxi_monitor_processStats == 0) {
7215 rx_enable_stats = 0;
7218 MUTEX_ENTER(&rx_peerHashTable_lock);
7219 for (peer_ptr = &rx_peerHashTable[0],
7220 peer_end = &rx_peerHashTable[rx_hashTableSize];
7221 peer_ptr < peer_end; peer_ptr++) {
7222 struct rx_peer *peer, *next, *prev;
7223 for (prev = peer = *peer_ptr; peer; peer = next) {
7225 code = MUTEX_TRYENTER(&peer->peer_lock);
7227 rx_interface_stat_p rpc_stat, nrpc_stat;
7229 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7230 rx_interface_stat)) {
7231 unsigned int num_funcs = 0;
7232 if (!rpc_stat) break;
7233 queue_Remove(&rpc_stat->queue_header);
7234 queue_Remove(&rpc_stat->all_peers);
7235 num_funcs = rpc_stat->stats[0].func_total;
7236 space = sizeof(rx_interface_stat_t) +
7237 rpc_stat->stats[0].func_total *
7238 sizeof(rx_function_entry_v1_t);
7240 rxi_Free(rpc_stat, space);
7241 rxi_rpc_peer_stat_cnt -= num_funcs;
7243 MUTEX_EXIT(&peer->peer_lock);
7244 if (prev == *peer_ptr) {
7256 MUTEX_EXIT(&rx_peerHashTable_lock);
7257 MUTEX_EXIT(&rx_rpc_stats);
7261 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7266 * IN clearFlag - flag indicating which stats to clear
7273 void rx_clearProcessRPCStats(
7274 afs_uint32 clearFlag)
7276 rx_interface_stat_p rpc_stat, nrpc_stat;
7278 MUTEX_ENTER(&rx_rpc_stats);
7280 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7281 unsigned int num_funcs = 0, i;
7282 num_funcs = rpc_stat->stats[0].func_total;
7283 for(i=0;i<num_funcs;i++) {
7284 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7285 hzero(rpc_stat->stats[i].invocations);
7287 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7288 hzero(rpc_stat->stats[i].bytes_sent);
7290 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7291 hzero(rpc_stat->stats[i].bytes_rcvd);
7293 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7294 rpc_stat->stats[i].queue_time_sum.sec = 0;
7295 rpc_stat->stats[i].queue_time_sum.usec = 0;
7297 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7298 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7299 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7301 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7302 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7303 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7305 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7306 rpc_stat->stats[i].queue_time_max.sec = 0;
7307 rpc_stat->stats[i].queue_time_max.usec = 0;
7309 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7310 rpc_stat->stats[i].execution_time_sum.sec = 0;
7311 rpc_stat->stats[i].execution_time_sum.usec = 0;
7313 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7314 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7315 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7317 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7318 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7319 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7321 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7322 rpc_stat->stats[i].execution_time_max.sec = 0;
7323 rpc_stat->stats[i].execution_time_max.usec = 0;
7328 MUTEX_EXIT(&rx_rpc_stats);
7332 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7337 * IN clearFlag - flag indicating which stats to clear
7344 void rx_clearPeerRPCStats(
7345 afs_uint32 clearFlag)
7347 rx_interface_stat_p rpc_stat, nrpc_stat;
7349 MUTEX_ENTER(&rx_rpc_stats);
7351 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7352 unsigned int num_funcs = 0, i;
7355 * We have to fix the offset of rpc_stat since we are
7356 * keeping this structure on two rx_queues. The rx_queue
7357 * package assumes that the rx_queue member is the first
7358 * member of the structure. That is, rx_queue assumes that
7359 * any one item is only on one queue at a time. We are
7360 * breaking that assumption and so we have to do a little
7361 * math to fix our pointers.
7364 fix_offset = (char *) rpc_stat;
7365 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7366 rpc_stat = (rx_interface_stat_p) fix_offset;
7368 num_funcs = rpc_stat->stats[0].func_total;
7369 for(i=0;i<num_funcs;i++) {
7370 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7371 hzero(rpc_stat->stats[i].invocations);
7373 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7374 hzero(rpc_stat->stats[i].bytes_sent);
7376 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7377 hzero(rpc_stat->stats[i].bytes_rcvd);
7379 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7380 rpc_stat->stats[i].queue_time_sum.sec = 0;
7381 rpc_stat->stats[i].queue_time_sum.usec = 0;
7383 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7384 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7385 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7387 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7388 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7389 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7391 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7392 rpc_stat->stats[i].queue_time_max.sec = 0;
7393 rpc_stat->stats[i].queue_time_max.usec = 0;
7395 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7396 rpc_stat->stats[i].execution_time_sum.sec = 0;
7397 rpc_stat->stats[i].execution_time_sum.usec = 0;
7399 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7400 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7401 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7403 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7404 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7405 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7407 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7408 rpc_stat->stats[i].execution_time_max.sec = 0;
7409 rpc_stat->stats[i].execution_time_max.usec = 0;
7414 MUTEX_EXIT(&rx_rpc_stats);
7418 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7419 * is authorized to enable/disable/clear RX statistics.
7421 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7423 void rx_SetRxStatUserOk(
7424 int (*proc)(struct rx_call *call))
7426 rxi_rxstat_userok = proc;
7429 int rx_RxStatUserOk(
7430 struct rx_call *call)
7432 if (!rxi_rxstat_userok)
7434 return rxi_rxstat_userok(call);