2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
13 #include "../afs/param.h"
14 #include <afsconfig.h>
15 #include "../afs/sysincludes.h"
16 #include "../afs/afsincludes.h"
18 #include "../h/types.h"
19 #include "../h/time.h"
20 #include "../h/stat.h"
22 #include <net/net_globals.h>
23 #endif /* AFS_OSF_ENV */
24 #ifdef AFS_LINUX20_ENV
25 #include "../h/socket.h"
27 #include "../netinet/in.h"
28 #include "../afs/afs_args.h"
29 #include "../afs/afs_osi.h"
30 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
31 #include "../h/systm.h"
34 #undef RXDEBUG /* turn off debugging */
36 #if defined(AFS_SGI_ENV)
37 #include "../sys/debug.h"
39 #include "../afsint/afsint.h"
46 #endif /* AFS_ALPHA_ENV */
48 #include "../afs/sysincludes.h"
49 #include "../afs/afsincludes.h"
51 #include "../afs/lock.h"
52 #include "../rx/rx_kmutex.h"
53 #include "../rx/rx_kernel.h"
54 #include "../rx/rx_clock.h"
55 #include "../rx/rx_queue.h"
57 #include "../rx/rx_globals.h"
58 #include "../rx/rx_trace.h"
59 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
60 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
61 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
62 #include "../afsint/afsint.h"
63 extern afs_int32 afs_termState;
65 #include "sys/lockl.h"
66 #include "sys/lock_def.h"
67 #endif /* AFS_AIX41_ENV */
68 # include "../afsint/rxgen_consts.h"
70 # include <afs/param.h>
71 # include <afsconfig.h>
72 # include <sys/types.h>
79 # include <sys/socket.h>
80 # include <sys/file.h>
82 # include <sys/stat.h>
83 # include <netinet/in.h>
84 # include <sys/time.h>
88 # include "rx_clock.h"
89 # include "rx_queue.h"
90 # include "rx_globals.h"
91 # include "rx_trace.h"
92 # include "rx_internal.h"
93 # include <afs/rxgen_consts.h>
102 int (*registerProgram)() = 0;
103 int (*swapNameProgram)() = 0;
105 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
107 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
108 afs_int32 rxi_start_in_error;
110 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
113 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
114 * currently allocated within rx. This number is used to allocate the
115 * memory required to return the statistics when queried.
118 static unsigned int rxi_rpc_peer_stat_cnt;
121 * rxi_rpc_process_stat_cnt counts the total number of local process stat
122 * structures currently allocated within rx. The number is used to allocate
123 * the memory required to return the statistics when queried.
126 static unsigned int rxi_rpc_process_stat_cnt;
128 #if !defined(offsetof)
129 #include <stddef.h> /* for definition of offsetof() */
132 #ifdef AFS_PTHREAD_ENV
136 * Use procedural initialization of mutexes/condition variables
140 extern pthread_mutex_t rxkad_stats_mutex;
141 extern pthread_mutex_t des_init_mutex;
142 extern pthread_mutex_t des_random_mutex;
143 extern pthread_mutex_t rx_clock_mutex;
144 extern pthread_mutex_t rxi_connCacheMutex;
145 extern pthread_mutex_t rx_event_mutex;
146 extern pthread_mutex_t osi_malloc_mutex;
147 extern pthread_mutex_t event_handler_mutex;
148 extern pthread_mutex_t listener_mutex;
149 extern pthread_mutex_t rx_if_init_mutex;
150 extern pthread_mutex_t rx_if_mutex;
151 extern pthread_mutex_t rxkad_client_uid_mutex;
152 extern pthread_mutex_t rxkad_random_mutex;
154 extern pthread_cond_t rx_event_handler_cond;
155 extern pthread_cond_t rx_listener_cond;
157 static pthread_mutex_t epoch_mutex;
158 static pthread_mutex_t rx_init_mutex;
159 static pthread_mutex_t rx_debug_mutex;
161 static void rxi_InitPthread(void) {
162 assert(pthread_mutex_init(&rx_clock_mutex,
163 (const pthread_mutexattr_t*)0)==0);
164 assert(pthread_mutex_init(&rxi_connCacheMutex,
165 (const pthread_mutexattr_t*)0)==0);
166 assert(pthread_mutex_init(&rx_init_mutex,
167 (const pthread_mutexattr_t*)0)==0);
168 assert(pthread_mutex_init(&epoch_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rx_event_mutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&des_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&des_random_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&osi_malloc_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&event_handler_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&listener_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&rx_if_init_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&rx_if_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rxkad_random_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rxkad_stats_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rx_debug_mutex,
193 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_cond_init(&rx_event_handler_cond,
196 (const pthread_condattr_t*)0)==0);
197 assert(pthread_cond_init(&rx_listener_cond,
198 (const pthread_condattr_t*)0)==0);
199 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
202 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
203 #define INIT_PTHREAD_LOCKS \
204 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
206 * The rx_stats_mutex mutex protects the following global variables:
211 * rxi_lowConnRefCount
212 * rxi_lowPeerRefCount
221 #define INIT_PTHREAD_LOCKS
224 extern void rxi_DeleteCachedConnections(void);
227 /* Variables for handling the minProcs implementation. availProcs gives the
228 * number of threads available in the pool at this moment (not counting dudes
229 * executing right now). totalMin gives the total number of procs required
230 * for handling all minProcs requests. minDeficit is a dynamic variable
231 * tracking the # of procs required to satisfy all of the remaining minProcs
233 * For fine grain locking to work, the quota check and the reservation of
234 * a server thread has to come while rxi_availProcs and rxi_minDeficit
235 * are locked. To this end, the code has been modified under #ifdef
236 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
237 * same time. A new function, ReturnToServerPool() returns the allocation.
239 * A call can be on several queue's (but only one at a time). When
240 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
241 * that no one else is touching the queue. To this end, we store the address
242 * of the queue lock in the call structure (under the call lock) when we
243 * put the call on a queue, and we clear the call_queue_lock when the
244 * call is removed from a queue (once the call lock has been obtained).
245 * This allows rxi_ResetCall to safely synchronize with others wishing
246 * to manipulate the queue.
249 extern void rxi_Delay(int);
251 static int rxi_ServerThreadSelectingCall;
253 #ifdef RX_ENABLE_LOCKS
254 static afs_kmutex_t rx_rpc_stats;
255 void rxi_StartUnlocked();
258 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
259 ** pretty good that the next packet coming in is from the same connection
260 ** as the last packet, since we're send multiple packets in a transmit window.
262 struct rx_connection *rxLastConn = 0;
264 #ifdef RX_ENABLE_LOCKS
265 /* The locking hierarchy for rx fine grain locking is composed of five
267 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
268 * call->lock - locks call data fields.
269 * Most any other lock - these are all independent of each other.....
271 * rx_freeCallQueue_lock
273 * rx_connHashTable_lock
276 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
279 * peer_lock - locks peer data fields.
280 * conn_data_lock - that more than one thread is not updating a conn data
281 * field at the same time.
282 * Do we need a lock to protect the peer field in the conn structure?
283 * conn->peer was previously a constant for all intents and so has no
284 * lock protecting this field. The multihomed client delta introduced
285 * a RX code change : change the peer field in the connection structure
286 * to that remote inetrface from which the last packet for this
287 * connection was sent out. This may become an issue if further changes
290 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
291 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
293 /* rxdb_fileID is used to identify the lock location, along with line#. */
294 static int rxdb_fileID = RXDB_FILE_RX;
295 #endif /* RX_LOCKS_DB */
296 static void rxi_SetAcksInTransmitQueue();
297 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
298 #else /* RX_ENABLE_LOCKS */
299 #define SET_CALL_QUEUE_LOCK(C, L)
300 #define CLEAR_CALL_QUEUE_LOCK(C)
301 #endif /* RX_ENABLE_LOCKS */
302 static void rxi_DestroyConnectionNoLock();
303 void rxi_DestroyConnection();
304 void rxi_CleanupConnection();
305 struct rx_serverQueueEntry *rx_waitForPacket = 0;
307 /* ------------Exported Interfaces------------- */
309 /* This function allows rxkad to set the epoch to a suitably random number
310 * which rx_NewConnection will use in the future. The principle purpose is to
311 * get rxnull connections to use the same epoch as the rxkad connections do, at
312 * least once the first rxkad connection is established. This is important now
313 * that the host/port addresses aren't used in FindConnection: the uniqueness
314 * of epoch/cid matters and the start time won't do. */
316 #ifdef AFS_PTHREAD_ENV
318 * This mutex protects the following global variables:
322 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
323 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
327 #endif /* AFS_PTHREAD_ENV */
329 void rx_SetEpoch (epoch)
337 /* Initialize rx. A port number may be mentioned, in which case this
338 * becomes the default port number for any service installed later.
339 * If 0 is provided for the port number, a random port will be chosen
340 * by the kernel. Whether this will ever overlap anything in
341 * /etc/services is anybody's guess... Returns 0 on success, -1 on
343 static int rxinit_status = 1;
344 #ifdef AFS_PTHREAD_ENV
346 * This mutex protects the following global variables:
350 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
351 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
354 #define UNLOCK_RX_INIT
357 int rx_Init(u_int port)
364 char *htable, *ptable;
367 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
368 __djgpp_set_quiet_socket(1);
375 if (rxinit_status == 0) {
376 tmp_status = rxinit_status;
378 return tmp_status; /* Already started; return previous error code. */
382 if (afs_winsockInit()<0)
388 * Initialize anything necessary to provide a non-premptive threading
391 rxi_InitializeThreadSupport();
394 /* Allocate and initialize a socket for client and perhaps server
397 rx_socket = rxi_GetUDPSocket((u_short)port);
398 if (rx_socket == OSI_NULLSOCKET) {
404 #ifdef RX_ENABLE_LOCKS
407 #endif /* RX_LOCKS_DB */
408 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
412 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
414 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
415 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
417 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
419 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
421 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
422 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
424 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
425 #endif /* KERNEL && AFS_HPUX110_ENV */
426 #else /* RX_ENABLE_LOCKS */
427 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
428 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
429 #endif /* AFS_GLOBAL_SUNLOCK */
430 #endif /* RX_ENABLE_LOCKS */
433 rx_connDeadTime = 12;
434 rx_tranquil = 0; /* reset flag */
435 bzero((char *)&rx_stats, sizeof(struct rx_stats));
437 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
438 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
439 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
440 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
441 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
442 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
444 /* Malloc up a bunch of packets & buffers */
446 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
447 queue_Init(&rx_freePacketQueue);
448 rxi_NeedMorePackets = FALSE;
449 rxi_MorePackets(rx_nPackets);
457 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
458 tv.tv_sec = clock_now.sec;
459 tv.tv_usec = clock_now.usec;
460 srand((unsigned int) tv.tv_usec);
467 #if defined(KERNEL) && !defined(UKERNEL)
468 /* Really, this should never happen in a real kernel */
471 struct sockaddr_in addr;
472 int addrlen = sizeof(addr);
473 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
477 rx_port = addr.sin_port;
480 rx_stats.minRtt.sec = 9999999;
482 rx_SetEpoch (tv.tv_sec | 0x80000000);
484 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
485 * will provide a randomer value. */
487 MUTEX_ENTER(&rx_stats_mutex);
488 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
489 MUTEX_EXIT(&rx_stats_mutex);
490 /* *Slightly* random start time for the cid. This is just to help
491 * out with the hashing function at the peer */
492 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
493 rx_connHashTable = (struct rx_connection **) htable;
494 rx_peerHashTable = (struct rx_peer **) ptable;
496 rx_lastAckDelay.sec = 0;
497 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
498 rx_hardAckDelay.sec = 0;
499 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
500 rx_softAckDelay.sec = 0;
501 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
503 rxevent_Init(20, rxi_ReScheduleEvents);
505 /* Initialize various global queues */
506 queue_Init(&rx_idleServerQueue);
507 queue_Init(&rx_incomingCallQueue);
508 queue_Init(&rx_freeCallQueue);
510 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
511 /* Initialize our list of usable IP addresses. */
515 /* Start listener process (exact function is dependent on the
516 * implementation environment--kernel or user space) */
521 tmp_status = rxinit_status = 0;
526 /* called with unincremented nRequestsRunning to see if it is OK to start
527 * a new thread in this service. Could be "no" for two reasons: over the
528 * max quota, or would prevent others from reaching their min quota.
530 #ifdef RX_ENABLE_LOCKS
531 /* This verion of QuotaOK reserves quota if it's ok while the
532 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
534 static int QuotaOK(aservice)
535 register struct rx_service *aservice;
537 /* check if over max quota */
538 if (aservice->nRequestsRunning >= aservice->maxProcs) {
542 /* under min quota, we're OK */
543 /* otherwise, can use only if there are enough to allow everyone
544 * to go to their min quota after this guy starts.
546 MUTEX_ENTER(&rx_stats_mutex);
547 if ((aservice->nRequestsRunning < aservice->minProcs) ||
548 (rxi_availProcs > rxi_minDeficit)) {
549 aservice->nRequestsRunning++;
550 /* just started call in minProcs pool, need fewer to maintain
552 if (aservice->nRequestsRunning <= aservice->minProcs)
555 MUTEX_EXIT(&rx_stats_mutex);
558 MUTEX_EXIT(&rx_stats_mutex);
562 static void ReturnToServerPool(aservice)
563 register struct rx_service *aservice;
565 aservice->nRequestsRunning--;
566 MUTEX_ENTER(&rx_stats_mutex);
567 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
569 MUTEX_EXIT(&rx_stats_mutex);
572 #else /* RX_ENABLE_LOCKS */
573 static QuotaOK(aservice)
574 register struct rx_service *aservice; {
576 /* under min quota, we're OK */
577 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
579 /* check if over max quota */
580 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
582 /* otherwise, can use only if there are enough to allow everyone
583 * to go to their min quota after this guy starts.
585 if (rxi_availProcs > rxi_minDeficit) rc = 1;
588 #endif /* RX_ENABLE_LOCKS */
591 /* Called by rx_StartServer to start up lwp's to service calls.
592 NExistingProcs gives the number of procs already existing, and which
593 therefore needn't be created. */
594 void rxi_StartServerProcs(nExistingProcs)
597 register struct rx_service *service;
602 /* For each service, reserve N processes, where N is the "minimum"
603 number of processes that MUST be able to execute a request in parallel,
604 at any time, for that process. Also compute the maximum difference
605 between any service's maximum number of processes that can run
606 (i.e. the maximum number that ever will be run, and a guarantee
607 that this number will run if other services aren't running), and its
608 minimum number. The result is the extra number of processes that
609 we need in order to provide the latter guarantee */
610 for (i=0; i<RX_MAX_SERVICES; i++) {
612 service = rx_services[i];
613 if (service == (struct rx_service *) 0) break;
614 nProcs += service->minProcs;
615 diff = service->maxProcs - service->minProcs;
616 if (diff > maxdiff) maxdiff = diff;
618 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
619 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
620 for (i = 0; i<nProcs; i++) {
621 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
626 /* This routine must be called if any services are exported. If the
627 * donateMe flag is set, the calling process is donated to the server
629 void rx_StartServer(donateMe)
631 register struct rx_service *service;
632 register int i, nProcs=0;
638 /* Start server processes, if necessary (exact function is dependent
639 * on the implementation environment--kernel or user space). DonateMe
640 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
641 * case, one less new proc will be created rx_StartServerProcs.
643 rxi_StartServerProcs(donateMe);
645 /* count up the # of threads in minProcs, and add set the min deficit to
646 * be that value, too.
648 for (i=0; i<RX_MAX_SERVICES; i++) {
649 service = rx_services[i];
650 if (service == (struct rx_service *) 0) break;
651 MUTEX_ENTER(&rx_stats_mutex);
652 rxi_totalMin += service->minProcs;
653 /* below works even if a thread is running, since minDeficit would
654 * still have been decremented and later re-incremented.
656 rxi_minDeficit += service->minProcs;
657 MUTEX_EXIT(&rx_stats_mutex);
660 /* Turn on reaping of idle server connections */
661 rxi_ReapConnections();
671 #ifdef AFS_PTHREAD_ENV
673 pid = (pid_t) pthread_self();
674 #else /* AFS_PTHREAD_ENV */
676 code = LWP_CurrentProcess(&pid);
677 #endif /* AFS_PTHREAD_ENV */
679 sprintf(name,"srv_%d", ++nProcs);
681 (*registerProgram)(pid, name);
683 #endif /* AFS_NT40_ENV */
684 rx_ServerProc(); /* Never returns */
689 /* Create a new client connection to the specified service, using the
690 * specified security object to implement the security model for this
692 struct rx_connection *
693 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
694 register afs_uint32 shost; /* Server host */
695 u_short sport; /* Server port */
696 u_short sservice; /* Server service id */
697 register struct rx_securityClass *securityObject;
698 int serviceSecurityIndex;
702 register struct rx_connection *conn;
707 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
708 shost, sport, sservice, securityObject, serviceSecurityIndex));
710 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
711 * the case of kmem_alloc? */
712 conn = rxi_AllocConnection();
713 #ifdef RX_ENABLE_LOCKS
714 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
715 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
716 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
720 MUTEX_ENTER(&rx_connHashTable_lock);
721 cid = (rx_nextCid += RX_MAXCALLS);
722 conn->type = RX_CLIENT_CONNECTION;
724 conn->epoch = rx_epoch;
725 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
726 conn->serviceId = sservice;
727 conn->securityObject = securityObject;
728 /* This doesn't work in all compilers with void (they're buggy), so fake it
730 conn->securityData = (VOID *) 0;
731 conn->securityIndex = serviceSecurityIndex;
732 rx_SetConnDeadTime(conn, rx_connDeadTime);
733 conn->ackRate = RX_FAST_ACK_RATE;
735 conn->specific = NULL;
736 conn->challengeEvent = (struct rxevent *)0;
737 conn->delayedAbortEvent = (struct rxevent *)0;
738 conn->abortCount = 0;
741 RXS_NewConnection(securityObject, conn);
742 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
744 conn->refCount++; /* no lock required since only this thread knows... */
745 conn->next = rx_connHashTable[hashindex];
746 rx_connHashTable[hashindex] = conn;
747 MUTEX_ENTER(&rx_stats_mutex);
748 rx_stats.nClientConns++;
749 MUTEX_EXIT(&rx_stats_mutex);
751 MUTEX_EXIT(&rx_connHashTable_lock);
757 void rx_SetConnDeadTime(conn, seconds)
758 register struct rx_connection *conn;
759 register int seconds;
761 /* The idea is to set the dead time to a value that allows several
762 * keepalives to be dropped without timing out the connection. */
763 conn->secondsUntilDead = MAX(seconds, 6);
764 conn->secondsUntilPing = conn->secondsUntilDead/6;
767 int rxi_lowPeerRefCount = 0;
768 int rxi_lowConnRefCount = 0;
771 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
772 * NOTE: must not be called with rx_connHashTable_lock held.
774 void rxi_CleanupConnection(conn)
775 struct rx_connection *conn;
779 /* Notify the service exporter, if requested, that this connection
780 * is being destroyed */
781 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
782 (*conn->service->destroyConnProc)(conn);
784 /* Notify the security module that this connection is being destroyed */
785 RXS_DestroyConnection(conn->securityObject, conn);
787 /* If this is the last connection using the rx_peer struct, set its
788 * idle time to now. rxi_ReapConnections will reap it if it's still
789 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
791 MUTEX_ENTER(&rx_peerHashTable_lock);
792 if (--conn->peer->refCount <= 0) {
793 conn->peer->idleWhen = clock_Sec();
794 if (conn->peer->refCount < 0) {
795 conn->peer->refCount = 0;
796 MUTEX_ENTER(&rx_stats_mutex);
797 rxi_lowPeerRefCount ++;
798 MUTEX_EXIT(&rx_stats_mutex);
801 MUTEX_EXIT(&rx_peerHashTable_lock);
803 MUTEX_ENTER(&rx_stats_mutex);
804 if (conn->type == RX_SERVER_CONNECTION)
805 rx_stats.nServerConns--;
807 rx_stats.nClientConns--;
808 MUTEX_EXIT(&rx_stats_mutex);
811 if (conn->specific) {
812 for (i = 0 ; i < conn->nSpecific ; i++) {
813 if (conn->specific[i] && rxi_keyCreate_destructor[i])
814 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
815 conn->specific[i] = NULL;
817 free(conn->specific);
819 conn->specific = NULL;
823 MUTEX_DESTROY(&conn->conn_call_lock);
824 MUTEX_DESTROY(&conn->conn_data_lock);
825 CV_DESTROY(&conn->conn_call_cv);
827 rxi_FreeConnection(conn);
830 /* Destroy the specified connection */
831 void rxi_DestroyConnection(conn)
832 register struct rx_connection *conn;
834 MUTEX_ENTER(&rx_connHashTable_lock);
835 rxi_DestroyConnectionNoLock(conn);
836 /* conn should be at the head of the cleanup list */
837 if (conn == rx_connCleanup_list) {
838 rx_connCleanup_list = rx_connCleanup_list->next;
839 MUTEX_EXIT(&rx_connHashTable_lock);
840 rxi_CleanupConnection(conn);
842 #ifdef RX_ENABLE_LOCKS
844 MUTEX_EXIT(&rx_connHashTable_lock);
846 #endif /* RX_ENABLE_LOCKS */
849 static void rxi_DestroyConnectionNoLock(conn)
850 register struct rx_connection *conn;
852 register struct rx_connection **conn_ptr;
853 register int havecalls = 0;
854 struct rx_packet *packet;
861 MUTEX_ENTER(&conn->conn_data_lock);
862 if (conn->refCount > 0)
865 MUTEX_ENTER(&rx_stats_mutex);
866 rxi_lowConnRefCount++;
867 MUTEX_EXIT(&rx_stats_mutex);
870 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
871 /* Busy; wait till the last guy before proceeding */
872 MUTEX_EXIT(&conn->conn_data_lock);
877 /* If the client previously called rx_NewCall, but it is still
878 * waiting, treat this as a running call, and wait to destroy the
879 * connection later when the call completes. */
880 if ((conn->type == RX_CLIENT_CONNECTION) &&
881 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
882 conn->flags |= RX_CONN_DESTROY_ME;
883 MUTEX_EXIT(&conn->conn_data_lock);
887 MUTEX_EXIT(&conn->conn_data_lock);
889 /* Check for extant references to this connection */
890 for (i = 0; i<RX_MAXCALLS; i++) {
891 register struct rx_call *call = conn->call[i];
894 if (conn->type == RX_CLIENT_CONNECTION) {
895 MUTEX_ENTER(&call->lock);
896 if (call->delayedAckEvent) {
897 /* Push the final acknowledgment out now--there
898 * won't be a subsequent call to acknowledge the
899 * last reply packets */
900 rxevent_Cancel(call->delayedAckEvent, call,
901 RX_CALL_REFCOUNT_DELAY);
902 rxi_AckAll((struct rxevent *)0, call, 0);
904 MUTEX_EXIT(&call->lock);
908 #ifdef RX_ENABLE_LOCKS
910 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
911 MUTEX_EXIT(&conn->conn_data_lock);
914 /* Someone is accessing a packet right now. */
918 #endif /* RX_ENABLE_LOCKS */
921 /* Don't destroy the connection if there are any call
922 * structures still in use */
923 MUTEX_ENTER(&conn->conn_data_lock);
924 conn->flags |= RX_CONN_DESTROY_ME;
925 MUTEX_EXIT(&conn->conn_data_lock);
930 if (conn->delayedAbortEvent) {
931 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
932 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
934 MUTEX_ENTER(&conn->conn_data_lock);
935 rxi_SendConnectionAbort(conn, packet, 0, 1);
936 MUTEX_EXIT(&conn->conn_data_lock);
937 rxi_FreePacket(packet);
941 /* Remove from connection hash table before proceeding */
942 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
943 conn->epoch, conn->type) ];
944 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
945 if (*conn_ptr == conn) {
946 *conn_ptr = conn->next;
950 /* if the conn that we are destroying was the last connection, then we
951 * clear rxLastConn as well */
952 if ( rxLastConn == conn )
955 /* Make sure the connection is completely reset before deleting it. */
956 /* get rid of pending events that could zap us later */
957 if (conn->challengeEvent) {
958 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
961 /* Add the connection to the list of destroyed connections that
962 * need to be cleaned up. This is necessary to avoid deadlocks
963 * in the routines we call to inform others that this connection is
964 * being destroyed. */
965 conn->next = rx_connCleanup_list;
966 rx_connCleanup_list = conn;
969 /* Externally available version */
970 void rx_DestroyConnection(conn)
971 register struct rx_connection *conn;
977 rxi_DestroyConnection (conn);
982 /* Start a new rx remote procedure call, on the specified connection.
983 * If wait is set to 1, wait for a free call channel; otherwise return
984 * 0. Maxtime gives the maximum number of seconds this call may take,
985 * after rx_MakeCall returns. After this time interval, a call to any
986 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
987 * For fine grain locking, we hold the conn_call_lock in order to
988 * to ensure that we don't get signalle after we found a call in an active
989 * state and before we go to sleep.
991 struct rx_call *rx_NewCall(conn)
992 register struct rx_connection *conn;
995 register struct rx_call *call;
996 struct clock queueTime;
1000 dpf (("rx_MakeCall(conn %x)\n", conn));
1003 clock_GetTime(&queueTime);
1005 MUTEX_ENTER(&conn->conn_call_lock);
1007 for (i=0; i<RX_MAXCALLS; i++) {
1008 call = conn->call[i];
1010 MUTEX_ENTER(&call->lock);
1011 if (call->state == RX_STATE_DALLY) {
1012 rxi_ResetCall(call, 0);
1013 (*call->callNumber)++;
1016 MUTEX_EXIT(&call->lock);
1019 call = rxi_NewCall(conn, i);
1020 MUTEX_ENTER(&call->lock);
1024 if (i < RX_MAXCALLS) {
1027 MUTEX_ENTER(&conn->conn_data_lock);
1028 conn->flags |= RX_CONN_MAKECALL_WAITING;
1029 MUTEX_EXIT(&conn->conn_data_lock);
1030 #ifdef RX_ENABLE_LOCKS
1031 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1037 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1039 /* Client is initially in send mode */
1040 call->state = RX_STATE_ACTIVE;
1041 call->mode = RX_MODE_SENDING;
1043 /* remember start time for call in case we have hard dead time limit */
1044 call->queueTime = queueTime;
1045 clock_GetTime(&call->startTime);
1046 hzero(call->bytesSent);
1047 hzero(call->bytesRcvd);
1049 /* Turn on busy protocol. */
1050 rxi_KeepAliveOn(call);
1052 MUTEX_EXIT(&call->lock);
1053 MUTEX_EXIT(&conn->conn_call_lock);
1057 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1058 /* Now, if TQ wasn't cleared earlier, do it now. */
1060 MUTEX_ENTER(&call->lock);
1061 while (call->flags & RX_CALL_TQ_BUSY) {
1062 call->flags |= RX_CALL_TQ_WAIT;
1063 #ifdef RX_ENABLE_LOCKS
1064 CV_WAIT(&call->cv_tq, &call->lock);
1065 #else /* RX_ENABLE_LOCKS */
1066 osi_rxSleep(&call->tq);
1067 #endif /* RX_ENABLE_LOCKS */
1069 if (call->flags & RX_CALL_TQ_CLEARME) {
1070 rxi_ClearTransmitQueue(call, 0);
1071 queue_Init(&call->tq);
1073 MUTEX_EXIT(&call->lock);
1075 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1080 rxi_HasActiveCalls(aconn)
1081 register struct rx_connection *aconn; {
1083 register struct rx_call *tcall;
1087 for(i=0; i<RX_MAXCALLS; i++) {
1088 if ((tcall = aconn->call[i])) {
1089 if ((tcall->state == RX_STATE_ACTIVE)
1090 || (tcall->state == RX_STATE_PRECALL)) {
1100 rxi_GetCallNumberVector(aconn, aint32s)
1101 register struct rx_connection *aconn;
1102 register afs_int32 *aint32s; {
1104 register struct rx_call *tcall;
1108 for(i=0; i<RX_MAXCALLS; i++) {
1109 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1110 aint32s[i] = aconn->callNumber[i]+1;
1112 aint32s[i] = aconn->callNumber[i];
1118 rxi_SetCallNumberVector(aconn, aint32s)
1119 register struct rx_connection *aconn;
1120 register afs_int32 *aint32s; {
1122 register struct rx_call *tcall;
1126 for(i=0; i<RX_MAXCALLS; i++) {
1127 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1128 aconn->callNumber[i] = aint32s[i] - 1;
1130 aconn->callNumber[i] = aint32s[i];
1136 /* Advertise a new service. A service is named locally by a UDP port
1137 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1140 rx_NewService(port, serviceId, serviceName, securityObjects,
1141 nSecurityObjects, serviceProc)
1144 char *serviceName; /* Name for identification purposes (e.g. the
1145 * service name might be used for probing for
1147 struct rx_securityClass **securityObjects;
1148 int nSecurityObjects;
1149 afs_int32 (*serviceProc)();
1151 osi_socket socket = OSI_NULLSOCKET;
1152 register struct rx_service *tservice;
1158 if (serviceId == 0) {
1159 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1165 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1172 tservice = rxi_AllocService();
1175 for (i = 0; i<RX_MAX_SERVICES; i++) {
1176 register struct rx_service *service = rx_services[i];
1178 if (port == service->servicePort) {
1179 if (service->serviceId == serviceId) {
1180 /* The identical service has already been
1181 * installed; if the caller was intending to
1182 * change the security classes used by this
1183 * service, he/she loses. */
1184 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1187 rxi_FreeService(tservice);
1190 /* Different service, same port: re-use the socket
1191 * which is bound to the same port */
1192 socket = service->socket;
1195 if (socket == OSI_NULLSOCKET) {
1196 /* If we don't already have a socket (from another
1197 * service on same port) get a new one */
1198 socket = rxi_GetUDPSocket(port);
1199 if (socket == OSI_NULLSOCKET) {
1202 rxi_FreeService(tservice);
1207 service->socket = socket;
1208 service->servicePort = port;
1209 service->serviceId = serviceId;
1210 service->serviceName = serviceName;
1211 service->nSecurityObjects = nSecurityObjects;
1212 service->securityObjects = securityObjects;
1213 service->minProcs = 0;
1214 service->maxProcs = 1;
1215 service->idleDeadTime = 60;
1216 service->connDeadTime = rx_connDeadTime;
1217 service->executeRequestProc = serviceProc;
1218 rx_services[i] = service; /* not visible until now */
1226 rxi_FreeService(tservice);
1227 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1231 /* Generic request processing loop. This routine should be called
1232 * by the implementation dependent rx_ServerProc. If socketp is
1233 * non-null, it will be set to the file descriptor that this thread
1234 * is now listening on. If socketp is null, this routine will never
1236 void rxi_ServerProc(threadID, newcall, socketp)
1238 struct rx_call *newcall;
1239 osi_socket *socketp;
1241 register struct rx_call *call;
1242 register afs_int32 code;
1243 register struct rx_service *tservice = NULL;
1250 call = rx_GetCall(threadID, tservice, socketp);
1251 if (socketp && *socketp != OSI_NULLSOCKET) {
1252 /* We are now a listener thread */
1257 /* if server is restarting( typically smooth shutdown) then do not
1258 * allow any new calls.
1261 if ( rx_tranquil && (call != NULL) ) {
1266 MUTEX_ENTER(&call->lock);
1268 rxi_CallError(call, RX_RESTARTING);
1269 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1271 MUTEX_EXIT(&call->lock);
1277 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1278 #ifdef RX_ENABLE_LOCKS
1280 #endif /* RX_ENABLE_LOCKS */
1281 afs_termState = AFSOP_STOP_AFS;
1282 afs_osi_Wakeup(&afs_termState);
1283 #ifdef RX_ENABLE_LOCKS
1285 #endif /* RX_ENABLE_LOCKS */
1290 tservice = call->conn->service;
1292 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1294 code = call->conn->service->executeRequestProc(call);
1296 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1298 rx_EndCall(call, code);
1299 MUTEX_ENTER(&rx_stats_mutex);
1301 MUTEX_EXIT(&rx_stats_mutex);
1306 void rx_WakeupServerProcs()
1308 struct rx_serverQueueEntry *np, *tqp;
1313 MUTEX_ENTER(&rx_serverPool_lock);
1315 #ifdef RX_ENABLE_LOCKS
1316 if (rx_waitForPacket)
1317 CV_BROADCAST(&rx_waitForPacket->cv);
1318 #else /* RX_ENABLE_LOCKS */
1319 if (rx_waitForPacket)
1320 osi_rxWakeup(rx_waitForPacket);
1321 #endif /* RX_ENABLE_LOCKS */
1322 MUTEX_ENTER(&freeSQEList_lock);
1323 for (np = rx_FreeSQEList; np; np = tqp) {
1324 tqp = *(struct rx_serverQueueEntry **)np;
1325 #ifdef RX_ENABLE_LOCKS
1326 CV_BROADCAST(&np->cv);
1327 #else /* RX_ENABLE_LOCKS */
1329 #endif /* RX_ENABLE_LOCKS */
1331 MUTEX_EXIT(&freeSQEList_lock);
1332 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1333 #ifdef RX_ENABLE_LOCKS
1334 CV_BROADCAST(&np->cv);
1335 #else /* RX_ENABLE_LOCKS */
1337 #endif /* RX_ENABLE_LOCKS */
1339 MUTEX_EXIT(&rx_serverPool_lock);
1345 * One thing that seems to happen is that all the server threads get
1346 * tied up on some empty or slow call, and then a whole bunch of calls
1347 * arrive at once, using up the packet pool, so now there are more
1348 * empty calls. The most critical resources here are server threads
1349 * and the free packet pool. The "doreclaim" code seems to help in
1350 * general. I think that eventually we arrive in this state: there
1351 * are lots of pending calls which do have all their packets present,
1352 * so they won't be reclaimed, are multi-packet calls, so they won't
1353 * be scheduled until later, and thus are tying up most of the free
1354 * packet pool for a very long time.
1356 * 1. schedule multi-packet calls if all the packets are present.
1357 * Probably CPU-bound operation, useful to return packets to pool.
1358 * Do what if there is a full window, but the last packet isn't here?
1359 * 3. preserve one thread which *only* runs "best" calls, otherwise
1360 * it sleeps and waits for that type of call.
1361 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1362 * the current dataquota business is badly broken. The quota isn't adjusted
1363 * to reflect how many packets are presently queued for a running call.
1364 * So, when we schedule a queued call with a full window of packets queued
1365 * up for it, that *should* free up a window full of packets for other 2d-class
1366 * calls to be able to use from the packet pool. But it doesn't.
1368 * NB. Most of the time, this code doesn't run -- since idle server threads
1369 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1370 * as a new call arrives.
1372 /* Sleep until a call arrives. Returns a pointer to the call, ready
1373 * for an rx_Read. */
1374 #ifdef RX_ENABLE_LOCKS
1376 rx_GetCall(tno, cur_service, socketp)
1378 struct rx_service *cur_service;
1379 osi_socket *socketp;
1381 struct rx_serverQueueEntry *sq;
1382 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1383 struct rx_service *service;
1386 MUTEX_ENTER(&freeSQEList_lock);
1388 if (sq = rx_FreeSQEList) {
1389 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1390 MUTEX_EXIT(&freeSQEList_lock);
1391 } else { /* otherwise allocate a new one and return that */
1392 MUTEX_EXIT(&freeSQEList_lock);
1393 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1394 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1395 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1398 MUTEX_ENTER(&rx_serverPool_lock);
1399 if (cur_service != NULL) {
1400 ReturnToServerPool(cur_service);
1403 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1404 register struct rx_call *tcall, *ncall;
1405 choice2 = (struct rx_call *) 0;
1406 /* Scan for eligible incoming calls. A call is not eligible
1407 * if the maximum number of calls for its service type are
1408 * already executing */
1409 /* One thread will process calls FCFS (to prevent starvation),
1410 * while the other threads may run ahead looking for calls which
1411 * have all their input data available immediately. This helps
1412 * keep threads from blocking, waiting for data from the client. */
1413 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1414 service = tcall->conn->service;
1415 if (!QuotaOK(service)) {
1418 if (!tno || !tcall->queue_item_header.next ) {
1419 /* If we're thread 0, then we'll just use
1420 * this call. If we haven't been able to find an optimal
1421 * choice, and we're at the end of the list, then use a
1422 * 2d choice if one has been identified. Otherwise... */
1423 call = (choice2 ? choice2 : tcall);
1424 service = call->conn->service;
1425 } else if (!queue_IsEmpty(&tcall->rq)) {
1426 struct rx_packet *rp;
1427 rp = queue_First(&tcall->rq, rx_packet);
1428 if (rp->header.seq == 1) {
1429 if (!meltdown_1pkt ||
1430 (rp->header.flags & RX_LAST_PACKET)) {
1432 } else if (rxi_2dchoice && !choice2 &&
1433 !(tcall->flags & RX_CALL_CLEARED) &&
1434 (tcall->rprev > rxi_HardAckRate)) {
1436 } else rxi_md2cnt++;
1442 ReturnToServerPool(service);
1449 rxi_ServerThreadSelectingCall = 1;
1450 MUTEX_EXIT(&rx_serverPool_lock);
1451 MUTEX_ENTER(&call->lock);
1452 MUTEX_ENTER(&rx_serverPool_lock);
1454 if (queue_IsEmpty(&call->rq) ||
1455 queue_First(&call->rq, rx_packet)->header.seq != 1)
1456 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1458 CLEAR_CALL_QUEUE_LOCK(call);
1460 MUTEX_EXIT(&call->lock);
1461 ReturnToServerPool(service);
1462 rxi_ServerThreadSelectingCall = 0;
1463 CV_SIGNAL(&rx_serverPool_cv);
1464 call = (struct rx_call*)0;
1467 call->flags &= (~RX_CALL_WAIT_PROC);
1468 MUTEX_ENTER(&rx_stats_mutex);
1470 MUTEX_EXIT(&rx_stats_mutex);
1471 rxi_ServerThreadSelectingCall = 0;
1472 CV_SIGNAL(&rx_serverPool_cv);
1473 MUTEX_EXIT(&rx_serverPool_lock);
1477 /* If there are no eligible incoming calls, add this process
1478 * to the idle server queue, to wait for one */
1482 *socketp = OSI_NULLSOCKET;
1484 sq->socketp = socketp;
1485 queue_Append(&rx_idleServerQueue, sq);
1486 #ifndef AFS_AIX41_ENV
1487 rx_waitForPacket = sq;
1488 #endif /* AFS_AIX41_ENV */
1490 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1492 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1493 MUTEX_EXIT(&rx_serverPool_lock);
1494 return (struct rx_call *)0;
1497 } while (!(call = sq->newcall) &&
1498 !(socketp && *socketp != OSI_NULLSOCKET));
1499 MUTEX_EXIT(&rx_serverPool_lock);
1501 MUTEX_ENTER(&call->lock);
1507 MUTEX_ENTER(&freeSQEList_lock);
1508 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1509 rx_FreeSQEList = sq;
1510 MUTEX_EXIT(&freeSQEList_lock);
1513 clock_GetTime(&call->startTime);
1514 call->state = RX_STATE_ACTIVE;
1515 call->mode = RX_MODE_RECEIVING;
1517 rxi_calltrace(RX_CALL_START, call);
1518 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1519 call->conn->service->servicePort,
1520 call->conn->service->serviceId, call));
1522 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1523 MUTEX_EXIT(&call->lock);
1525 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1530 #else /* RX_ENABLE_LOCKS */
1532 rx_GetCall(tno, cur_service, socketp)
1534 struct rx_service *cur_service;
1535 osi_socket *socketp;
1537 struct rx_serverQueueEntry *sq;
1538 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1539 struct rx_service *service = NULL;
1544 MUTEX_ENTER(&freeSQEList_lock);
1546 if ((sq = rx_FreeSQEList)) {
1547 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1548 MUTEX_EXIT(&freeSQEList_lock);
1549 } else { /* otherwise allocate a new one and return that */
1550 MUTEX_EXIT(&freeSQEList_lock);
1551 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1552 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1553 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1555 MUTEX_ENTER(&sq->lock);
1557 if (cur_service != NULL) {
1558 cur_service->nRequestsRunning--;
1559 if (cur_service->nRequestsRunning < cur_service->minProcs)
1563 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1564 register struct rx_call *tcall, *ncall;
1565 /* Scan for eligible incoming calls. A call is not eligible
1566 * if the maximum number of calls for its service type are
1567 * already executing */
1568 /* One thread will process calls FCFS (to prevent starvation),
1569 * while the other threads may run ahead looking for calls which
1570 * have all their input data available immediately. This helps
1571 * keep threads from blocking, waiting for data from the client. */
1572 choice2 = (struct rx_call *) 0;
1573 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1574 service = tcall->conn->service;
1575 if (QuotaOK(service)) {
1576 if (!tno || !tcall->queue_item_header.next ) {
1577 /* If we're thread 0, then we'll just use
1578 * this call. If we haven't been able to find an optimal
1579 * choice, and we're at the end of the list, then use a
1580 * 2d choice if one has been identified. Otherwise... */
1581 call = (choice2 ? choice2 : tcall);
1582 service = call->conn->service;
1583 } else if (!queue_IsEmpty(&tcall->rq)) {
1584 struct rx_packet *rp;
1585 rp = queue_First(&tcall->rq, rx_packet);
1586 if (rp->header.seq == 1
1587 && (!meltdown_1pkt ||
1588 (rp->header.flags & RX_LAST_PACKET))) {
1590 } else if (rxi_2dchoice && !choice2 &&
1591 !(tcall->flags & RX_CALL_CLEARED) &&
1592 (tcall->rprev > rxi_HardAckRate)) {
1594 } else rxi_md2cnt++;
1604 /* we can't schedule a call if there's no data!!! */
1605 /* send an ack if there's no data, if we're missing the
1606 * first packet, or we're missing something between first
1607 * and last -- there's a "hole" in the incoming data. */
1608 if (queue_IsEmpty(&call->rq) ||
1609 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1610 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1611 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1613 call->flags &= (~RX_CALL_WAIT_PROC);
1614 service->nRequestsRunning++;
1615 /* just started call in minProcs pool, need fewer to maintain
1617 if (service->nRequestsRunning <= service->minProcs)
1621 /* MUTEX_EXIT(&call->lock); */
1624 /* If there are no eligible incoming calls, add this process
1625 * to the idle server queue, to wait for one */
1628 *socketp = OSI_NULLSOCKET;
1630 sq->socketp = socketp;
1631 queue_Append(&rx_idleServerQueue, sq);
1635 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1638 return (struct rx_call *)0;
1641 } while (!(call = sq->newcall) &&
1642 !(socketp && *socketp != OSI_NULLSOCKET));
1644 MUTEX_EXIT(&sq->lock);
1646 MUTEX_ENTER(&freeSQEList_lock);
1647 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1648 rx_FreeSQEList = sq;
1649 MUTEX_EXIT(&freeSQEList_lock);
1652 clock_GetTime(&call->startTime);
1653 call->state = RX_STATE_ACTIVE;
1654 call->mode = RX_MODE_RECEIVING;
1656 rxi_calltrace(RX_CALL_START, call);
1657 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1658 call->conn->service->servicePort,
1659 call->conn->service->serviceId, call));
1661 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1669 #endif /* RX_ENABLE_LOCKS */
1673 /* Establish a procedure to be called when a packet arrives for a
1674 * call. This routine will be called at most once after each call,
1675 * and will also be called if there is an error condition on the or
1676 * the call is complete. Used by multi rx to build a selection
1677 * function which determines which of several calls is likely to be a
1678 * good one to read from.
1679 * NOTE: the way this is currently implemented it is probably only a
1680 * good idea to (1) use it immediately after a newcall (clients only)
1681 * and (2) only use it once. Other uses currently void your warranty
1683 void rx_SetArrivalProc(call, proc, handle, arg)
1684 register struct rx_call *call;
1685 register VOID (*proc)();
1686 register VOID *handle;
1689 call->arrivalProc = proc;
1690 call->arrivalProcHandle = handle;
1691 call->arrivalProcArg = arg;
1694 /* Call is finished (possibly prematurely). Return rc to the peer, if
1695 * appropriate, and return the final error code from the conversation
1698 afs_int32 rx_EndCall(call, rc)
1699 register struct rx_call *call;
1702 register struct rx_connection *conn = call->conn;
1703 register struct rx_service *service;
1704 register struct rx_packet *tp; /* Temporary packet pointer */
1705 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1709 dpf(("rx_EndCall(call %x)\n", call));
1713 MUTEX_ENTER(&call->lock);
1715 if (rc == 0 && call->error == 0) {
1716 call->abortCode = 0;
1717 call->abortCount = 0;
1720 call->arrivalProc = (VOID (*)()) 0;
1721 if (rc && call->error == 0) {
1722 rxi_CallError(call, rc);
1723 /* Send an abort message to the peer if this error code has
1724 * only just been set. If it was set previously, assume the
1725 * peer has already been sent the error code or will request it
1727 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1729 if (conn->type == RX_SERVER_CONNECTION) {
1730 /* Make sure reply or at least dummy reply is sent */
1731 if (call->mode == RX_MODE_RECEIVING) {
1732 rxi_WriteProc(call, 0, 0);
1734 if (call->mode == RX_MODE_SENDING) {
1735 rxi_FlushWrite(call);
1737 service = conn->service;
1738 rxi_calltrace(RX_CALL_END, call);
1739 /* Call goes to hold state until reply packets are acknowledged */
1740 if (call->tfirst + call->nSoftAcked < call->tnext) {
1741 call->state = RX_STATE_HOLD;
1743 call->state = RX_STATE_DALLY;
1744 rxi_ClearTransmitQueue(call, 0);
1745 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1746 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1749 else { /* Client connection */
1751 /* Make sure server receives input packets, in the case where
1752 * no reply arguments are expected */
1753 if ((call->mode == RX_MODE_SENDING)
1754 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1755 (void) rxi_ReadProc(call, &dummy, 1);
1757 /* We need to release the call lock since it's lower than the
1758 * conn_call_lock and we don't want to hold the conn_call_lock
1759 * over the rx_ReadProc call. The conn_call_lock needs to be held
1760 * here for the case where rx_NewCall is perusing the calls on
1761 * the connection structure. We don't want to signal until
1762 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1763 * have checked this call, found it active and by the time it
1764 * goes to sleep, will have missed the signal.
1766 MUTEX_EXIT(&call->lock);
1767 MUTEX_ENTER(&conn->conn_call_lock);
1768 MUTEX_ENTER(&call->lock);
1769 MUTEX_ENTER(&conn->conn_data_lock);
1770 conn->flags |= RX_CONN_BUSY;
1771 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1772 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1773 MUTEX_EXIT(&conn->conn_data_lock);
1774 #ifdef RX_ENABLE_LOCKS
1775 CV_BROADCAST(&conn->conn_call_cv);
1780 #ifdef RX_ENABLE_LOCKS
1782 MUTEX_EXIT(&conn->conn_data_lock);
1784 #endif /* RX_ENABLE_LOCKS */
1785 call->state = RX_STATE_DALLY;
1787 error = call->error;
1789 /* currentPacket, nLeft, and NFree must be zeroed here, because
1790 * ResetCall cannot: ResetCall may be called at splnet(), in the
1791 * kernel version, and may interrupt the macros rx_Read or
1792 * rx_Write, which run at normal priority for efficiency. */
1793 if (call->currentPacket) {
1794 rxi_FreePacket(call->currentPacket);
1795 call->currentPacket = (struct rx_packet *) 0;
1796 call->nLeft = call->nFree = call->curlen = 0;
1799 call->nLeft = call->nFree = call->curlen = 0;
1801 /* Free any packets from the last call to ReadvProc/WritevProc */
1802 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1807 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1808 MUTEX_EXIT(&call->lock);
1809 if (conn->type == RX_CLIENT_CONNECTION) {
1810 MUTEX_EXIT(&conn->conn_call_lock);
1811 conn->flags &= ~RX_CONN_BUSY;
1816 * Map errors to the local host's errno.h format.
1818 error = ntoh_syserr_conv(error);
1822 #if !defined(KERNEL)
1824 /* Call this routine when shutting down a server or client (especially
1825 * clients). This will allow Rx to gracefully garbage collect server
1826 * connections, and reduce the number of retries that a server might
1827 * make to a dead client.
1828 * This is not quite right, since some calls may still be ongoing and
1829 * we can't lock them to destroy them. */
1830 void rx_Finalize() {
1831 register struct rx_connection **conn_ptr, **conn_end;
1835 if (rxinit_status == 1) {
1837 return; /* Already shutdown. */
1839 rxi_DeleteCachedConnections();
1840 if (rx_connHashTable) {
1841 MUTEX_ENTER(&rx_connHashTable_lock);
1842 for (conn_ptr = &rx_connHashTable[0],
1843 conn_end = &rx_connHashTable[rx_hashTableSize];
1844 conn_ptr < conn_end; conn_ptr++) {
1845 struct rx_connection *conn, *next;
1846 for (conn = *conn_ptr; conn; conn = next) {
1848 if (conn->type == RX_CLIENT_CONNECTION) {
1849 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1851 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1852 #ifdef RX_ENABLE_LOCKS
1853 rxi_DestroyConnectionNoLock(conn);
1854 #else /* RX_ENABLE_LOCKS */
1855 rxi_DestroyConnection(conn);
1856 #endif /* RX_ENABLE_LOCKS */
1860 #ifdef RX_ENABLE_LOCKS
1861 while (rx_connCleanup_list) {
1862 struct rx_connection *conn;
1863 conn = rx_connCleanup_list;
1864 rx_connCleanup_list = rx_connCleanup_list->next;
1865 MUTEX_EXIT(&rx_connHashTable_lock);
1866 rxi_CleanupConnection(conn);
1867 MUTEX_ENTER(&rx_connHashTable_lock);
1869 MUTEX_EXIT(&rx_connHashTable_lock);
1870 #endif /* RX_ENABLE_LOCKS */
1879 /* if we wakeup packet waiter too often, can get in loop with two
1880 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1882 rxi_PacketsUnWait() {
1884 if (!rx_waitingForPackets) {
1888 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1889 return; /* still over quota */
1892 rx_waitingForPackets = 0;
1893 #ifdef RX_ENABLE_LOCKS
1894 CV_BROADCAST(&rx_waitingForPackets_cv);
1896 osi_rxWakeup(&rx_waitingForPackets);
1902 /* ------------------Internal interfaces------------------------- */
1904 /* Return this process's service structure for the
1905 * specified socket and service */
1906 struct rx_service *rxi_FindService(socket, serviceId)
1907 register osi_socket socket;
1908 register u_short serviceId;
1910 register struct rx_service **sp;
1911 for (sp = &rx_services[0]; *sp; sp++) {
1912 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1918 /* Allocate a call structure, for the indicated channel of the
1919 * supplied connection. The mode and state of the call must be set by
1921 struct rx_call *rxi_NewCall(conn, channel)
1922 register struct rx_connection *conn;
1923 register int channel;
1925 register struct rx_call *call;
1926 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1927 register struct rx_call *cp; /* Call pointer temp */
1928 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1929 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1931 /* Grab an existing call structure, or allocate a new one.
1932 * Existing call structures are assumed to have been left reset by
1934 MUTEX_ENTER(&rx_freeCallQueue_lock);
1936 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1938 * EXCEPT that the TQ might not yet be cleared out.
1939 * Skip over those with in-use TQs.
1942 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1943 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1949 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1950 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1951 call = queue_First(&rx_freeCallQueue, rx_call);
1952 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1954 MUTEX_ENTER(&rx_stats_mutex);
1955 rx_stats.nFreeCallStructs--;
1956 MUTEX_EXIT(&rx_stats_mutex);
1957 MUTEX_EXIT(&rx_freeCallQueue_lock);
1958 MUTEX_ENTER(&call->lock);
1959 CLEAR_CALL_QUEUE_LOCK(call);
1960 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1961 /* Now, if TQ wasn't cleared earlier, do it now. */
1962 if (call->flags & RX_CALL_TQ_CLEARME) {
1963 rxi_ClearTransmitQueue(call, 0);
1964 queue_Init(&call->tq);
1966 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1967 /* Bind the call to its connection structure */
1969 rxi_ResetCall(call, 1);
1972 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1974 MUTEX_EXIT(&rx_freeCallQueue_lock);
1975 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1976 MUTEX_ENTER(&call->lock);
1977 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1978 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1979 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1981 MUTEX_ENTER(&rx_stats_mutex);
1982 rx_stats.nCallStructs++;
1983 MUTEX_EXIT(&rx_stats_mutex);
1984 /* Initialize once-only items */
1985 queue_Init(&call->tq);
1986 queue_Init(&call->rq);
1987 queue_Init(&call->iovq);
1988 /* Bind the call to its connection structure (prereq for reset) */
1990 rxi_ResetCall(call, 1);
1992 call->channel = channel;
1993 call->callNumber = &conn->callNumber[channel];
1994 /* Note that the next expected call number is retained (in
1995 * conn->callNumber[i]), even if we reallocate the call structure
1997 conn->call[channel] = call;
1998 /* if the channel's never been used (== 0), we should start at 1, otherwise
1999 the call number is valid from the last time this channel was used */
2000 if (*call->callNumber == 0) *call->callNumber = 1;
2002 MUTEX_EXIT(&call->lock);
2006 /* A call has been inactive long enough that so we can throw away
2007 * state, including the call structure, which is placed on the call
2009 * Call is locked upon entry.
2011 #ifdef RX_ENABLE_LOCKS
2012 void rxi_FreeCall(call, haveCTLock)
2013 int haveCTLock; /* Set if called from rxi_ReapConnections */
2014 #else /* RX_ENABLE_LOCKS */
2015 void rxi_FreeCall(call)
2016 #endif /* RX_ENABLE_LOCKS */
2017 register struct rx_call *call;
2019 register int channel = call->channel;
2020 register struct rx_connection *conn = call->conn;
2023 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2024 (*call->callNumber)++;
2025 rxi_ResetCall(call, 0);
2026 call->conn->call[channel] = (struct rx_call *) 0;
2028 MUTEX_ENTER(&rx_freeCallQueue_lock);
2029 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2030 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2031 /* A call may be free even though its transmit queue is still in use.
2032 * Since we search the call list from head to tail, put busy calls at
2033 * the head of the list, and idle calls at the tail.
2035 if (call->flags & RX_CALL_TQ_BUSY)
2036 queue_Prepend(&rx_freeCallQueue, call);
2038 queue_Append(&rx_freeCallQueue, call);
2039 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2040 queue_Append(&rx_freeCallQueue, call);
2041 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2042 MUTEX_ENTER(&rx_stats_mutex);
2043 rx_stats.nFreeCallStructs++;
2044 MUTEX_EXIT(&rx_stats_mutex);
2046 MUTEX_EXIT(&rx_freeCallQueue_lock);
2048 /* Destroy the connection if it was previously slated for
2049 * destruction, i.e. the Rx client code previously called
2050 * rx_DestroyConnection (client connections), or
2051 * rxi_ReapConnections called the same routine (server
2052 * connections). Only do this, however, if there are no
2053 * outstanding calls. Note that for fine grain locking, there appears
2054 * to be a deadlock in that rxi_FreeCall has a call locked and
2055 * DestroyConnectionNoLock locks each call in the conn. But note a
2056 * few lines up where we have removed this call from the conn.
2057 * If someone else destroys a connection, they either have no
2058 * call lock held or are going through this section of code.
2060 if (conn->flags & RX_CONN_DESTROY_ME) {
2061 MUTEX_ENTER(&conn->conn_data_lock);
2063 MUTEX_EXIT(&conn->conn_data_lock);
2064 #ifdef RX_ENABLE_LOCKS
2066 rxi_DestroyConnectionNoLock(conn);
2068 rxi_DestroyConnection(conn);
2069 #else /* RX_ENABLE_LOCKS */
2070 rxi_DestroyConnection(conn);
2071 #endif /* RX_ENABLE_LOCKS */
2075 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2076 char *rxi_Alloc(size)
2077 register size_t size;
2081 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2082 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2085 int glockOwner = ISAFS_GLOCK();
2089 MUTEX_ENTER(&rx_stats_mutex);
2090 rxi_Alloccnt++; rxi_Allocsize += size;
2091 MUTEX_EXIT(&rx_stats_mutex);
2092 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2093 if (size > AFS_SMALLOCSIZ) {
2094 p = (char *) osi_AllocMediumSpace(size);
2096 p = (char *) osi_AllocSmall(size, 1);
2097 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2102 p = (char *) osi_Alloc(size);
2104 if (!p) osi_Panic("rxi_Alloc error");
2109 void rxi_Free(addr, size)
2111 register size_t size;
2113 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2114 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2117 int glockOwner = ISAFS_GLOCK();
2121 MUTEX_ENTER(&rx_stats_mutex);
2122 rxi_Alloccnt--; rxi_Allocsize -= size;
2123 MUTEX_EXIT(&rx_stats_mutex);
2124 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2125 if (size > AFS_SMALLOCSIZ)
2126 osi_FreeMediumSpace(addr);
2128 osi_FreeSmall(addr);
2129 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2134 osi_Free(addr, size);
2138 /* Find the peer process represented by the supplied (host,port)
2139 * combination. If there is no appropriate active peer structure, a
2140 * new one will be allocated and initialized
2141 * The origPeer, if set, is a pointer to a peer structure on which the
2142 * refcount will be be decremented. This is used to replace the peer
2143 * structure hanging off a connection structure */
2144 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2145 register afs_uint32 host;
2146 register u_short port;
2147 struct rx_peer *origPeer;
2150 register struct rx_peer *pp;
2152 hashIndex = PEER_HASH(host, port);
2153 MUTEX_ENTER(&rx_peerHashTable_lock);
2154 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2155 if ((pp->host == host) && (pp->port == port)) break;
2159 pp = rxi_AllocPeer(); /* This bzero's *pp */
2160 pp->host = host; /* set here or in InitPeerParams is zero */
2162 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2163 queue_Init(&pp->congestionQueue);
2164 queue_Init(&pp->rpcStats);
2165 pp->next = rx_peerHashTable[hashIndex];
2166 rx_peerHashTable[hashIndex] = pp;
2167 rxi_InitPeerParams(pp);
2168 MUTEX_ENTER(&rx_stats_mutex);
2169 rx_stats.nPeerStructs++;
2170 MUTEX_EXIT(&rx_stats_mutex);
2177 origPeer->refCount--;
2178 MUTEX_EXIT(&rx_peerHashTable_lock);
2183 /* Find the connection at (host, port) started at epoch, and with the
2184 * given connection id. Creates the server connection if necessary.
2185 * The type specifies whether a client connection or a server
2186 * connection is desired. In both cases, (host, port) specify the
2187 * peer's (host, pair) pair. Client connections are not made
2188 * automatically by this routine. The parameter socket gives the
2189 * socket descriptor on which the packet was received. This is used,
2190 * in the case of server connections, to check that *new* connections
2191 * come via a valid (port, serviceId). Finally, the securityIndex
2192 * parameter must match the existing index for the connection. If a
2193 * server connection is created, it will be created using the supplied
2194 * index, if the index is valid for this service */
2195 struct rx_connection *
2196 rxi_FindConnection(socket, host, port, serviceId, cid,
2197 epoch, type, securityIndex)
2199 register afs_int32 host;
2200 register u_short port;
2205 u_int securityIndex;
2207 int hashindex, flag;
2208 register struct rx_connection *conn;
2209 struct rx_peer *peer;
2210 hashindex = CONN_HASH(host, port, cid, epoch, type);
2211 MUTEX_ENTER(&rx_connHashTable_lock);
2212 rxLastConn ? (conn = rxLastConn, flag = 0) :
2213 (conn = rx_connHashTable[hashindex], flag = 1);
2215 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2216 && (epoch == conn->epoch)) {
2217 register struct rx_peer *pp = conn->peer;
2218 if (securityIndex != conn->securityIndex) {
2219 /* this isn't supposed to happen, but someone could forge a packet
2220 like this, and there seems to be some CM bug that makes this
2221 happen from time to time -- in which case, the fileserver
2223 MUTEX_EXIT(&rx_connHashTable_lock);
2224 return (struct rx_connection *) 0;
2226 /* epoch's high order bits mean route for security reasons only on
2227 * the cid, not the host and port fields.
2229 if (conn->epoch & 0x80000000) break;
2230 if (((type == RX_CLIENT_CONNECTION)
2231 || (pp->host == host)) && (pp->port == port))
2236 /* the connection rxLastConn that was used the last time is not the
2237 ** one we are looking for now. Hence, start searching in the hash */
2239 conn = rx_connHashTable[hashindex];
2245 struct rx_service *service;
2246 if (type == RX_CLIENT_CONNECTION) {
2247 MUTEX_EXIT(&rx_connHashTable_lock);
2248 return (struct rx_connection *) 0;
2250 service = rxi_FindService(socket, serviceId);
2251 if (!service || (securityIndex >= service->nSecurityObjects)
2252 || (service->securityObjects[securityIndex] == 0)) {
2253 MUTEX_EXIT(&rx_connHashTable_lock);
2254 return (struct rx_connection *) 0;
2256 conn = rxi_AllocConnection(); /* This bzero's the connection */
2257 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2259 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2261 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2262 conn->next = rx_connHashTable[hashindex];
2263 rx_connHashTable[hashindex] = conn;
2264 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2265 conn->type = RX_SERVER_CONNECTION;
2266 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2267 conn->epoch = epoch;
2268 conn->cid = cid & RX_CIDMASK;
2269 /* conn->serial = conn->lastSerial = 0; */
2270 /* conn->timeout = 0; */
2271 conn->ackRate = RX_FAST_ACK_RATE;
2272 conn->service = service;
2273 conn->serviceId = serviceId;
2274 conn->securityIndex = securityIndex;
2275 conn->securityObject = service->securityObjects[securityIndex];
2276 conn->nSpecific = 0;
2277 conn->specific = NULL;
2278 rx_SetConnDeadTime(conn, service->connDeadTime);
2279 /* Notify security object of the new connection */
2280 RXS_NewConnection(conn->securityObject, conn);
2281 /* XXXX Connection timeout? */
2282 if (service->newConnProc) (*service->newConnProc)(conn);
2283 MUTEX_ENTER(&rx_stats_mutex);
2284 rx_stats.nServerConns++;
2285 MUTEX_EXIT(&rx_stats_mutex);
2289 /* Ensure that the peer structure is set up in such a way that
2290 ** replies in this connection go back to that remote interface
2291 ** from which the last packet was sent out. In case, this packet's
2292 ** source IP address does not match the peer struct for this conn,
2293 ** then drop the refCount on conn->peer and get a new peer structure.
2294 ** We can check the host,port field in the peer structure without the
2295 ** rx_peerHashTable_lock because the peer structure has its refCount
2296 ** incremented and the only time the host,port in the peer struct gets
2297 ** updated is when the peer structure is created.
2299 if (conn->peer->host == host )
2300 peer = conn->peer; /* no change to the peer structure */
2302 peer = rxi_FindPeer(host, port, conn->peer, 1);
2305 MUTEX_ENTER(&conn->conn_data_lock);
2308 MUTEX_EXIT(&conn->conn_data_lock);
2310 rxLastConn = conn; /* store this connection as the last conn used */
2311 MUTEX_EXIT(&rx_connHashTable_lock);
2315 /* There are two packet tracing routines available for testing and monitoring
2316 * Rx. One is called just after every packet is received and the other is
2317 * called just before every packet is sent. Received packets, have had their
2318 * headers decoded, and packets to be sent have not yet had their headers
2319 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2320 * containing the network address. Both can be modified. The return value, if
2321 * non-zero, indicates that the packet should be dropped. */
2323 int (*rx_justReceived)() = 0;
2324 int (*rx_almostSent)() = 0;
2326 /* A packet has been received off the interface. Np is the packet, socket is
2327 * the socket number it was received from (useful in determining which service
2328 * this packet corresponds to), and (host, port) reflect the host,port of the
2329 * sender. This call returns the packet to the caller if it is finished with
2330 * it, rather than de-allocating it, just as a small performance hack */
2332 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2333 register struct rx_packet *np;
2338 struct rx_call **newcallp;
2340 register struct rx_call *call;
2341 register struct rx_connection *conn;
2343 afs_uint32 currentCallNumber;
2349 struct rx_packet *tnp;
2352 /* We don't print out the packet until now because (1) the time may not be
2353 * accurate enough until now in the lwp implementation (rx_Listener only gets
2354 * the time after the packet is read) and (2) from a protocol point of view,
2355 * this is the first time the packet has been seen */
2356 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2357 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2358 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2359 np->header.serial, packetType, host, port, np->header.serviceId,
2360 np->header.epoch, np->header.cid, np->header.callNumber,
2361 np->header.seq, np->header.flags, np));
2364 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2365 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2368 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2369 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2372 /* If an input tracer function is defined, call it with the packet and
2373 * network address. Note this function may modify its arguments. */
2374 if (rx_justReceived) {
2375 struct sockaddr_in addr;
2377 addr.sin_family = AF_INET;
2378 addr.sin_port = port;
2379 addr.sin_addr.s_addr = host;
2380 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2381 addr.sin_len = sizeof(addr);
2382 #endif /* AFS_OSF_ENV */
2383 drop = (*rx_justReceived) (np, &addr);
2384 /* drop packet if return value is non-zero */
2385 if (drop) return np;
2386 port = addr.sin_port; /* in case fcn changed addr */
2387 host = addr.sin_addr.s_addr;
2391 /* If packet was not sent by the client, then *we* must be the client */
2392 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2393 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2395 /* Find the connection (or fabricate one, if we're the server & if
2396 * necessary) associated with this packet */
2397 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2398 np->header.cid, np->header.epoch, type,
2399 np->header.securityIndex);
2402 /* If no connection found or fabricated, just ignore the packet.
2403 * (An argument could be made for sending an abort packet for
2408 MUTEX_ENTER(&conn->conn_data_lock);
2409 if (conn->maxSerial < np->header.serial)
2410 conn->maxSerial = np->header.serial;
2411 MUTEX_EXIT(&conn->conn_data_lock);
2413 /* If the connection is in an error state, send an abort packet and ignore
2414 * the incoming packet */
2416 /* Don't respond to an abort packet--we don't want loops! */
2417 MUTEX_ENTER(&conn->conn_data_lock);
2418 if (np->header.type != RX_PACKET_TYPE_ABORT)
2419 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2421 MUTEX_EXIT(&conn->conn_data_lock);
2425 /* Check for connection-only requests (i.e. not call specific). */
2426 if (np->header.callNumber == 0) {
2427 switch (np->header.type) {
2428 case RX_PACKET_TYPE_ABORT:
2429 /* What if the supplied error is zero? */
2430 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2431 MUTEX_ENTER(&conn->conn_data_lock);
2433 MUTEX_EXIT(&conn->conn_data_lock);
2435 case RX_PACKET_TYPE_CHALLENGE:
2436 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2437 MUTEX_ENTER(&conn->conn_data_lock);
2439 MUTEX_EXIT(&conn->conn_data_lock);
2441 case RX_PACKET_TYPE_RESPONSE:
2442 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2443 MUTEX_ENTER(&conn->conn_data_lock);
2445 MUTEX_EXIT(&conn->conn_data_lock);
2447 case RX_PACKET_TYPE_PARAMS:
2448 case RX_PACKET_TYPE_PARAMS+1:
2449 case RX_PACKET_TYPE_PARAMS+2:
2450 /* ignore these packet types for now */
2451 MUTEX_ENTER(&conn->conn_data_lock);
2453 MUTEX_EXIT(&conn->conn_data_lock);
2458 /* Should not reach here, unless the peer is broken: send an
2460 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2461 MUTEX_ENTER(&conn->conn_data_lock);
2462 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2464 MUTEX_EXIT(&conn->conn_data_lock);
2469 channel = np->header.cid & RX_CHANNELMASK;
2470 call = conn->call[channel];
2471 #ifdef RX_ENABLE_LOCKS
2473 MUTEX_ENTER(&call->lock);
2474 /* Test to see if call struct is still attached to conn. */
2475 if (call != conn->call[channel]) {
2477 MUTEX_EXIT(&call->lock);
2478 if (type == RX_SERVER_CONNECTION) {
2479 call = conn->call[channel];
2480 /* If we started with no call attached and there is one now,
2481 * another thread is also running this routine and has gotten
2482 * the connection channel. We should drop this packet in the tests
2483 * below. If there was a call on this connection and it's now
2484 * gone, then we'll be making a new call below.
2485 * If there was previously a call and it's now different then
2486 * the old call was freed and another thread running this routine
2487 * has created a call on this channel. One of these two threads
2488 * has a packet for the old call and the code below handles those
2492 MUTEX_ENTER(&call->lock);
2495 /* This packet can't be for this call. If the new call address is
2496 * 0 then no call is running on this channel. If there is a call
2497 * then, since this is a client connection we're getting data for
2498 * it must be for the previous call.
2500 MUTEX_ENTER(&rx_stats_mutex);
2501 rx_stats.spuriousPacketsRead++;
2502 MUTEX_EXIT(&rx_stats_mutex);
2503 MUTEX_ENTER(&conn->conn_data_lock);
2505 MUTEX_EXIT(&conn->conn_data_lock);
2510 currentCallNumber = conn->callNumber[channel];
2512 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2513 if (np->header.callNumber < currentCallNumber) {
2514 MUTEX_ENTER(&rx_stats_mutex);
2515 rx_stats.spuriousPacketsRead++;
2516 MUTEX_EXIT(&rx_stats_mutex);
2517 #ifdef RX_ENABLE_LOCKS
2519 MUTEX_EXIT(&call->lock);
2521 MUTEX_ENTER(&conn->conn_data_lock);
2523 MUTEX_EXIT(&conn->conn_data_lock);
2527 call = rxi_NewCall(conn, channel);
2528 MUTEX_ENTER(&call->lock);
2529 *call->callNumber = np->header.callNumber;
2530 call->state = RX_STATE_PRECALL;
2531 clock_GetTime(&call->queueTime);
2532 hzero(call->bytesSent);
2533 hzero(call->bytesRcvd);
2534 rxi_KeepAliveOn(call);
2536 else if (np->header.callNumber != currentCallNumber) {
2537 /* Wait until the transmit queue is idle before deciding
2538 * whether to reset the current call. Chances are that the
2539 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2542 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2543 while ((call->state == RX_STATE_ACTIVE) &&
2544 (call->flags & RX_CALL_TQ_BUSY)) {
2545 call->flags |= RX_CALL_TQ_WAIT;
2546 #ifdef RX_ENABLE_LOCKS
2547 CV_WAIT(&call->cv_tq, &call->lock);
2548 #else /* RX_ENABLE_LOCKS */
2549 osi_rxSleep(&call->tq);
2550 #endif /* RX_ENABLE_LOCKS */
2552 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2553 /* If the new call cannot be taken right now send a busy and set
2554 * the error condition in this call, so that it terminates as
2555 * quickly as possible */
2556 if (call->state == RX_STATE_ACTIVE) {
2557 struct rx_packet *tp;
2559 rxi_CallError(call, RX_CALL_DEAD);
2560 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2561 MUTEX_EXIT(&call->lock);
2562 MUTEX_ENTER(&conn->conn_data_lock);
2564 MUTEX_EXIT(&conn->conn_data_lock);
2567 rxi_ResetCall(call, 0);
2568 *call->callNumber = np->header.callNumber;
2569 call->state = RX_STATE_PRECALL;
2570 clock_GetTime(&call->queueTime);
2571 hzero(call->bytesSent);
2572 hzero(call->bytesRcvd);
2574 * If the number of queued calls exceeds the overload
2575 * threshold then abort this call.
2577 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2578 struct rx_packet *tp;
2580 rxi_CallError(call, rx_BusyError);
2581 tp = rxi_SendCallAbort(call, np, 1, 0);
2582 MUTEX_EXIT(&call->lock);
2583 MUTEX_ENTER(&conn->conn_data_lock);
2585 MUTEX_EXIT(&conn->conn_data_lock);
2588 rxi_KeepAliveOn(call);
2591 /* Continuing call; do nothing here. */
2593 } else { /* we're the client */
2594 /* Ignore all incoming acknowledgements for calls in DALLY state */
2595 if ( call && (call->state == RX_STATE_DALLY)
2596 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2597 MUTEX_ENTER(&rx_stats_mutex);
2598 rx_stats.ignorePacketDally++;
2599 MUTEX_EXIT(&rx_stats_mutex);
2600 #ifdef RX_ENABLE_LOCKS
2602 MUTEX_EXIT(&call->lock);
2605 MUTEX_ENTER(&conn->conn_data_lock);
2607 MUTEX_EXIT(&conn->conn_data_lock);
2611 /* Ignore anything that's not relevant to the current call. If there
2612 * isn't a current call, then no packet is relevant. */
2613 if (!call || (np->header.callNumber != currentCallNumber)) {
2614 MUTEX_ENTER(&rx_stats_mutex);
2615 rx_stats.spuriousPacketsRead++;
2616 MUTEX_EXIT(&rx_stats_mutex);
2617 #ifdef RX_ENABLE_LOCKS
2619 MUTEX_EXIT(&call->lock);
2622 MUTEX_ENTER(&conn->conn_data_lock);
2624 MUTEX_EXIT(&conn->conn_data_lock);
2627 /* If the service security object index stamped in the packet does not
2628 * match the connection's security index, ignore the packet */
2629 if (np->header.securityIndex != conn->securityIndex) {
2630 #ifdef RX_ENABLE_LOCKS
2631 MUTEX_EXIT(&call->lock);
2633 MUTEX_ENTER(&conn->conn_data_lock);
2635 MUTEX_EXIT(&conn->conn_data_lock);
2639 /* If we're receiving the response, then all transmit packets are
2640 * implicitly acknowledged. Get rid of them. */
2641 if (np->header.type == RX_PACKET_TYPE_DATA) {
2642 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2643 /* XXX Hack. Because we must release the global rx lock when
2644 * sending packets (osi_NetSend) we drop all acks while we're
2645 * traversing the tq in rxi_Start sending packets out because
2646 * packets may move to the freePacketQueue as result of being here!
2647 * So we drop these packets until we're safely out of the
2648 * traversing. Really ugly!
2649 * For fine grain RX locking, we set the acked field in the
2650 * packets and let rxi_Start remove them from the transmit queue.
2652 if (call->flags & RX_CALL_TQ_BUSY) {
2653 #ifdef RX_ENABLE_LOCKS
2654 rxi_SetAcksInTransmitQueue(call);
2657 return np; /* xmitting; drop packet */
2661 rxi_ClearTransmitQueue(call, 0);
2663 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2664 rxi_ClearTransmitQueue(call, 0);
2665 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2667 if (np->header.type == RX_PACKET_TYPE_ACK) {
2668 /* now check to see if this is an ack packet acknowledging that the
2669 * server actually *lost* some hard-acked data. If this happens we
2670 * ignore this packet, as it may indicate that the server restarted in
2671 * the middle of a call. It is also possible that this is an old ack
2672 * packet. We don't abort the connection in this case, because this
2673 * *might* just be an old ack packet. The right way to detect a server
2674 * restart in the midst of a call is to notice that the server epoch
2676 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2677 * XXX unacknowledged. I think that this is off-by-one, but
2678 * XXX I don't dare change it just yet, since it will
2679 * XXX interact badly with the server-restart detection
2680 * XXX code in receiveackpacket. */
2681 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2682 MUTEX_ENTER(&rx_stats_mutex);
2683 rx_stats.spuriousPacketsRead++;
2684 MUTEX_EXIT(&rx_stats_mutex);
2685 MUTEX_EXIT(&call->lock);
2686 MUTEX_ENTER(&conn->conn_data_lock);
2688 MUTEX_EXIT(&conn->conn_data_lock);
2692 } /* else not a data packet */
2695 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2696 /* Set remote user defined status from packet */
2697 call->remoteStatus = np->header.userStatus;
2699 /* Note the gap between the expected next packet and the actual
2700 * packet that arrived, when the new packet has a smaller serial number
2701 * than expected. Rioses frequently reorder packets all by themselves,
2702 * so this will be quite important with very large window sizes.
2703 * Skew is checked against 0 here to avoid any dependence on the type of
2704 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2706 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2707 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2708 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2710 MUTEX_ENTER(&conn->conn_data_lock);
2711 skew = conn->lastSerial - np->header.serial;
2712 conn->lastSerial = np->header.serial;
2713 MUTEX_EXIT(&conn->conn_data_lock);
2715 register struct rx_peer *peer;
2717 if (skew > peer->inPacketSkew) {
2718 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2719 peer->inPacketSkew = skew;
2723 /* Now do packet type-specific processing */
2724 switch (np->header.type) {
2725 case RX_PACKET_TYPE_DATA:
2726 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2729 case RX_PACKET_TYPE_ACK:
2730 /* Respond immediately to ack packets requesting acknowledgement
2732 if (np->header.flags & RX_REQUEST_ACK) {
2733 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2734 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2736 np = rxi_ReceiveAckPacket(call, np, 1);
2738 case RX_PACKET_TYPE_ABORT:
2739 /* An abort packet: reset the connection, passing the error up to
2741 /* What if error is zero? */
2742 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2744 case RX_PACKET_TYPE_BUSY:
2747 case RX_PACKET_TYPE_ACKALL:
2748 /* All packets acknowledged, so we can drop all packets previously
2749 * readied for sending */
2750 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2751 /* XXX Hack. We because we can't release the global rx lock when
2752 * sending packets (osi_NetSend) we drop all ack pkts while we're
2753 * traversing the tq in rxi_Start sending packets out because
2754 * packets may move to the freePacketQueue as result of being
2755 * here! So we drop these packets until we're safely out of the
2756 * traversing. Really ugly!
2757 * For fine grain RX locking, we set the acked field in the packets
2758 * and let rxi_Start remove the packets from the transmit queue.
2760 if (call->flags & RX_CALL_TQ_BUSY) {
2761 #ifdef RX_ENABLE_LOCKS
2762 rxi_SetAcksInTransmitQueue(call);
2764 #else /* RX_ENABLE_LOCKS */
2766 return np; /* xmitting; drop packet */
2767 #endif /* RX_ENABLE_LOCKS */
2769 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2770 rxi_ClearTransmitQueue(call, 0);
2773 /* Should not reach here, unless the peer is broken: send an abort
2775 rxi_CallError(call, RX_PROTOCOL_ERROR);
2776 np = rxi_SendCallAbort(call, np, 1, 0);
2779 /* Note when this last legitimate packet was received, for keep-alive
2780 * processing. Note, we delay getting the time until now in the hope that
2781 * the packet will be delivered to the user before any get time is required
2782 * (if not, then the time won't actually be re-evaluated here). */
2783 call->lastReceiveTime = clock_Sec();
2784 MUTEX_EXIT(&call->lock);
2785 MUTEX_ENTER(&conn->conn_data_lock);
2787 MUTEX_EXIT(&conn->conn_data_lock);
2791 /* return true if this is an "interesting" connection from the point of view
2792 of someone trying to debug the system */
2793 int rxi_IsConnInteresting(struct rx_connection *aconn)
2796 register struct rx_call *tcall;
2798 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2800 for(i=0;i<RX_MAXCALLS;i++) {
2801 tcall = aconn->call[i];
2803 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2805 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2813 /* if this is one of the last few packets AND it wouldn't be used by the
2814 receiving call to immediately satisfy a read request, then drop it on
2815 the floor, since accepting it might prevent a lock-holding thread from
2816 making progress in its reading. If a call has been cleared while in
2817 the precall state then ignore all subsequent packets until the call
2818 is assigned to a thread. */
2820 static TooLow(ap, acall)
2821 struct rx_call *acall;
2822 struct rx_packet *ap; {
2824 MUTEX_ENTER(&rx_stats_mutex);
2825 if (((ap->header.seq != 1) &&
2826 (acall->flags & RX_CALL_CLEARED) &&
2827 (acall->state == RX_STATE_PRECALL)) ||
2828 ((rx_nFreePackets < rxi_dataQuota+2) &&
2829 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2830 && (acall->flags & RX_CALL_READER_WAIT)))) {
2833 MUTEX_EXIT(&rx_stats_mutex);
2838 /* try to attach call, if authentication is complete */
2839 static void TryAttach(acall, socket, tnop, newcallp)
2840 register struct rx_call *acall;
2841 register osi_socket socket;
2843 register struct rx_call **newcallp; {
2844 register struct rx_connection *conn;
2846 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2847 /* Don't attach until we have any req'd. authentication. */
2848 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2849 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2850 /* Note: this does not necessarily succeed; there
2851 may not any proc available */
2854 rxi_ChallengeOn(acall->conn);
2859 /* A data packet has been received off the interface. This packet is
2860 * appropriate to the call (the call is in the right state, etc.). This
2861 * routine can return a packet to the caller, for re-use */
2863 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2864 port, tnop, newcallp)
2865 register struct rx_call *call;
2866 register struct rx_packet *np;
2872 struct rx_call **newcallp;
2878 afs_uint32 seq, serial, flags;
2880 struct rx_packet *tnp;
2882 MUTEX_ENTER(&rx_stats_mutex);
2883 rx_stats.dataPacketsRead++;
2884 MUTEX_EXIT(&rx_stats_mutex);
2887 /* If there are no packet buffers, drop this new packet, unless we can find
2888 * packet buffers from inactive calls */
2890 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2891 MUTEX_ENTER(&rx_freePktQ_lock);
2892 rxi_NeedMorePackets = TRUE;
2893 MUTEX_EXIT(&rx_freePktQ_lock);
2894 MUTEX_ENTER(&rx_stats_mutex);
2895 rx_stats.noPacketBuffersOnRead++;
2896 MUTEX_EXIT(&rx_stats_mutex);
2897 call->rprev = np->header.serial;
2898 rxi_calltrace(RX_TRACE_DROP, call);
2899 dpf (("packet %x dropped on receipt - quota problems", np));
2901 rxi_ClearReceiveQueue(call);
2902 clock_GetTime(&when);
2903 clock_Add(&when, &rx_softAckDelay);
2904 if (!call->delayedAckEvent ||
2905 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2906 rxevent_Cancel(call->delayedAckEvent, call,
2907 RX_CALL_REFCOUNT_DELAY);
2908 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2909 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2912 /* we've damaged this call already, might as well do it in. */
2918 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2919 * packet is one of several packets transmitted as a single
2920 * datagram. Do not send any soft or hard acks until all packets
2921 * in a jumbogram have been processed. Send negative acks right away.
2923 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2924 /* tnp is non-null when there are more packets in the
2925 * current jumbo gram */
2932 seq = np->header.seq;
2933 serial = np->header.serial;
2934 flags = np->header.flags;
2936 /* If the call is in an error state, send an abort message */
2938 return rxi_SendCallAbort(call, np, istack, 0);
2940 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2941 * AFS 3.5 jumbogram. */
2942 if (flags & RX_JUMBO_PACKET) {
2943 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2948 if (np->header.spare != 0) {
2949 MUTEX_ENTER(&call->conn->conn_data_lock);
2950 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2951 MUTEX_EXIT(&call->conn->conn_data_lock);
2954 /* The usual case is that this is the expected next packet */
2955 if (seq == call->rnext) {
2957 /* Check to make sure it is not a duplicate of one already queued */
2958 if (queue_IsNotEmpty(&call->rq)
2959 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2960 MUTEX_ENTER(&rx_stats_mutex);
2961 rx_stats.dupPacketsRead++;
2962 MUTEX_EXIT(&rx_stats_mutex);
2963 dpf (("packet %x dropped on receipt - duplicate", np));
2964 rxevent_Cancel(call->delayedAckEvent, call,
2965 RX_CALL_REFCOUNT_DELAY);
2966 np = rxi_SendAck(call, np, seq, serial,
2967 flags, RX_ACK_DUPLICATE, istack);
2973 /* It's the next packet. Stick it on the receive queue
2974 * for this call. Set newPackets to make sure we wake
2975 * the reader once all packets have been processed */
2976 queue_Prepend(&call->rq, np);
2978 np = NULL; /* We can't use this anymore */
2981 /* If an ack is requested then set a flag to make sure we
2982 * send an acknowledgement for this packet */
2983 if (flags & RX_REQUEST_ACK) {
2987 /* Keep track of whether we have received the last packet */
2988 if (flags & RX_LAST_PACKET) {
2989 call->flags |= RX_CALL_HAVE_LAST;
2993 /* Check whether we have all of the packets for this call */
2994 if (call->flags & RX_CALL_HAVE_LAST) {
2995 afs_uint32 tseq; /* temporary sequence number */
2996 struct rx_packet *tp; /* Temporary packet pointer */
2997 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2999 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3000 if (tseq != tp->header.seq)
3002 if (tp->header.flags & RX_LAST_PACKET) {
3003 call->flags |= RX_CALL_RECEIVE_DONE;
3010 /* Provide asynchronous notification for those who want it
3011 * (e.g. multi rx) */
3012 if (call->arrivalProc) {
3013 (*call->arrivalProc)(call, call->arrivalProcHandle,
3014 call->arrivalProcArg);
3015 call->arrivalProc = (VOID (*)()) 0;
3018 /* Update last packet received */
3021 /* If there is no server process serving this call, grab
3022 * one, if available. We only need to do this once. If a
3023 * server thread is available, this thread becomes a server
3024 * thread and the server thread becomes a listener thread. */
3026 TryAttach(call, socket, tnop, newcallp);
3029 /* This is not the expected next packet. */
3031 /* Determine whether this is a new or old packet, and if it's
3032 * a new one, whether it fits into the current receive window.
3033 * Also figure out whether the packet was delivered in sequence.
3034 * We use the prev variable to determine whether the new packet
3035 * is the successor of its immediate predecessor in the
3036 * receive queue, and the missing flag to determine whether
3037 * any of this packets predecessors are missing. */
3039 afs_uint32 prev; /* "Previous packet" sequence number */
3040 struct rx_packet *tp; /* Temporary packet pointer */
3041 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3042 int missing; /* Are any predecessors missing? */
3044 /* If the new packet's sequence number has been sent to the
3045 * application already, then this is a duplicate */
3046 if (seq < call->rnext) {
3047 MUTEX_ENTER(&rx_stats_mutex);
3048 rx_stats.dupPacketsRead++;
3049 MUTEX_EXIT(&rx_stats_mutex);
3050 rxevent_Cancel(call->delayedAckEvent, call,
3051 RX_CALL_REFCOUNT_DELAY);
3052 np = rxi_SendAck(call, np, seq, serial,
3053 flags, RX_ACK_DUPLICATE, istack);
3059 /* If the sequence number is greater than what can be
3060 * accomodated by the current window, then send a negative
3061 * acknowledge and drop the packet */
3062 if ((call->rnext + call->rwind) <= seq) {
3063 rxevent_Cancel(call->delayedAckEvent, call,
3064 RX_CALL_REFCOUNT_DELAY);
3065 np = rxi_SendAck(call, np, seq, serial,
3066 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3072 /* Look for the packet in the queue of old received packets */
3073 for (prev = call->rnext - 1, missing = 0,
3074 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3075 /*Check for duplicate packet */
3076 if (seq == tp->header.seq) {
3077 MUTEX_ENTER(&rx_stats_mutex);
3078 rx_stats.dupPacketsRead++;
3079 MUTEX_EXIT(&rx_stats_mutex);
3080 rxevent_Cancel(call->delayedAckEvent, call,
3081 RX_CALL_REFCOUNT_DELAY);
3082 np = rxi_SendAck(call, np, seq, serial,
3083 flags, RX_ACK_DUPLICATE, istack);
3088 /* If we find a higher sequence packet, break out and
3089 * insert the new packet here. */
3090 if (seq < tp->header.seq) break;
3091 /* Check for missing packet */
3092 if (tp->header.seq != prev+1) {
3096 prev = tp->header.seq;
3099 /* Keep track of whether we have received the last packet. */
3100 if (flags & RX_LAST_PACKET) {
3101 call->flags |= RX_CALL_HAVE_LAST;
3104 /* It's within the window: add it to the the receive queue.
3105 * tp is left by the previous loop either pointing at the
3106 * packet before which to insert the new packet, or at the
3107 * queue head if the queue is empty or the packet should be
3109 queue_InsertBefore(tp, np);
3113 /* Check whether we have all of the packets for this call */
3114 if ((call->flags & RX_CALL_HAVE_LAST)
3115 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3116 afs_uint32 tseq; /* temporary sequence number */
3118 for (tseq = call->rnext,
3119 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3120 if (tseq != tp->header.seq)
3122 if (tp->header.flags & RX_LAST_PACKET) {
3123 call->flags |= RX_CALL_RECEIVE_DONE;
3130 /* We need to send an ack of the packet is out of sequence,
3131 * or if an ack was requested by the peer. */
3132 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3136 /* Acknowledge the last packet for each call */
3137 if (flags & RX_LAST_PACKET) {
3148 * If the receiver is waiting for an iovec, fill the iovec
3149 * using the data from the receive queue */
3150 if (call->flags & RX_CALL_IOVEC_WAIT) {
3151 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3152 /* the call may have been aborted */
3161 /* Wakeup the reader if any */
3162 if ((call->flags & RX_CALL_READER_WAIT) &&
3163 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3164 (call->iovNext >= call->iovMax) ||
3165 (call->flags & RX_CALL_RECEIVE_DONE))) {
3166 call->flags &= ~RX_CALL_READER_WAIT;
3167 #ifdef RX_ENABLE_LOCKS
3168 CV_BROADCAST(&call->cv_rq);
3170 osi_rxWakeup(&call->rq);
3176 * Send an ack when requested by the peer, or once every
3177 * rxi_SoftAckRate packets until the last packet has been
3178 * received. Always send a soft ack for the last packet in
3179 * the server's reply. */
3181 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3182 np = rxi_SendAck(call, np, seq, serial, flags,
3183 RX_ACK_REQUESTED, istack);
3184 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3185 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3186 np = rxi_SendAck(call, np, seq, serial, flags,
3187 RX_ACK_IDLE, istack);
3188 } else if (call->nSoftAcks) {
3189 clock_GetTime(&when);
3190 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3191 clock_Add(&when, &rx_lastAckDelay);
3193 clock_Add(&when, &rx_softAckDelay);
3195 if (!call->delayedAckEvent ||
3196 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3197 rxevent_Cancel(call->delayedAckEvent, call,
3198 RX_CALL_REFCOUNT_DELAY);
3199 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3200 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3203 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3204 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3211 static void rxi_ComputeRate();
3214 /* The real smarts of the whole thing. */
3215 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3216 register struct rx_call *call;
3217 struct rx_packet *np;
3220 struct rx_ackPacket *ap;
3222 register struct rx_packet *tp;
3223 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3224 register struct rx_connection *conn = call->conn;
3225 struct rx_peer *peer = conn->peer;
3228 /* because there are CM's that are bogus, sending weird values for this. */
3229 afs_uint32 skew = 0;
3234 int newAckCount = 0;
3235 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3236 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3238 MUTEX_ENTER(&rx_stats_mutex);
3239 rx_stats.ackPacketsRead++;
3240 MUTEX_EXIT(&rx_stats_mutex);
3241 ap = (struct rx_ackPacket *) rx_DataOf(np);
3242 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3244 return np; /* truncated ack packet */
3246 /* depends on ack packet struct */
3247 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3248 first = ntohl(ap->firstPacket);
3249 serial = ntohl(ap->serial);
3250 /* temporarily disabled -- needs to degrade over time
3251 skew = ntohs(ap->maxSkew); */
3253 /* Ignore ack packets received out of order */
3254 if (first < call->tfirst) {
3258 if (np->header.flags & RX_SLOW_START_OK) {
3259 call->flags |= RX_CALL_SLOW_START_OK;
3265 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3266 ap->reason, ntohl(ap->previousPacket), np->header.seq, serial,
3267 skew, ntohl(ap->firstPacket));
3270 for (offset = 0; offset < nAcks; offset++)
3271 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3277 /* if a server connection has been re-created, it doesn't remember what
3278 serial # it was up to. An ack will tell us, since the serial field
3279 contains the largest serial received by the other side */
3280 MUTEX_ENTER(&conn->conn_data_lock);
3281 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3282 conn->serial = serial+1;
3284 MUTEX_EXIT(&conn->conn_data_lock);
3286 /* Update the outgoing packet skew value to the latest value of
3287 * the peer's incoming packet skew value. The ack packet, of
3288 * course, could arrive out of order, but that won't affect things
3290 MUTEX_ENTER(&peer->peer_lock);
3291 peer->outPacketSkew = skew;
3293 /* Check for packets that no longer need to be transmitted, and
3294 * discard them. This only applies to packets positively
3295 * acknowledged as having been sent to the peer's upper level.
3296 * All other packets must be retained. So only packets with
3297 * sequence numbers < ap->firstPacket are candidates. */
3298 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3299 if (tp->header.seq >= first) break;
3300 call->tfirst = tp->header.seq + 1;
3301 if (tp->header.serial == serial) {
3302 /* Use RTT if not delayed by client. */
3303 if (ap->reason != RX_ACK_DELAY)
3304 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3306 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3309 else if (tp->firstSerial == serial) {
3310 /* Use RTT if not delayed by client. */
3311 if (ap->reason != RX_ACK_DELAY)
3312 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3314 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3317 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3318 /* XXX Hack. Because we have to release the global rx lock when sending
3319 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3320 * in rxi_Start sending packets out because packets may move to the
3321 * freePacketQueue as result of being here! So we drop these packets until
3322 * we're safely out of the traversing. Really ugly!
3323 * To make it even uglier, if we're using fine grain locking, we can
3324 * set the ack bits in the packets and have rxi_Start remove the packets
3325 * when it's done transmitting.
3330 if (call->flags & RX_CALL_TQ_BUSY) {
3331 #ifdef RX_ENABLE_LOCKS
3333 call->flags |= RX_CALL_TQ_SOME_ACKED;
3334 #else /* RX_ENABLE_LOCKS */
3336 #endif /* RX_ENABLE_LOCKS */
3338 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3341 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3346 /* Give rate detector a chance to respond to ping requests */
3347 if (ap->reason == RX_ACK_PING_RESPONSE) {
3348 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3352 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3354 /* Now go through explicit acks/nacks and record the results in
3355 * the waiting packets. These are packets that can't be released
3356 * yet, even with a positive acknowledge. This positive
3357 * acknowledge only means the packet has been received by the
3358 * peer, not that it will be retained long enough to be sent to
3359 * the peer's upper level. In addition, reset the transmit timers
3360 * of any missing packets (those packets that must be missing
3361 * because this packet was out of sequence) */
3363 call->nSoftAcked = 0;
3364 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3365 /* Update round trip time if the ack was stimulated on receipt
3367 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3368 #ifdef RX_ENABLE_LOCKS
3369 if (tp->header.seq >= first) {
3370 #endif /* RX_ENABLE_LOCKS */
3371 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3372 if (tp->header.serial == serial) {
3373 /* Use RTT if not delayed by client. */
3374 if (ap->reason != RX_ACK_DELAY)
3375 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3377 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3380 else if ((tp->firstSerial == serial)) {
3381 /* Use RTT if not delayed by client. */
3382 if (ap->reason != RX_ACK_DELAY)
3383 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3385 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3388 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3389 #ifdef RX_ENABLE_LOCKS
3391 #endif /* RX_ENABLE_LOCKS */
3392 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3394 /* Set the acknowledge flag per packet based on the
3395 * information in the ack packet. An acknowlegded packet can
3396 * be downgraded when the server has discarded a packet it
3397 * soacked previously, or when an ack packet is received
3398 * out of sequence. */
3399 if (tp->header.seq < first) {
3400 /* Implicit ack information */
3406 else if (tp->header.seq < first + nAcks) {
3407 /* Explicit ack information: set it in the packet appropriately */
3408 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3428 /* If packet isn't yet acked, and it has been transmitted at least
3429 * once, reset retransmit time using latest timeout
3430 * ie, this should readjust the retransmit timer for all outstanding
3431 * packets... So we don't just retransmit when we should know better*/
3433 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3434 tp->retryTime = tp->timeSent;
3435 clock_Add(&tp->retryTime, &peer->timeout);
3436 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3437 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3441 /* If the window has been extended by this acknowledge packet,
3442 * then wakeup a sender waiting in alloc for window space, or try
3443 * sending packets now, if he's been sitting on packets due to
3444 * lack of window space */
3445 if (call->tnext < (call->tfirst + call->twind)) {
3446 #ifdef RX_ENABLE_LOCKS
3447 CV_SIGNAL(&call->cv_twind);
3449 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3450 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3451 osi_rxWakeup(&call->twind);
3454 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3455 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3459 /* if the ack packet has a receivelen field hanging off it,
3460 * update our state */
3461 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3464 /* If the ack packet has a "recommended" size that is less than
3465 * what I am using now, reduce my size to match */
3466 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3467 sizeof(afs_int32), &tSize);
3468 tSize = (afs_uint32) ntohl(tSize);
3469 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3471 /* Get the maximum packet size to send to this peer */
3472 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3474 tSize = (afs_uint32)ntohl(tSize);
3475 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3476 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3478 /* sanity check - peer might have restarted with different params.
3479 * If peer says "send less", dammit, send less... Peer should never
3480 * be unable to accept packets of the size that prior AFS versions would
3481 * send without asking. */
3482 if (peer->maxMTU != tSize) {
3483 peer->maxMTU = tSize;
3484 peer->MTU = MIN(tSize, peer->MTU);
3485 call->MTU = MIN(call->MTU, tSize);
3489 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3491 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3492 sizeof(afs_int32), &tSize);
3493 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3494 if (tSize < call->twind) { /* smaller than our send */
3495 call->twind = tSize; /* window, we must send less... */
3496 call->ssthresh = MIN(call->twind, call->ssthresh);
3499 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3500 * network MTU confused with the loopback MTU. Calculate the
3501 * maximum MTU here for use in the slow start code below.
3503 maxMTU = peer->maxMTU;
3504 /* Did peer restart with older RX version? */
3505 if (peer->maxDgramPackets > 1) {
3506 peer->maxDgramPackets = 1;
3508 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3510 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3511 sizeof(afs_int32), &tSize);
3512 tSize = (afs_uint32) ntohl(tSize);
3514 * As of AFS 3.5 we set the send window to match the receive window.
3516 if (tSize < call->twind) {
3517 call->twind = tSize;
3518 call->ssthresh = MIN(call->twind, call->ssthresh);
3519 } else if (tSize > call->twind) {
3520 call->twind = tSize;
3524 * As of AFS 3.5, a jumbogram is more than one fixed size
3525 * packet transmitted in a single UDP datagram. If the remote
3526 * MTU is smaller than our local MTU then never send a datagram
3527 * larger than the natural MTU.
3529 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3530 sizeof(afs_int32), &tSize);
3531 maxDgramPackets = (afs_uint32) ntohl(tSize);
3532 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3533 maxDgramPackets = MIN(maxDgramPackets,
3534 (int)(peer->ifDgramPackets));
3535 maxDgramPackets = MIN(maxDgramPackets, tSize);
3536 if (maxDgramPackets > 1) {
3537 peer->maxDgramPackets = maxDgramPackets;
3538 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3540 peer->maxDgramPackets = 1;
3541 call->MTU = peer->natMTU;
3543 } else if (peer->maxDgramPackets > 1) {
3544 /* Restarted with lower version of RX */
3545 peer->maxDgramPackets = 1;
3547 } else if (peer->maxDgramPackets > 1 ||
3548 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3549 /* Restarted with lower version of RX */
3550 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3551 peer->natMTU = OLD_MAX_PACKET_SIZE;
3552 peer->MTU = OLD_MAX_PACKET_SIZE;
3553 peer->maxDgramPackets = 1;
3554 peer->nDgramPackets = 1;
3556 call->MTU = OLD_MAX_PACKET_SIZE;
3561 * Calculate how many datagrams were successfully received after
3562 * the first missing packet and adjust the negative ack counter
3567 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3568 if (call->nNacks < nNacked) {
3569 call->nNacks = nNacked;
3578 if (call->flags & RX_CALL_FAST_RECOVER) {
3580 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3582 call->flags &= ~RX_CALL_FAST_RECOVER;
3583 call->cwind = call->nextCwind;
3584 call->nextCwind = 0;
3587 call->nCwindAcks = 0;
3589 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3590 /* Three negative acks in a row trigger congestion recovery */
3591 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3592 MUTEX_EXIT(&peer->peer_lock);
3593 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3594 /* someone else is waiting to start recovery */
3597 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3598 while (call->flags & RX_CALL_TQ_BUSY) {
3599 call->flags |= RX_CALL_TQ_WAIT;
3600 #ifdef RX_ENABLE_LOCKS
3601 CV_WAIT(&call->cv_tq, &call->lock);
3602 #else /* RX_ENABLE_LOCKS */
3603 osi_rxSleep(&call->tq);
3604 #endif /* RX_ENABLE_LOCKS */
3606 MUTEX_ENTER(&peer->peer_lock);
3607 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3608 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3609 call->flags |= RX_CALL_FAST_RECOVER;
3610 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3611 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3613 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3614 call->nextCwind = call->ssthresh;
3617 peer->MTU = call->MTU;
3618 peer->cwind = call->nextCwind;
3619 peer->nDgramPackets = call->nDgramPackets;
3621 call->congestSeq = peer->congestSeq;
3622 /* Reset the resend times on the packets that were nacked
3623 * so we will retransmit as soon as the window permits*/
3624 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3627 clock_Zero(&tp->retryTime);
3629 } else if (tp->acked) {
3634 /* If cwind is smaller than ssthresh, then increase
3635 * the window one packet for each ack we receive (exponential
3637 * If cwind is greater than or equal to ssthresh then increase
3638 * the congestion window by one packet for each cwind acks we
3639 * receive (linear growth). */
3640 if (call->cwind < call->ssthresh) {
3641 call->cwind = MIN((int)call->ssthresh,
3642 (int)(call->cwind + newAckCount));
3643 call->nCwindAcks = 0;
3645 call->nCwindAcks += newAckCount;
3646 if (call->nCwindAcks >= call->cwind) {
3647 call->nCwindAcks = 0;
3648 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3652 * If we have received several acknowledgements in a row then
3653 * it is time to increase the size of our datagrams
3655 if ((int)call->nAcks > rx_nDgramThreshold) {
3656 if (peer->maxDgramPackets > 1) {
3657 if (call->nDgramPackets < peer->maxDgramPackets) {
3658 call->nDgramPackets++;
3660 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3661 } else if (call->MTU < peer->maxMTU) {
3662 call->MTU += peer->natMTU;
3663 call->MTU = MIN(call->MTU, peer->maxMTU);
3669 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3671 /* Servers need to hold the call until all response packets have
3672 * been acknowledged. Soft acks are good enough since clients
3673 * are not allowed to clear their receive queues. */
3674 if (call->state == RX_STATE_HOLD &&
3675 call->tfirst + call->nSoftAcked >= call->tnext) {
3676 call->state = RX_STATE_DALLY;
3677 rxi_ClearTransmitQueue(call, 0);
3678 } else if (!queue_IsEmpty(&call->tq)) {
3679 rxi_Start(0, call, istack);
3684 /* Received a response to a challenge packet */
3685 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3686 register struct rx_connection *conn;
3687 register struct rx_packet *np;
3692 /* Ignore the packet if we're the client */
3693 if (conn->type == RX_CLIENT_CONNECTION) return np;
3695 /* If already authenticated, ignore the packet (it's probably a retry) */
3696 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3699 /* Otherwise, have the security object evaluate the response packet */
3700 error = RXS_CheckResponse(conn->securityObject, conn, np);
3702 /* If the response is invalid, reset the connection, sending
3703 * an abort to the peer */
3707 rxi_ConnectionError(conn, error);
3708 MUTEX_ENTER(&conn->conn_data_lock);
3709 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3710 MUTEX_EXIT(&conn->conn_data_lock);
3714 /* If the response is valid, any calls waiting to attach
3715 * servers can now do so */
3717 for (i=0; i<RX_MAXCALLS; i++) {
3718 struct rx_call *call = conn->call[i];
3720 MUTEX_ENTER(&call->lock);
3721 if (call->state == RX_STATE_PRECALL)
3722 rxi_AttachServerProc(call, -1, NULL, NULL);
3723 MUTEX_EXIT(&call->lock);
3730 /* A client has received an authentication challenge: the security
3731 * object is asked to cough up a respectable response packet to send
3732 * back to the server. The server is responsible for retrying the
3733 * challenge if it fails to get a response. */
3736 rxi_ReceiveChallengePacket(conn, np, istack)
3737 register struct rx_connection *conn;
3738 register struct rx_packet *np;
3743 /* Ignore the challenge if we're the server */
3744 if (conn->type == RX_SERVER_CONNECTION) return np;
3746 /* Ignore the challenge if the connection is otherwise idle; someone's
3747 * trying to use us as an oracle. */
3748 if (!rxi_HasActiveCalls(conn)) return np;
3750 /* Send the security object the challenge packet. It is expected to fill
3751 * in the response. */
3752 error = RXS_GetResponse(conn->securityObject, conn, np);
3754 /* If the security object is unable to return a valid response, reset the
3755 * connection and send an abort to the peer. Otherwise send the response
3756 * packet to the peer connection. */
3758 rxi_ConnectionError(conn, error);
3759 MUTEX_ENTER(&conn->conn_data_lock);
3760 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3761 MUTEX_EXIT(&conn->conn_data_lock);
3764 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3765 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3771 /* Find an available server process to service the current request in
3772 * the given call structure. If one isn't available, queue up this
3773 * call so it eventually gets one */
3775 rxi_AttachServerProc(call, socket, tnop, newcallp)
3776 register struct rx_call *call;
3777 register osi_socket socket;
3779 register struct rx_call **newcallp;
3781 register struct rx_serverQueueEntry *sq;
3782 register struct rx_service *service = call->conn->service;
3783 #ifdef RX_ENABLE_LOCKS
3784 register int haveQuota = 0;
3785 #endif /* RX_ENABLE_LOCKS */
3786 /* May already be attached */
3787 if (call->state == RX_STATE_ACTIVE) return;
3789 MUTEX_ENTER(&rx_serverPool_lock);
3790 #ifdef RX_ENABLE_LOCKS
3791 while(rxi_ServerThreadSelectingCall) {
3792 MUTEX_EXIT(&call->lock);
3793 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3794 MUTEX_EXIT(&rx_serverPool_lock);
3795 MUTEX_ENTER(&call->lock);
3796 MUTEX_ENTER(&rx_serverPool_lock);
3797 /* Call may have been attached */
3798 if (call->state == RX_STATE_ACTIVE) return;
3801 haveQuota = QuotaOK(service);
3802 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3803 /* If there are no processes available to service this call,
3804 * put the call on the incoming call queue (unless it's
3805 * already on the queue).
3808 ReturnToServerPool(service);
3809 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3810 call->flags |= RX_CALL_WAIT_PROC;
3811 MUTEX_ENTER(&rx_stats_mutex);
3813 MUTEX_EXIT(&rx_stats_mutex);
3814 rxi_calltrace(RX_CALL_ARRIVAL, call);
3815 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3816 queue_Append(&rx_incomingCallQueue, call);
3819 #else /* RX_ENABLE_LOCKS */
3820 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3821 /* If there are no processes available to service this call,
3822 * put the call on the incoming call queue (unless it's
3823 * already on the queue).
3825 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3826 call->flags |= RX_CALL_WAIT_PROC;
3828 rxi_calltrace(RX_CALL_ARRIVAL, call);
3829 queue_Append(&rx_incomingCallQueue, call);
3832 #endif /* RX_ENABLE_LOCKS */
3834 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3836 /* If hot threads are enabled, and both newcallp and sq->socketp
3837 * are non-null, then this thread will process the call, and the
3838 * idle server thread will start listening on this threads socket.
3841 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3844 *sq->socketp = socket;
3845 clock_GetTime(&call->startTime);
3846 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3850 if (call->flags & RX_CALL_WAIT_PROC) {
3851 /* Conservative: I don't think this should happen */
3852 call->flags &= ~RX_CALL_WAIT_PROC;
3853 MUTEX_ENTER(&rx_stats_mutex);
3855 MUTEX_EXIT(&rx_stats_mutex);
3858 call->state = RX_STATE_ACTIVE;
3859 call->mode = RX_MODE_RECEIVING;
3860 if (call->flags & RX_CALL_CLEARED) {
3861 /* send an ack now to start the packet flow up again */
3862 call->flags &= ~RX_CALL_CLEARED;
3863 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3865 #ifdef RX_ENABLE_LOCKS
3868 service->nRequestsRunning++;
3869 if (service->nRequestsRunning <= service->minProcs)
3875 MUTEX_EXIT(&rx_serverPool_lock);
3878 /* Delay the sending of an acknowledge event for a short while, while
3879 * a new call is being prepared (in the case of a client) or a reply
3880 * is being prepared (in the case of a server). Rather than sending
3881 * an ack packet, an ACKALL packet is sent. */
3882 void rxi_AckAll(event, call, dummy)
3883 struct rxevent *event;
3884 register struct rx_call *call;
3887 #ifdef RX_ENABLE_LOCKS
3889 MUTEX_ENTER(&call->lock);
3890 call->delayedAckEvent = (struct rxevent *) 0;
3891 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3893 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3894 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3896 MUTEX_EXIT(&call->lock);
3897 #else /* RX_ENABLE_LOCKS */
3898 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3899 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3900 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3901 #endif /* RX_ENABLE_LOCKS */
3904 void rxi_SendDelayedAck(event, call, dummy)
3905 struct rxevent *event;
3906 register struct rx_call *call;
3909 #ifdef RX_ENABLE_LOCKS
3911 MUTEX_ENTER(&call->lock);
3912 if (event == call->delayedAckEvent)
3913 call->delayedAckEvent = (struct rxevent *) 0;
3914 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3916 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3918 MUTEX_EXIT(&call->lock);
3919 #else /* RX_ENABLE_LOCKS */
3920 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3921 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3922 #endif /* RX_ENABLE_LOCKS */
3926 #ifdef RX_ENABLE_LOCKS
3927 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3928 * clearing them out.
3930 static void rxi_SetAcksInTransmitQueue(call)
3931 register struct rx_call *call;
3933 register struct rx_packet *p, *tp;
3936 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3943 call->flags |= RX_CALL_TQ_CLEARME;
3944 call->flags |= RX_CALL_TQ_SOME_ACKED;
3947 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3948 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3949 call->tfirst = call->tnext;
3950 call->nSoftAcked = 0;
3952 if (call->flags & RX_CALL_FAST_RECOVER) {
3953 call->flags &= ~RX_CALL_FAST_RECOVER;
3954 call->cwind = call->nextCwind;
3955 call->nextCwind = 0;
3958 CV_SIGNAL(&call->cv_twind);
3960 #endif /* RX_ENABLE_LOCKS */
3962 /* Clear out the transmit queue for the current call (all packets have
3963 * been received by peer) */
3964 void rxi_ClearTransmitQueue(call, force)
3965 register struct rx_call *call;
3968 register struct rx_packet *p, *tp;
3970 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3971 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3973 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3980 call->flags |= RX_CALL_TQ_CLEARME;
3981 call->flags |= RX_CALL_TQ_SOME_ACKED;
3984 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3985 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3991 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3992 call->flags &= ~RX_CALL_TQ_CLEARME;
3994 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3996 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3997 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3998 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3999 call->nSoftAcked = 0;
4001 if (call->flags & RX_CALL_FAST_RECOVER) {
4002 call->flags &= ~RX_CALL_FAST_RECOVER;
4003 call->cwind = call->nextCwind;
4006 #ifdef RX_ENABLE_LOCKS
4007 CV_SIGNAL(&call->cv_twind);
4009 osi_rxWakeup(&call->twind);
4013 void rxi_ClearReceiveQueue(call)
4014 register struct rx_call *call;
4016 register struct rx_packet *p, *tp;
4017 if (queue_IsNotEmpty(&call->rq)) {
4018 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4023 rx_packetReclaims++;
4025 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4027 if (call->state == RX_STATE_PRECALL) {
4028 call->flags |= RX_CALL_CLEARED;
4032 /* Send an abort packet for the specified call */
4033 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4034 register struct rx_call *call;
4035 struct rx_packet *packet;
4045 /* Clients should never delay abort messages */
4046 if (rx_IsClientConn(call->conn))
4049 if (call->abortCode != call->error) {
4050 call->abortCode = call->error;
4051 call->abortCount = 0;
4054 if (force || rxi_callAbortThreshhold == 0 ||
4055 call->abortCount < rxi_callAbortThreshhold) {
4056 if (call->delayedAbortEvent) {
4057 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4059 error = htonl(call->error);
4061 packet = rxi_SendSpecial(call, call->conn, packet,
4062 RX_PACKET_TYPE_ABORT, (char *)&error,
4063 sizeof(error), istack);
4064 } else if (!call->delayedAbortEvent) {
4065 clock_GetTime(&when);
4066 clock_Addmsec(&when, rxi_callAbortDelay);
4067 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4068 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4074 /* Send an abort packet for the specified connection. Packet is an
4075 * optional pointer to a packet that can be used to send the abort.
4076 * Once the number of abort messages reaches the threshhold, an
4077 * event is scheduled to send the abort. Setting the force flag
4078 * overrides sending delayed abort messages.
4080 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4081 * to send the abort packet.
4083 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4084 register struct rx_connection *conn;
4085 struct rx_packet *packet;
4095 /* Clients should never delay abort messages */
4096 if (rx_IsClientConn(conn))
4099 if (force || rxi_connAbortThreshhold == 0 ||
4100 conn->abortCount < rxi_connAbortThreshhold) {
4101 if (conn->delayedAbortEvent) {
4102 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4104 error = htonl(conn->error);
4106 MUTEX_EXIT(&conn->conn_data_lock);
4107 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4108 RX_PACKET_TYPE_ABORT, (char *)&error,
4109 sizeof(error), istack);
4110 MUTEX_ENTER(&conn->conn_data_lock);
4111 } else if (!conn->delayedAbortEvent) {
4112 clock_GetTime(&when);
4113 clock_Addmsec(&when, rxi_connAbortDelay);
4114 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4120 /* Associate an error all of the calls owned by a connection. Called
4121 * with error non-zero. This is only for really fatal things, like
4122 * bad authentication responses. The connection itself is set in
4123 * error at this point, so that future packets received will be
4125 void rxi_ConnectionError(conn, error)
4126 register struct rx_connection *conn;
4127 register afs_int32 error;
4131 if (conn->challengeEvent)
4132 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4133 for (i=0; i<RX_MAXCALLS; i++) {
4134 struct rx_call *call = conn->call[i];
4136 MUTEX_ENTER(&call->lock);
4137 rxi_CallError(call, error);
4138 MUTEX_EXIT(&call->lock);
4141 conn->error = error;
4142 MUTEX_ENTER(&rx_stats_mutex);
4143 rx_stats.fatalErrors++;
4144 MUTEX_EXIT(&rx_stats_mutex);
4148 void rxi_CallError(call, error)
4149 register struct rx_call *call;
4152 if (call->error) error = call->error;
4153 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4154 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4155 rxi_ResetCall(call, 0);
4158 rxi_ResetCall(call, 0);
4160 call->error = error;
4161 call->mode = RX_MODE_ERROR;
4164 /* Reset various fields in a call structure, and wakeup waiting
4165 * processes. Some fields aren't changed: state & mode are not
4166 * touched (these must be set by the caller), and bufptr, nLeft, and
4167 * nFree are not reset, since these fields are manipulated by
4168 * unprotected macros, and may only be reset by non-interrupting code.
4171 /* this code requires that call->conn be set properly as a pre-condition. */
4172 #endif /* ADAPT_WINDOW */
4174 void rxi_ResetCall(call, newcall)
4175 register struct rx_call *call;
4176 register int newcall;
4179 register struct rx_peer *peer;
4180 struct rx_packet *packet;
4182 /* Notify anyone who is waiting for asynchronous packet arrival */
4183 if (call->arrivalProc) {
4184 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4185 call->arrivalProc = (VOID (*)()) 0;
4188 if (call->delayedAbortEvent) {
4189 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4190 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4192 rxi_SendCallAbort(call, packet, 0, 1);
4193 rxi_FreePacket(packet);
4198 * Update the peer with the congestion information in this call
4199 * so other calls on this connection can pick up where this call
4200 * left off. If the congestion sequence numbers don't match then
4201 * another call experienced a retransmission.
4203 peer = call->conn->peer;
4204 MUTEX_ENTER(&peer->peer_lock);
4206 if (call->congestSeq == peer->congestSeq) {
4207 peer->cwind = MAX(peer->cwind, call->cwind);
4208 peer->MTU = MAX(peer->MTU, call->MTU);
4209 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4212 call->abortCode = 0;
4213 call->abortCount = 0;
4215 if (peer->maxDgramPackets > 1) {
4216 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4218 call->MTU = peer->MTU;
4220 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4221 call->ssthresh = rx_maxSendWindow;
4222 call->nDgramPackets = peer->nDgramPackets;
4223 call->congestSeq = peer->congestSeq;
4224 MUTEX_EXIT(&peer->peer_lock);
4226 flags = call->flags;
4227 rxi_ClearReceiveQueue(call);
4228 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4229 if (call->flags & RX_CALL_TQ_BUSY) {
4230 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4231 call->flags |= (flags & RX_CALL_TQ_WAIT);
4233 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4235 rxi_ClearTransmitQueue(call, 0);
4236 queue_Init(&call->tq);
4239 queue_Init(&call->rq);
4241 call->rwind = rx_initReceiveWindow;
4242 call->twind = rx_initSendWindow;
4243 call->nSoftAcked = 0;
4244 call->nextCwind = 0;
4247 call->nCwindAcks = 0;
4248 call->nSoftAcks = 0;
4249 call->nHardAcks = 0;
4251 call->tfirst = call->rnext = call->tnext = 1;
4253 call->lastAcked = 0;
4254 call->localStatus = call->remoteStatus = 0;
4256 if (flags & RX_CALL_READER_WAIT) {
4257 #ifdef RX_ENABLE_LOCKS
4258 CV_BROADCAST(&call->cv_rq);
4260 osi_rxWakeup(&call->rq);
4263 if (flags & RX_CALL_WAIT_PACKETS) {
4264 MUTEX_ENTER(&rx_freePktQ_lock);
4265 rxi_PacketsUnWait(); /* XXX */
4266 MUTEX_EXIT(&rx_freePktQ_lock);
4269 #ifdef RX_ENABLE_LOCKS
4270 CV_SIGNAL(&call->cv_twind);
4272 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4273 osi_rxWakeup(&call->twind);
4276 #ifdef RX_ENABLE_LOCKS
4277 /* The following ensures that we don't mess with any queue while some
4278 * other thread might also be doing so. The call_queue_lock field is
4279 * is only modified under the call lock. If the call is in the process
4280 * of being removed from a queue, the call is not locked until the
4281 * the queue lock is dropped and only then is the call_queue_lock field
4282 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4283 * Note that any other routine which removes a call from a queue has to
4284 * obtain the queue lock before examing the queue and removing the call.
4286 if (call->call_queue_lock) {
4287 MUTEX_ENTER(call->call_queue_lock);
4288 if (queue_IsOnQueue(call)) {
4290 if (flags & RX_CALL_WAIT_PROC) {
4291 MUTEX_ENTER(&rx_stats_mutex);
4293 MUTEX_EXIT(&rx_stats_mutex);
4296 MUTEX_EXIT(call->call_queue_lock);
4297 CLEAR_CALL_QUEUE_LOCK(call);
4299 #else /* RX_ENABLE_LOCKS */
4300 if (queue_IsOnQueue(call)) {
4302 if (flags & RX_CALL_WAIT_PROC)
4305 #endif /* RX_ENABLE_LOCKS */
4307 rxi_KeepAliveOff(call);
4308 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4311 /* Send an acknowledge for the indicated packet (seq,serial) of the
4312 * indicated call, for the indicated reason (reason). This
4313 * acknowledge will specifically acknowledge receiving the packet, and
4314 * will also specify which other packets for this call have been
4315 * received. This routine returns the packet that was used to the
4316 * caller. The caller is responsible for freeing it or re-using it.
4317 * This acknowledgement also returns the highest sequence number
4318 * actually read out by the higher level to the sender; the sender
4319 * promises to keep around packets that have not been read by the
4320 * higher level yet (unless, of course, the sender decides to abort
4321 * the call altogether). Any of p, seq, serial, pflags, or reason may
4322 * be set to zero without ill effect. That is, if they are zero, they
4323 * will not convey any information.
4324 * NOW there is a trailer field, after the ack where it will safely be
4325 * ignored by mundanes, which indicates the maximum size packet this
4326 * host can swallow. */
4327 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4328 register struct rx_call *call;
4329 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4330 int seq; /* Sequence number of the packet we are acking */
4331 int serial; /* Serial number of the packet */
4332 int pflags; /* Flags field from packet header */
4333 int reason; /* Reason an acknowledge was prompted */
4336 struct rx_ackPacket *ap;
4337 register struct rx_packet *rqp;
4338 register struct rx_packet *nxp; /* For queue_Scan */
4339 register struct rx_packet *p;
4344 * Open the receive window once a thread starts reading packets
4346 if (call->rnext > 1) {
4347 call->rwind = rx_maxReceiveWindow;
4350 call->nHardAcks = 0;
4351 call->nSoftAcks = 0;
4352 if (call->rnext > call->lastAcked)
4353 call->lastAcked = call->rnext;
4357 rx_computelen(p, p->length); /* reset length, you never know */
4358 } /* where that's been... */
4360 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4361 /* We won't send the ack, but don't panic. */
4362 return optionalPacket;
4365 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4367 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4368 if (!optionalPacket) rxi_FreePacket(p);
4369 return optionalPacket;
4371 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4372 if (rx_Contiguous(p)<templ) {
4373 if (!optionalPacket) rxi_FreePacket(p);
4374 return optionalPacket;
4376 } /* MTUXXX failing to send an ack is very serious. We should */
4377 /* try as hard as possible to send even a partial ack; it's */
4378 /* better than nothing. */
4380 ap = (struct rx_ackPacket *) rx_DataOf(p);
4381 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4382 ap->reason = reason;
4384 /* The skew computation used to be bogus, I think it's better now. */
4385 /* We should start paying attention to skew. XXX */
4386 ap->serial = htonl(call->conn->maxSerial);
4387 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4389 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4390 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4392 /* No fear of running out of ack packet here because there can only be at most
4393 * one window full of unacknowledged packets. The window size must be constrained
4394 * to be less than the maximum ack size, of course. Also, an ack should always
4395 * fit into a single packet -- it should not ever be fragmented. */
4396 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4397 if (!rqp || !call->rq.next
4398 || (rqp->header.seq > (call->rnext + call->rwind))) {
4399 if (!optionalPacket) rxi_FreePacket(p);
4400 rxi_CallError(call, RX_CALL_DEAD);
4401 return optionalPacket;
4404 while (rqp->header.seq > call->rnext + offset)
4405 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4406 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4408 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4409 if (!optionalPacket) rxi_FreePacket(p);
4410 rxi_CallError(call, RX_CALL_DEAD);
4411 return optionalPacket;
4416 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4418 /* these are new for AFS 3.3 */
4419 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4420 templ = htonl(templ);
4421 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4422 templ = htonl(call->conn->peer->ifMTU);
4423 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4425 /* new for AFS 3.4 */
4426 templ = htonl(call->rwind);
4427 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4429 /* new for AFS 3.5 */
4430 templ = htonl(call->conn->peer->ifDgramPackets);
4431 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4433 p->header.serviceId = call->conn->serviceId;
4434 p->header.cid = (call->conn->cid | call->channel);
4435 p->header.callNumber = *call->callNumber;
4436 p->header.seq = seq;
4437 p->header.securityIndex = call->conn->securityIndex;
4438 p->header.epoch = call->conn->epoch;
4439 p->header.type = RX_PACKET_TYPE_ACK;
4440 p->header.flags = RX_SLOW_START_OK;
4441 if (reason == RX_ACK_PING) {
4442 p->header.flags |= RX_REQUEST_ACK;
4444 clock_GetTime(&call->pingRequestTime);
4447 if (call->conn->type == RX_CLIENT_CONNECTION)
4448 p->header.flags |= RX_CLIENT_INITIATED;
4452 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4453 ap->reason, ntohl(ap->previousPacket), p->header.seq,
4454 ntohl(ap->firstPacket));
4456 for (offset = 0; offset < ap->nAcks; offset++)
4457 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4464 register int i, nbytes = p->length;
4466 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4467 if (nbytes <= p->wirevec[i].iov_len) {
4468 register int savelen, saven;
4470 savelen = p->wirevec[i].iov_len;
4472 p->wirevec[i].iov_len = nbytes;
4474 rxi_Send(call, p, istack);
4475 p->wirevec[i].iov_len = savelen;
4479 else nbytes -= p->wirevec[i].iov_len;
4482 MUTEX_ENTER(&rx_stats_mutex);
4483 rx_stats.ackPacketsSent++;
4484 MUTEX_EXIT(&rx_stats_mutex);
4485 if (!optionalPacket) rxi_FreePacket(p);
4486 return optionalPacket; /* Return packet for re-use by caller */
4489 /* Send all of the packets in the list in single datagram */
4490 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4491 struct rx_call *call;
4492 struct rx_packet **list;
4497 struct clock *retryTime;
4503 struct rx_connection *conn = call->conn;
4504 struct rx_peer *peer = conn->peer;
4506 MUTEX_ENTER(&peer->peer_lock);
4508 if (resending) peer->reSends += len;
4509 MUTEX_ENTER(&rx_stats_mutex);
4510 rx_stats.dataPacketsSent += len;
4511 MUTEX_EXIT(&rx_stats_mutex);
4512 MUTEX_EXIT(&peer->peer_lock);
4514 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4518 /* Set the packet flags and schedule the resend events */
4519 /* Only request an ack for the last packet in the list */
4520 for (i = 0 ; i < len ; i++) {
4521 list[i]->retryTime = *retryTime;
4522 if (list[i]->header.serial) {
4523 /* Exponentially backoff retry times */
4524 if (list[i]->backoff < MAXBACKOFF) {
4525 /* so it can't stay == 0 */
4526 list[i]->backoff = (list[i]->backoff << 1) +1;
4528 else list[i]->backoff++;
4529 clock_Addmsec(&(list[i]->retryTime),
4530 ((afs_uint32) list[i]->backoff) << 8);
4533 /* Wait a little extra for the ack on the last packet */
4534 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4535 clock_Addmsec(&(list[i]->retryTime), 400);
4538 /* Record the time sent */
4539 list[i]->timeSent = *now;
4541 /* Ask for an ack on retransmitted packets, on every other packet
4542 * if the peer doesn't support slow start. Ask for an ack on every
4543 * packet until the congestion window reaches the ack rate. */
4544 if (list[i]->header.serial) {
4546 MUTEX_ENTER(&rx_stats_mutex);
4547 rx_stats.dataPacketsReSent++;
4548 MUTEX_EXIT(&rx_stats_mutex);
4550 /* improved RTO calculation- not Karn */
4551 list[i]->firstSent = *now;
4553 && (call->cwind <= (u_short)(conn->ackRate+1)
4554 || (!(call->flags & RX_CALL_SLOW_START_OK)
4555 && (list[i]->header.seq & 1)))) {
4560 MUTEX_ENTER(&peer->peer_lock);
4562 if (resending) peer->reSends++;
4563 MUTEX_ENTER(&rx_stats_mutex);
4564 rx_stats.dataPacketsSent++;
4565 MUTEX_EXIT(&rx_stats_mutex);
4566 MUTEX_EXIT(&peer->peer_lock);
4568 /* Tag this packet as not being the last in this group,
4569 * for the receiver's benefit */
4570 if (i < len-1 || moreFlag) {
4571 list[i]->header.flags |= RX_MORE_PACKETS;
4574 /* Install the new retransmit time for the packet, and
4575 * record the time sent */
4576 list[i]->timeSent = *now;
4580 list[len-1]->header.flags |= RX_REQUEST_ACK;
4583 /* Since we're about to send a data packet to the peer, it's
4584 * safe to nuke any scheduled end-of-packets ack */
4585 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4587 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4588 MUTEX_EXIT(&call->lock);
4590 rxi_SendPacketList(conn, list, len, istack);
4592 rxi_SendPacket(conn, list[0], istack);
4594 MUTEX_ENTER(&call->lock);
4595 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4597 /* Update last send time for this call (for keep-alive
4598 * processing), and for the connection (so that we can discover
4599 * idle connections) */
4600 conn->lastSendTime = call->lastSendTime = clock_Sec();
4603 /* When sending packets we need to follow these rules:
4604 * 1. Never send more than maxDgramPackets in a jumbogram.
4605 * 2. Never send a packet with more than two iovecs in a jumbogram.
4606 * 3. Never send a retransmitted packet in a jumbogram.
4607 * 4. Never send more than cwind/4 packets in a jumbogram
4608 * We always keep the last list we should have sent so we
4609 * can set the RX_MORE_PACKETS flags correctly.
4611 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4612 struct rx_call *call;
4613 struct rx_packet **list;
4617 struct clock *retryTime;
4620 int i, cnt, lastCnt = 0;
4621 struct rx_packet **listP, **lastP = 0;
4622 struct rx_peer *peer = call->conn->peer;
4623 int morePackets = 0;
4625 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4626 /* Does the current packet force us to flush the current list? */
4628 && (list[i]->header.serial
4630 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4632 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4633 /* If the call enters an error state stop sending, or if
4634 * we entered congestion recovery mode, stop sending */
4635 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4643 /* Add the current packet to the list if it hasn't been acked.
4644 * Otherwise adjust the list pointer to skip the current packet. */
4645 if (!list[i]->acked) {
4647 /* Do we need to flush the list? */
4648 if (cnt >= (int)peer->maxDgramPackets
4649 || cnt >= (int)call->nDgramPackets
4650 || cnt >= (int)call->cwind
4651 || list[i]->header.serial
4652 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4654 rxi_SendList(call, lastP, lastCnt, istack, 1,
4655 now, retryTime, resending);
4656 /* If the call enters an error state stop sending, or if
4657 * we entered congestion recovery mode, stop sending */
4658 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4668 osi_Panic("rxi_SendList error");
4674 /* Send the whole list when the call is in receive mode, when
4675 * the call is in eof mode, when we are in fast recovery mode,
4676 * and when we have the last packet */
4677 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4678 || call->mode == RX_MODE_RECEIVING
4679 || call->mode == RX_MODE_EOF
4680 || (call->flags & RX_CALL_FAST_RECOVER)) {
4681 /* Check for the case where the current list contains
4682 * an acked packet. Since we always send retransmissions
4683 * in a separate packet, we only need to check the first
4684 * packet in the list */
4685 if (cnt > 0 && !listP[0]->acked) {
4689 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4690 now, retryTime, resending);
4691 /* If the call enters an error state stop sending, or if
4692 * we entered congestion recovery mode, stop sending */
4693 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4697 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4699 } else if (lastCnt > 0) {
4700 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4704 #ifdef RX_ENABLE_LOCKS
4705 /* Call rxi_Start, below, but with the call lock held. */
4706 void rxi_StartUnlocked(event, call, istack)
4707 struct rxevent *event;
4708 register struct rx_call *call;
4711 MUTEX_ENTER(&call->lock);
4712 rxi_Start(event, call, istack);
4713 MUTEX_EXIT(&call->lock);
4715 #endif /* RX_ENABLE_LOCKS */
4717 /* This routine is called when new packets are readied for
4718 * transmission and when retransmission may be necessary, or when the
4719 * transmission window or burst count are favourable. This should be
4720 * better optimized for new packets, the usual case, now that we've
4721 * got rid of queues of send packets. XXXXXXXXXXX */
4722 void rxi_Start(event, call, istack)
4723 struct rxevent *event;
4724 register struct rx_call *call;
4727 struct rx_packet *p;
4728 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4729 struct rx_peer *peer = call->conn->peer;
4730 struct clock now, retryTime;
4734 struct rx_packet **xmitList;
4737 /* If rxi_Start is being called as a result of a resend event,
4738 * then make sure that the event pointer is removed from the call
4739 * structure, since there is no longer a per-call retransmission
4741 if (event && event == call->resendEvent) {
4742 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4743 call->resendEvent = NULL;
4745 if (queue_IsEmpty(&call->tq)) {
4749 /* Timeouts trigger congestion recovery */
4750 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4751 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4752 /* someone else is waiting to start recovery */
4755 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4756 while (call->flags & RX_CALL_TQ_BUSY) {
4757 call->flags |= RX_CALL_TQ_WAIT;
4758 #ifdef RX_ENABLE_LOCKS
4759 CV_WAIT(&call->cv_tq, &call->lock);
4760 #else /* RX_ENABLE_LOCKS */
4761 osi_rxSleep(&call->tq);
4762 #endif /* RX_ENABLE_LOCKS */
4764 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4765 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4766 call->flags |= RX_CALL_FAST_RECOVER;
4767 if (peer->maxDgramPackets > 1) {
4768 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4770 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4772 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4773 call->nDgramPackets = 1;
4775 call->nextCwind = 1;
4778 MUTEX_ENTER(&peer->peer_lock);
4779 peer->MTU = call->MTU;
4780 peer->cwind = call->cwind;
4781 peer->nDgramPackets = 1;
4783 call->congestSeq = peer->congestSeq;
4784 MUTEX_EXIT(&peer->peer_lock);
4785 /* Clear retry times on packets. Otherwise, it's possible for
4786 * some packets in the queue to force resends at rates faster
4787 * than recovery rates.
4789 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4791 clock_Zero(&p->retryTime);
4796 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4797 MUTEX_ENTER(&rx_stats_mutex);
4798 rx_tq_debug.rxi_start_in_error ++;
4799 MUTEX_EXIT(&rx_stats_mutex);
4804 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4805 /* Get clock to compute the re-transmit time for any packets
4806 * in this burst. Note, if we back off, it's reasonable to
4807 * back off all of the packets in the same manner, even if
4808 * some of them have been retransmitted more times than more
4809 * recent additions */
4810 clock_GetTime(&now);
4811 retryTime = now; /* initialize before use */
4812 MUTEX_ENTER(&peer->peer_lock);
4813 clock_Add(&retryTime, &peer->timeout);
4814 MUTEX_EXIT(&peer->peer_lock);
4816 /* Send (or resend) any packets that need it, subject to
4817 * window restrictions and congestion burst control
4818 * restrictions. Ask for an ack on the last packet sent in
4819 * this burst. For now, we're relying upon the window being
4820 * considerably bigger than the largest number of packets that
4821 * are typically sent at once by one initial call to
4822 * rxi_Start. This is probably bogus (perhaps we should ask
4823 * for an ack when we're half way through the current
4824 * window?). Also, for non file transfer applications, this
4825 * may end up asking for an ack for every packet. Bogus. XXXX
4828 * But check whether we're here recursively, and let the other guy
4831 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4832 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4833 call->flags |= RX_CALL_TQ_BUSY;
4835 call->flags &= ~RX_CALL_NEED_START;
4836 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4838 maxXmitPackets = MIN(call->twind, call->cwind);
4839 xmitList = (struct rx_packet **)
4840 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4841 if (xmitList == NULL)
4842 osi_Panic("rxi_Start, failed to allocate xmit list");
4843 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4844 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4845 /* We shouldn't be sending packets if a thread is waiting
4846 * to initiate congestion recovery */
4849 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4850 /* Only send one packet during fast recovery */
4853 if ((p->header.flags == RX_FREE_PACKET) ||
4854 (!queue_IsEnd(&call->tq, nxp)
4855 && (nxp->header.flags == RX_FREE_PACKET)) ||
4856 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4857 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4858 osi_Panic("rxi_Start: xmit queue clobbered");
4861 MUTEX_ENTER(&rx_stats_mutex);
4862 rx_stats.ignoreAckedPacket++;
4863 MUTEX_EXIT(&rx_stats_mutex);
4864 continue; /* Ignore this packet if it has been acknowledged */
4867 /* Turn off all flags except these ones, which are the same
4868 * on each transmission */
4869 p->header.flags &= RX_PRESET_FLAGS;
4871 if (p->header.seq >= call->tfirst +
4872 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4873 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4874 /* Note: if we're waiting for more window space, we can
4875 * still send retransmits; hence we don't return here, but
4876 * break out to schedule a retransmit event */
4877 dpf(("call %d waiting for window", *(call->callNumber)));
4881 /* Transmit the packet if it needs to be sent. */
4882 if (!clock_Lt(&now, &p->retryTime)) {
4883 if (nXmitPackets == maxXmitPackets) {
4884 osi_Panic("rxi_Start: xmit list overflowed");
4886 xmitList[nXmitPackets++] = p;
4890 /* xmitList now hold pointers to all of the packets that are
4891 * ready to send. Now we loop to send the packets */
4892 if (nXmitPackets > 0) {
4893 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4894 &now, &retryTime, resending);
4896 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4898 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4900 * TQ references no longer protected by this flag; they must remain
4901 * protected by the global lock.
4903 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4904 call->flags &= ~RX_CALL_TQ_BUSY;
4905 if (call->flags & RX_CALL_TQ_WAIT) {
4906 call->flags &= ~RX_CALL_TQ_WAIT;
4907 #ifdef RX_ENABLE_LOCKS
4908 CV_BROADCAST(&call->cv_tq);
4909 #else /* RX_ENABLE_LOCKS */
4910 osi_rxWakeup(&call->tq);
4911 #endif /* RX_ENABLE_LOCKS */
4916 /* We went into the error state while sending packets. Now is
4917 * the time to reset the call. This will also inform the using
4918 * process that the call is in an error state.
4920 MUTEX_ENTER(&rx_stats_mutex);
4921 rx_tq_debug.rxi_start_aborted ++;
4922 MUTEX_EXIT(&rx_stats_mutex);
4923 call->flags &= ~RX_CALL_TQ_BUSY;
4924 if (call->flags & RX_CALL_TQ_WAIT) {
4925 call->flags &= ~RX_CALL_TQ_WAIT;
4926 #ifdef RX_ENABLE_LOCKS
4927 CV_BROADCAST(&call->cv_tq);
4928 #else /* RX_ENABLE_LOCKS */
4929 osi_rxWakeup(&call->tq);
4930 #endif /* RX_ENABLE_LOCKS */
4932 rxi_CallError(call, call->error);
4935 #ifdef RX_ENABLE_LOCKS
4936 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4937 register int missing;
4938 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4939 /* Some packets have received acks. If they all have, we can clear
4940 * the transmit queue.
4942 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4943 if (p->header.seq < call->tfirst && p->acked) {
4951 call->flags |= RX_CALL_TQ_CLEARME;
4953 #endif /* RX_ENABLE_LOCKS */
4954 /* Don't bother doing retransmits if the TQ is cleared. */
4955 if (call->flags & RX_CALL_TQ_CLEARME) {
4956 rxi_ClearTransmitQueue(call, 1);
4958 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4961 /* Always post a resend event, if there is anything in the
4962 * queue, and resend is possible. There should be at least
4963 * one unacknowledged packet in the queue ... otherwise none
4964 * of these packets should be on the queue in the first place.
4966 if (call->resendEvent) {
4967 /* Cancel the existing event and post a new one */
4968 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4971 /* The retry time is the retry time on the first unacknowledged
4972 * packet inside the current window */
4973 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4974 /* Don't set timers for packets outside the window */
4975 if (p->header.seq >= call->tfirst + call->twind) {
4979 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4981 retryTime = p->retryTime;
4986 /* Post a new event to re-run rxi_Start when retries may be needed */
4987 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4988 #ifdef RX_ENABLE_LOCKS
4989 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4990 call->resendEvent = rxevent_Post(&retryTime,
4992 (char *)call, istack);
4993 #else /* RX_ENABLE_LOCKS */
4994 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4995 (char *)call, (void*)(long)istack);
4996 #endif /* RX_ENABLE_LOCKS */
4999 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5000 } while (call->flags & RX_CALL_NEED_START);
5002 * TQ references no longer protected by this flag; they must remain
5003 * protected by the global lock.
5005 call->flags &= ~RX_CALL_TQ_BUSY;
5006 if (call->flags & RX_CALL_TQ_WAIT) {
5007 call->flags &= ~RX_CALL_TQ_WAIT;
5008 #ifdef RX_ENABLE_LOCKS
5009 CV_BROADCAST(&call->cv_tq);
5010 #else /* RX_ENABLE_LOCKS */
5011 osi_rxWakeup(&call->tq);
5012 #endif /* RX_ENABLE_LOCKS */
5015 call->flags |= RX_CALL_NEED_START;
5017 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5019 if (call->resendEvent) {
5020 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5025 /* Also adjusts the keep alive parameters for the call, to reflect
5026 * that we have just sent a packet (so keep alives aren't sent
5028 void rxi_Send(call, p, istack)
5029 register struct rx_call *call;
5030 register struct rx_packet *p;
5033 register struct rx_connection *conn = call->conn;
5035 /* Stamp each packet with the user supplied status */
5036 p->header.userStatus = call->localStatus;
5038 /* Allow the security object controlling this call's security to
5039 * make any last-minute changes to the packet */
5040 RXS_SendPacket(conn->securityObject, call, p);
5042 /* Since we're about to send SOME sort of packet to the peer, it's
5043 * safe to nuke any scheduled end-of-packets ack */
5044 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5046 /* Actually send the packet, filling in more connection-specific fields */
5047 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5048 MUTEX_EXIT(&call->lock);
5049 rxi_SendPacket(conn, p, istack);
5050 MUTEX_ENTER(&call->lock);
5051 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5053 /* Update last send time for this call (for keep-alive
5054 * processing), and for the connection (so that we can discover
5055 * idle connections) */
5056 conn->lastSendTime = call->lastSendTime = clock_Sec();
5060 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5061 * that things are fine. Also called periodically to guarantee that nothing
5062 * falls through the cracks (e.g. (error + dally) connections have keepalive
5063 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5066 #ifdef RX_ENABLE_LOCKS
5067 int rxi_CheckCall(call, haveCTLock)
5068 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5069 #else /* RX_ENABLE_LOCKS */
5070 int rxi_CheckCall(call)
5071 #endif /* RX_ENABLE_LOCKS */
5072 register struct rx_call *call;
5074 register struct rx_connection *conn = call->conn;
5075 register struct rx_service *tservice;
5077 afs_uint32 deadTime;
5079 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5080 if (call->flags & RX_CALL_TQ_BUSY) {
5081 /* Call is active and will be reset by rxi_Start if it's
5082 * in an error state.
5087 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5088 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5089 ((afs_uint32)conn->peer->rtt >> 3) +
5090 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5092 /* These are computed to the second (+- 1 second). But that's
5093 * good enough for these values, which should be a significant
5094 * number of seconds. */
5095 if (now > (call->lastReceiveTime + deadTime)) {
5096 if (call->state == RX_STATE_ACTIVE) {
5097 rxi_CallError(call, RX_CALL_DEAD);
5101 #ifdef RX_ENABLE_LOCKS
5102 /* Cancel pending events */
5103 rxevent_Cancel(call->delayedAckEvent, call,
5104 RX_CALL_REFCOUNT_DELAY);
5105 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5106 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5107 if (call->refCount == 0) {
5108 rxi_FreeCall(call, haveCTLock);
5112 #else /* RX_ENABLE_LOCKS */
5115 #endif /* RX_ENABLE_LOCKS */
5117 /* Non-active calls are destroyed if they are not responding
5118 * to pings; active calls are simply flagged in error, so the
5119 * attached process can die reasonably gracefully. */
5121 /* see if we have a non-activity timeout */
5122 tservice = conn->service;
5123 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5124 && tservice->idleDeadTime
5125 && ((call->startWait + tservice->idleDeadTime) < now)) {
5126 if (call->state == RX_STATE_ACTIVE) {
5127 rxi_CallError(call, RX_CALL_TIMEOUT);
5131 /* see if we have a hard timeout */
5132 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5133 if (call->state == RX_STATE_ACTIVE)
5134 rxi_CallError(call, RX_CALL_TIMEOUT);
5141 /* When a call is in progress, this routine is called occasionally to
5142 * make sure that some traffic has arrived (or been sent to) the peer.
5143 * If nothing has arrived in a reasonable amount of time, the call is
5144 * declared dead; if nothing has been sent for a while, we send a
5145 * keep-alive packet (if we're actually trying to keep the call alive)
5147 void rxi_KeepAliveEvent(event, call, dummy)
5148 struct rxevent *event;
5149 register struct rx_call *call;
5151 struct rx_connection *conn;
5154 MUTEX_ENTER(&call->lock);
5155 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5156 if (event == call->keepAliveEvent)
5157 call->keepAliveEvent = (struct rxevent *) 0;
5160 #ifdef RX_ENABLE_LOCKS
5161 if(rxi_CheckCall(call, 0)) {
5162 MUTEX_EXIT(&call->lock);
5165 #else /* RX_ENABLE_LOCKS */
5166 if (rxi_CheckCall(call)) return;
5167 #endif /* RX_ENABLE_LOCKS */
5169 /* Don't try to keep alive dallying calls */
5170 if (call->state == RX_STATE_DALLY) {
5171 MUTEX_EXIT(&call->lock);
5176 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5177 /* Don't try to send keepalives if there is unacknowledged data */
5178 /* the rexmit code should be good enough, this little hack
5179 * doesn't quite work XXX */
5180 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5182 rxi_ScheduleKeepAliveEvent(call);
5183 MUTEX_EXIT(&call->lock);
5187 void rxi_ScheduleKeepAliveEvent(call)
5188 register struct rx_call *call;
5190 if (!call->keepAliveEvent) {
5192 clock_GetTime(&when);
5193 when.sec += call->conn->secondsUntilPing;
5194 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5195 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5199 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5200 void rxi_KeepAliveOn(call)
5201 register struct rx_call *call;
5203 /* Pretend last packet received was received now--i.e. if another
5204 * packet isn't received within the keep alive time, then the call
5205 * will die; Initialize last send time to the current time--even
5206 * if a packet hasn't been sent yet. This will guarantee that a
5207 * keep-alive is sent within the ping time */
5208 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5209 rxi_ScheduleKeepAliveEvent(call);
5212 /* This routine is called to send connection abort messages
5213 * that have been delayed to throttle looping clients. */
5214 void rxi_SendDelayedConnAbort(event, conn, dummy)
5215 struct rxevent *event;
5216 register struct rx_connection *conn;
5220 struct rx_packet *packet;
5222 MUTEX_ENTER(&conn->conn_data_lock);
5223 conn->delayedAbortEvent = (struct rxevent *) 0;
5224 error = htonl(conn->error);
5226 MUTEX_EXIT(&conn->conn_data_lock);
5227 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5229 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5230 RX_PACKET_TYPE_ABORT, (char *)&error,
5232 rxi_FreePacket(packet);
5236 /* This routine is called to send call abort messages
5237 * that have been delayed to throttle looping clients. */
5238 void rxi_SendDelayedCallAbort(event, call, dummy)
5239 struct rxevent *event;
5240 register struct rx_call *call;
5244 struct rx_packet *packet;
5246 MUTEX_ENTER(&call->lock);
5247 call->delayedAbortEvent = (struct rxevent *) 0;
5248 error = htonl(call->error);
5250 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5252 packet = rxi_SendSpecial(call, call->conn, packet,
5253 RX_PACKET_TYPE_ABORT, (char *)&error,
5255 rxi_FreePacket(packet);
5257 MUTEX_EXIT(&call->lock);
5260 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5261 * seconds) to ask the client to authenticate itself. The routine
5262 * issues a challenge to the client, which is obtained from the
5263 * security object associated with the connection */
5264 void rxi_ChallengeEvent(event, conn, dummy)
5265 struct rxevent *event;
5266 register struct rx_connection *conn;
5269 conn->challengeEvent = (struct rxevent *) 0;
5270 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5271 register struct rx_packet *packet;
5273 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5275 /* If there's no packet available, do this later. */
5276 RXS_GetChallenge(conn->securityObject, conn, packet);
5277 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5278 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5279 rxi_FreePacket(packet);
5281 clock_GetTime(&when);
5282 when.sec += RX_CHALLENGE_TIMEOUT;
5283 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5287 /* Call this routine to start requesting the client to authenticate
5288 * itself. This will continue until authentication is established,
5289 * the call times out, or an invalid response is returned. The
5290 * security object associated with the connection is asked to create
5291 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5292 * defined earlier. */
5293 void rxi_ChallengeOn(conn)
5294 register struct rx_connection *conn;
5296 if (!conn->challengeEvent) {
5297 RXS_CreateChallenge(conn->securityObject, conn);
5298 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5303 /* Compute round trip time of the packet provided, in *rttp.
5306 /* rxi_ComputeRoundTripTime is called with peer locked. */
5307 void rxi_ComputeRoundTripTime(p, sentp, peer)
5308 register struct clock *sentp; /* may be null */
5309 register struct rx_peer *peer; /* may be null */
5310 register struct rx_packet *p;
5312 struct clock thisRtt, *rttp = &thisRtt;
5314 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5315 /* making year 2038 bugs to get this running now - stroucki */
5316 struct timeval temptime;
5318 register int rtt_timeout;
5319 static char id[]="@(#)adaptive RTO";
5321 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5322 /* yet again. This was the worst Heisenbug of the port - stroucki */
5323 clock_GetTime(&temptime);
5324 rttp->sec=(afs_int32)temptime.tv_sec;
5325 rttp->usec=(afs_int32)temptime.tv_usec;
5327 clock_GetTime(rttp);
5329 if (clock_Lt(rttp, sentp)) {
5331 return; /* somebody set the clock back, don't count this time. */
5333 clock_Sub(rttp, sentp);
5334 MUTEX_ENTER(&rx_stats_mutex);
5335 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5336 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5337 if (rttp->sec > 60) {
5338 MUTEX_EXIT(&rx_stats_mutex);
5339 return; /* somebody set the clock ahead */
5341 rx_stats.maxRtt = *rttp;
5343 clock_Add(&rx_stats.totalRtt, rttp);
5344 rx_stats.nRttSamples++;
5345 MUTEX_EXIT(&rx_stats_mutex);
5347 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5349 /* Apply VanJacobson round-trip estimations */
5354 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5355 * srtt is stored as fixed point with 3 bits after the binary
5356 * point (i.e., scaled by 8). The following magic is
5357 * equivalent to the smoothing algorithm in rfc793 with an
5358 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5359 * srtt*8 = srtt*8 + rtt - srtt
5360 * srtt = srtt + rtt/8 - srtt/8
5363 delta = MSEC(rttp) - (peer->rtt >> 3);
5367 * We accumulate a smoothed rtt variance (actually, a smoothed
5368 * mean difference), then set the retransmit timer to smoothed
5369 * rtt + 4 times the smoothed variance (was 2x in van's original
5370 * paper, but 4x works better for me, and apparently for him as
5372 * rttvar is stored as
5373 * fixed point with 2 bits after the binary point (scaled by
5374 * 4). The following is equivalent to rfc793 smoothing with
5375 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5376 * replaces rfc793's wired-in beta.
5377 * dev*4 = dev*4 + (|actual - expected| - dev)
5383 delta -= (peer->rtt_dev >> 2);
5384 peer->rtt_dev += delta;
5387 /* I don't have a stored RTT so I start with this value. Since I'm
5388 * probably just starting a call, and will be pushing more data down
5389 * this, I expect congestion to increase rapidly. So I fudge a
5390 * little, and I set deviance to half the rtt. In practice,
5391 * deviance tends to approach something a little less than
5392 * half the smoothed rtt. */
5393 peer->rtt = (MSEC(rttp) << 3) + 8;
5394 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5396 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5397 * the other of these connections is usually in a user process, and can
5398 * be switched and/or swapped out. So on fast, reliable networks, the
5399 * timeout would otherwise be too short.
5401 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5402 clock_Zero(&(peer->timeout));
5403 clock_Addmsec(&(peer->timeout), rtt_timeout);
5405 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5406 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5407 (peer->timeout.sec),(peer->timeout.usec)) );
5411 /* Find all server connections that have not been active for a long time, and
5413 void rxi_ReapConnections()
5416 clock_GetTime(&now);
5418 /* Find server connection structures that haven't been used for
5419 * greater than rx_idleConnectionTime */
5420 { struct rx_connection **conn_ptr, **conn_end;
5421 int i, havecalls = 0;
5422 MUTEX_ENTER(&rx_connHashTable_lock);
5423 for (conn_ptr = &rx_connHashTable[0],
5424 conn_end = &rx_connHashTable[rx_hashTableSize];
5425 conn_ptr < conn_end; conn_ptr++) {
5426 struct rx_connection *conn, *next;
5427 struct rx_call *call;
5431 for (conn = *conn_ptr; conn; conn = next) {
5432 /* XXX -- Shouldn't the connection be locked? */
5435 for(i=0;i<RX_MAXCALLS;i++) {
5436 call = conn->call[i];
5439 MUTEX_ENTER(&call->lock);
5440 #ifdef RX_ENABLE_LOCKS
5441 result = rxi_CheckCall(call, 1);
5442 #else /* RX_ENABLE_LOCKS */
5443 result = rxi_CheckCall(call);
5444 #endif /* RX_ENABLE_LOCKS */
5445 MUTEX_EXIT(&call->lock);
5447 /* If CheckCall freed the call, it might
5448 * have destroyed the connection as well,
5449 * which screws up the linked lists.
5455 if (conn->type == RX_SERVER_CONNECTION) {
5456 /* This only actually destroys the connection if
5457 * there are no outstanding calls */
5458 MUTEX_ENTER(&conn->conn_data_lock);
5459 if (!havecalls && !conn->refCount &&
5460 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5461 conn->refCount++; /* it will be decr in rx_DestroyConn */
5462 MUTEX_EXIT(&conn->conn_data_lock);
5463 #ifdef RX_ENABLE_LOCKS
5464 rxi_DestroyConnectionNoLock(conn);
5465 #else /* RX_ENABLE_LOCKS */
5466 rxi_DestroyConnection(conn);
5467 #endif /* RX_ENABLE_LOCKS */
5469 #ifdef RX_ENABLE_LOCKS
5471 MUTEX_EXIT(&conn->conn_data_lock);
5473 #endif /* RX_ENABLE_LOCKS */
5477 #ifdef RX_ENABLE_LOCKS
5478 while (rx_connCleanup_list) {
5479 struct rx_connection *conn;
5480 conn = rx_connCleanup_list;
5481 rx_connCleanup_list = rx_connCleanup_list->next;
5482 MUTEX_EXIT(&rx_connHashTable_lock);
5483 rxi_CleanupConnection(conn);
5484 MUTEX_ENTER(&rx_connHashTable_lock);
5486 MUTEX_EXIT(&rx_connHashTable_lock);
5487 #endif /* RX_ENABLE_LOCKS */
5490 /* Find any peer structures that haven't been used (haven't had an
5491 * associated connection) for greater than rx_idlePeerTime */
5492 { struct rx_peer **peer_ptr, **peer_end;
5494 MUTEX_ENTER(&rx_rpc_stats);
5495 MUTEX_ENTER(&rx_peerHashTable_lock);
5496 for (peer_ptr = &rx_peerHashTable[0],
5497 peer_end = &rx_peerHashTable[rx_hashTableSize];
5498 peer_ptr < peer_end; peer_ptr++) {
5499 struct rx_peer *peer, *next, *prev;
5500 for (prev = peer = *peer_ptr; peer; peer = next) {
5502 code = MUTEX_TRYENTER(&peer->peer_lock);
5503 if ((code) && (peer->refCount == 0)
5504 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5505 rx_interface_stat_p rpc_stat, nrpc_stat;
5507 MUTEX_EXIT(&peer->peer_lock);
5508 MUTEX_DESTROY(&peer->peer_lock);
5509 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5510 rx_interface_stat)) {
5511 unsigned int num_funcs;
5512 if (!rpc_stat) break;
5513 queue_Remove(&rpc_stat->queue_header);
5514 queue_Remove(&rpc_stat->all_peers);
5515 num_funcs = rpc_stat->stats[0].func_total;
5516 space = sizeof(rx_interface_stat_t) +
5517 rpc_stat->stats[0].func_total *
5518 sizeof(rx_function_entry_v1_t);
5520 rxi_Free(rpc_stat, space);
5521 rxi_rpc_peer_stat_cnt -= num_funcs;
5524 MUTEX_ENTER(&rx_stats_mutex);
5525 rx_stats.nPeerStructs--;
5526 MUTEX_EXIT(&rx_stats_mutex);
5527 if (prev == *peer_ptr) {
5536 MUTEX_EXIT(&peer->peer_lock);
5542 MUTEX_EXIT(&rx_peerHashTable_lock);
5543 MUTEX_EXIT(&rx_rpc_stats);
5546 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5547 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5548 GC, just below. Really, we shouldn't have to keep moving packets from
5549 one place to another, but instead ought to always know if we can
5550 afford to hold onto a packet in its particular use. */
5551 MUTEX_ENTER(&rx_freePktQ_lock);
5552 if (rx_waitingForPackets) {
5553 rx_waitingForPackets = 0;
5554 #ifdef RX_ENABLE_LOCKS
5555 CV_BROADCAST(&rx_waitingForPackets_cv);
5557 osi_rxWakeup(&rx_waitingForPackets);
5560 MUTEX_EXIT(&rx_freePktQ_lock);
5562 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5563 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5567 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5568 * rx.h is sort of strange this is better. This is called with a security
5569 * object before it is discarded. Each connection using a security object has
5570 * its own refcount to the object so it won't actually be freed until the last
5571 * connection is destroyed.
5573 * This is the only rxs module call. A hold could also be written but no one
5576 int rxs_Release (aobj)
5577 struct rx_securityClass *aobj;
5579 return RXS_Close (aobj);
5583 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5584 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5585 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5586 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5588 /* Adjust our estimate of the transmission rate to this peer, given
5589 * that the packet p was just acked. We can adjust peer->timeout and
5590 * call->twind. Pragmatically, this is called
5591 * only with packets of maximal length.
5592 * Called with peer and call locked.
5595 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5596 register struct rx_peer *peer;
5597 register struct rx_call *call;
5598 struct rx_packet *p, *ackp;
5601 afs_int32 xferSize, xferMs;
5602 register afs_int32 minTime;
5605 /* Count down packets */
5606 if (peer->rateFlag > 0) peer->rateFlag--;
5607 /* Do nothing until we're enabled */
5608 if (peer->rateFlag != 0) return;
5609 if (!call->conn) return;
5611 /* Count only when the ack seems legitimate */
5612 switch (ackReason) {
5613 case RX_ACK_REQUESTED:
5614 xferSize = p->length + RX_HEADER_SIZE +
5615 call->conn->securityMaxTrailerSize;
5619 case RX_ACK_PING_RESPONSE:
5620 if (p) /* want the response to ping-request, not data send */
5622 clock_GetTime(&newTO);
5623 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5624 clock_Sub(&newTO, &call->pingRequestTime);
5625 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5629 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5636 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5637 ntohl(peer->host), ntohs(peer->port),
5638 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5639 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5642 /* Track only packets that are big enough. */
5643 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5647 /* absorb RTT data (in milliseconds) for these big packets */
5648 if (peer->smRtt == 0) {
5649 peer->smRtt = xferMs;
5651 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5652 if (!peer->smRtt) peer->smRtt = 1;
5655 if (peer->countDown) {
5659 peer->countDown = 10; /* recalculate only every so often */
5661 /* In practice, we can measure only the RTT for full packets,
5662 * because of the way Rx acks the data that it receives. (If it's
5663 * smaller than a full packet, it often gets implicitly acked
5664 * either by the call response (from a server) or by the next call
5665 * (from a client), and either case confuses transmission times
5666 * with processing times.) Therefore, replace the above
5667 * more-sophisticated processing with a simpler version, where the
5668 * smoothed RTT is kept for full-size packets, and the time to
5669 * transmit a windowful of full-size packets is simply RTT *
5670 * windowSize. Again, we take two steps:
5671 - ensure the timeout is large enough for a single packet's RTT;
5672 - ensure that the window is small enough to fit in the desired timeout.*/
5674 /* First, the timeout check. */
5675 minTime = peer->smRtt;
5676 /* Get a reasonable estimate for a timeout period */
5678 newTO.sec = minTime / 1000;
5679 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5681 /* Increase the timeout period so that we can always do at least
5682 * one packet exchange */
5683 if (clock_Gt(&newTO, &peer->timeout)) {
5685 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5686 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5687 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5690 peer->timeout = newTO;
5693 /* Now, get an estimate for the transmit window size. */
5694 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5695 /* Now, convert to the number of full packets that could fit in a
5696 * reasonable fraction of that interval */
5697 minTime /= (peer->smRtt << 1);
5698 xferSize = minTime; /* (make a copy) */
5700 /* Now clamp the size to reasonable bounds. */
5701 if (minTime <= 1) minTime = 1;
5702 else if (minTime > rx_Window) minTime = rx_Window;
5703 /* if (minTime != peer->maxWindow) {
5704 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5705 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5706 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5708 peer->maxWindow = minTime;
5709 elide... call->twind = minTime;
5713 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5714 * Discern this by calculating the timeout necessary for rx_Window
5716 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5717 /* calculate estimate for transmission interval in milliseconds */
5718 minTime = rx_Window * peer->smRtt;
5719 if (minTime < 1000) {
5720 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5721 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5722 peer->timeout.usec, peer->smRtt,
5725 newTO.sec = 0; /* cut back on timeout by half a second */
5726 newTO.usec = 500000;
5727 clock_Sub(&peer->timeout, &newTO);
5732 } /* end of rxi_ComputeRate */
5733 #endif /* ADAPT_WINDOW */
5741 /* Don't call this debugging routine directly; use dpf */
5743 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5744 a11, a12, a13, a14, a15)
5748 clock_GetTime(&now);
5749 fprintf(rx_Log, " %u.%.3u:", now.sec, now.usec/1000);
5750 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5757 * This function is used to process the rx_stats structure that is local
5758 * to a process as well as an rx_stats structure received from a remote
5759 * process (via rxdebug). Therefore, it needs to do minimal version
5762 void rx_PrintTheseStats (file, s, size, freePackets, version)
5765 int size; /* some idea of version control */
5766 afs_int32 freePackets;
5771 if (size != sizeof(struct rx_stats)) {
5773 "Unexpected size of stats structure: was %d, expected %d\n",
5774 size, sizeof(struct rx_stats));
5778 "rx stats: free packets %d, "
5783 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5785 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5786 s->receivePktAllocFailures,
5787 s->receiveCbufPktAllocFailures,
5788 s->sendPktAllocFailures,
5789 s->sendCbufPktAllocFailures,
5790 s->specialPktAllocFailures);
5793 "alloc-failures(rcv %d,send %d,ack %d)\n",
5794 s->receivePktAllocFailures,
5795 s->sendPktAllocFailures,
5796 s->specialPktAllocFailures);
5801 "bogusReads %d (last from host %x), "
5807 s->bogusPacketOnRead,
5810 s->noPacketBuffersOnRead,
5814 fprintf(file, " packets read: ");
5815 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5821 fprintf(file, "\n");
5824 " other read counters: data %d, "
5832 s->spuriousPacketsRead,
5833 s->ignorePacketDally);
5835 fprintf(file, " packets sent: ");
5836 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5842 fprintf(file, "\n");
5845 " other send counters: ack %d, "
5846 "data %d (not resends), "
5849 "acked&ignored %d\n",
5852 s->dataPacketsReSent,
5853 s->dataPacketsPushed,
5854 s->ignoreAckedPacket);
5857 " \t(these should be small) sendFailed %d, "
5862 if (s->nRttSamples) {
5864 " Average rtt is %0.3f, with %d samples\n",
5865 clock_Float(&s->totalRtt)/s->nRttSamples,
5869 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5870 clock_Float(&s->minRtt),
5871 clock_Float(&s->maxRtt));
5875 " %d server connections, "
5876 "%d client connections, "
5879 "%d free call structs\n",
5884 s->nFreeCallStructs);
5886 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5888 " %d clock updates\n",
5894 /* for backward compatibility */
5895 void rx_PrintStats(file)
5898 MUTEX_ENTER(&rx_stats_mutex);
5899 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5900 MUTEX_EXIT(&rx_stats_mutex);
5903 void rx_PrintPeerStats(file, peer)
5905 struct rx_peer *peer;
5910 "burst wait %u.%d.\n",
5914 peer->burstWait.sec,
5915 peer->burstWait.usec);
5919 "retry time %u.%06d, "
5930 "max in packet skew %d, "
5931 "max out packet skew %d\n",
5934 peer->outPacketSkew);
5937 #ifdef AFS_PTHREAD_ENV
5939 * This mutex protects the following static variables:
5943 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5944 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5946 #define LOCK_RX_DEBUG
5947 #define UNLOCK_RX_DEBUG
5948 #endif /* AFS_PTHREAD_ENV */
5950 static int MakeDebugCall(
5952 afs_uint32 remoteAddr,
5953 afs_uint16 remotePort,
5961 static afs_int32 counter = 100;
5963 struct rx_header theader;
5965 register afs_int32 code;
5967 struct sockaddr_in taddr, faddr;
5972 endTime = time(0) + 20; /* try for 20 seconds */
5976 tp = &tbuffer[sizeof(struct rx_header)];
5977 taddr.sin_family = AF_INET;
5978 taddr.sin_port = remotePort;
5979 taddr.sin_addr.s_addr = remoteAddr;
5981 memset(&theader, 0, sizeof(theader));
5982 theader.epoch = htonl(999);
5984 theader.callNumber = htonl(counter);
5987 theader.type = type;
5988 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5989 theader.serviceId = 0;
5991 bcopy(&theader, tbuffer, sizeof(theader));
5992 bcopy(inputData, tp, inputLength);
5993 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5994 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5996 /* see if there's a packet available */
5998 FD_SET(socket, &imask);
6001 code = select(socket+1, &imask, 0, 0, &tv);
6003 /* now receive a packet */
6004 faddrLen = sizeof(struct sockaddr_in);
6005 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6006 (struct sockaddr *) &faddr, &faddrLen);
6008 bcopy(tbuffer, &theader, sizeof(struct rx_header));
6009 if (counter == ntohl(theader.callNumber)) break;
6012 /* see if we've timed out */
6013 if (endTime < time(0)) return -1;
6015 code -= sizeof(struct rx_header);
6016 if (code > outputLength) code = outputLength;
6017 bcopy(tp, outputData, code);
6021 afs_int32 rx_GetServerDebug(
6023 afs_uint32 remoteAddr,
6024 afs_uint16 remotePort,
6025 struct rx_debugStats *stat,
6026 afs_uint32 *supportedValues
6029 struct rx_debugIn in;
6032 *supportedValues = 0;
6033 in.type = htonl(RX_DEBUGI_GETSTATS);
6036 rc = MakeDebugCall(socket,
6039 RX_PACKET_TYPE_DEBUG,
6046 * If the call was successful, fixup the version and indicate
6047 * what contents of the stat structure are valid.
6048 * Also do net to host conversion of fields here.
6052 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6053 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6055 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6056 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6058 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6059 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6061 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6062 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6064 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6065 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6067 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6068 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6070 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6071 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6074 stat->nFreePackets = ntohl(stat->nFreePackets);
6075 stat->packetReclaims = ntohl(stat->packetReclaims);
6076 stat->callsExecuted = ntohl(stat->callsExecuted);
6077 stat->nWaiting = ntohl(stat->nWaiting);
6078 stat->idleThreads = ntohl(stat->idleThreads);
6084 afs_int32 rx_GetServerStats(
6086 afs_uint32 remoteAddr,
6087 afs_uint16 remotePort,
6088 struct rx_stats *stat,
6089 afs_uint32 *supportedValues
6092 struct rx_debugIn in;
6093 afs_int32 *lp = (afs_int32 *) stat;
6098 * supportedValues is currently unused, but added to allow future
6099 * versioning of this function.
6102 *supportedValues = 0;
6103 in.type = htonl(RX_DEBUGI_RXSTATS);
6105 memset(stat, 0, sizeof(*stat));
6107 rc = MakeDebugCall(socket,
6110 RX_PACKET_TYPE_DEBUG,
6119 * Do net to host conversion here
6122 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6130 afs_int32 rx_GetServerVersion(
6132 afs_uint32 remoteAddr,
6133 afs_uint16 remotePort,
6134 size_t version_length,
6139 return MakeDebugCall(socket,
6142 RX_PACKET_TYPE_VERSION,
6149 afs_int32 rx_GetServerConnections(
6151 afs_uint32 remoteAddr,
6152 afs_uint16 remotePort,
6153 afs_int32 *nextConnection,
6155 afs_uint32 debugSupportedValues,
6156 struct rx_debugConn *conn,
6157 afs_uint32 *supportedValues
6160 struct rx_debugIn in;
6165 * supportedValues is currently unused, but added to allow future
6166 * versioning of this function.
6169 *supportedValues = 0;
6170 if (allConnections) {
6171 in.type = htonl(RX_DEBUGI_GETALLCONN);
6173 in.type = htonl(RX_DEBUGI_GETCONN);
6175 in.index = htonl(*nextConnection);
6176 memset(conn, 0, sizeof(*conn));
6178 rc = MakeDebugCall(socket,
6181 RX_PACKET_TYPE_DEBUG,
6188 *nextConnection += 1;
6191 * Convert old connection format to new structure.
6194 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6195 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6196 #define MOVEvL(a) (conn->a = vL->a)
6198 /* any old or unrecognized version... */
6199 for (i=0;i<RX_MAXCALLS;i++) {
6200 MOVEvL(callState[i]);
6201 MOVEvL(callMode[i]);
6202 MOVEvL(callFlags[i]);
6203 MOVEvL(callOther[i]);
6205 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6206 MOVEvL(secStats.type);
6207 MOVEvL(secStats.level);
6208 MOVEvL(secStats.flags);
6209 MOVEvL(secStats.expires);
6210 MOVEvL(secStats.packetsReceived);
6211 MOVEvL(secStats.packetsSent);
6212 MOVEvL(secStats.bytesReceived);
6213 MOVEvL(secStats.bytesSent);
6218 * Do net to host conversion here
6220 * I don't convert host or port since we are most likely
6221 * going to want these in NBO.
6223 conn->cid = ntohl(conn->cid);
6224 conn->serial = ntohl(conn->serial);
6225 for(i=0;i<RX_MAXCALLS;i++) {
6226 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6228 conn->error = ntohl(conn->error);
6229 conn->secStats.flags = ntohl(conn->secStats.flags);
6230 conn->secStats.expires = ntohl(conn->secStats.expires);
6231 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6232 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6233 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6234 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6235 conn->epoch = ntohl(conn->epoch);
6236 conn->natMTU = ntohl(conn->natMTU);
6242 afs_int32 rx_GetServerPeers(
6244 afs_uint32 remoteAddr,
6245 afs_uint16 remotePort,
6246 afs_int32 *nextPeer,
6247 afs_uint32 debugSupportedValues,
6248 struct rx_debugPeer *peer,
6249 afs_uint32 *supportedValues
6252 struct rx_debugIn in;
6256 * supportedValues is currently unused, but added to allow future
6257 * versioning of this function.
6260 *supportedValues = 0;
6261 in.type = htonl(RX_DEBUGI_GETPEER);
6262 in.index = htonl(*nextPeer);
6263 memset(peer, 0, sizeof(*peer));
6265 rc = MakeDebugCall(socket,
6268 RX_PACKET_TYPE_DEBUG,
6278 * Do net to host conversion here
6280 * I don't convert host or port since we are most likely
6281 * going to want these in NBO.
6283 peer->ifMTU = ntohs(peer->ifMTU);
6284 peer->idleWhen = ntohl(peer->idleWhen);
6285 peer->refCount = ntohs(peer->refCount);
6286 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6287 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6288 peer->rtt = ntohl(peer->rtt);
6289 peer->rtt_dev = ntohl(peer->rtt_dev);
6290 peer->timeout.sec = ntohl(peer->timeout.sec);
6291 peer->timeout.usec = ntohl(peer->timeout.usec);
6292 peer->nSent = ntohl(peer->nSent);
6293 peer->reSends = ntohl(peer->reSends);
6294 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6295 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6296 peer->rateFlag = ntohl(peer->rateFlag);
6297 peer->natMTU = ntohs(peer->natMTU);
6298 peer->maxMTU = ntohs(peer->maxMTU);
6299 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6300 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6301 peer->MTU = ntohs(peer->MTU);
6302 peer->cwind = ntohs(peer->cwind);
6303 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6304 peer->congestSeq = ntohs(peer->congestSeq);
6305 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6306 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6307 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6308 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6313 #endif /* RXDEBUG */
6315 void shutdown_rx(void)
6317 struct rx_serverQueueEntry *np;
6319 register struct rx_call *call;
6320 register struct rx_serverQueueEntry *sq;
6323 if (rxinit_status == 1) {
6325 return; /* Already shutdown. */
6330 #ifndef AFS_PTHREAD_ENV
6331 FD_ZERO(&rx_selectMask);
6332 #endif /* AFS_PTHREAD_ENV */
6333 rxi_dataQuota = RX_MAX_QUOTA;
6334 #ifndef AFS_PTHREAD_ENV
6336 #endif /* AFS_PTHREAD_ENV */
6339 #ifndef AFS_PTHREAD_ENV
6340 #ifndef AFS_USE_GETTIMEOFDAY
6342 #endif /* AFS_USE_GETTIMEOFDAY */
6343 #endif /* AFS_PTHREAD_ENV */
6345 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6346 call = queue_First(&rx_freeCallQueue, rx_call);
6348 rxi_Free(call, sizeof(struct rx_call));
6351 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6352 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6358 struct rx_peer **peer_ptr, **peer_end;
6359 for (peer_ptr = &rx_peerHashTable[0],
6360 peer_end = &rx_peerHashTable[rx_hashTableSize];
6361 peer_ptr < peer_end; peer_ptr++) {
6362 struct rx_peer *peer, *next;
6363 for (peer = *peer_ptr; peer; peer = next) {
6364 rx_interface_stat_p rpc_stat, nrpc_stat;
6366 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6367 rx_interface_stat)) {
6368 unsigned int num_funcs;
6369 if (!rpc_stat) break;
6370 queue_Remove(&rpc_stat->queue_header);
6371 queue_Remove(&rpc_stat->all_peers);
6372 num_funcs = rpc_stat->stats[0].func_total;
6373 space = sizeof(rx_interface_stat_t) +
6374 rpc_stat->stats[0].func_total *
6375 sizeof(rx_function_entry_v1_t);
6377 rxi_Free(rpc_stat, space);
6378 MUTEX_ENTER(&rx_rpc_stats);
6379 rxi_rpc_peer_stat_cnt -= num_funcs;
6380 MUTEX_EXIT(&rx_rpc_stats);
6384 MUTEX_ENTER(&rx_stats_mutex);
6385 rx_stats.nPeerStructs--;
6386 MUTEX_EXIT(&rx_stats_mutex);
6390 for (i = 0; i<RX_MAX_SERVICES; i++) {
6392 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6394 for (i = 0; i < rx_hashTableSize; i++) {
6395 register struct rx_connection *tc, *ntc;
6396 MUTEX_ENTER(&rx_connHashTable_lock);
6397 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6399 for (j = 0; j < RX_MAXCALLS; j++) {
6401 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6404 rxi_Free(tc, sizeof(*tc));
6406 MUTEX_EXIT(&rx_connHashTable_lock);
6409 MUTEX_ENTER(&freeSQEList_lock);
6411 while ((np = rx_FreeSQEList)) {
6412 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6413 MUTEX_DESTROY(&np->lock);
6414 rxi_Free(np, sizeof(*np));
6417 MUTEX_EXIT(&freeSQEList_lock);
6418 MUTEX_DESTROY(&freeSQEList_lock);
6419 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6420 MUTEX_DESTROY(&rx_connHashTable_lock);
6421 MUTEX_DESTROY(&rx_peerHashTable_lock);
6422 MUTEX_DESTROY(&rx_serverPool_lock);
6424 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6425 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6427 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6428 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6430 rxi_FreeAllPackets();
6432 MUTEX_ENTER(&rx_stats_mutex);
6433 rxi_dataQuota = RX_MAX_QUOTA;
6434 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6435 MUTEX_EXIT(&rx_stats_mutex);
6441 #ifdef RX_ENABLE_LOCKS
6442 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6444 if (!MUTEX_ISMINE(lockaddr))
6445 osi_Panic("Lock not held: %s", msg);
6447 #endif /* RX_ENABLE_LOCKS */
6452 * Routines to implement connection specific data.
6455 int rx_KeyCreate(rx_destructor_t rtn)
6458 MUTEX_ENTER(&rxi_keyCreate_lock);
6459 key = rxi_keyCreate_counter++;
6460 rxi_keyCreate_destructor = (rx_destructor_t *)
6461 realloc((void *)rxi_keyCreate_destructor,
6462 (key+1) * sizeof(rx_destructor_t));
6463 rxi_keyCreate_destructor[key] = rtn;
6464 MUTEX_EXIT(&rxi_keyCreate_lock);
6468 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6471 MUTEX_ENTER(&conn->conn_data_lock);
6472 if (!conn->specific) {
6473 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6474 for (i = 0 ; i < key ; i++)
6475 conn->specific[i] = NULL;
6476 conn->nSpecific = key+1;
6477 conn->specific[key] = ptr;
6478 } else if (key >= conn->nSpecific) {
6479 conn->specific = (void **)
6480 realloc(conn->specific,(key+1)*sizeof(void *));
6481 for (i = conn->nSpecific ; i < key ; i++)
6482 conn->specific[i] = NULL;
6483 conn->nSpecific = key+1;
6484 conn->specific[key] = ptr;
6486 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6487 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6488 conn->specific[key] = ptr;
6490 MUTEX_EXIT(&conn->conn_data_lock);
6493 void *rx_GetSpecific(struct rx_connection *conn, int key)
6496 MUTEX_ENTER(&conn->conn_data_lock);
6497 if (key >= conn->nSpecific)
6500 ptr = conn->specific[key];
6501 MUTEX_EXIT(&conn->conn_data_lock);
6505 #endif /* !KERNEL */
6508 * processStats is a queue used to store the statistics for the local
6509 * process. Its contents are similar to the contents of the rpcStats
6510 * queue on a rx_peer structure, but the actual data stored within
6511 * this queue contains totals across the lifetime of the process (assuming
6512 * the stats have not been reset) - unlike the per peer structures
6513 * which can come and go based upon the peer lifetime.
6516 static struct rx_queue processStats = {&processStats,&processStats};
6519 * peerStats is a queue used to store the statistics for all peer structs.
6520 * Its contents are the union of all the peer rpcStats queues.
6523 static struct rx_queue peerStats = {&peerStats,&peerStats};
6526 * rxi_monitor_processStats is used to turn process wide stat collection
6530 static int rxi_monitor_processStats = 0;
6533 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6536 static int rxi_monitor_peerStats = 0;
6539 * rxi_AddRpcStat - given all of the information for a particular rpc
6540 * call, create (if needed) and update the stat totals for the rpc.
6544 * IN stats - the queue of stats that will be updated with the new value
6546 * IN rxInterface - a unique number that identifies the rpc interface
6548 * IN currentFunc - the index of the function being invoked
6550 * IN totalFunc - the total number of functions in this interface
6552 * IN queueTime - the amount of time this function waited for a thread
6554 * IN execTime - the amount of time this function invocation took to execute
6556 * IN bytesSent - the number bytes sent by this invocation
6558 * IN bytesRcvd - the number bytes received by this invocation
6560 * IN isServer - if true, this invocation was made to a server
6562 * IN remoteHost - the ip address of the remote host
6564 * IN remotePort - the port of the remote host
6566 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6568 * INOUT counter - if a new stats structure is allocated, the counter will
6569 * be updated with the new number of allocated stat structures
6576 static int rxi_AddRpcStat(
6577 struct rx_queue *stats,
6578 afs_uint32 rxInterface,
6579 afs_uint32 currentFunc,
6580 afs_uint32 totalFunc,
6581 struct clock *queueTime,
6582 struct clock *execTime,
6583 afs_hyper_t *bytesSent,
6584 afs_hyper_t *bytesRcvd,
6586 afs_uint32 remoteHost,
6587 afs_uint32 remotePort,
6589 unsigned int *counter)
6592 rx_interface_stat_p rpc_stat, nrpc_stat;
6595 * See if there's already a structure for this interface
6598 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6599 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6600 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6604 * Didn't find a match so allocate a new structure and add it to the
6608 if (queue_IsEnd(stats, rpc_stat) ||
6609 (rpc_stat == NULL) ||
6610 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6611 (rpc_stat->stats[0].remote_is_server != isServer)) {
6615 space = sizeof(rx_interface_stat_t) + totalFunc *
6616 sizeof(rx_function_entry_v1_t);
6618 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6619 if (rpc_stat == NULL) {
6623 *counter += totalFunc;
6624 for(i=0;i<totalFunc;i++) {
6625 rpc_stat->stats[i].remote_peer = remoteHost;
6626 rpc_stat->stats[i].remote_port = remotePort;
6627 rpc_stat->stats[i].remote_is_server = isServer;
6628 rpc_stat->stats[i].interfaceId = rxInterface;
6629 rpc_stat->stats[i].func_total = totalFunc;
6630 rpc_stat->stats[i].func_index = i;
6631 hzero(rpc_stat->stats[i].invocations);
6632 hzero(rpc_stat->stats[i].bytes_sent);
6633 hzero(rpc_stat->stats[i].bytes_rcvd);
6634 rpc_stat->stats[i].queue_time_sum.sec = 0;
6635 rpc_stat->stats[i].queue_time_sum.usec = 0;
6636 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6637 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6638 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6639 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6640 rpc_stat->stats[i].queue_time_max.sec = 0;
6641 rpc_stat->stats[i].queue_time_max.usec = 0;
6642 rpc_stat->stats[i].execution_time_sum.sec = 0;
6643 rpc_stat->stats[i].execution_time_sum.usec = 0;
6644 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6645 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6646 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6647 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6648 rpc_stat->stats[i].execution_time_max.sec = 0;
6649 rpc_stat->stats[i].execution_time_max.usec = 0;
6651 queue_Prepend(stats, rpc_stat);
6652 if (addToPeerList) {
6653 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6658 * Increment the stats for this function
6661 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6662 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6663 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6664 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6665 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6666 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6667 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6669 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6670 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6672 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6673 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6674 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6675 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6677 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6678 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6686 * rx_IncrementTimeAndCount - increment the times and count for a particular
6691 * IN peer - the peer who invoked the rpc
6693 * IN rxInterface - a unique number that identifies the rpc interface
6695 * IN currentFunc - the index of the function being invoked
6697 * IN totalFunc - the total number of functions in this interface
6699 * IN queueTime - the amount of time this function waited for a thread
6701 * IN execTime - the amount of time this function invocation took to execute
6703 * IN bytesSent - the number bytes sent by this invocation
6705 * IN bytesRcvd - the number bytes received by this invocation
6707 * IN isServer - if true, this invocation was made to a server
6714 void rx_IncrementTimeAndCount(
6715 struct rx_peer *peer,
6716 afs_uint32 rxInterface,
6717 afs_uint32 currentFunc,
6718 afs_uint32 totalFunc,
6719 struct clock *queueTime,
6720 struct clock *execTime,
6721 afs_hyper_t *bytesSent,
6722 afs_hyper_t *bytesRcvd,
6726 MUTEX_ENTER(&rx_rpc_stats);
6727 MUTEX_ENTER(&peer->peer_lock);
6729 if (rxi_monitor_peerStats) {
6730 rxi_AddRpcStat(&peer->rpcStats,
6742 &rxi_rpc_peer_stat_cnt);
6745 if (rxi_monitor_processStats) {
6746 rxi_AddRpcStat(&processStats,
6758 &rxi_rpc_process_stat_cnt);
6761 MUTEX_EXIT(&peer->peer_lock);
6762 MUTEX_EXIT(&rx_rpc_stats);
6767 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6771 * IN callerVersion - the rpc stat version of the caller.
6773 * IN count - the number of entries to marshall.
6775 * IN stats - pointer to stats to be marshalled.
6777 * OUT ptr - Where to store the marshalled data.
6783 void rx_MarshallProcessRPCStats(
6784 afs_uint32 callerVersion,
6786 rx_function_entry_v1_t *stats,
6793 * We only support the first version
6795 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6796 *(ptr++) = stats->remote_peer;
6797 *(ptr++) = stats->remote_port;
6798 *(ptr++) = stats->remote_is_server;
6799 *(ptr++) = stats->interfaceId;
6800 *(ptr++) = stats->func_total;
6801 *(ptr++) = stats->func_index;
6802 *(ptr++) = hgethi(stats->invocations);
6803 *(ptr++) = hgetlo(stats->invocations);
6804 *(ptr++) = hgethi(stats->bytes_sent);
6805 *(ptr++) = hgetlo(stats->bytes_sent);
6806 *(ptr++) = hgethi(stats->bytes_rcvd);
6807 *(ptr++) = hgetlo(stats->bytes_rcvd);
6808 *(ptr++) = stats->queue_time_sum.sec;
6809 *(ptr++) = stats->queue_time_sum.usec;
6810 *(ptr++) = stats->queue_time_sum_sqr.sec;
6811 *(ptr++) = stats->queue_time_sum_sqr.usec;
6812 *(ptr++) = stats->queue_time_min.sec;
6813 *(ptr++) = stats->queue_time_min.usec;
6814 *(ptr++) = stats->queue_time_max.sec;
6815 *(ptr++) = stats->queue_time_max.usec;
6816 *(ptr++) = stats->execution_time_sum.sec;
6817 *(ptr++) = stats->execution_time_sum.usec;
6818 *(ptr++) = stats->execution_time_sum_sqr.sec;
6819 *(ptr++) = stats->execution_time_sum_sqr.usec;
6820 *(ptr++) = stats->execution_time_min.sec;
6821 *(ptr++) = stats->execution_time_min.usec;
6822 *(ptr++) = stats->execution_time_max.sec;
6823 *(ptr++) = stats->execution_time_max.usec;
6829 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6834 * IN callerVersion - the rpc stat version of the caller
6836 * OUT myVersion - the rpc stat version of this function
6838 * OUT clock_sec - local time seconds
6840 * OUT clock_usec - local time microseconds
6842 * OUT allocSize - the number of bytes allocated to contain stats
6844 * OUT statCount - the number stats retrieved from this process.
6846 * OUT stats - the actual stats retrieved from this process.
6850 * Returns void. If successful, stats will != NULL.
6853 int rx_RetrieveProcessRPCStats(
6854 afs_uint32 callerVersion,
6855 afs_uint32 *myVersion,
6856 afs_uint32 *clock_sec,
6857 afs_uint32 *clock_usec,
6859 afs_uint32 *statCount,
6870 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6873 * Check to see if stats are enabled
6876 MUTEX_ENTER(&rx_rpc_stats);
6877 if (!rxi_monitor_processStats) {
6878 MUTEX_EXIT(&rx_rpc_stats);
6882 clock_GetTime(&now);
6883 *clock_sec = now.sec;
6884 *clock_usec = now.usec;
6887 * Allocate the space based upon the caller version
6889 * If the client is at an older version than we are,
6890 * we return the statistic data in the older data format, but
6891 * we still return our version number so the client knows we
6892 * are maintaining more data than it can retrieve.
6895 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6896 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6897 *statCount = rxi_rpc_process_stat_cnt;
6900 * This can't happen yet, but in the future version changes
6901 * can be handled by adding additional code here
6905 if (space > (size_t) 0) {
6907 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6910 rx_interface_stat_p rpc_stat, nrpc_stat;
6913 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6914 rx_interface_stat)) {
6916 * Copy the data based upon the caller version
6918 rx_MarshallProcessRPCStats(callerVersion,
6919 rpc_stat->stats[0].func_total,
6920 rpc_stat->stats, &ptr);
6926 MUTEX_EXIT(&rx_rpc_stats);
6931 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6935 * IN callerVersion - the rpc stat version of the caller
6937 * OUT myVersion - the rpc stat version of this function
6939 * OUT clock_sec - local time seconds
6941 * OUT clock_usec - local time microseconds
6943 * OUT allocSize - the number of bytes allocated to contain stats
6945 * OUT statCount - the number of stats retrieved from the individual
6948 * OUT stats - the actual stats retrieved from the individual peer structures.
6952 * Returns void. If successful, stats will != NULL.
6955 int rx_RetrievePeerRPCStats(
6956 afs_uint32 callerVersion,
6957 afs_uint32 *myVersion,
6958 afs_uint32 *clock_sec,
6959 afs_uint32 *clock_usec,
6961 afs_uint32 *statCount,
6972 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6975 * Check to see if stats are enabled
6978 MUTEX_ENTER(&rx_rpc_stats);
6979 if (!rxi_monitor_peerStats) {
6980 MUTEX_EXIT(&rx_rpc_stats);
6984 clock_GetTime(&now);
6985 *clock_sec = now.sec;
6986 *clock_usec = now.usec;
6989 * Allocate the space based upon the caller version
6991 * If the client is at an older version than we are,
6992 * we return the statistic data in the older data format, but
6993 * we still return our version number so the client knows we
6994 * are maintaining more data than it can retrieve.
6997 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6998 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6999 *statCount = rxi_rpc_peer_stat_cnt;
7002 * This can't happen yet, but in the future version changes
7003 * can be handled by adding additional code here
7007 if (space > (size_t) 0) {
7009 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7012 rx_interface_stat_p rpc_stat, nrpc_stat;
7015 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7016 rx_interface_stat)) {
7018 * We have to fix the offset of rpc_stat since we are
7019 * keeping this structure on two rx_queues. The rx_queue
7020 * package assumes that the rx_queue member is the first
7021 * member of the structure. That is, rx_queue assumes that
7022 * any one item is only on one queue at a time. We are
7023 * breaking that assumption and so we have to do a little
7024 * math to fix our pointers.
7027 fix_offset = (char *) rpc_stat;
7028 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7029 rpc_stat = (rx_interface_stat_p) fix_offset;
7032 * Copy the data based upon the caller version
7034 rx_MarshallProcessRPCStats(callerVersion,
7035 rpc_stat->stats[0].func_total,
7036 rpc_stat->stats, &ptr);
7042 MUTEX_EXIT(&rx_rpc_stats);
7047 * rx_FreeRPCStats - free memory allocated by
7048 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7052 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7053 * rx_RetrievePeerRPCStats
7055 * IN allocSize - the number of bytes in stats.
7062 void rx_FreeRPCStats(
7066 rxi_Free(stats, allocSize);
7070 * rx_queryProcessRPCStats - see if process rpc stat collection is
7071 * currently enabled.
7077 * Returns 0 if stats are not enabled != 0 otherwise
7080 int rx_queryProcessRPCStats()
7083 MUTEX_ENTER(&rx_rpc_stats);
7084 rc = rxi_monitor_processStats;
7085 MUTEX_EXIT(&rx_rpc_stats);
7090 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7096 * Returns 0 if stats are not enabled != 0 otherwise
7099 int rx_queryPeerRPCStats()
7102 MUTEX_ENTER(&rx_rpc_stats);
7103 rc = rxi_monitor_peerStats;
7104 MUTEX_EXIT(&rx_rpc_stats);
7109 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7118 void rx_enableProcessRPCStats()
7120 MUTEX_ENTER(&rx_rpc_stats);
7121 rx_enable_stats = 1;
7122 rxi_monitor_processStats = 1;
7123 MUTEX_EXIT(&rx_rpc_stats);
7127 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7136 void rx_enablePeerRPCStats()
7138 MUTEX_ENTER(&rx_rpc_stats);
7139 rx_enable_stats = 1;
7140 rxi_monitor_peerStats = 1;
7141 MUTEX_EXIT(&rx_rpc_stats);
7145 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7154 void rx_disableProcessRPCStats()
7156 rx_interface_stat_p rpc_stat, nrpc_stat;
7159 MUTEX_ENTER(&rx_rpc_stats);
7162 * Turn off process statistics and if peer stats is also off, turn
7166 rxi_monitor_processStats = 0;
7167 if (rxi_monitor_peerStats == 0) {
7168 rx_enable_stats = 0;
7171 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7172 unsigned int num_funcs = 0;
7173 if (!rpc_stat) break;
7174 queue_Remove(rpc_stat);
7175 num_funcs = rpc_stat->stats[0].func_total;
7176 space = sizeof(rx_interface_stat_t) +
7177 rpc_stat->stats[0].func_total *
7178 sizeof(rx_function_entry_v1_t);
7180 rxi_Free(rpc_stat, space);
7181 rxi_rpc_process_stat_cnt -= num_funcs;
7183 MUTEX_EXIT(&rx_rpc_stats);
7187 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7196 void rx_disablePeerRPCStats()
7198 struct rx_peer **peer_ptr, **peer_end;
7201 MUTEX_ENTER(&rx_rpc_stats);
7204 * Turn off peer statistics and if process stats is also off, turn
7208 rxi_monitor_peerStats = 0;
7209 if (rxi_monitor_processStats == 0) {
7210 rx_enable_stats = 0;
7213 MUTEX_ENTER(&rx_peerHashTable_lock);
7214 for (peer_ptr = &rx_peerHashTable[0],
7215 peer_end = &rx_peerHashTable[rx_hashTableSize];
7216 peer_ptr < peer_end; peer_ptr++) {
7217 struct rx_peer *peer, *next, *prev;
7218 for (prev = peer = *peer_ptr; peer; peer = next) {
7220 code = MUTEX_TRYENTER(&peer->peer_lock);
7222 rx_interface_stat_p rpc_stat, nrpc_stat;
7224 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7225 rx_interface_stat)) {
7226 unsigned int num_funcs = 0;
7227 if (!rpc_stat) break;
7228 queue_Remove(&rpc_stat->queue_header);
7229 queue_Remove(&rpc_stat->all_peers);
7230 num_funcs = rpc_stat->stats[0].func_total;
7231 space = sizeof(rx_interface_stat_t) +
7232 rpc_stat->stats[0].func_total *
7233 sizeof(rx_function_entry_v1_t);
7235 rxi_Free(rpc_stat, space);
7236 rxi_rpc_peer_stat_cnt -= num_funcs;
7238 MUTEX_EXIT(&peer->peer_lock);
7239 if (prev == *peer_ptr) {
7251 MUTEX_EXIT(&rx_peerHashTable_lock);
7252 MUTEX_EXIT(&rx_rpc_stats);
7256 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7261 * IN clearFlag - flag indicating which stats to clear
7268 void rx_clearProcessRPCStats(
7269 afs_uint32 clearFlag)
7271 rx_interface_stat_p rpc_stat, nrpc_stat;
7273 MUTEX_ENTER(&rx_rpc_stats);
7275 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7276 unsigned int num_funcs = 0, i;
7277 num_funcs = rpc_stat->stats[0].func_total;
7278 for(i=0;i<num_funcs;i++) {
7279 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7280 hzero(rpc_stat->stats[i].invocations);
7282 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7283 hzero(rpc_stat->stats[i].bytes_sent);
7285 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7286 hzero(rpc_stat->stats[i].bytes_rcvd);
7288 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7289 rpc_stat->stats[i].queue_time_sum.sec = 0;
7290 rpc_stat->stats[i].queue_time_sum.usec = 0;
7292 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7293 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7294 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7296 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7297 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7298 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7300 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7301 rpc_stat->stats[i].queue_time_max.sec = 0;
7302 rpc_stat->stats[i].queue_time_max.usec = 0;
7304 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7305 rpc_stat->stats[i].execution_time_sum.sec = 0;
7306 rpc_stat->stats[i].execution_time_sum.usec = 0;
7308 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7309 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7310 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7312 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7313 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7314 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7316 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7317 rpc_stat->stats[i].execution_time_max.sec = 0;
7318 rpc_stat->stats[i].execution_time_max.usec = 0;
7323 MUTEX_EXIT(&rx_rpc_stats);
7327 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7332 * IN clearFlag - flag indicating which stats to clear
7339 void rx_clearPeerRPCStats(
7340 afs_uint32 clearFlag)
7342 rx_interface_stat_p rpc_stat, nrpc_stat;
7344 MUTEX_ENTER(&rx_rpc_stats);
7346 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7347 unsigned int num_funcs = 0, i;
7350 * We have to fix the offset of rpc_stat since we are
7351 * keeping this structure on two rx_queues. The rx_queue
7352 * package assumes that the rx_queue member is the first
7353 * member of the structure. That is, rx_queue assumes that
7354 * any one item is only on one queue at a time. We are
7355 * breaking that assumption and so we have to do a little
7356 * math to fix our pointers.
7359 fix_offset = (char *) rpc_stat;
7360 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7361 rpc_stat = (rx_interface_stat_p) fix_offset;
7363 num_funcs = rpc_stat->stats[0].func_total;
7364 for(i=0;i<num_funcs;i++) {
7365 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7366 hzero(rpc_stat->stats[i].invocations);
7368 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7369 hzero(rpc_stat->stats[i].bytes_sent);
7371 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7372 hzero(rpc_stat->stats[i].bytes_rcvd);
7374 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7375 rpc_stat->stats[i].queue_time_sum.sec = 0;
7376 rpc_stat->stats[i].queue_time_sum.usec = 0;
7378 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7379 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7380 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7382 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7383 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7384 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7386 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7387 rpc_stat->stats[i].queue_time_max.sec = 0;
7388 rpc_stat->stats[i].queue_time_max.usec = 0;
7390 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7391 rpc_stat->stats[i].execution_time_sum.sec = 0;
7392 rpc_stat->stats[i].execution_time_sum.usec = 0;
7394 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7395 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7396 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7398 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7399 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7400 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7402 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7403 rpc_stat->stats[i].execution_time_max.sec = 0;
7404 rpc_stat->stats[i].execution_time_max.usec = 0;
7409 MUTEX_EXIT(&rx_rpc_stats);
7413 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7414 * is authorized to enable/disable/clear RX statistics.
7416 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7418 void rx_SetRxStatUserOk(
7419 int (*proc)(struct rx_call *call))
7421 rxi_rxstat_userok = proc;
7424 int rx_RxStatUserOk(
7425 struct rx_call *call)
7427 if (!rxi_rxstat_userok)
7429 return rxi_rxstat_userok(call);