2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
21 #include <net/net_globals.h>
22 #endif /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
33 #undef RXDEBUG /* turn off debugging */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
38 #include "../afsint/afsint.h"
45 #endif /* AFS_ALPHA_ENV */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
60 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
69 # include <afs/param.h>
70 # include <sys/types.h>
77 # include <sys/socket.h>
78 # include <sys/file.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
95 extern afs_uint32 LWP_ThreadId();
98 int (*registerProgram)() = 0;
99 int (*swapNameProgram)() = 0;
101 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
103 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
104 afs_int32 rxi_start_in_error;
106 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
109 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
110 * currently allocated within rx. This number is used to allocate the
111 * memory required to return the statistics when queried.
114 static unsigned int rxi_rpc_peer_stat_cnt;
117 * rxi_rpc_process_stat_cnt counts the total number of local process stat
118 * structures currently allocated within rx. The number is used to allocate
119 * the memory required to return the statistics when queried.
122 static unsigned int rxi_rpc_process_stat_cnt;
124 #if !defined(offsetof)
125 #include <stddef.h> /* for definition of offsetof() */
128 #ifdef AFS_PTHREAD_ENV
132 * Use procedural initialization of mutexes/condition variables
136 extern pthread_mutex_t rxkad_stats_mutex;
137 extern pthread_mutex_t des_init_mutex;
138 extern pthread_mutex_t des_random_mutex;
139 extern pthread_mutex_t rx_clock_mutex;
140 extern pthread_mutex_t rxi_connCacheMutex;
141 extern pthread_mutex_t rx_event_mutex;
142 extern pthread_mutex_t osi_malloc_mutex;
143 extern pthread_mutex_t event_handler_mutex;
144 extern pthread_mutex_t listener_mutex;
145 extern pthread_mutex_t rx_if_init_mutex;
146 extern pthread_mutex_t rx_if_mutex;
147 extern pthread_mutex_t rxkad_client_uid_mutex;
148 extern pthread_mutex_t rxkad_random_mutex;
150 extern pthread_cond_t rx_event_handler_cond;
151 extern pthread_cond_t rx_listener_cond;
153 static pthread_mutex_t epoch_mutex;
154 static pthread_mutex_t rx_init_mutex;
155 static pthread_mutex_t rx_debug_mutex;
157 static void rxi_InitPthread(void) {
158 assert(pthread_mutex_init(&rx_clock_mutex,
159 (const pthread_mutexattr_t*)0)==0);
160 assert(pthread_mutex_init(&rxi_connCacheMutex,
161 (const pthread_mutexattr_t*)0)==0);
162 assert(pthread_mutex_init(&rx_init_mutex,
163 (const pthread_mutexattr_t*)0)==0);
164 assert(pthread_mutex_init(&epoch_mutex,
165 (const pthread_mutexattr_t*)0)==0);
166 assert(pthread_mutex_init(&rx_event_mutex,
167 (const pthread_mutexattr_t*)0)==0);
168 assert(pthread_mutex_init(&des_init_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&des_random_mutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&osi_malloc_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&event_handler_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&listener_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&rx_if_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&rx_if_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&rxkad_random_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&rxkad_stats_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_debug_mutex,
189 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_cond_init(&rx_event_handler_cond,
192 (const pthread_condattr_t*)0)==0);
193 assert(pthread_cond_init(&rx_listener_cond,
194 (const pthread_condattr_t*)0)==0);
195 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
198 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
199 #define INIT_PTHREAD_LOCKS \
200 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
202 * The rx_stats_mutex mutex protects the following global variables:
207 * rxi_lowConnRefCount
208 * rxi_lowPeerRefCount
217 #define INIT_PTHREAD_LOCKS
220 extern void rxi_DeleteCachedConnections(void);
223 /* Variables for handling the minProcs implementation. availProcs gives the
224 * number of threads available in the pool at this moment (not counting dudes
225 * executing right now). totalMin gives the total number of procs required
226 * for handling all minProcs requests. minDeficit is a dynamic variable
227 * tracking the # of procs required to satisfy all of the remaining minProcs
229 * For fine grain locking to work, the quota check and the reservation of
230 * a server thread has to come while rxi_availProcs and rxi_minDeficit
231 * are locked. To this end, the code has been modified under #ifdef
232 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
233 * same time. A new function, ReturnToServerPool() returns the allocation.
235 * A call can be on several queue's (but only one at a time). When
236 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
237 * that no one else is touching the queue. To this end, we store the address
238 * of the queue lock in the call structure (under the call lock) when we
239 * put the call on a queue, and we clear the call_queue_lock when the
240 * call is removed from a queue (once the call lock has been obtained).
241 * This allows rxi_ResetCall to safely synchronize with others wishing
242 * to manipulate the queue.
245 extern void rxi_Delay(int);
247 static int rxi_ServerThreadSelectingCall;
249 #ifdef RX_ENABLE_LOCKS
250 static afs_kmutex_t rx_rpc_stats;
251 void rxi_StartUnlocked();
254 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
255 ** pretty good that the next packet coming in is from the same connection
256 ** as the last packet, since we're send multiple packets in a transmit window.
258 struct rx_connection *rxLastConn = 0;
260 #ifdef RX_ENABLE_LOCKS
261 /* The locking hierarchy for rx fine grain locking is composed of five
263 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
264 * call->lock - locks call data fields.
265 * Most any other lock - these are all independent of each other.....
267 * rx_freeCallQueue_lock
269 * rx_connHashTable_lock
272 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
275 * peer_lock - locks peer data fields.
276 * conn_data_lock - that more than one thread is not updating a conn data
277 * field at the same time.
278 * Do we need a lock to protect the peer field in the conn structure?
279 * conn->peer was previously a constant for all intents and so has no
280 * lock protecting this field. The multihomed client delta introduced
281 * a RX code change : change the peer field in the connection structure
282 * to that remote inetrface from which the last packet for this
283 * connection was sent out. This may become an issue if further changes
286 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
287 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
289 /* rxdb_fileID is used to identify the lock location, along with line#. */
290 static int rxdb_fileID = RXDB_FILE_RX;
291 #endif /* RX_LOCKS_DB */
292 static void rxi_SetAcksInTransmitQueue();
293 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
294 #else /* RX_ENABLE_LOCKS */
295 #define SET_CALL_QUEUE_LOCK(C, L)
296 #define CLEAR_CALL_QUEUE_LOCK(C)
297 #endif /* RX_ENABLE_LOCKS */
298 static void rxi_DestroyConnectionNoLock();
299 void rxi_DestroyConnection();
300 void rxi_CleanupConnection();
301 struct rx_serverQueueEntry *rx_waitForPacket = 0;
303 /* ------------Exported Interfaces------------- */
305 /* This function allows rxkad to set the epoch to a suitably random number
306 * which rx_NewConnection will use in the future. The principle purpose is to
307 * get rxnull connections to use the same epoch as the rxkad connections do, at
308 * least once the first rxkad connection is established. This is important now
309 * that the host/port addresses aren't used in FindConnection: the uniqueness
310 * of epoch/cid matters and the start time won't do. */
312 #ifdef AFS_PTHREAD_ENV
314 * This mutex protects the following global variables:
318 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
319 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
323 #endif /* AFS_PTHREAD_ENV */
325 void rx_SetEpoch (epoch)
333 /* Initialize rx. A port number may be mentioned, in which case this
334 * becomes the default port number for any service installed later.
335 * If 0 is provided for the port number, a random port will be chosen
336 * by the kernel. Whether this will ever overlap anything in
337 * /etc/services is anybody's guess... Returns 0 on success, -1 on
339 static int rxinit_status = 1;
340 #ifdef AFS_PTHREAD_ENV
342 * This mutex protects the following global variables:
346 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
347 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT
353 int rx_Init(u_int port)
360 char *htable, *ptable;
363 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
364 __djgpp_set_quiet_socket(1);
371 if (rxinit_status == 0) {
372 tmp_status = rxinit_status;
374 return tmp_status; /* Already started; return previous error code. */
378 if (afs_winsockInit()<0)
384 * Initialize anything necessary to provide a non-premptive threading
387 rxi_InitializeThreadSupport();
390 /* Allocate and initialize a socket for client and perhaps server
393 rx_socket = rxi_GetUDPSocket((u_short)port);
394 if (rx_socket == OSI_NULLSOCKET) {
400 #ifdef RX_ENABLE_LOCKS
403 #endif /* RX_LOCKS_DB */
404 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
405 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
406 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
407 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
410 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
411 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
412 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
413 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
415 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
417 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
418 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
420 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
421 #endif /* KERNEL && AFS_HPUX110_ENV */
422 #else /* RX_ENABLE_LOCKS */
423 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
424 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
425 #endif /* AFS_GLOBAL_SUNLOCK */
426 #endif /* RX_ENABLE_LOCKS */
429 rx_connDeadTime = 12;
430 rx_tranquil = 0; /* reset flag */
431 bzero((char *)&rx_stats, sizeof(struct rx_stats));
433 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
434 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
435 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
436 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
437 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
438 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
440 /* Malloc up a bunch of packets & buffers */
442 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
443 queue_Init(&rx_freePacketQueue);
444 rxi_NeedMorePackets = FALSE;
445 rxi_MorePackets(rx_nPackets);
453 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
454 tv.tv_sec = clock_now.sec;
455 tv.tv_usec = clock_now.usec;
456 srand((unsigned int) tv.tv_usec);
463 #if defined(KERNEL) && !defined(UKERNEL)
464 /* Really, this should never happen in a real kernel */
467 struct sockaddr_in addr;
468 int addrlen = sizeof(addr);
469 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
473 rx_port = addr.sin_port;
476 rx_stats.minRtt.sec = 9999999;
478 rx_SetEpoch (tv.tv_sec | 0x80000000);
480 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
481 * will provide a randomer value. */
483 MUTEX_ENTER(&rx_stats_mutex);
484 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
485 MUTEX_EXIT(&rx_stats_mutex);
486 /* *Slightly* random start time for the cid. This is just to help
487 * out with the hashing function at the peer */
488 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
489 rx_connHashTable = (struct rx_connection **) htable;
490 rx_peerHashTable = (struct rx_peer **) ptable;
492 rx_lastAckDelay.sec = 0;
493 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
494 rx_hardAckDelay.sec = 0;
495 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
496 rx_softAckDelay.sec = 0;
497 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
499 rxevent_Init(20, rxi_ReScheduleEvents);
501 /* Initialize various global queues */
502 queue_Init(&rx_idleServerQueue);
503 queue_Init(&rx_incomingCallQueue);
504 queue_Init(&rx_freeCallQueue);
506 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
507 /* Initialize our list of usable IP addresses. */
511 /* Start listener process (exact function is dependent on the
512 * implementation environment--kernel or user space) */
517 tmp_status = rxinit_status = 0;
522 /* called with unincremented nRequestsRunning to see if it is OK to start
523 * a new thread in this service. Could be "no" for two reasons: over the
524 * max quota, or would prevent others from reaching their min quota.
526 #ifdef RX_ENABLE_LOCKS
527 /* This verion of QuotaOK reserves quota if it's ok while the
528 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
530 static int QuotaOK(aservice)
531 register struct rx_service *aservice;
533 /* check if over max quota */
534 if (aservice->nRequestsRunning >= aservice->maxProcs) {
538 /* under min quota, we're OK */
539 /* otherwise, can use only if there are enough to allow everyone
540 * to go to their min quota after this guy starts.
542 MUTEX_ENTER(&rx_stats_mutex);
543 if ((aservice->nRequestsRunning < aservice->minProcs) ||
544 (rxi_availProcs > rxi_minDeficit)) {
545 aservice->nRequestsRunning++;
546 /* just started call in minProcs pool, need fewer to maintain
548 if (aservice->nRequestsRunning <= aservice->minProcs)
551 MUTEX_EXIT(&rx_stats_mutex);
554 MUTEX_EXIT(&rx_stats_mutex);
558 static void ReturnToServerPool(aservice)
559 register struct rx_service *aservice;
561 aservice->nRequestsRunning--;
562 MUTEX_ENTER(&rx_stats_mutex);
563 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
565 MUTEX_EXIT(&rx_stats_mutex);
568 #else /* RX_ENABLE_LOCKS */
569 static QuotaOK(aservice)
570 register struct rx_service *aservice; {
572 /* under min quota, we're OK */
573 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
575 /* check if over max quota */
576 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
578 /* otherwise, can use only if there are enough to allow everyone
579 * to go to their min quota after this guy starts.
581 if (rxi_availProcs > rxi_minDeficit) rc = 1;
584 #endif /* RX_ENABLE_LOCKS */
587 /* Called by rx_StartServer to start up lwp's to service calls.
588 NExistingProcs gives the number of procs already existing, and which
589 therefore needn't be created. */
590 void rxi_StartServerProcs(nExistingProcs)
593 register struct rx_service *service;
598 /* For each service, reserve N processes, where N is the "minimum"
599 number of processes that MUST be able to execute a request in parallel,
600 at any time, for that process. Also compute the maximum difference
601 between any service's maximum number of processes that can run
602 (i.e. the maximum number that ever will be run, and a guarantee
603 that this number will run if other services aren't running), and its
604 minimum number. The result is the extra number of processes that
605 we need in order to provide the latter guarantee */
606 for (i=0; i<RX_MAX_SERVICES; i++) {
608 service = rx_services[i];
609 if (service == (struct rx_service *) 0) break;
610 nProcs += service->minProcs;
611 diff = service->maxProcs - service->minProcs;
612 if (diff > maxdiff) maxdiff = diff;
614 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
615 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
616 for (i = 0; i<nProcs; i++) {
617 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
622 /* This routine must be called if any services are exported. If the
623 * donateMe flag is set, the calling process is donated to the server
625 void rx_StartServer(donateMe)
627 register struct rx_service *service;
628 register int i, nProcs;
634 /* Start server processes, if necessary (exact function is dependent
635 * on the implementation environment--kernel or user space). DonateMe
636 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
637 * case, one less new proc will be created rx_StartServerProcs.
639 rxi_StartServerProcs(donateMe);
641 /* count up the # of threads in minProcs, and add set the min deficit to
642 * be that value, too.
644 for (i=0; i<RX_MAX_SERVICES; i++) {
645 service = rx_services[i];
646 if (service == (struct rx_service *) 0) break;
647 MUTEX_ENTER(&rx_stats_mutex);
648 rxi_totalMin += service->minProcs;
649 /* below works even if a thread is running, since minDeficit would
650 * still have been decremented and later re-incremented.
652 rxi_minDeficit += service->minProcs;
653 MUTEX_EXIT(&rx_stats_mutex);
656 /* Turn on reaping of idle server connections */
657 rxi_ReapConnections();
667 #ifdef AFS_PTHREAD_ENV
669 pid = (pid_t) pthread_self();
670 #else /* AFS_PTHREAD_ENV */
672 code = LWP_CurrentProcess(&pid);
673 #endif /* AFS_PTHREAD_ENV */
675 sprintf(name,"srv_%d", ++nProcs);
677 (*registerProgram)(pid, name);
679 #endif /* AFS_NT40_ENV */
680 rx_ServerProc(); /* Never returns */
685 /* Create a new client connection to the specified service, using the
686 * specified security object to implement the security model for this
688 struct rx_connection *
689 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
690 register afs_uint32 shost; /* Server host */
691 u_short sport; /* Server port */
692 u_short sservice; /* Server service id */
693 register struct rx_securityClass *securityObject;
694 int serviceSecurityIndex;
698 register struct rx_connection *conn;
703 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
704 shost, sport, sservice, securityObject, serviceSecurityIndex));
706 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
707 * the case of kmem_alloc? */
708 conn = rxi_AllocConnection();
709 #ifdef RX_ENABLE_LOCKS
710 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
711 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
712 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
716 MUTEX_ENTER(&rx_connHashTable_lock);
717 cid = (rx_nextCid += RX_MAXCALLS);
718 conn->type = RX_CLIENT_CONNECTION;
720 conn->epoch = rx_epoch;
721 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
722 conn->serviceId = sservice;
723 conn->securityObject = securityObject;
724 /* This doesn't work in all compilers with void (they're buggy), so fake it
726 conn->securityData = (VOID *) 0;
727 conn->securityIndex = serviceSecurityIndex;
728 rx_SetConnDeadTime(conn, rx_connDeadTime);
729 conn->ackRate = RX_FAST_ACK_RATE;
731 conn->specific = NULL;
732 conn->challengeEvent = (struct rxevent *)0;
733 conn->delayedAbortEvent = (struct rxevent *)0;
734 conn->abortCount = 0;
737 RXS_NewConnection(securityObject, conn);
738 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
740 conn->refCount++; /* no lock required since only this thread knows... */
741 conn->next = rx_connHashTable[hashindex];
742 rx_connHashTable[hashindex] = conn;
743 MUTEX_ENTER(&rx_stats_mutex);
744 rx_stats.nClientConns++;
745 MUTEX_EXIT(&rx_stats_mutex);
747 MUTEX_EXIT(&rx_connHashTable_lock);
753 void rx_SetConnDeadTime(conn, seconds)
754 register struct rx_connection *conn;
755 register int seconds;
757 /* The idea is to set the dead time to a value that allows several
758 * keepalives to be dropped without timing out the connection. */
759 conn->secondsUntilDead = MAX(seconds, 6);
760 conn->secondsUntilPing = conn->secondsUntilDead/6;
763 int rxi_lowPeerRefCount = 0;
764 int rxi_lowConnRefCount = 0;
767 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
768 * NOTE: must not be called with rx_connHashTable_lock held.
770 void rxi_CleanupConnection(conn)
771 struct rx_connection *conn;
775 /* Notify the service exporter, if requested, that this connection
776 * is being destroyed */
777 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
778 (*conn->service->destroyConnProc)(conn);
780 /* Notify the security module that this connection is being destroyed */
781 RXS_DestroyConnection(conn->securityObject, conn);
783 /* If this is the last connection using the rx_peer struct, set its
784 * idle time to now. rxi_ReapConnections will reap it if it's still
785 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
787 MUTEX_ENTER(&rx_peerHashTable_lock);
788 if (--conn->peer->refCount <= 0) {
789 conn->peer->idleWhen = clock_Sec();
790 if (conn->peer->refCount < 0) {
791 conn->peer->refCount = 0;
792 MUTEX_ENTER(&rx_stats_mutex);
793 rxi_lowPeerRefCount ++;
794 MUTEX_EXIT(&rx_stats_mutex);
797 MUTEX_EXIT(&rx_peerHashTable_lock);
799 MUTEX_ENTER(&rx_stats_mutex);
800 if (conn->type == RX_SERVER_CONNECTION)
801 rx_stats.nServerConns--;
803 rx_stats.nClientConns--;
804 MUTEX_EXIT(&rx_stats_mutex);
807 if (conn->specific) {
808 for (i = 0 ; i < conn->nSpecific ; i++) {
809 if (conn->specific[i] && rxi_keyCreate_destructor[i])
810 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
811 conn->specific[i] = NULL;
813 free(conn->specific);
815 conn->specific = NULL;
819 MUTEX_DESTROY(&conn->conn_call_lock);
820 MUTEX_DESTROY(&conn->conn_data_lock);
821 CV_DESTROY(&conn->conn_call_cv);
823 rxi_FreeConnection(conn);
826 /* Destroy the specified connection */
827 void rxi_DestroyConnection(conn)
828 register struct rx_connection *conn;
830 MUTEX_ENTER(&rx_connHashTable_lock);
831 rxi_DestroyConnectionNoLock(conn);
832 /* conn should be at the head of the cleanup list */
833 if (conn == rx_connCleanup_list) {
834 rx_connCleanup_list = rx_connCleanup_list->next;
835 MUTEX_EXIT(&rx_connHashTable_lock);
836 rxi_CleanupConnection(conn);
838 #ifdef RX_ENABLE_LOCKS
840 MUTEX_EXIT(&rx_connHashTable_lock);
842 #endif /* RX_ENABLE_LOCKS */
845 static void rxi_DestroyConnectionNoLock(conn)
846 register struct rx_connection *conn;
848 register struct rx_connection **conn_ptr;
849 register int havecalls = 0;
850 struct rx_packet *packet;
857 MUTEX_ENTER(&conn->conn_data_lock);
858 if (conn->refCount > 0)
861 MUTEX_ENTER(&rx_stats_mutex);
862 rxi_lowConnRefCount++;
863 MUTEX_EXIT(&rx_stats_mutex);
866 if (conn->refCount > 0) {
867 /* Busy; wait till the last guy before proceeding */
868 MUTEX_EXIT(&conn->conn_data_lock);
873 /* If the client previously called rx_NewCall, but it is still
874 * waiting, treat this as a running call, and wait to destroy the
875 * connection later when the call completes. */
876 if ((conn->type == RX_CLIENT_CONNECTION) &&
877 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
878 conn->flags |= RX_CONN_DESTROY_ME;
879 MUTEX_EXIT(&conn->conn_data_lock);
883 MUTEX_EXIT(&conn->conn_data_lock);
885 /* Check for extant references to this connection */
886 for (i = 0; i<RX_MAXCALLS; i++) {
887 register struct rx_call *call = conn->call[i];
890 if (conn->type == RX_CLIENT_CONNECTION) {
891 MUTEX_ENTER(&call->lock);
892 if (call->delayedAckEvent) {
893 /* Push the final acknowledgment out now--there
894 * won't be a subsequent call to acknowledge the
895 * last reply packets */
896 rxevent_Cancel(call->delayedAckEvent, call,
897 RX_CALL_REFCOUNT_DELAY);
898 rxi_AckAll((struct rxevent *)0, call, 0);
900 MUTEX_EXIT(&call->lock);
904 #ifdef RX_ENABLE_LOCKS
906 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
907 MUTEX_EXIT(&conn->conn_data_lock);
910 /* Someone is accessing a packet right now. */
914 #endif /* RX_ENABLE_LOCKS */
917 /* Don't destroy the connection if there are any call
918 * structures still in use */
919 MUTEX_ENTER(&conn->conn_data_lock);
920 conn->flags |= RX_CONN_DESTROY_ME;
921 MUTEX_EXIT(&conn->conn_data_lock);
926 if (conn->delayedAbortEvent) {
927 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
928 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
930 MUTEX_ENTER(&conn->conn_data_lock);
931 rxi_SendConnectionAbort(conn, packet, 0, 1);
932 MUTEX_EXIT(&conn->conn_data_lock);
933 rxi_FreePacket(packet);
937 /* Remove from connection hash table before proceeding */
938 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
939 conn->epoch, conn->type) ];
940 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
941 if (*conn_ptr == conn) {
942 *conn_ptr = conn->next;
946 /* if the conn that we are destroying was the last connection, then we
947 * clear rxLastConn as well */
948 if ( rxLastConn == conn )
951 /* Make sure the connection is completely reset before deleting it. */
952 /* get rid of pending events that could zap us later */
953 if (conn->challengeEvent) {
954 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
957 /* Add the connection to the list of destroyed connections that
958 * need to be cleaned up. This is necessary to avoid deadlocks
959 * in the routines we call to inform others that this connection is
960 * being destroyed. */
961 conn->next = rx_connCleanup_list;
962 rx_connCleanup_list = conn;
965 /* Externally available version */
966 void rx_DestroyConnection(conn)
967 register struct rx_connection *conn;
973 rxi_DestroyConnection (conn);
978 /* Start a new rx remote procedure call, on the specified connection.
979 * If wait is set to 1, wait for a free call channel; otherwise return
980 * 0. Maxtime gives the maximum number of seconds this call may take,
981 * after rx_MakeCall returns. After this time interval, a call to any
982 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
983 * For fine grain locking, we hold the conn_call_lock in order to
984 * to ensure that we don't get signalle after we found a call in an active
985 * state and before we go to sleep.
987 struct rx_call *rx_NewCall(conn)
988 register struct rx_connection *conn;
991 register struct rx_call *call;
992 struct clock queueTime;
996 dpf (("rx_MakeCall(conn %x)\n", conn));
999 clock_GetTime(&queueTime);
1001 MUTEX_ENTER(&conn->conn_call_lock);
1003 for (i=0; i<RX_MAXCALLS; i++) {
1004 call = conn->call[i];
1006 MUTEX_ENTER(&call->lock);
1007 if (call->state == RX_STATE_DALLY) {
1008 rxi_ResetCall(call, 0);
1009 (*call->callNumber)++;
1012 MUTEX_EXIT(&call->lock);
1015 call = rxi_NewCall(conn, i);
1016 MUTEX_ENTER(&call->lock);
1020 if (i < RX_MAXCALLS) {
1023 MUTEX_ENTER(&conn->conn_data_lock);
1024 conn->flags |= RX_CONN_MAKECALL_WAITING;
1025 MUTEX_EXIT(&conn->conn_data_lock);
1026 #ifdef RX_ENABLE_LOCKS
1027 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1033 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1035 /* Client is initially in send mode */
1036 call->state = RX_STATE_ACTIVE;
1037 call->mode = RX_MODE_SENDING;
1039 /* remember start time for call in case we have hard dead time limit */
1040 call->queueTime = queueTime;
1041 clock_GetTime(&call->startTime);
1042 hzero(call->bytesSent);
1043 hzero(call->bytesRcvd);
1045 /* Turn on busy protocol. */
1046 rxi_KeepAliveOn(call);
1048 MUTEX_EXIT(&call->lock);
1049 MUTEX_EXIT(&conn->conn_call_lock);
1053 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1054 /* Now, if TQ wasn't cleared earlier, do it now. */
1056 MUTEX_ENTER(&call->lock);
1057 while (call->flags & RX_CALL_TQ_BUSY) {
1058 call->flags |= RX_CALL_TQ_WAIT;
1059 #ifdef RX_ENABLE_LOCKS
1060 CV_WAIT(&call->cv_tq, &call->lock);
1061 #else /* RX_ENABLE_LOCKS */
1062 osi_rxSleep(&call->tq);
1063 #endif /* RX_ENABLE_LOCKS */
1065 if (call->flags & RX_CALL_TQ_CLEARME) {
1066 rxi_ClearTransmitQueue(call, 0);
1067 queue_Init(&call->tq);
1069 MUTEX_EXIT(&call->lock);
1071 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1076 rxi_HasActiveCalls(aconn)
1077 register struct rx_connection *aconn; {
1079 register struct rx_call *tcall;
1083 for(i=0; i<RX_MAXCALLS; i++) {
1084 if (tcall = aconn->call[i]) {
1085 if ((tcall->state == RX_STATE_ACTIVE)
1086 || (tcall->state == RX_STATE_PRECALL)) {
1096 rxi_GetCallNumberVector(aconn, aint32s)
1097 register struct rx_connection *aconn;
1098 register afs_int32 *aint32s; {
1100 register struct rx_call *tcall;
1104 for(i=0; i<RX_MAXCALLS; i++) {
1105 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1106 aint32s[i] = aconn->callNumber[i]+1;
1108 aint32s[i] = aconn->callNumber[i];
1114 rxi_SetCallNumberVector(aconn, aint32s)
1115 register struct rx_connection *aconn;
1116 register afs_int32 *aint32s; {
1118 register struct rx_call *tcall;
1122 for(i=0; i<RX_MAXCALLS; i++) {
1123 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1124 aconn->callNumber[i] = aint32s[i] - 1;
1126 aconn->callNumber[i] = aint32s[i];
1132 /* Advertise a new service. A service is named locally by a UDP port
1133 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1136 rx_NewService(port, serviceId, serviceName, securityObjects,
1137 nSecurityObjects, serviceProc)
1140 char *serviceName; /* Name for identification purposes (e.g. the
1141 * service name might be used for probing for
1143 struct rx_securityClass **securityObjects;
1144 int nSecurityObjects;
1145 afs_int32 (*serviceProc)();
1147 osi_socket socket = OSI_NULLSOCKET;
1148 register struct rx_service *tservice;
1154 if (serviceId == 0) {
1155 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1161 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1168 tservice = rxi_AllocService();
1171 for (i = 0; i<RX_MAX_SERVICES; i++) {
1172 register struct rx_service *service = rx_services[i];
1174 if (port == service->servicePort) {
1175 if (service->serviceId == serviceId) {
1176 /* The identical service has already been
1177 * installed; if the caller was intending to
1178 * change the security classes used by this
1179 * service, he/she loses. */
1180 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1183 rxi_FreeService(tservice);
1186 /* Different service, same port: re-use the socket
1187 * which is bound to the same port */
1188 socket = service->socket;
1191 if (socket == OSI_NULLSOCKET) {
1192 /* If we don't already have a socket (from another
1193 * service on same port) get a new one */
1194 socket = rxi_GetUDPSocket(port);
1195 if (socket == OSI_NULLSOCKET) {
1198 rxi_FreeService(tservice);
1203 service->socket = socket;
1204 service->servicePort = port;
1205 service->serviceId = serviceId;
1206 service->serviceName = serviceName;
1207 service->nSecurityObjects = nSecurityObjects;
1208 service->securityObjects = securityObjects;
1209 service->minProcs = 0;
1210 service->maxProcs = 1;
1211 service->idleDeadTime = 60;
1212 service->connDeadTime = rx_connDeadTime;
1213 service->executeRequestProc = serviceProc;
1214 rx_services[i] = service; /* not visible until now */
1222 rxi_FreeService(tservice);
1223 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1227 /* Generic request processing loop. This routine should be called
1228 * by the implementation dependent rx_ServerProc. If socketp is
1229 * non-null, it will be set to the file descriptor that this thread
1230 * is now listening on. If socketp is null, this routine will never
1232 void rxi_ServerProc(threadID, newcall, socketp)
1234 struct rx_call *newcall;
1235 osi_socket *socketp;
1237 register struct rx_call *call;
1238 register afs_int32 code;
1239 register struct rx_service *tservice = NULL;
1246 call = rx_GetCall(threadID, tservice, socketp);
1247 if (socketp && *socketp != OSI_NULLSOCKET) {
1248 /* We are now a listener thread */
1253 /* if server is restarting( typically smooth shutdown) then do not
1254 * allow any new calls.
1257 if ( rx_tranquil && (call != NULL) ) {
1262 MUTEX_ENTER(&call->lock);
1264 rxi_CallError(call, RX_RESTARTING);
1265 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1267 MUTEX_EXIT(&call->lock);
1273 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1274 #ifdef RX_ENABLE_LOCKS
1276 #endif /* RX_ENABLE_LOCKS */
1277 afs_termState = AFSOP_STOP_AFS;
1278 afs_osi_Wakeup(&afs_termState);
1279 #ifdef RX_ENABLE_LOCKS
1281 #endif /* RX_ENABLE_LOCKS */
1286 tservice = call->conn->service;
1288 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1290 code = call->conn->service->executeRequestProc(call);
1292 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1294 rx_EndCall(call, code);
1295 MUTEX_ENTER(&rx_stats_mutex);
1297 MUTEX_EXIT(&rx_stats_mutex);
1302 void rx_WakeupServerProcs()
1304 struct rx_serverQueueEntry *np, *tqp;
1309 MUTEX_ENTER(&rx_serverPool_lock);
1311 #ifdef RX_ENABLE_LOCKS
1312 if (rx_waitForPacket)
1313 CV_BROADCAST(&rx_waitForPacket->cv);
1314 #else /* RX_ENABLE_LOCKS */
1315 if (rx_waitForPacket)
1316 osi_rxWakeup(rx_waitForPacket);
1317 #endif /* RX_ENABLE_LOCKS */
1318 MUTEX_ENTER(&freeSQEList_lock);
1319 for (np = rx_FreeSQEList; np; np = tqp) {
1320 tqp = *(struct rx_serverQueueEntry **)np;
1321 #ifdef RX_ENABLE_LOCKS
1322 CV_BROADCAST(&np->cv);
1323 #else /* RX_ENABLE_LOCKS */
1325 #endif /* RX_ENABLE_LOCKS */
1327 MUTEX_EXIT(&freeSQEList_lock);
1328 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1329 #ifdef RX_ENABLE_LOCKS
1330 CV_BROADCAST(&np->cv);
1331 #else /* RX_ENABLE_LOCKS */
1333 #endif /* RX_ENABLE_LOCKS */
1335 MUTEX_EXIT(&rx_serverPool_lock);
1341 * One thing that seems to happen is that all the server threads get
1342 * tied up on some empty or slow call, and then a whole bunch of calls
1343 * arrive at once, using up the packet pool, so now there are more
1344 * empty calls. The most critical resources here are server threads
1345 * and the free packet pool. The "doreclaim" code seems to help in
1346 * general. I think that eventually we arrive in this state: there
1347 * are lots of pending calls which do have all their packets present,
1348 * so they won't be reclaimed, are multi-packet calls, so they won't
1349 * be scheduled until later, and thus are tying up most of the free
1350 * packet pool for a very long time.
1352 * 1. schedule multi-packet calls if all the packets are present.
1353 * Probably CPU-bound operation, useful to return packets to pool.
1354 * Do what if there is a full window, but the last packet isn't here?
1355 * 3. preserve one thread which *only* runs "best" calls, otherwise
1356 * it sleeps and waits for that type of call.
1357 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1358 * the current dataquota business is badly broken. The quota isn't adjusted
1359 * to reflect how many packets are presently queued for a running call.
1360 * So, when we schedule a queued call with a full window of packets queued
1361 * up for it, that *should* free up a window full of packets for other 2d-class
1362 * calls to be able to use from the packet pool. But it doesn't.
1364 * NB. Most of the time, this code doesn't run -- since idle server threads
1365 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1366 * as a new call arrives.
1368 /* Sleep until a call arrives. Returns a pointer to the call, ready
1369 * for an rx_Read. */
1370 #ifdef RX_ENABLE_LOCKS
1372 rx_GetCall(tno, cur_service, socketp)
1374 struct rx_service *cur_service;
1375 osi_socket *socketp;
1377 struct rx_serverQueueEntry *sq;
1378 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1379 struct rx_service *service;
1382 MUTEX_ENTER(&freeSQEList_lock);
1384 if (sq = rx_FreeSQEList) {
1385 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1386 MUTEX_EXIT(&freeSQEList_lock);
1387 } else { /* otherwise allocate a new one and return that */
1388 MUTEX_EXIT(&freeSQEList_lock);
1389 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1390 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1391 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1394 MUTEX_ENTER(&rx_serverPool_lock);
1395 if (cur_service != NULL) {
1396 ReturnToServerPool(cur_service);
1399 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1400 register struct rx_call *tcall, *ncall;
1401 choice2 = (struct rx_call *) 0;
1402 /* Scan for eligible incoming calls. A call is not eligible
1403 * if the maximum number of calls for its service type are
1404 * already executing */
1405 /* One thread will process calls FCFS (to prevent starvation),
1406 * while the other threads may run ahead looking for calls which
1407 * have all their input data available immediately. This helps
1408 * keep threads from blocking, waiting for data from the client. */
1409 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1410 service = tcall->conn->service;
1411 if (!QuotaOK(service)) {
1414 if (!tno || !tcall->queue_item_header.next ) {
1415 /* If we're thread 0, then we'll just use
1416 * this call. If we haven't been able to find an optimal
1417 * choice, and we're at the end of the list, then use a
1418 * 2d choice if one has been identified. Otherwise... */
1419 call = (choice2 ? choice2 : tcall);
1420 service = call->conn->service;
1421 } else if (!queue_IsEmpty(&tcall->rq)) {
1422 struct rx_packet *rp;
1423 rp = queue_First(&tcall->rq, rx_packet);
1424 if (rp->header.seq == 1) {
1425 if (!meltdown_1pkt ||
1426 (rp->header.flags & RX_LAST_PACKET)) {
1428 } else if (rxi_2dchoice && !choice2 &&
1429 !(tcall->flags & RX_CALL_CLEARED) &&
1430 (tcall->rprev > rxi_HardAckRate)) {
1432 } else rxi_md2cnt++;
1438 ReturnToServerPool(service);
1445 rxi_ServerThreadSelectingCall = 1;
1446 MUTEX_EXIT(&rx_serverPool_lock);
1447 MUTEX_ENTER(&call->lock);
1448 MUTEX_ENTER(&rx_serverPool_lock);
1450 if (queue_IsEmpty(&call->rq) ||
1451 queue_First(&call->rq, rx_packet)->header.seq != 1)
1452 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1454 CLEAR_CALL_QUEUE_LOCK(call);
1456 MUTEX_EXIT(&call->lock);
1457 ReturnToServerPool(service);
1458 rxi_ServerThreadSelectingCall = 0;
1459 CV_SIGNAL(&rx_serverPool_cv);
1460 call = (struct rx_call*)0;
1463 call->flags &= (~RX_CALL_WAIT_PROC);
1464 MUTEX_ENTER(&rx_stats_mutex);
1466 MUTEX_EXIT(&rx_stats_mutex);
1467 rxi_ServerThreadSelectingCall = 0;
1468 CV_SIGNAL(&rx_serverPool_cv);
1469 MUTEX_EXIT(&rx_serverPool_lock);
1473 /* If there are no eligible incoming calls, add this process
1474 * to the idle server queue, to wait for one */
1478 *socketp = OSI_NULLSOCKET;
1480 sq->socketp = socketp;
1481 queue_Append(&rx_idleServerQueue, sq);
1482 #ifndef AFS_AIX41_ENV
1483 rx_waitForPacket = sq;
1484 #endif /* AFS_AIX41_ENV */
1486 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1488 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1489 MUTEX_EXIT(&rx_serverPool_lock);
1490 return (struct rx_call *)0;
1493 } while (!(call = sq->newcall) &&
1494 !(socketp && *socketp != OSI_NULLSOCKET));
1495 MUTEX_EXIT(&rx_serverPool_lock);
1497 MUTEX_ENTER(&call->lock);
1503 MUTEX_ENTER(&freeSQEList_lock);
1504 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1505 rx_FreeSQEList = sq;
1506 MUTEX_EXIT(&freeSQEList_lock);
1509 clock_GetTime(&call->startTime);
1510 call->state = RX_STATE_ACTIVE;
1511 call->mode = RX_MODE_RECEIVING;
1513 rxi_calltrace(RX_CALL_START, call);
1514 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1515 call->conn->service->servicePort,
1516 call->conn->service->serviceId, call));
1518 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1519 MUTEX_EXIT(&call->lock);
1521 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1526 #else /* RX_ENABLE_LOCKS */
1528 rx_GetCall(tno, cur_service, socketp)
1530 struct rx_service *cur_service;
1531 osi_socket *socketp;
1533 struct rx_serverQueueEntry *sq;
1534 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1535 struct rx_service *service;
1540 MUTEX_ENTER(&freeSQEList_lock);
1542 if (sq = rx_FreeSQEList) {
1543 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1544 MUTEX_EXIT(&freeSQEList_lock);
1545 } else { /* otherwise allocate a new one and return that */
1546 MUTEX_EXIT(&freeSQEList_lock);
1547 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1548 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1549 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1551 MUTEX_ENTER(&sq->lock);
1553 if (cur_service != NULL) {
1554 cur_service->nRequestsRunning--;
1555 if (cur_service->nRequestsRunning < cur_service->minProcs)
1559 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1560 register struct rx_call *tcall, *ncall;
1561 /* Scan for eligible incoming calls. A call is not eligible
1562 * if the maximum number of calls for its service type are
1563 * already executing */
1564 /* One thread will process calls FCFS (to prevent starvation),
1565 * while the other threads may run ahead looking for calls which
1566 * have all their input data available immediately. This helps
1567 * keep threads from blocking, waiting for data from the client. */
1568 choice2 = (struct rx_call *) 0;
1569 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1570 service = tcall->conn->service;
1571 if (QuotaOK(service)) {
1572 if (!tno || !tcall->queue_item_header.next ) {
1573 /* If we're thread 0, then we'll just use
1574 * this call. If we haven't been able to find an optimal
1575 * choice, and we're at the end of the list, then use a
1576 * 2d choice if one has been identified. Otherwise... */
1577 call = (choice2 ? choice2 : tcall);
1578 service = call->conn->service;
1579 } else if (!queue_IsEmpty(&tcall->rq)) {
1580 struct rx_packet *rp;
1581 rp = queue_First(&tcall->rq, rx_packet);
1582 if (rp->header.seq == 1
1583 && (!meltdown_1pkt ||
1584 (rp->header.flags & RX_LAST_PACKET))) {
1586 } else if (rxi_2dchoice && !choice2 &&
1587 !(tcall->flags & RX_CALL_CLEARED) &&
1588 (tcall->rprev > rxi_HardAckRate)) {
1590 } else rxi_md2cnt++;
1600 /* we can't schedule a call if there's no data!!! */
1601 /* send an ack if there's no data, if we're missing the
1602 * first packet, or we're missing something between first
1603 * and last -- there's a "hole" in the incoming data. */
1604 if (queue_IsEmpty(&call->rq) ||
1605 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1606 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1607 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1609 call->flags &= (~RX_CALL_WAIT_PROC);
1610 service->nRequestsRunning++;
1611 /* just started call in minProcs pool, need fewer to maintain
1613 if (service->nRequestsRunning <= service->minProcs)
1617 /* MUTEX_EXIT(&call->lock); */
1620 /* If there are no eligible incoming calls, add this process
1621 * to the idle server queue, to wait for one */
1624 *socketp = OSI_NULLSOCKET;
1626 sq->socketp = socketp;
1627 queue_Append(&rx_idleServerQueue, sq);
1631 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1634 return (struct rx_call *)0;
1637 } while (!(call = sq->newcall) &&
1638 !(socketp && *socketp != OSI_NULLSOCKET));
1640 MUTEX_EXIT(&sq->lock);
1642 MUTEX_ENTER(&freeSQEList_lock);
1643 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1644 rx_FreeSQEList = sq;
1645 MUTEX_EXIT(&freeSQEList_lock);
1648 clock_GetTime(&call->startTime);
1649 call->state = RX_STATE_ACTIVE;
1650 call->mode = RX_MODE_RECEIVING;
1652 rxi_calltrace(RX_CALL_START, call);
1653 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1654 call->conn->service->servicePort,
1655 call->conn->service->serviceId, call));
1657 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1665 #endif /* RX_ENABLE_LOCKS */
1669 /* Establish a procedure to be called when a packet arrives for a
1670 * call. This routine will be called at most once after each call,
1671 * and will also be called if there is an error condition on the or
1672 * the call is complete. Used by multi rx to build a selection
1673 * function which determines which of several calls is likely to be a
1674 * good one to read from.
1675 * NOTE: the way this is currently implemented it is probably only a
1676 * good idea to (1) use it immediately after a newcall (clients only)
1677 * and (2) only use it once. Other uses currently void your warranty
1679 void rx_SetArrivalProc(call, proc, handle, arg)
1680 register struct rx_call *call;
1681 register VOID (*proc)();
1682 register VOID *handle;
1685 call->arrivalProc = proc;
1686 call->arrivalProcHandle = handle;
1687 call->arrivalProcArg = arg;
1690 /* Call is finished (possibly prematurely). Return rc to the peer, if
1691 * appropriate, and return the final error code from the conversation
1694 afs_int32 rx_EndCall(call, rc)
1695 register struct rx_call *call;
1698 register struct rx_connection *conn = call->conn;
1699 register struct rx_service *service;
1700 register struct rx_packet *tp; /* Temporary packet pointer */
1701 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1705 dpf(("rx_EndCall(call %x)\n", call));
1709 MUTEX_ENTER(&call->lock);
1711 if (rc == 0 && call->error == 0) {
1712 call->abortCode = 0;
1713 call->abortCount = 0;
1716 call->arrivalProc = (VOID (*)()) 0;
1717 if (rc && call->error == 0) {
1718 rxi_CallError(call, rc);
1719 /* Send an abort message to the peer if this error code has
1720 * only just been set. If it was set previously, assume the
1721 * peer has already been sent the error code or will request it
1723 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1725 if (conn->type == RX_SERVER_CONNECTION) {
1726 /* Make sure reply or at least dummy reply is sent */
1727 if (call->mode == RX_MODE_RECEIVING) {
1728 rxi_WriteProc(call, 0, 0);
1730 if (call->mode == RX_MODE_SENDING) {
1731 rxi_FlushWrite(call);
1733 service = conn->service;
1734 rxi_calltrace(RX_CALL_END, call);
1735 /* Call goes to hold state until reply packets are acknowledged */
1736 if (call->tfirst + call->nSoftAcked < call->tnext) {
1737 call->state = RX_STATE_HOLD;
1739 call->state = RX_STATE_DALLY;
1740 rxi_ClearTransmitQueue(call, 0);
1741 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1742 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1745 else { /* Client connection */
1747 /* Make sure server receives input packets, in the case where
1748 * no reply arguments are expected */
1749 if ((call->mode == RX_MODE_SENDING)
1750 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1751 (void) rxi_ReadProc(call, &dummy, 1);
1753 /* We need to release the call lock since it's lower than the
1754 * conn_call_lock and we don't want to hold the conn_call_lock
1755 * over the rx_ReadProc call. The conn_call_lock needs to be held
1756 * here for the case where rx_NewCall is perusing the calls on
1757 * the connection structure. We don't want to signal until
1758 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1759 * have checked this call, found it active and by the time it
1760 * goes to sleep, will have missed the signal.
1762 MUTEX_EXIT(&call->lock);
1763 MUTEX_ENTER(&conn->conn_call_lock);
1764 MUTEX_ENTER(&call->lock);
1765 MUTEX_ENTER(&conn->conn_data_lock);
1766 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1767 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1768 MUTEX_EXIT(&conn->conn_data_lock);
1769 #ifdef RX_ENABLE_LOCKS
1770 CV_BROADCAST(&conn->conn_call_cv);
1775 #ifdef RX_ENABLE_LOCKS
1777 MUTEX_EXIT(&conn->conn_data_lock);
1779 #endif /* RX_ENABLE_LOCKS */
1780 call->state = RX_STATE_DALLY;
1782 error = call->error;
1784 /* currentPacket, nLeft, and NFree must be zeroed here, because
1785 * ResetCall cannot: ResetCall may be called at splnet(), in the
1786 * kernel version, and may interrupt the macros rx_Read or
1787 * rx_Write, which run at normal priority for efficiency. */
1788 if (call->currentPacket) {
1789 rxi_FreePacket(call->currentPacket);
1790 call->currentPacket = (struct rx_packet *) 0;
1791 call->nLeft = call->nFree = call->curlen = 0;
1794 call->nLeft = call->nFree = call->curlen = 0;
1796 /* Free any packets from the last call to ReadvProc/WritevProc */
1797 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1802 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1803 MUTEX_EXIT(&call->lock);
1804 if (conn->type == RX_CLIENT_CONNECTION)
1805 MUTEX_EXIT(&conn->conn_call_lock);
1809 * Map errors to the local host's errno.h format.
1811 error = ntoh_syserr_conv(error);
1815 #if !defined(KERNEL)
1817 /* Call this routine when shutting down a server or client (especially
1818 * clients). This will allow Rx to gracefully garbage collect server
1819 * connections, and reduce the number of retries that a server might
1820 * make to a dead client.
1821 * This is not quite right, since some calls may still be ongoing and
1822 * we can't lock them to destroy them. */
1823 void rx_Finalize() {
1824 register struct rx_connection **conn_ptr, **conn_end;
1828 if (rxinit_status == 1) {
1830 return; /* Already shutdown. */
1832 rxi_DeleteCachedConnections();
1833 if (rx_connHashTable) {
1834 MUTEX_ENTER(&rx_connHashTable_lock);
1835 for (conn_ptr = &rx_connHashTable[0],
1836 conn_end = &rx_connHashTable[rx_hashTableSize];
1837 conn_ptr < conn_end; conn_ptr++) {
1838 struct rx_connection *conn, *next;
1839 for (conn = *conn_ptr; conn; conn = next) {
1841 if (conn->type == RX_CLIENT_CONNECTION) {
1842 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1844 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1845 #ifdef RX_ENABLE_LOCKS
1846 rxi_DestroyConnectionNoLock(conn);
1847 #else /* RX_ENABLE_LOCKS */
1848 rxi_DestroyConnection(conn);
1849 #endif /* RX_ENABLE_LOCKS */
1853 #ifdef RX_ENABLE_LOCKS
1854 while (rx_connCleanup_list) {
1855 struct rx_connection *conn;
1856 conn = rx_connCleanup_list;
1857 rx_connCleanup_list = rx_connCleanup_list->next;
1858 MUTEX_EXIT(&rx_connHashTable_lock);
1859 rxi_CleanupConnection(conn);
1860 MUTEX_ENTER(&rx_connHashTable_lock);
1862 MUTEX_EXIT(&rx_connHashTable_lock);
1863 #endif /* RX_ENABLE_LOCKS */
1872 /* if we wakeup packet waiter too often, can get in loop with two
1873 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1875 rxi_PacketsUnWait() {
1877 if (!rx_waitingForPackets) {
1881 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1882 return; /* still over quota */
1885 rx_waitingForPackets = 0;
1886 #ifdef RX_ENABLE_LOCKS
1887 CV_BROADCAST(&rx_waitingForPackets_cv);
1889 osi_rxWakeup(&rx_waitingForPackets);
1895 /* ------------------Internal interfaces------------------------- */
1897 /* Return this process's service structure for the
1898 * specified socket and service */
1899 struct rx_service *rxi_FindService(socket, serviceId)
1900 register osi_socket socket;
1901 register u_short serviceId;
1903 register struct rx_service **sp;
1904 for (sp = &rx_services[0]; *sp; sp++) {
1905 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1911 /* Allocate a call structure, for the indicated channel of the
1912 * supplied connection. The mode and state of the call must be set by
1914 struct rx_call *rxi_NewCall(conn, channel)
1915 register struct rx_connection *conn;
1916 register int channel;
1918 register struct rx_call *call;
1919 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1920 register struct rx_call *cp; /* Call pointer temp */
1921 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1922 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1924 /* Grab an existing call structure, or allocate a new one.
1925 * Existing call structures are assumed to have been left reset by
1927 MUTEX_ENTER(&rx_freeCallQueue_lock);
1929 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1931 * EXCEPT that the TQ might not yet be cleared out.
1932 * Skip over those with in-use TQs.
1935 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1936 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1942 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1943 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1944 call = queue_First(&rx_freeCallQueue, rx_call);
1945 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1947 MUTEX_ENTER(&rx_stats_mutex);
1948 rx_stats.nFreeCallStructs--;
1949 MUTEX_EXIT(&rx_stats_mutex);
1950 MUTEX_EXIT(&rx_freeCallQueue_lock);
1951 MUTEX_ENTER(&call->lock);
1952 CLEAR_CALL_QUEUE_LOCK(call);
1953 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1954 /* Now, if TQ wasn't cleared earlier, do it now. */
1955 if (call->flags & RX_CALL_TQ_CLEARME) {
1956 rxi_ClearTransmitQueue(call, 0);
1957 queue_Init(&call->tq);
1959 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1960 /* Bind the call to its connection structure */
1962 rxi_ResetCall(call, 1);
1965 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1967 MUTEX_EXIT(&rx_freeCallQueue_lock);
1968 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1969 MUTEX_ENTER(&call->lock);
1970 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1971 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1972 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1974 MUTEX_ENTER(&rx_stats_mutex);
1975 rx_stats.nCallStructs++;
1976 MUTEX_EXIT(&rx_stats_mutex);
1977 /* Initialize once-only items */
1978 queue_Init(&call->tq);
1979 queue_Init(&call->rq);
1980 queue_Init(&call->iovq);
1981 /* Bind the call to its connection structure (prereq for reset) */
1983 rxi_ResetCall(call, 1);
1985 call->channel = channel;
1986 call->callNumber = &conn->callNumber[channel];
1987 /* Note that the next expected call number is retained (in
1988 * conn->callNumber[i]), even if we reallocate the call structure
1990 conn->call[channel] = call;
1991 /* if the channel's never been used (== 0), we should start at 1, otherwise
1992 the call number is valid from the last time this channel was used */
1993 if (*call->callNumber == 0) *call->callNumber = 1;
1995 MUTEX_EXIT(&call->lock);
1999 /* A call has been inactive long enough that so we can throw away
2000 * state, including the call structure, which is placed on the call
2002 * Call is locked upon entry.
2004 #ifdef RX_ENABLE_LOCKS
2005 void rxi_FreeCall(call, haveCTLock)
2006 int haveCTLock; /* Set if called from rxi_ReapConnections */
2007 #else /* RX_ENABLE_LOCKS */
2008 void rxi_FreeCall(call)
2009 #endif /* RX_ENABLE_LOCKS */
2010 register struct rx_call *call;
2012 register int channel = call->channel;
2013 register struct rx_connection *conn = call->conn;
2016 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2017 (*call->callNumber)++;
2018 rxi_ResetCall(call, 0);
2019 call->conn->call[channel] = (struct rx_call *) 0;
2021 MUTEX_ENTER(&rx_freeCallQueue_lock);
2022 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2023 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2024 /* A call may be free even though its transmit queue is still in use.
2025 * Since we search the call list from head to tail, put busy calls at
2026 * the head of the list, and idle calls at the tail.
2028 if (call->flags & RX_CALL_TQ_BUSY)
2029 queue_Prepend(&rx_freeCallQueue, call);
2031 queue_Append(&rx_freeCallQueue, call);
2032 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2033 queue_Append(&rx_freeCallQueue, call);
2034 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2035 MUTEX_ENTER(&rx_stats_mutex);
2036 rx_stats.nFreeCallStructs++;
2037 MUTEX_EXIT(&rx_stats_mutex);
2039 MUTEX_EXIT(&rx_freeCallQueue_lock);
2041 /* Destroy the connection if it was previously slated for
2042 * destruction, i.e. the Rx client code previously called
2043 * rx_DestroyConnection (client connections), or
2044 * rxi_ReapConnections called the same routine (server
2045 * connections). Only do this, however, if there are no
2046 * outstanding calls. Note that for fine grain locking, there appears
2047 * to be a deadlock in that rxi_FreeCall has a call locked and
2048 * DestroyConnectionNoLock locks each call in the conn. But note a
2049 * few lines up where we have removed this call from the conn.
2050 * If someone else destroys a connection, they either have no
2051 * call lock held or are going through this section of code.
2053 if (conn->flags & RX_CONN_DESTROY_ME) {
2054 MUTEX_ENTER(&conn->conn_data_lock);
2056 MUTEX_EXIT(&conn->conn_data_lock);
2057 #ifdef RX_ENABLE_LOCKS
2059 rxi_DestroyConnectionNoLock(conn);
2061 rxi_DestroyConnection(conn);
2062 #else /* RX_ENABLE_LOCKS */
2063 rxi_DestroyConnection(conn);
2064 #endif /* RX_ENABLE_LOCKS */
2068 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2069 char *rxi_Alloc(size)
2070 register size_t size;
2074 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2075 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2078 int glockOwner = ISAFS_GLOCK();
2082 MUTEX_ENTER(&rx_stats_mutex);
2083 rxi_Alloccnt++; rxi_Allocsize += size;
2084 MUTEX_EXIT(&rx_stats_mutex);
2085 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2086 if (size > AFS_SMALLOCSIZ) {
2087 p = (char *) osi_AllocMediumSpace(size);
2089 p = (char *) osi_AllocSmall(size, 1);
2090 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2095 p = (char *) osi_Alloc(size);
2097 if (!p) osi_Panic("rxi_Alloc error");
2102 void rxi_Free(addr, size)
2104 register size_t size;
2106 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2107 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2110 int glockOwner = ISAFS_GLOCK();
2114 MUTEX_ENTER(&rx_stats_mutex);
2115 rxi_Alloccnt--; rxi_Allocsize -= size;
2116 MUTEX_EXIT(&rx_stats_mutex);
2117 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2118 if (size > AFS_SMALLOCSIZ)
2119 osi_FreeMediumSpace(addr);
2121 osi_FreeSmall(addr);
2122 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2127 osi_Free(addr, size);
2131 /* Find the peer process represented by the supplied (host,port)
2132 * combination. If there is no appropriate active peer structure, a
2133 * new one will be allocated and initialized
2134 * The origPeer, if set, is a pointer to a peer structure on which the
2135 * refcount will be be decremented. This is used to replace the peer
2136 * structure hanging off a connection structure */
2137 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2138 register afs_uint32 host;
2139 register u_short port;
2140 struct rx_peer *origPeer;
2143 register struct rx_peer *pp;
2145 hashIndex = PEER_HASH(host, port);
2146 MUTEX_ENTER(&rx_peerHashTable_lock);
2147 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2148 if ((pp->host == host) && (pp->port == port)) break;
2152 pp = rxi_AllocPeer(); /* This bzero's *pp */
2153 pp->host = host; /* set here or in InitPeerParams is zero */
2155 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2156 queue_Init(&pp->congestionQueue);
2157 queue_Init(&pp->rpcStats);
2158 pp->next = rx_peerHashTable[hashIndex];
2159 rx_peerHashTable[hashIndex] = pp;
2160 rxi_InitPeerParams(pp);
2161 MUTEX_ENTER(&rx_stats_mutex);
2162 rx_stats.nPeerStructs++;
2163 MUTEX_EXIT(&rx_stats_mutex);
2170 origPeer->refCount--;
2171 MUTEX_EXIT(&rx_peerHashTable_lock);
2176 /* Find the connection at (host, port) started at epoch, and with the
2177 * given connection id. Creates the server connection if necessary.
2178 * The type specifies whether a client connection or a server
2179 * connection is desired. In both cases, (host, port) specify the
2180 * peer's (host, pair) pair. Client connections are not made
2181 * automatically by this routine. The parameter socket gives the
2182 * socket descriptor on which the packet was received. This is used,
2183 * in the case of server connections, to check that *new* connections
2184 * come via a valid (port, serviceId). Finally, the securityIndex
2185 * parameter must match the existing index for the connection. If a
2186 * server connection is created, it will be created using the supplied
2187 * index, if the index is valid for this service */
2188 struct rx_connection *
2189 rxi_FindConnection(socket, host, port, serviceId, cid,
2190 epoch, type, securityIndex)
2192 register afs_int32 host;
2193 register u_short port;
2198 u_int securityIndex;
2200 int hashindex, flag;
2201 register struct rx_connection *conn;
2202 struct rx_peer *peer;
2203 hashindex = CONN_HASH(host, port, cid, epoch, type);
2204 MUTEX_ENTER(&rx_connHashTable_lock);
2205 rxLastConn ? (conn = rxLastConn, flag = 0) :
2206 (conn = rx_connHashTable[hashindex], flag = 1);
2208 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2209 && (epoch == conn->epoch)) {
2210 register struct rx_peer *pp = conn->peer;
2211 if (securityIndex != conn->securityIndex) {
2212 /* this isn't supposed to happen, but someone could forge a packet
2213 like this, and there seems to be some CM bug that makes this
2214 happen from time to time -- in which case, the fileserver
2216 MUTEX_EXIT(&rx_connHashTable_lock);
2217 return (struct rx_connection *) 0;
2219 /* epoch's high order bits mean route for security reasons only on
2220 * the cid, not the host and port fields.
2222 if (conn->epoch & 0x80000000) break;
2223 if (((type == RX_CLIENT_CONNECTION)
2224 || (pp->host == host)) && (pp->port == port))
2229 /* the connection rxLastConn that was used the last time is not the
2230 ** one we are looking for now. Hence, start searching in the hash */
2232 conn = rx_connHashTable[hashindex];
2238 struct rx_service *service;
2239 if (type == RX_CLIENT_CONNECTION) {
2240 MUTEX_EXIT(&rx_connHashTable_lock);
2241 return (struct rx_connection *) 0;
2243 service = rxi_FindService(socket, serviceId);
2244 if (!service || (securityIndex >= service->nSecurityObjects)
2245 || (service->securityObjects[securityIndex] == 0)) {
2246 MUTEX_EXIT(&rx_connHashTable_lock);
2247 return (struct rx_connection *) 0;
2249 conn = rxi_AllocConnection(); /* This bzero's the connection */
2250 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2252 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2254 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2255 conn->next = rx_connHashTable[hashindex];
2256 rx_connHashTable[hashindex] = conn;
2257 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2258 conn->type = RX_SERVER_CONNECTION;
2259 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2260 conn->epoch = epoch;
2261 conn->cid = cid & RX_CIDMASK;
2262 /* conn->serial = conn->lastSerial = 0; */
2263 /* conn->timeout = 0; */
2264 conn->ackRate = RX_FAST_ACK_RATE;
2265 conn->service = service;
2266 conn->serviceId = serviceId;
2267 conn->securityIndex = securityIndex;
2268 conn->securityObject = service->securityObjects[securityIndex];
2269 conn->nSpecific = 0;
2270 conn->specific = NULL;
2271 rx_SetConnDeadTime(conn, service->connDeadTime);
2272 /* Notify security object of the new connection */
2273 RXS_NewConnection(conn->securityObject, conn);
2274 /* XXXX Connection timeout? */
2275 if (service->newConnProc) (*service->newConnProc)(conn);
2276 MUTEX_ENTER(&rx_stats_mutex);
2277 rx_stats.nServerConns++;
2278 MUTEX_EXIT(&rx_stats_mutex);
2282 /* Ensure that the peer structure is set up in such a way that
2283 ** replies in this connection go back to that remote interface
2284 ** from which the last packet was sent out. In case, this packet's
2285 ** source IP address does not match the peer struct for this conn,
2286 ** then drop the refCount on conn->peer and get a new peer structure.
2287 ** We can check the host,port field in the peer structure without the
2288 ** rx_peerHashTable_lock because the peer structure has its refCount
2289 ** incremented and the only time the host,port in the peer struct gets
2290 ** updated is when the peer structure is created.
2292 if (conn->peer->host == host )
2293 peer = conn->peer; /* no change to the peer structure */
2295 peer = rxi_FindPeer(host, port, conn->peer, 1);
2298 MUTEX_ENTER(&conn->conn_data_lock);
2301 MUTEX_EXIT(&conn->conn_data_lock);
2303 rxLastConn = conn; /* store this connection as the last conn used */
2304 MUTEX_EXIT(&rx_connHashTable_lock);
2308 /* There are two packet tracing routines available for testing and monitoring
2309 * Rx. One is called just after every packet is received and the other is
2310 * called just before every packet is sent. Received packets, have had their
2311 * headers decoded, and packets to be sent have not yet had their headers
2312 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2313 * containing the network address. Both can be modified. The return value, if
2314 * non-zero, indicates that the packet should be dropped. */
2316 int (*rx_justReceived)() = 0;
2317 int (*rx_almostSent)() = 0;
2319 /* A packet has been received off the interface. Np is the packet, socket is
2320 * the socket number it was received from (useful in determining which service
2321 * this packet corresponds to), and (host, port) reflect the host,port of the
2322 * sender. This call returns the packet to the caller if it is finished with
2323 * it, rather than de-allocating it, just as a small performance hack */
2325 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2326 register struct rx_packet *np;
2331 struct rx_call **newcallp;
2333 register struct rx_call *call;
2334 register struct rx_connection *conn;
2336 afs_uint32 currentCallNumber;
2342 struct rx_packet *tnp;
2345 /* We don't print out the packet until now because (1) the time may not be
2346 * accurate enough until now in the lwp implementation (rx_Listener only gets
2347 * the time after the packet is read) and (2) from a protocol point of view,
2348 * this is the first time the packet has been seen */
2349 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2350 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2351 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2352 np->header.serial, packetType, host, port, np->header.serviceId,
2353 np->header.epoch, np->header.cid, np->header.callNumber,
2354 np->header.seq, np->header.flags, np));
2357 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2358 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2361 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2362 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2365 /* If an input tracer function is defined, call it with the packet and
2366 * network address. Note this function may modify its arguments. */
2367 if (rx_justReceived) {
2368 struct sockaddr_in addr;
2370 addr.sin_family = AF_INET;
2371 addr.sin_port = port;
2372 addr.sin_addr.s_addr = host;
2373 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2374 addr.sin_len = sizeof(addr);
2375 #endif /* AFS_OSF_ENV */
2376 drop = (*rx_justReceived) (np, &addr);
2377 /* drop packet if return value is non-zero */
2378 if (drop) return np;
2379 port = addr.sin_port; /* in case fcn changed addr */
2380 host = addr.sin_addr.s_addr;
2384 /* If packet was not sent by the client, then *we* must be the client */
2385 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2386 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2388 /* Find the connection (or fabricate one, if we're the server & if
2389 * necessary) associated with this packet */
2390 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2391 np->header.cid, np->header.epoch, type,
2392 np->header.securityIndex);
2395 /* If no connection found or fabricated, just ignore the packet.
2396 * (An argument could be made for sending an abort packet for
2401 MUTEX_ENTER(&conn->conn_data_lock);
2402 if (conn->maxSerial < np->header.serial)
2403 conn->maxSerial = np->header.serial;
2404 MUTEX_EXIT(&conn->conn_data_lock);
2406 /* If the connection is in an error state, send an abort packet and ignore
2407 * the incoming packet */
2409 /* Don't respond to an abort packet--we don't want loops! */
2410 MUTEX_ENTER(&conn->conn_data_lock);
2411 if (np->header.type != RX_PACKET_TYPE_ABORT)
2412 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2414 MUTEX_EXIT(&conn->conn_data_lock);
2418 /* Check for connection-only requests (i.e. not call specific). */
2419 if (np->header.callNumber == 0) {
2420 switch (np->header.type) {
2421 case RX_PACKET_TYPE_ABORT:
2422 /* What if the supplied error is zero? */
2423 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2424 MUTEX_ENTER(&conn->conn_data_lock);
2426 MUTEX_EXIT(&conn->conn_data_lock);
2428 case RX_PACKET_TYPE_CHALLENGE:
2429 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2430 MUTEX_ENTER(&conn->conn_data_lock);
2432 MUTEX_EXIT(&conn->conn_data_lock);
2434 case RX_PACKET_TYPE_RESPONSE:
2435 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2436 MUTEX_ENTER(&conn->conn_data_lock);
2438 MUTEX_EXIT(&conn->conn_data_lock);
2440 case RX_PACKET_TYPE_PARAMS:
2441 case RX_PACKET_TYPE_PARAMS+1:
2442 case RX_PACKET_TYPE_PARAMS+2:
2443 /* ignore these packet types for now */
2444 MUTEX_ENTER(&conn->conn_data_lock);
2446 MUTEX_EXIT(&conn->conn_data_lock);
2451 /* Should not reach here, unless the peer is broken: send an
2453 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2454 MUTEX_ENTER(&conn->conn_data_lock);
2455 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2457 MUTEX_EXIT(&conn->conn_data_lock);
2462 channel = np->header.cid & RX_CHANNELMASK;
2463 call = conn->call[channel];
2464 #ifdef RX_ENABLE_LOCKS
2466 MUTEX_ENTER(&call->lock);
2467 /* Test to see if call struct is still attached to conn. */
2468 if (call != conn->call[channel]) {
2470 MUTEX_EXIT(&call->lock);
2471 if (type == RX_SERVER_CONNECTION) {
2472 call = conn->call[channel];
2473 /* If we started with no call attached and there is one now,
2474 * another thread is also running this routine and has gotten
2475 * the connection channel. We should drop this packet in the tests
2476 * below. If there was a call on this connection and it's now
2477 * gone, then we'll be making a new call below.
2478 * If there was previously a call and it's now different then
2479 * the old call was freed and another thread running this routine
2480 * has created a call on this channel. One of these two threads
2481 * has a packet for the old call and the code below handles those
2485 MUTEX_ENTER(&call->lock);
2488 /* This packet can't be for this call. If the new call address is
2489 * 0 then no call is running on this channel. If there is a call
2490 * then, since this is a client connection we're getting data for
2491 * it must be for the previous call.
2493 MUTEX_ENTER(&rx_stats_mutex);
2494 rx_stats.spuriousPacketsRead++;
2495 MUTEX_EXIT(&rx_stats_mutex);
2496 MUTEX_ENTER(&conn->conn_data_lock);
2498 MUTEX_EXIT(&conn->conn_data_lock);
2503 currentCallNumber = conn->callNumber[channel];
2505 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2506 if (np->header.callNumber < currentCallNumber) {
2507 MUTEX_ENTER(&rx_stats_mutex);
2508 rx_stats.spuriousPacketsRead++;
2509 MUTEX_EXIT(&rx_stats_mutex);
2510 #ifdef RX_ENABLE_LOCKS
2512 MUTEX_EXIT(&call->lock);
2514 MUTEX_ENTER(&conn->conn_data_lock);
2516 MUTEX_EXIT(&conn->conn_data_lock);
2520 call = rxi_NewCall(conn, channel);
2521 MUTEX_ENTER(&call->lock);
2522 *call->callNumber = np->header.callNumber;
2523 call->state = RX_STATE_PRECALL;
2524 clock_GetTime(&call->queueTime);
2525 hzero(call->bytesSent);
2526 hzero(call->bytesRcvd);
2527 rxi_KeepAliveOn(call);
2529 else if (np->header.callNumber != currentCallNumber) {
2530 /* Wait until the transmit queue is idle before deciding
2531 * whether to reset the current call. Chances are that the
2532 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2535 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2536 while ((call->state == RX_STATE_ACTIVE) &&
2537 (call->flags & RX_CALL_TQ_BUSY)) {
2538 call->flags |= RX_CALL_TQ_WAIT;
2539 #ifdef RX_ENABLE_LOCKS
2540 CV_WAIT(&call->cv_tq, &call->lock);
2541 #else /* RX_ENABLE_LOCKS */
2542 osi_rxSleep(&call->tq);
2543 #endif /* RX_ENABLE_LOCKS */
2545 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2546 /* If the new call cannot be taken right now send a busy and set
2547 * the error condition in this call, so that it terminates as
2548 * quickly as possible */
2549 if (call->state == RX_STATE_ACTIVE) {
2550 struct rx_packet *tp;
2552 rxi_CallError(call, RX_CALL_DEAD);
2553 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2554 MUTEX_EXIT(&call->lock);
2555 MUTEX_ENTER(&conn->conn_data_lock);
2557 MUTEX_EXIT(&conn->conn_data_lock);
2560 rxi_ResetCall(call, 0);
2561 *call->callNumber = np->header.callNumber;
2562 call->state = RX_STATE_PRECALL;
2563 clock_GetTime(&call->queueTime);
2564 hzero(call->bytesSent);
2565 hzero(call->bytesRcvd);
2567 * If the number of queued calls exceeds the overload
2568 * threshold then abort this call.
2570 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2571 struct rx_packet *tp;
2573 rxi_CallError(call, rx_BusyError);
2574 tp = rxi_SendCallAbort(call, np, 1, 0);
2575 MUTEX_EXIT(&call->lock);
2576 MUTEX_ENTER(&conn->conn_data_lock);
2578 MUTEX_EXIT(&conn->conn_data_lock);
2581 rxi_KeepAliveOn(call);
2584 /* Continuing call; do nothing here. */
2586 } else { /* we're the client */
2587 /* Ignore all incoming acknowledgements for calls in DALLY state */
2588 if ( call && (call->state == RX_STATE_DALLY)
2589 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2590 MUTEX_ENTER(&rx_stats_mutex);
2591 rx_stats.ignorePacketDally++;
2592 MUTEX_EXIT(&rx_stats_mutex);
2593 #ifdef RX_ENABLE_LOCKS
2595 MUTEX_EXIT(&call->lock);
2598 MUTEX_ENTER(&conn->conn_data_lock);
2600 MUTEX_EXIT(&conn->conn_data_lock);
2604 /* Ignore anything that's not relevant to the current call. If there
2605 * isn't a current call, then no packet is relevant. */
2606 if (!call || (np->header.callNumber != currentCallNumber)) {
2607 MUTEX_ENTER(&rx_stats_mutex);
2608 rx_stats.spuriousPacketsRead++;
2609 MUTEX_EXIT(&rx_stats_mutex);
2610 #ifdef RX_ENABLE_LOCKS
2612 MUTEX_EXIT(&call->lock);
2615 MUTEX_ENTER(&conn->conn_data_lock);
2617 MUTEX_EXIT(&conn->conn_data_lock);
2620 /* If the service security object index stamped in the packet does not
2621 * match the connection's security index, ignore the packet */
2622 if (np->header.securityIndex != conn->securityIndex) {
2623 #ifdef RX_ENABLE_LOCKS
2624 MUTEX_EXIT(&call->lock);
2626 MUTEX_ENTER(&conn->conn_data_lock);
2628 MUTEX_EXIT(&conn->conn_data_lock);
2632 /* If we're receiving the response, then all transmit packets are
2633 * implicitly acknowledged. Get rid of them. */
2634 if (np->header.type == RX_PACKET_TYPE_DATA) {
2635 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2636 /* XXX Hack. Because we must release the global rx lock when
2637 * sending packets (osi_NetSend) we drop all acks while we're
2638 * traversing the tq in rxi_Start sending packets out because
2639 * packets may move to the freePacketQueue as result of being here!
2640 * So we drop these packets until we're safely out of the
2641 * traversing. Really ugly!
2642 * For fine grain RX locking, we set the acked field in the
2643 * packets and let rxi_Start remove them from the transmit queue.
2645 if (call->flags & RX_CALL_TQ_BUSY) {
2646 #ifdef RX_ENABLE_LOCKS
2647 rxi_SetAcksInTransmitQueue(call);
2650 return np; /* xmitting; drop packet */
2654 rxi_ClearTransmitQueue(call, 0);
2656 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2657 rxi_ClearTransmitQueue(call, 0);
2658 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2660 if (np->header.type == RX_PACKET_TYPE_ACK) {
2661 /* now check to see if this is an ack packet acknowledging that the
2662 * server actually *lost* some hard-acked data. If this happens we
2663 * ignore this packet, as it may indicate that the server restarted in
2664 * the middle of a call. It is also possible that this is an old ack
2665 * packet. We don't abort the connection in this case, because this
2666 * *might* just be an old ack packet. The right way to detect a server
2667 * restart in the midst of a call is to notice that the server epoch
2669 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2670 * XXX unacknowledged. I think that this is off-by-one, but
2671 * XXX I don't dare change it just yet, since it will
2672 * XXX interact badly with the server-restart detection
2673 * XXX code in receiveackpacket. */
2674 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2675 MUTEX_ENTER(&rx_stats_mutex);
2676 rx_stats.spuriousPacketsRead++;
2677 MUTEX_EXIT(&rx_stats_mutex);
2678 MUTEX_EXIT(&call->lock);
2679 MUTEX_ENTER(&conn->conn_data_lock);
2681 MUTEX_EXIT(&conn->conn_data_lock);
2685 } /* else not a data packet */
2688 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2689 /* Set remote user defined status from packet */
2690 call->remoteStatus = np->header.userStatus;
2692 /* Note the gap between the expected next packet and the actual
2693 * packet that arrived, when the new packet has a smaller serial number
2694 * than expected. Rioses frequently reorder packets all by themselves,
2695 * so this will be quite important with very large window sizes.
2696 * Skew is checked against 0 here to avoid any dependence on the type of
2697 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2699 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2700 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2701 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2703 MUTEX_ENTER(&conn->conn_data_lock);
2704 skew = conn->lastSerial - np->header.serial;
2705 conn->lastSerial = np->header.serial;
2706 MUTEX_EXIT(&conn->conn_data_lock);
2708 register struct rx_peer *peer;
2710 if (skew > peer->inPacketSkew) {
2711 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2712 peer->inPacketSkew = skew;
2716 /* Now do packet type-specific processing */
2717 switch (np->header.type) {
2718 case RX_PACKET_TYPE_DATA:
2719 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2722 case RX_PACKET_TYPE_ACK:
2723 /* Respond immediately to ack packets requesting acknowledgement
2725 if (np->header.flags & RX_REQUEST_ACK) {
2726 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2727 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2729 np = rxi_ReceiveAckPacket(call, np, 1);
2731 case RX_PACKET_TYPE_ABORT:
2732 /* An abort packet: reset the connection, passing the error up to
2734 /* What if error is zero? */
2735 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2737 case RX_PACKET_TYPE_BUSY:
2740 case RX_PACKET_TYPE_ACKALL:
2741 /* All packets acknowledged, so we can drop all packets previously
2742 * readied for sending */
2743 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2744 /* XXX Hack. We because we can't release the global rx lock when
2745 * sending packets (osi_NetSend) we drop all ack pkts while we're
2746 * traversing the tq in rxi_Start sending packets out because
2747 * packets may move to the freePacketQueue as result of being
2748 * here! So we drop these packets until we're safely out of the
2749 * traversing. Really ugly!
2750 * For fine grain RX locking, we set the acked field in the packets
2751 * and let rxi_Start remove the packets from the transmit queue.
2753 if (call->flags & RX_CALL_TQ_BUSY) {
2754 #ifdef RX_ENABLE_LOCKS
2755 rxi_SetAcksInTransmitQueue(call);
2757 #else /* RX_ENABLE_LOCKS */
2759 return np; /* xmitting; drop packet */
2760 #endif /* RX_ENABLE_LOCKS */
2762 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2763 rxi_ClearTransmitQueue(call, 0);
2766 /* Should not reach here, unless the peer is broken: send an abort
2768 rxi_CallError(call, RX_PROTOCOL_ERROR);
2769 np = rxi_SendCallAbort(call, np, 1, 0);
2772 /* Note when this last legitimate packet was received, for keep-alive
2773 * processing. Note, we delay getting the time until now in the hope that
2774 * the packet will be delivered to the user before any get time is required
2775 * (if not, then the time won't actually be re-evaluated here). */
2776 call->lastReceiveTime = clock_Sec();
2777 MUTEX_EXIT(&call->lock);
2778 MUTEX_ENTER(&conn->conn_data_lock);
2780 MUTEX_EXIT(&conn->conn_data_lock);
2784 /* return true if this is an "interesting" connection from the point of view
2785 of someone trying to debug the system */
2786 int rxi_IsConnInteresting(struct rx_connection *aconn)
2789 register struct rx_call *tcall;
2791 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2793 for(i=0;i<RX_MAXCALLS;i++) {
2794 tcall = aconn->call[i];
2796 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2798 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2806 /* if this is one of the last few packets AND it wouldn't be used by the
2807 receiving call to immediately satisfy a read request, then drop it on
2808 the floor, since accepting it might prevent a lock-holding thread from
2809 making progress in its reading. If a call has been cleared while in
2810 the precall state then ignore all subsequent packets until the call
2811 is assigned to a thread. */
2813 static TooLow(ap, acall)
2814 struct rx_call *acall;
2815 struct rx_packet *ap; {
2817 MUTEX_ENTER(&rx_stats_mutex);
2818 if (((ap->header.seq != 1) &&
2819 (acall->flags & RX_CALL_CLEARED) &&
2820 (acall->state == RX_STATE_PRECALL)) ||
2821 ((rx_nFreePackets < rxi_dataQuota+2) &&
2822 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2823 && (acall->flags & RX_CALL_READER_WAIT)))) {
2826 MUTEX_EXIT(&rx_stats_mutex);
2831 /* try to attach call, if authentication is complete */
2832 static void TryAttach(acall, socket, tnop, newcallp)
2833 register struct rx_call *acall;
2834 register osi_socket socket;
2836 register struct rx_call **newcallp; {
2837 register struct rx_connection *conn;
2839 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2840 /* Don't attach until we have any req'd. authentication. */
2841 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2842 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2843 /* Note: this does not necessarily succeed; there
2844 may not any proc available */
2847 rxi_ChallengeOn(acall->conn);
2852 /* A data packet has been received off the interface. This packet is
2853 * appropriate to the call (the call is in the right state, etc.). This
2854 * routine can return a packet to the caller, for re-use */
2856 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2857 port, tnop, newcallp)
2858 register struct rx_call *call;
2859 register struct rx_packet *np;
2865 struct rx_call **newcallp;
2871 afs_uint32 seq, serial, flags;
2873 struct rx_packet *tnp;
2875 MUTEX_ENTER(&rx_stats_mutex);
2876 rx_stats.dataPacketsRead++;
2877 MUTEX_EXIT(&rx_stats_mutex);
2880 /* If there are no packet buffers, drop this new packet, unless we can find
2881 * packet buffers from inactive calls */
2883 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2884 MUTEX_ENTER(&rx_freePktQ_lock);
2885 rxi_NeedMorePackets = TRUE;
2886 MUTEX_EXIT(&rx_freePktQ_lock);
2887 MUTEX_ENTER(&rx_stats_mutex);
2888 rx_stats.noPacketBuffersOnRead++;
2889 MUTEX_EXIT(&rx_stats_mutex);
2890 call->rprev = np->header.serial;
2891 rxi_calltrace(RX_TRACE_DROP, call);
2892 dpf (("packet %x dropped on receipt - quota problems", np));
2894 rxi_ClearReceiveQueue(call);
2895 clock_GetTime(&when);
2896 clock_Add(&when, &rx_softAckDelay);
2897 if (!call->delayedAckEvent ||
2898 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2899 rxevent_Cancel(call->delayedAckEvent, call,
2900 RX_CALL_REFCOUNT_DELAY);
2901 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2902 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2905 /* we've damaged this call already, might as well do it in. */
2911 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2912 * packet is one of several packets transmitted as a single
2913 * datagram. Do not send any soft or hard acks until all packets
2914 * in a jumbogram have been processed. Send negative acks right away.
2916 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2917 /* tnp is non-null when there are more packets in the
2918 * current jumbo gram */
2925 seq = np->header.seq;
2926 serial = np->header.serial;
2927 flags = np->header.flags;
2929 /* If the call is in an error state, send an abort message */
2931 return rxi_SendCallAbort(call, np, istack, 0);
2933 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2934 * AFS 3.5 jumbogram. */
2935 if (flags & RX_JUMBO_PACKET) {
2936 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2941 if (np->header.spare != 0) {
2942 MUTEX_ENTER(&call->conn->conn_data_lock);
2943 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2944 MUTEX_EXIT(&call->conn->conn_data_lock);
2947 /* The usual case is that this is the expected next packet */
2948 if (seq == call->rnext) {
2950 /* Check to make sure it is not a duplicate of one already queued */
2951 if (queue_IsNotEmpty(&call->rq)
2952 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2953 MUTEX_ENTER(&rx_stats_mutex);
2954 rx_stats.dupPacketsRead++;
2955 MUTEX_EXIT(&rx_stats_mutex);
2956 dpf (("packet %x dropped on receipt - duplicate", np));
2957 rxevent_Cancel(call->delayedAckEvent, call,
2958 RX_CALL_REFCOUNT_DELAY);
2959 np = rxi_SendAck(call, np, seq, serial,
2960 flags, RX_ACK_DUPLICATE, istack);
2966 /* It's the next packet. Stick it on the receive queue
2967 * for this call. Set newPackets to make sure we wake
2968 * the reader once all packets have been processed */
2969 queue_Prepend(&call->rq, np);
2971 np = NULL; /* We can't use this anymore */
2974 /* If an ack is requested then set a flag to make sure we
2975 * send an acknowledgement for this packet */
2976 if (flags & RX_REQUEST_ACK) {
2980 /* Keep track of whether we have received the last packet */
2981 if (flags & RX_LAST_PACKET) {
2982 call->flags |= RX_CALL_HAVE_LAST;
2986 /* Check whether we have all of the packets for this call */
2987 if (call->flags & RX_CALL_HAVE_LAST) {
2988 afs_uint32 tseq; /* temporary sequence number */
2989 struct rx_packet *tp; /* Temporary packet pointer */
2990 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2992 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2993 if (tseq != tp->header.seq)
2995 if (tp->header.flags & RX_LAST_PACKET) {
2996 call->flags |= RX_CALL_RECEIVE_DONE;
3003 /* Provide asynchronous notification for those who want it
3004 * (e.g. multi rx) */
3005 if (call->arrivalProc) {
3006 (*call->arrivalProc)(call, call->arrivalProcHandle,
3007 call->arrivalProcArg);
3008 call->arrivalProc = (VOID (*)()) 0;
3011 /* Update last packet received */
3014 /* If there is no server process serving this call, grab
3015 * one, if available. We only need to do this once. If a
3016 * server thread is available, this thread becomes a server
3017 * thread and the server thread becomes a listener thread. */
3019 TryAttach(call, socket, tnop, newcallp);
3022 /* This is not the expected next packet. */
3024 /* Determine whether this is a new or old packet, and if it's
3025 * a new one, whether it fits into the current receive window.
3026 * Also figure out whether the packet was delivered in sequence.
3027 * We use the prev variable to determine whether the new packet
3028 * is the successor of its immediate predecessor in the
3029 * receive queue, and the missing flag to determine whether
3030 * any of this packets predecessors are missing. */
3032 afs_uint32 prev; /* "Previous packet" sequence number */
3033 struct rx_packet *tp; /* Temporary packet pointer */
3034 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3035 int missing; /* Are any predecessors missing? */
3037 /* If the new packet's sequence number has been sent to the
3038 * application already, then this is a duplicate */
3039 if (seq < call->rnext) {
3040 MUTEX_ENTER(&rx_stats_mutex);
3041 rx_stats.dupPacketsRead++;
3042 MUTEX_EXIT(&rx_stats_mutex);
3043 rxevent_Cancel(call->delayedAckEvent, call,
3044 RX_CALL_REFCOUNT_DELAY);
3045 np = rxi_SendAck(call, np, seq, serial,
3046 flags, RX_ACK_DUPLICATE, istack);
3052 /* If the sequence number is greater than what can be
3053 * accomodated by the current window, then send a negative
3054 * acknowledge and drop the packet */
3055 if ((call->rnext + call->rwind) <= seq) {
3056 rxevent_Cancel(call->delayedAckEvent, call,
3057 RX_CALL_REFCOUNT_DELAY);
3058 np = rxi_SendAck(call, np, seq, serial,
3059 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3065 /* Look for the packet in the queue of old received packets */
3066 for (prev = call->rnext - 1, missing = 0,
3067 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3068 /*Check for duplicate packet */
3069 if (seq == tp->header.seq) {
3070 MUTEX_ENTER(&rx_stats_mutex);
3071 rx_stats.dupPacketsRead++;
3072 MUTEX_EXIT(&rx_stats_mutex);
3073 rxevent_Cancel(call->delayedAckEvent, call,
3074 RX_CALL_REFCOUNT_DELAY);
3075 np = rxi_SendAck(call, np, seq, serial,
3076 flags, RX_ACK_DUPLICATE, istack);
3081 /* If we find a higher sequence packet, break out and
3082 * insert the new packet here. */
3083 if (seq < tp->header.seq) break;
3084 /* Check for missing packet */
3085 if (tp->header.seq != prev+1) {
3089 prev = tp->header.seq;
3092 /* Keep track of whether we have received the last packet. */
3093 if (flags & RX_LAST_PACKET) {
3094 call->flags |= RX_CALL_HAVE_LAST;
3097 /* It's within the window: add it to the the receive queue.
3098 * tp is left by the previous loop either pointing at the
3099 * packet before which to insert the new packet, or at the
3100 * queue head if the queue is empty or the packet should be
3102 queue_InsertBefore(tp, np);
3106 /* Check whether we have all of the packets for this call */
3107 if ((call->flags & RX_CALL_HAVE_LAST)
3108 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3109 afs_uint32 tseq; /* temporary sequence number */
3111 for (tseq = call->rnext,
3112 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3113 if (tseq != tp->header.seq)
3115 if (tp->header.flags & RX_LAST_PACKET) {
3116 call->flags |= RX_CALL_RECEIVE_DONE;
3123 /* We need to send an ack of the packet is out of sequence,
3124 * or if an ack was requested by the peer. */
3125 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3129 /* Acknowledge the last packet for each call */
3130 if (flags & RX_LAST_PACKET) {
3141 * If the receiver is waiting for an iovec, fill the iovec
3142 * using the data from the receive queue */
3143 if (call->flags & RX_CALL_IOVEC_WAIT) {
3144 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3145 /* the call may have been aborted */
3154 /* Wakeup the reader if any */
3155 if ((call->flags & RX_CALL_READER_WAIT) &&
3156 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3157 (call->iovNext >= call->iovMax) ||
3158 (call->flags & RX_CALL_RECEIVE_DONE))) {
3159 call->flags &= ~RX_CALL_READER_WAIT;
3160 #ifdef RX_ENABLE_LOCKS
3161 CV_BROADCAST(&call->cv_rq);
3163 osi_rxWakeup(&call->rq);
3169 * Send an ack when requested by the peer, or once every
3170 * rxi_SoftAckRate packets until the last packet has been
3171 * received. Always send a soft ack for the last packet in
3172 * the server's reply. */
3174 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3175 np = rxi_SendAck(call, np, seq, serial, flags,
3176 RX_ACK_REQUESTED, istack);
3177 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3178 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3179 np = rxi_SendAck(call, np, seq, serial, flags,
3180 RX_ACK_IDLE, istack);
3181 } else if (call->nSoftAcks) {
3182 clock_GetTime(&when);
3183 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3184 clock_Add(&when, &rx_lastAckDelay);
3186 clock_Add(&when, &rx_softAckDelay);
3188 if (!call->delayedAckEvent ||
3189 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3190 rxevent_Cancel(call->delayedAckEvent, call,
3191 RX_CALL_REFCOUNT_DELAY);
3192 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3193 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3196 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3197 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3204 static void rxi_ComputeRate();
3207 /* The real smarts of the whole thing. */
3208 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3209 register struct rx_call *call;
3210 struct rx_packet *np;
3213 struct rx_ackPacket *ap;
3215 register struct rx_packet *tp;
3216 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3217 register struct rx_connection *conn = call->conn;
3218 struct rx_peer *peer = conn->peer;
3221 /* because there are CM's that are bogus, sending weird values for this. */
3222 afs_uint32 skew = 0;
3223 int needRxStart = 0;
3228 int newAckCount = 0;
3229 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3230 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3232 MUTEX_ENTER(&rx_stats_mutex);
3233 rx_stats.ackPacketsRead++;
3234 MUTEX_EXIT(&rx_stats_mutex);
3235 ap = (struct rx_ackPacket *) rx_DataOf(np);
3236 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3238 return np; /* truncated ack packet */
3240 /* depends on ack packet struct */
3241 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3242 first = ntohl(ap->firstPacket);
3243 serial = ntohl(ap->serial);
3244 /* temporarily disabled -- needs to degrade over time
3245 skew = ntohs(ap->maxSkew); */
3247 /* Ignore ack packets received out of order */
3248 if (first < call->tfirst) {
3252 if (np->header.flags & RX_SLOW_START_OK) {
3253 call->flags |= RX_CALL_SLOW_START_OK;
3259 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3260 ap->reason, ntohl(ap->previousPacket), np->header.seq, serial,
3261 skew, ntohl(ap->firstPacket));
3264 for (offset = 0; offset < nAcks; offset++)
3265 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3271 /* if a server connection has been re-created, it doesn't remember what
3272 serial # it was up to. An ack will tell us, since the serial field
3273 contains the largest serial received by the other side */
3274 MUTEX_ENTER(&conn->conn_data_lock);
3275 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3276 conn->serial = serial+1;
3278 MUTEX_EXIT(&conn->conn_data_lock);
3280 /* Update the outgoing packet skew value to the latest value of
3281 * the peer's incoming packet skew value. The ack packet, of
3282 * course, could arrive out of order, but that won't affect things
3284 MUTEX_ENTER(&peer->peer_lock);
3285 peer->outPacketSkew = skew;
3287 /* Check for packets that no longer need to be transmitted, and
3288 * discard them. This only applies to packets positively
3289 * acknowledged as having been sent to the peer's upper level.
3290 * All other packets must be retained. So only packets with
3291 * sequence numbers < ap->firstPacket are candidates. */
3292 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3293 if (tp->header.seq >= first) break;
3294 call->tfirst = tp->header.seq + 1;
3295 if (tp->header.serial == serial) {
3296 /* Use RTT if not delayed by client. */
3297 if (ap->reason != RX_ACK_DELAY)
3298 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3300 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3303 else if (tp->firstSerial == serial) {
3304 /* Use RTT if not delayed by client. */
3305 if (ap->reason != RX_ACK_DELAY)
3306 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3308 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3311 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3312 /* XXX Hack. Because we have to release the global rx lock when sending
3313 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3314 * in rxi_Start sending packets out because packets may move to the
3315 * freePacketQueue as result of being here! So we drop these packets until
3316 * we're safely out of the traversing. Really ugly!
3317 * To make it even uglier, if we're using fine grain locking, we can
3318 * set the ack bits in the packets and have rxi_Start remove the packets
3319 * when it's done transmitting.
3324 if (call->flags & RX_CALL_TQ_BUSY) {
3325 #ifdef RX_ENABLE_LOCKS
3327 call->flags |= RX_CALL_TQ_SOME_ACKED;
3328 #else /* RX_ENABLE_LOCKS */
3330 #endif /* RX_ENABLE_LOCKS */
3332 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3335 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3340 /* Give rate detector a chance to respond to ping requests */
3341 if (ap->reason == RX_ACK_PING_RESPONSE) {
3342 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3346 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3348 /* Now go through explicit acks/nacks and record the results in
3349 * the waiting packets. These are packets that can't be released
3350 * yet, even with a positive acknowledge. This positive
3351 * acknowledge only means the packet has been received by the
3352 * peer, not that it will be retained long enough to be sent to
3353 * the peer's upper level. In addition, reset the transmit timers
3354 * of any missing packets (those packets that must be missing
3355 * because this packet was out of sequence) */
3357 call->nSoftAcked = 0;
3358 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3359 /* Update round trip time if the ack was stimulated on receipt
3361 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3362 #ifdef RX_ENABLE_LOCKS
3363 if (tp->header.seq >= first) {
3364 #endif /* RX_ENABLE_LOCKS */
3365 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3366 if (tp->header.serial == serial) {
3367 /* Use RTT if not delayed by client. */
3368 if (ap->reason != RX_ACK_DELAY)
3369 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3371 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3374 else if ((tp->firstSerial == serial)) {
3375 /* Use RTT if not delayed by client. */
3376 if (ap->reason != RX_ACK_DELAY)
3377 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3379 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3382 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3383 #ifdef RX_ENABLE_LOCKS
3385 #endif /* RX_ENABLE_LOCKS */
3386 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3388 /* Set the acknowledge flag per packet based on the
3389 * information in the ack packet. An acknowlegded packet can
3390 * be downgraded when the server has discarded a packet it
3391 * soacked previously, or when an ack packet is received
3392 * out of sequence. */
3393 if (tp->header.seq < first) {
3394 /* Implicit ack information */
3400 else if (tp->header.seq < first + nAcks) {
3401 /* Explicit ack information: set it in the packet appropriately */
3402 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3422 /* If packet isn't yet acked, and it has been transmitted at least
3423 * once, reset retransmit time using latest timeout
3424 * ie, this should readjust the retransmit timer for all outstanding
3425 * packets... So we don't just retransmit when we should know better*/
3427 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3428 tp->retryTime = tp->timeSent;
3429 clock_Add(&tp->retryTime, &peer->timeout);
3430 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3431 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3435 /* If the window has been extended by this acknowledge packet,
3436 * then wakeup a sender waiting in alloc for window space, or try
3437 * sending packets now, if he's been sitting on packets due to
3438 * lack of window space */
3439 if (call->tnext < (call->tfirst + call->twind)) {
3440 #ifdef RX_ENABLE_LOCKS
3441 CV_SIGNAL(&call->cv_twind);
3443 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3444 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3445 osi_rxWakeup(&call->twind);
3448 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3449 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3453 /* if the ack packet has a receivelen field hanging off it,
3454 * update our state */
3455 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3458 /* If the ack packet has a "recommended" size that is less than
3459 * what I am using now, reduce my size to match */
3460 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3461 sizeof(afs_int32), &tSize);
3462 tSize = (afs_uint32) ntohl(tSize);
3463 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3465 /* Get the maximum packet size to send to this peer */
3466 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3468 tSize = (afs_uint32)ntohl(tSize);
3469 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3470 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3472 /* sanity check - peer might have restarted with different params.
3473 * If peer says "send less", dammit, send less... Peer should never
3474 * be unable to accept packets of the size that prior AFS versions would
3475 * send without asking. */
3476 if (peer->maxMTU != tSize) {
3477 peer->maxMTU = tSize;
3478 peer->MTU = MIN(tSize, peer->MTU);
3479 call->MTU = MIN(call->MTU, tSize);
3483 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3485 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3486 sizeof(afs_int32), &tSize);
3487 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3488 if (tSize < call->twind) { /* smaller than our send */
3489 call->twind = tSize; /* window, we must send less... */
3490 call->ssthresh = MIN(call->twind, call->ssthresh);
3493 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3494 * network MTU confused with the loopback MTU. Calculate the
3495 * maximum MTU here for use in the slow start code below.
3497 maxMTU = peer->maxMTU;
3498 /* Did peer restart with older RX version? */
3499 if (peer->maxDgramPackets > 1) {
3500 peer->maxDgramPackets = 1;
3502 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3504 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3505 sizeof(afs_int32), &tSize);
3506 tSize = (afs_uint32) ntohl(tSize);
3508 * As of AFS 3.5 we set the send window to match the receive window.
3510 if (tSize < call->twind) {
3511 call->twind = tSize;
3512 call->ssthresh = MIN(call->twind, call->ssthresh);
3513 } else if (tSize > call->twind) {
3514 call->twind = tSize;
3518 * As of AFS 3.5, a jumbogram is more than one fixed size
3519 * packet transmitted in a single UDP datagram. If the remote
3520 * MTU is smaller than our local MTU then never send a datagram
3521 * larger than the natural MTU.
3523 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3524 sizeof(afs_int32), &tSize);
3525 maxDgramPackets = (afs_uint32) ntohl(tSize);
3526 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3527 maxDgramPackets = MIN(maxDgramPackets,
3528 (int)(peer->ifDgramPackets));
3529 maxDgramPackets = MIN(maxDgramPackets, tSize);
3530 if (maxDgramPackets > 1) {
3531 peer->maxDgramPackets = maxDgramPackets;
3532 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3534 peer->maxDgramPackets = 1;
3535 call->MTU = peer->natMTU;
3537 } else if (peer->maxDgramPackets > 1) {
3538 /* Restarted with lower version of RX */
3539 peer->maxDgramPackets = 1;
3541 } else if (peer->maxDgramPackets > 1 ||
3542 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3543 /* Restarted with lower version of RX */
3544 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3545 peer->natMTU = OLD_MAX_PACKET_SIZE;
3546 peer->MTU = OLD_MAX_PACKET_SIZE;
3547 peer->maxDgramPackets = 1;
3548 peer->nDgramPackets = 1;
3550 call->MTU = OLD_MAX_PACKET_SIZE;
3555 * Calculate how many datagrams were successfully received after
3556 * the first missing packet and adjust the negative ack counter
3561 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3562 if (call->nNacks < nNacked) {
3563 call->nNacks = nNacked;
3572 if (call->flags & RX_CALL_FAST_RECOVER) {
3574 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3576 call->flags &= ~RX_CALL_FAST_RECOVER;
3577 call->cwind = call->nextCwind;
3578 call->nextCwind = 0;
3581 call->nCwindAcks = 0;
3583 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3584 /* Three negative acks in a row trigger congestion recovery */
3585 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3586 MUTEX_EXIT(&peer->peer_lock);
3587 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3588 /* someone else is waiting to start recovery */
3591 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3592 while (call->flags & RX_CALL_TQ_BUSY) {
3593 call->flags |= RX_CALL_TQ_WAIT;
3594 #ifdef RX_ENABLE_LOCKS
3595 CV_WAIT(&call->cv_tq, &call->lock);
3596 #else /* RX_ENABLE_LOCKS */
3597 osi_rxSleep(&call->tq);
3598 #endif /* RX_ENABLE_LOCKS */
3600 MUTEX_ENTER(&peer->peer_lock);
3601 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3602 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3603 call->flags |= RX_CALL_FAST_RECOVER;
3604 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3605 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3607 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3608 call->nextCwind = call->ssthresh;
3611 peer->MTU = call->MTU;
3612 peer->cwind = call->nextCwind;
3613 peer->nDgramPackets = call->nDgramPackets;
3615 call->congestSeq = peer->congestSeq;
3616 /* Reset the resend times on the packets that were nacked
3617 * so we will retransmit as soon as the window permits*/
3618 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3621 clock_Zero(&tp->retryTime);
3623 } else if (tp->acked) {
3628 /* If cwind is smaller than ssthresh, then increase
3629 * the window one packet for each ack we receive (exponential
3631 * If cwind is greater than or equal to ssthresh then increase
3632 * the congestion window by one packet for each cwind acks we
3633 * receive (linear growth). */
3634 if (call->cwind < call->ssthresh) {
3635 call->cwind = MIN((int)call->ssthresh,
3636 (int)(call->cwind + newAckCount));
3637 call->nCwindAcks = 0;
3639 call->nCwindAcks += newAckCount;
3640 if (call->nCwindAcks >= call->cwind) {
3641 call->nCwindAcks = 0;
3642 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3646 * If we have received several acknowledgements in a row then
3647 * it is time to increase the size of our datagrams
3649 if ((int)call->nAcks > rx_nDgramThreshold) {
3650 if (peer->maxDgramPackets > 1) {
3651 if (call->nDgramPackets < peer->maxDgramPackets) {
3652 call->nDgramPackets++;
3654 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3655 } else if (call->MTU < peer->maxMTU) {
3656 call->MTU += peer->natMTU;
3657 call->MTU = MIN(call->MTU, peer->maxMTU);
3663 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3665 /* Servers need to hold the call until all response packets have
3666 * been acknowledged. Soft acks are good enough since clients
3667 * are not allowed to clear their receive queues. */
3668 if (call->state == RX_STATE_HOLD &&
3669 call->tfirst + call->nSoftAcked >= call->tnext) {
3670 call->state = RX_STATE_DALLY;
3671 rxi_ClearTransmitQueue(call, 0);
3672 } else if (!queue_IsEmpty(&call->tq)) {
3673 rxi_Start(0, call, istack);
3678 /* Received a response to a challenge packet */
3679 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3680 register struct rx_connection *conn;
3681 register struct rx_packet *np;
3686 /* Ignore the packet if we're the client */
3687 if (conn->type == RX_CLIENT_CONNECTION) return np;
3689 /* If already authenticated, ignore the packet (it's probably a retry) */
3690 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3693 /* Otherwise, have the security object evaluate the response packet */
3694 error = RXS_CheckResponse(conn->securityObject, conn, np);
3696 /* If the response is invalid, reset the connection, sending
3697 * an abort to the peer */
3701 rxi_ConnectionError(conn, error);
3702 MUTEX_ENTER(&conn->conn_data_lock);
3703 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3704 MUTEX_EXIT(&conn->conn_data_lock);
3708 /* If the response is valid, any calls waiting to attach
3709 * servers can now do so */
3711 for (i=0; i<RX_MAXCALLS; i++) {
3712 struct rx_call *call = conn->call[i];
3714 MUTEX_ENTER(&call->lock);
3715 if (call->state == RX_STATE_PRECALL)
3716 rxi_AttachServerProc(call, -1, NULL, NULL);
3717 MUTEX_EXIT(&call->lock);
3724 /* A client has received an authentication challenge: the security
3725 * object is asked to cough up a respectable response packet to send
3726 * back to the server. The server is responsible for retrying the
3727 * challenge if it fails to get a response. */
3730 rxi_ReceiveChallengePacket(conn, np, istack)
3731 register struct rx_connection *conn;
3732 register struct rx_packet *np;
3737 /* Ignore the challenge if we're the server */
3738 if (conn->type == RX_SERVER_CONNECTION) return np;
3740 /* Ignore the challenge if the connection is otherwise idle; someone's
3741 * trying to use us as an oracle. */
3742 if (!rxi_HasActiveCalls(conn)) return np;
3744 /* Send the security object the challenge packet. It is expected to fill
3745 * in the response. */
3746 error = RXS_GetResponse(conn->securityObject, conn, np);
3748 /* If the security object is unable to return a valid response, reset the
3749 * connection and send an abort to the peer. Otherwise send the response
3750 * packet to the peer connection. */
3752 rxi_ConnectionError(conn, error);
3753 MUTEX_ENTER(&conn->conn_data_lock);
3754 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3755 MUTEX_EXIT(&conn->conn_data_lock);
3758 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3759 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3765 /* Find an available server process to service the current request in
3766 * the given call structure. If one isn't available, queue up this
3767 * call so it eventually gets one */
3769 rxi_AttachServerProc(call, socket, tnop, newcallp)
3770 register struct rx_call *call;
3771 register osi_socket socket;
3773 register struct rx_call **newcallp;
3775 register struct rx_serverQueueEntry *sq;
3776 register struct rx_service *service = call->conn->service;
3777 #ifdef RX_ENABLE_LOCKS
3778 register int haveQuota = 0;
3779 #endif /* RX_ENABLE_LOCKS */
3780 /* May already be attached */
3781 if (call->state == RX_STATE_ACTIVE) return;
3783 MUTEX_ENTER(&rx_serverPool_lock);
3784 #ifdef RX_ENABLE_LOCKS
3785 while(rxi_ServerThreadSelectingCall) {
3786 MUTEX_EXIT(&call->lock);
3787 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3788 MUTEX_EXIT(&rx_serverPool_lock);
3789 MUTEX_ENTER(&call->lock);
3790 MUTEX_ENTER(&rx_serverPool_lock);
3791 /* Call may have been attached */
3792 if (call->state == RX_STATE_ACTIVE) return;
3795 haveQuota = QuotaOK(service);
3796 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3797 /* If there are no processes available to service this call,
3798 * put the call on the incoming call queue (unless it's
3799 * already on the queue).
3802 ReturnToServerPool(service);
3803 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3804 call->flags |= RX_CALL_WAIT_PROC;
3805 MUTEX_ENTER(&rx_stats_mutex);
3807 MUTEX_EXIT(&rx_stats_mutex);
3808 rxi_calltrace(RX_CALL_ARRIVAL, call);
3809 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3810 queue_Append(&rx_incomingCallQueue, call);
3813 #else /* RX_ENABLE_LOCKS */
3814 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3815 /* If there are no processes available to service this call,
3816 * put the call on the incoming call queue (unless it's
3817 * already on the queue).
3819 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3820 call->flags |= RX_CALL_WAIT_PROC;
3822 rxi_calltrace(RX_CALL_ARRIVAL, call);
3823 queue_Append(&rx_incomingCallQueue, call);
3826 #endif /* RX_ENABLE_LOCKS */
3828 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3830 /* If hot threads are enabled, and both newcallp and sq->socketp
3831 * are non-null, then this thread will process the call, and the
3832 * idle server thread will start listening on this threads socket.
3835 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3838 *sq->socketp = socket;
3839 clock_GetTime(&call->startTime);
3840 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3844 if (call->flags & RX_CALL_WAIT_PROC) {
3845 /* Conservative: I don't think this should happen */
3846 call->flags &= ~RX_CALL_WAIT_PROC;
3847 MUTEX_ENTER(&rx_stats_mutex);
3849 MUTEX_EXIT(&rx_stats_mutex);
3852 call->state = RX_STATE_ACTIVE;
3853 call->mode = RX_MODE_RECEIVING;
3854 if (call->flags & RX_CALL_CLEARED) {
3855 /* send an ack now to start the packet flow up again */
3856 call->flags &= ~RX_CALL_CLEARED;
3857 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3859 #ifdef RX_ENABLE_LOCKS
3862 service->nRequestsRunning++;
3863 if (service->nRequestsRunning <= service->minProcs)
3869 MUTEX_EXIT(&rx_serverPool_lock);
3872 /* Delay the sending of an acknowledge event for a short while, while
3873 * a new call is being prepared (in the case of a client) or a reply
3874 * is being prepared (in the case of a server). Rather than sending
3875 * an ack packet, an ACKALL packet is sent. */
3876 void rxi_AckAll(event, call, dummy)
3877 struct rxevent *event;
3878 register struct rx_call *call;
3881 #ifdef RX_ENABLE_LOCKS
3883 MUTEX_ENTER(&call->lock);
3884 call->delayedAckEvent = (struct rxevent *) 0;
3885 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3887 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3888 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3890 MUTEX_EXIT(&call->lock);
3891 #else /* RX_ENABLE_LOCKS */
3892 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3893 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3894 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3895 #endif /* RX_ENABLE_LOCKS */
3898 void rxi_SendDelayedAck(event, call, dummy)
3899 struct rxevent *event;
3900 register struct rx_call *call;
3903 #ifdef RX_ENABLE_LOCKS
3905 MUTEX_ENTER(&call->lock);
3906 if (event == call->delayedAckEvent)
3907 call->delayedAckEvent = (struct rxevent *) 0;
3908 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3910 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3912 MUTEX_EXIT(&call->lock);
3913 #else /* RX_ENABLE_LOCKS */
3914 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3915 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3916 #endif /* RX_ENABLE_LOCKS */
3920 #ifdef RX_ENABLE_LOCKS
3921 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3922 * clearing them out.
3924 static void rxi_SetAcksInTransmitQueue(call)
3925 register struct rx_call *call;
3927 register struct rx_packet *p, *tp;
3930 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3937 call->flags |= RX_CALL_TQ_CLEARME;
3938 call->flags |= RX_CALL_TQ_SOME_ACKED;
3941 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3942 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3943 call->tfirst = call->tnext;
3944 call->nSoftAcked = 0;
3946 if (call->flags & RX_CALL_FAST_RECOVER) {
3947 call->flags &= ~RX_CALL_FAST_RECOVER;
3948 call->cwind = call->nextCwind;
3949 call->nextCwind = 0;
3952 CV_SIGNAL(&call->cv_twind);
3954 #endif /* RX_ENABLE_LOCKS */
3956 /* Clear out the transmit queue for the current call (all packets have
3957 * been received by peer) */
3958 void rxi_ClearTransmitQueue(call, force)
3959 register struct rx_call *call;
3962 register struct rx_packet *p, *tp;
3964 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3965 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3967 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3974 call->flags |= RX_CALL_TQ_CLEARME;
3975 call->flags |= RX_CALL_TQ_SOME_ACKED;
3978 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3979 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3985 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3986 call->flags &= ~RX_CALL_TQ_CLEARME;
3988 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3990 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3991 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3992 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3993 call->nSoftAcked = 0;
3995 if (call->flags & RX_CALL_FAST_RECOVER) {
3996 call->flags &= ~RX_CALL_FAST_RECOVER;
3997 call->cwind = call->nextCwind;
4000 #ifdef RX_ENABLE_LOCKS
4001 CV_SIGNAL(&call->cv_twind);
4003 osi_rxWakeup(&call->twind);
4007 void rxi_ClearReceiveQueue(call)
4008 register struct rx_call *call;
4010 register struct rx_packet *p, *tp;
4011 if (queue_IsNotEmpty(&call->rq)) {
4012 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4017 rx_packetReclaims++;
4019 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4021 if (call->state == RX_STATE_PRECALL) {
4022 call->flags |= RX_CALL_CLEARED;
4026 /* Send an abort packet for the specified call */
4027 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4028 register struct rx_call *call;
4029 struct rx_packet *packet;
4039 /* Clients should never delay abort messages */
4040 if (rx_IsClientConn(call->conn))
4043 if (call->abortCode != call->error) {
4044 call->abortCode = call->error;
4045 call->abortCount = 0;
4048 if (force || rxi_callAbortThreshhold == 0 ||
4049 call->abortCount < rxi_callAbortThreshhold) {
4050 if (call->delayedAbortEvent) {
4051 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4053 error = htonl(call->error);
4055 packet = rxi_SendSpecial(call, call->conn, packet,
4056 RX_PACKET_TYPE_ABORT, (char *)&error,
4057 sizeof(error), istack);
4058 } else if (!call->delayedAbortEvent) {
4059 clock_GetTime(&when);
4060 clock_Addmsec(&when, rxi_callAbortDelay);
4061 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4062 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4068 /* Send an abort packet for the specified connection. Packet is an
4069 * optional pointer to a packet that can be used to send the abort.
4070 * Once the number of abort messages reaches the threshhold, an
4071 * event is scheduled to send the abort. Setting the force flag
4072 * overrides sending delayed abort messages.
4074 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4075 * to send the abort packet.
4077 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4078 register struct rx_connection *conn;
4079 struct rx_packet *packet;
4089 /* Clients should never delay abort messages */
4090 if (rx_IsClientConn(conn))
4093 if (force || rxi_connAbortThreshhold == 0 ||
4094 conn->abortCount < rxi_connAbortThreshhold) {
4095 if (conn->delayedAbortEvent) {
4096 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4098 error = htonl(conn->error);
4100 MUTEX_EXIT(&conn->conn_data_lock);
4101 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4102 RX_PACKET_TYPE_ABORT, (char *)&error,
4103 sizeof(error), istack);
4104 MUTEX_ENTER(&conn->conn_data_lock);
4105 } else if (!conn->delayedAbortEvent) {
4106 clock_GetTime(&when);
4107 clock_Addmsec(&when, rxi_connAbortDelay);
4108 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4114 /* Associate an error all of the calls owned by a connection. Called
4115 * with error non-zero. This is only for really fatal things, like
4116 * bad authentication responses. The connection itself is set in
4117 * error at this point, so that future packets received will be
4119 void rxi_ConnectionError(conn, error)
4120 register struct rx_connection *conn;
4121 register afs_int32 error;
4125 if (conn->challengeEvent)
4126 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4127 for (i=0; i<RX_MAXCALLS; i++) {
4128 struct rx_call *call = conn->call[i];
4130 MUTEX_ENTER(&call->lock);
4131 rxi_CallError(call, error);
4132 MUTEX_EXIT(&call->lock);
4135 conn->error = error;
4136 MUTEX_ENTER(&rx_stats_mutex);
4137 rx_stats.fatalErrors++;
4138 MUTEX_EXIT(&rx_stats_mutex);
4142 void rxi_CallError(call, error)
4143 register struct rx_call *call;
4146 if (call->error) error = call->error;
4147 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4148 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4149 rxi_ResetCall(call, 0);
4152 rxi_ResetCall(call, 0);
4154 call->error = error;
4155 call->mode = RX_MODE_ERROR;
4158 /* Reset various fields in a call structure, and wakeup waiting
4159 * processes. Some fields aren't changed: state & mode are not
4160 * touched (these must be set by the caller), and bufptr, nLeft, and
4161 * nFree are not reset, since these fields are manipulated by
4162 * unprotected macros, and may only be reset by non-interrupting code.
4165 /* this code requires that call->conn be set properly as a pre-condition. */
4166 #endif /* ADAPT_WINDOW */
4168 void rxi_ResetCall(call, newcall)
4169 register struct rx_call *call;
4170 register int newcall;
4173 register struct rx_peer *peer;
4174 struct rx_packet *packet;
4176 /* Notify anyone who is waiting for asynchronous packet arrival */
4177 if (call->arrivalProc) {
4178 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4179 call->arrivalProc = (VOID (*)()) 0;
4182 if (call->delayedAbortEvent) {
4183 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4184 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4186 rxi_SendCallAbort(call, packet, 0, 1);
4187 rxi_FreePacket(packet);
4192 * Update the peer with the congestion information in this call
4193 * so other calls on this connection can pick up where this call
4194 * left off. If the congestion sequence numbers don't match then
4195 * another call experienced a retransmission.
4197 peer = call->conn->peer;
4198 MUTEX_ENTER(&peer->peer_lock);
4200 if (call->congestSeq == peer->congestSeq) {
4201 peer->cwind = MAX(peer->cwind, call->cwind);
4202 peer->MTU = MAX(peer->MTU, call->MTU);
4203 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4206 call->abortCode = 0;
4207 call->abortCount = 0;
4209 if (peer->maxDgramPackets > 1) {
4210 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4212 call->MTU = peer->MTU;
4214 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4215 call->ssthresh = rx_maxSendWindow;
4216 call->nDgramPackets = peer->nDgramPackets;
4217 call->congestSeq = peer->congestSeq;
4218 MUTEX_EXIT(&peer->peer_lock);
4220 flags = call->flags;
4221 rxi_ClearReceiveQueue(call);
4222 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4223 if (call->flags & RX_CALL_TQ_BUSY) {
4224 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4225 call->flags |= (flags & RX_CALL_TQ_WAIT);
4227 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4229 rxi_ClearTransmitQueue(call, 0);
4230 queue_Init(&call->tq);
4233 queue_Init(&call->rq);
4235 call->rwind = rx_initReceiveWindow;
4236 call->twind = rx_initSendWindow;
4237 call->nSoftAcked = 0;
4238 call->nextCwind = 0;
4241 call->nCwindAcks = 0;
4242 call->nSoftAcks = 0;
4243 call->nHardAcks = 0;
4245 call->tfirst = call->rnext = call->tnext = 1;
4247 call->lastAcked = 0;
4248 call->localStatus = call->remoteStatus = 0;
4250 if (flags & RX_CALL_READER_WAIT) {
4251 #ifdef RX_ENABLE_LOCKS
4252 CV_BROADCAST(&call->cv_rq);
4254 osi_rxWakeup(&call->rq);
4257 if (flags & RX_CALL_WAIT_PACKETS) {
4258 MUTEX_ENTER(&rx_freePktQ_lock);
4259 rxi_PacketsUnWait(); /* XXX */
4260 MUTEX_EXIT(&rx_freePktQ_lock);
4263 #ifdef RX_ENABLE_LOCKS
4264 CV_SIGNAL(&call->cv_twind);
4266 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4267 osi_rxWakeup(&call->twind);
4270 #ifdef RX_ENABLE_LOCKS
4271 /* The following ensures that we don't mess with any queue while some
4272 * other thread might also be doing so. The call_queue_lock field is
4273 * is only modified under the call lock. If the call is in the process
4274 * of being removed from a queue, the call is not locked until the
4275 * the queue lock is dropped and only then is the call_queue_lock field
4276 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4277 * Note that any other routine which removes a call from a queue has to
4278 * obtain the queue lock before examing the queue and removing the call.
4280 if (call->call_queue_lock) {
4281 MUTEX_ENTER(call->call_queue_lock);
4282 if (queue_IsOnQueue(call)) {
4284 if (flags & RX_CALL_WAIT_PROC) {
4285 MUTEX_ENTER(&rx_stats_mutex);
4287 MUTEX_EXIT(&rx_stats_mutex);
4290 MUTEX_EXIT(call->call_queue_lock);
4291 CLEAR_CALL_QUEUE_LOCK(call);
4293 #else /* RX_ENABLE_LOCKS */
4294 if (queue_IsOnQueue(call)) {
4296 if (flags & RX_CALL_WAIT_PROC)
4299 #endif /* RX_ENABLE_LOCKS */
4301 rxi_KeepAliveOff(call);
4302 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4305 /* Send an acknowledge for the indicated packet (seq,serial) of the
4306 * indicated call, for the indicated reason (reason). This
4307 * acknowledge will specifically acknowledge receiving the packet, and
4308 * will also specify which other packets for this call have been
4309 * received. This routine returns the packet that was used to the
4310 * caller. The caller is responsible for freeing it or re-using it.
4311 * This acknowledgement also returns the highest sequence number
4312 * actually read out by the higher level to the sender; the sender
4313 * promises to keep around packets that have not been read by the
4314 * higher level yet (unless, of course, the sender decides to abort
4315 * the call altogether). Any of p, seq, serial, pflags, or reason may
4316 * be set to zero without ill effect. That is, if they are zero, they
4317 * will not convey any information.
4318 * NOW there is a trailer field, after the ack where it will safely be
4319 * ignored by mundanes, which indicates the maximum size packet this
4320 * host can swallow. */
4321 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4322 register struct rx_call *call;
4323 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4324 int seq; /* Sequence number of the packet we are acking */
4325 int serial; /* Serial number of the packet */
4326 int pflags; /* Flags field from packet header */
4327 int reason; /* Reason an acknowledge was prompted */
4330 struct rx_ackPacket *ap;
4331 register struct rx_packet *rqp;
4332 register struct rx_packet *nxp; /* For queue_Scan */
4333 register struct rx_packet *p;
4338 * Open the receive window once a thread starts reading packets
4340 if (call->rnext > 1) {
4341 call->rwind = rx_maxReceiveWindow;
4344 call->nHardAcks = 0;
4345 call->nSoftAcks = 0;
4346 if (call->rnext > call->lastAcked)
4347 call->lastAcked = call->rnext;
4351 rx_computelen(p, p->length); /* reset length, you never know */
4352 } /* where that's been... */
4354 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4355 /* We won't send the ack, but don't panic. */
4356 return optionalPacket;
4359 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4361 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4362 if (!optionalPacket) rxi_FreePacket(p);
4363 return optionalPacket;
4365 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4366 if (rx_Contiguous(p)<templ) {
4367 if (!optionalPacket) rxi_FreePacket(p);
4368 return optionalPacket;
4370 } /* MTUXXX failing to send an ack is very serious. We should */
4371 /* try as hard as possible to send even a partial ack; it's */
4372 /* better than nothing. */
4374 ap = (struct rx_ackPacket *) rx_DataOf(p);
4375 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4376 ap->reason = reason;
4378 /* The skew computation used to be bogus, I think it's better now. */
4379 /* We should start paying attention to skew. XXX */
4380 ap->serial = htonl(call->conn->maxSerial);
4381 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4383 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4384 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4386 /* No fear of running out of ack packet here because there can only be at most
4387 * one window full of unacknowledged packets. The window size must be constrained
4388 * to be less than the maximum ack size, of course. Also, an ack should always
4389 * fit into a single packet -- it should not ever be fragmented. */
4390 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4391 if (!rqp || !call->rq.next
4392 || (rqp->header.seq > (call->rnext + call->rwind))) {
4393 if (!optionalPacket) rxi_FreePacket(p);
4394 rxi_CallError(call, RX_CALL_DEAD);
4395 return optionalPacket;
4398 while (rqp->header.seq > call->rnext + offset)
4399 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4400 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4402 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4403 if (!optionalPacket) rxi_FreePacket(p);
4404 rxi_CallError(call, RX_CALL_DEAD);
4405 return optionalPacket;
4410 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4412 /* these are new for AFS 3.3 */
4413 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4414 templ = htonl(templ);
4415 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4416 templ = htonl(call->conn->peer->ifMTU);
4417 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4419 /* new for AFS 3.4 */
4420 templ = htonl(call->rwind);
4421 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4423 /* new for AFS 3.5 */
4424 templ = htonl(call->conn->peer->ifDgramPackets);
4425 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4427 p->header.serviceId = call->conn->serviceId;
4428 p->header.cid = (call->conn->cid | call->channel);
4429 p->header.callNumber = *call->callNumber;
4430 p->header.seq = seq;
4431 p->header.securityIndex = call->conn->securityIndex;
4432 p->header.epoch = call->conn->epoch;
4433 p->header.type = RX_PACKET_TYPE_ACK;
4434 p->header.flags = RX_SLOW_START_OK;
4435 if (reason == RX_ACK_PING) {
4436 p->header.flags |= RX_REQUEST_ACK;
4438 clock_GetTime(&call->pingRequestTime);
4441 if (call->conn->type == RX_CLIENT_CONNECTION)
4442 p->header.flags |= RX_CLIENT_INITIATED;
4446 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4447 ap->reason, ntohl(ap->previousPacket), p->header.seq,
4448 ntohl(ap->firstPacket));
4450 for (offset = 0; offset < ap->nAcks; offset++)
4451 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4458 register int i, nbytes = p->length;
4460 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4461 if (nbytes <= p->wirevec[i].iov_len) {
4462 register int savelen, saven;
4464 savelen = p->wirevec[i].iov_len;
4466 p->wirevec[i].iov_len = nbytes;
4468 rxi_Send(call, p, istack);
4469 p->wirevec[i].iov_len = savelen;
4473 else nbytes -= p->wirevec[i].iov_len;
4476 MUTEX_ENTER(&rx_stats_mutex);
4477 rx_stats.ackPacketsSent++;
4478 MUTEX_EXIT(&rx_stats_mutex);
4479 if (!optionalPacket) rxi_FreePacket(p);
4480 return optionalPacket; /* Return packet for re-use by caller */
4483 /* Send all of the packets in the list in single datagram */
4484 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4485 struct rx_call *call;
4486 struct rx_packet **list;
4491 struct clock *retryTime;
4497 struct rx_connection *conn = call->conn;
4498 struct rx_peer *peer = conn->peer;
4500 MUTEX_ENTER(&peer->peer_lock);
4502 if (resending) peer->reSends += len;
4503 MUTEX_ENTER(&rx_stats_mutex);
4504 rx_stats.dataPacketsSent += len;
4505 MUTEX_EXIT(&rx_stats_mutex);
4506 MUTEX_EXIT(&peer->peer_lock);
4508 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4512 /* Set the packet flags and schedule the resend events */
4513 /* Only request an ack for the last packet in the list */
4514 for (i = 0 ; i < len ; i++) {
4515 list[i]->retryTime = *retryTime;
4516 if (list[i]->header.serial) {
4517 /* Exponentially backoff retry times */
4518 if (list[i]->backoff < MAXBACKOFF) {
4519 /* so it can't stay == 0 */
4520 list[i]->backoff = (list[i]->backoff << 1) +1;
4522 else list[i]->backoff++;
4523 clock_Addmsec(&(list[i]->retryTime),
4524 ((afs_uint32) list[i]->backoff) << 8);
4527 /* Wait a little extra for the ack on the last packet */
4528 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4529 clock_Addmsec(&(list[i]->retryTime), 400);
4532 /* Record the time sent */
4533 list[i]->timeSent = *now;
4535 /* Ask for an ack on retransmitted packets, on every other packet
4536 * if the peer doesn't support slow start. Ask for an ack on every
4537 * packet until the congestion window reaches the ack rate. */
4538 if (list[i]->header.serial) {
4540 MUTEX_ENTER(&rx_stats_mutex);
4541 rx_stats.dataPacketsReSent++;
4542 MUTEX_EXIT(&rx_stats_mutex);
4544 /* improved RTO calculation- not Karn */
4545 list[i]->firstSent = *now;
4547 && (call->cwind <= (u_short)(conn->ackRate+1)
4548 || (!(call->flags & RX_CALL_SLOW_START_OK)
4549 && (list[i]->header.seq & 1)))) {
4554 MUTEX_ENTER(&peer->peer_lock);
4556 if (resending) peer->reSends++;
4557 MUTEX_ENTER(&rx_stats_mutex);
4558 rx_stats.dataPacketsSent++;
4559 MUTEX_EXIT(&rx_stats_mutex);
4560 MUTEX_EXIT(&peer->peer_lock);
4562 /* Tag this packet as not being the last in this group,
4563 * for the receiver's benefit */
4564 if (i < len-1 || moreFlag) {
4565 list[i]->header.flags |= RX_MORE_PACKETS;
4568 /* Install the new retransmit time for the packet, and
4569 * record the time sent */
4570 list[i]->timeSent = *now;
4574 list[len-1]->header.flags |= RX_REQUEST_ACK;
4577 /* Since we're about to send a data packet to the peer, it's
4578 * safe to nuke any scheduled end-of-packets ack */
4579 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4581 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4582 MUTEX_EXIT(&call->lock);
4584 rxi_SendPacketList(conn, list, len, istack);
4586 rxi_SendPacket(conn, list[0], istack);
4588 MUTEX_ENTER(&call->lock);
4589 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4591 /* Update last send time for this call (for keep-alive
4592 * processing), and for the connection (so that we can discover
4593 * idle connections) */
4594 conn->lastSendTime = call->lastSendTime = clock_Sec();
4597 /* When sending packets we need to follow these rules:
4598 * 1. Never send more than maxDgramPackets in a jumbogram.
4599 * 2. Never send a packet with more than two iovecs in a jumbogram.
4600 * 3. Never send a retransmitted packet in a jumbogram.
4601 * 4. Never send more than cwind/4 packets in a jumbogram
4602 * We always keep the last list we should have sent so we
4603 * can set the RX_MORE_PACKETS flags correctly.
4605 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4606 struct rx_call *call;
4607 struct rx_packet **list;
4611 struct clock *retryTime;
4614 int i, cnt, lastCnt = 0;
4615 struct rx_packet **listP, **lastP = 0;
4616 struct rx_peer *peer = call->conn->peer;
4617 int morePackets = 0;
4619 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4620 /* Does the current packet force us to flush the current list? */
4622 && (list[i]->header.serial
4624 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4626 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4627 /* If the call enters an error state stop sending, or if
4628 * we entered congestion recovery mode, stop sending */
4629 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4637 /* Add the current packet to the list if it hasn't been acked.
4638 * Otherwise adjust the list pointer to skip the current packet. */
4639 if (!list[i]->acked) {
4641 /* Do we need to flush the list? */
4642 if (cnt >= (int)peer->maxDgramPackets
4643 || cnt >= (int)call->nDgramPackets
4644 || cnt >= (int)call->cwind
4645 || list[i]->header.serial
4646 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4648 rxi_SendList(call, lastP, lastCnt, istack, 1,
4649 now, retryTime, resending);
4650 /* If the call enters an error state stop sending, or if
4651 * we entered congestion recovery mode, stop sending */
4652 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4662 osi_Panic("rxi_SendList error");
4668 /* Send the whole list when the call is in receive mode, when
4669 * the call is in eof mode, when we are in fast recovery mode,
4670 * and when we have the last packet */
4671 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4672 || call->mode == RX_MODE_RECEIVING
4673 || call->mode == RX_MODE_EOF
4674 || (call->flags & RX_CALL_FAST_RECOVER)) {
4675 /* Check for the case where the current list contains
4676 * an acked packet. Since we always send retransmissions
4677 * in a separate packet, we only need to check the first
4678 * packet in the list */
4679 if (cnt > 0 && !listP[0]->acked) {
4683 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4684 now, retryTime, resending);
4685 /* If the call enters an error state stop sending, or if
4686 * we entered congestion recovery mode, stop sending */
4687 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4691 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4693 } else if (lastCnt > 0) {
4694 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4698 #ifdef RX_ENABLE_LOCKS
4699 /* Call rxi_Start, below, but with the call lock held. */
4700 void rxi_StartUnlocked(event, call, istack)
4701 struct rxevent *event;
4702 register struct rx_call *call;
4705 MUTEX_ENTER(&call->lock);
4706 rxi_Start(event, call, istack);
4707 MUTEX_EXIT(&call->lock);
4709 #endif /* RX_ENABLE_LOCKS */
4711 /* This routine is called when new packets are readied for
4712 * transmission and when retransmission may be necessary, or when the
4713 * transmission window or burst count are favourable. This should be
4714 * better optimized for new packets, the usual case, now that we've
4715 * got rid of queues of send packets. XXXXXXXXXXX */
4716 void rxi_Start(event, call, istack)
4717 struct rxevent *event;
4718 register struct rx_call *call;
4721 struct rx_packet *p;
4722 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4723 struct rx_peer *peer = call->conn->peer;
4724 struct clock now, retryTime;
4728 struct rx_packet **xmitList;
4731 /* If rxi_Start is being called as a result of a resend event,
4732 * then make sure that the event pointer is removed from the call
4733 * structure, since there is no longer a per-call retransmission
4735 if (event && event == call->resendEvent) {
4736 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4737 call->resendEvent = NULL;
4739 if (queue_IsEmpty(&call->tq)) {
4743 /* Timeouts trigger congestion recovery */
4744 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4745 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4746 /* someone else is waiting to start recovery */
4749 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4750 while (call->flags & RX_CALL_TQ_BUSY) {
4751 call->flags |= RX_CALL_TQ_WAIT;
4752 #ifdef RX_ENABLE_LOCKS
4753 CV_WAIT(&call->cv_tq, &call->lock);
4754 #else /* RX_ENABLE_LOCKS */
4755 osi_rxSleep(&call->tq);
4756 #endif /* RX_ENABLE_LOCKS */
4758 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4759 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4760 call->flags |= RX_CALL_FAST_RECOVER;
4761 if (peer->maxDgramPackets > 1) {
4762 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4764 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4766 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4767 call->nDgramPackets = 1;
4769 call->nextCwind = 1;
4772 MUTEX_ENTER(&peer->peer_lock);
4773 peer->MTU = call->MTU;
4774 peer->cwind = call->cwind;
4775 peer->nDgramPackets = 1;
4777 call->congestSeq = peer->congestSeq;
4778 MUTEX_EXIT(&peer->peer_lock);
4779 /* Clear retry times on packets. Otherwise, it's possible for
4780 * some packets in the queue to force resends at rates faster
4781 * than recovery rates.
4783 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4785 clock_Zero(&p->retryTime);
4790 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4791 MUTEX_ENTER(&rx_stats_mutex);
4792 rx_tq_debug.rxi_start_in_error ++;
4793 MUTEX_EXIT(&rx_stats_mutex);
4798 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4799 /* Get clock to compute the re-transmit time for any packets
4800 * in this burst. Note, if we back off, it's reasonable to
4801 * back off all of the packets in the same manner, even if
4802 * some of them have been retransmitted more times than more
4803 * recent additions */
4804 clock_GetTime(&now);
4805 retryTime = now; /* initialize before use */
4806 MUTEX_ENTER(&peer->peer_lock);
4807 clock_Add(&retryTime, &peer->timeout);
4808 MUTEX_EXIT(&peer->peer_lock);
4810 /* Send (or resend) any packets that need it, subject to
4811 * window restrictions and congestion burst control
4812 * restrictions. Ask for an ack on the last packet sent in
4813 * this burst. For now, we're relying upon the window being
4814 * considerably bigger than the largest number of packets that
4815 * are typically sent at once by one initial call to
4816 * rxi_Start. This is probably bogus (perhaps we should ask
4817 * for an ack when we're half way through the current
4818 * window?). Also, for non file transfer applications, this
4819 * may end up asking for an ack for every packet. Bogus. XXXX
4822 * But check whether we're here recursively, and let the other guy
4825 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4826 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4827 call->flags |= RX_CALL_TQ_BUSY;
4829 call->flags &= ~RX_CALL_NEED_START;
4830 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4832 maxXmitPackets = MIN(call->twind, call->cwind);
4833 xmitList = (struct rx_packet **)
4834 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4835 if (xmitList == NULL)
4836 osi_Panic("rxi_Start, failed to allocate xmit list");
4837 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4838 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4839 /* We shouldn't be sending packets if a thread is waiting
4840 * to initiate congestion recovery */
4843 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4844 /* Only send one packet during fast recovery */
4847 if ((p->header.flags == RX_FREE_PACKET) ||
4848 (!queue_IsEnd(&call->tq, nxp)
4849 && (nxp->header.flags == RX_FREE_PACKET)) ||
4850 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4851 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4852 osi_Panic("rxi_Start: xmit queue clobbered");
4855 MUTEX_ENTER(&rx_stats_mutex);
4856 rx_stats.ignoreAckedPacket++;
4857 MUTEX_EXIT(&rx_stats_mutex);
4858 continue; /* Ignore this packet if it has been acknowledged */
4861 /* Turn off all flags except these ones, which are the same
4862 * on each transmission */
4863 p->header.flags &= RX_PRESET_FLAGS;
4865 if (p->header.seq >= call->tfirst +
4866 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4867 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4868 /* Note: if we're waiting for more window space, we can
4869 * still send retransmits; hence we don't return here, but
4870 * break out to schedule a retransmit event */
4871 dpf(("call %d waiting for window", *(call->callNumber)));
4875 /* Transmit the packet if it needs to be sent. */
4876 if (!clock_Lt(&now, &p->retryTime)) {
4877 if (nXmitPackets == maxXmitPackets) {
4878 osi_Panic("rxi_Start: xmit list overflowed");
4880 xmitList[nXmitPackets++] = p;
4884 /* xmitList now hold pointers to all of the packets that are
4885 * ready to send. Now we loop to send the packets */
4886 if (nXmitPackets > 0) {
4887 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4888 &now, &retryTime, resending);
4890 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4892 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4894 * TQ references no longer protected by this flag; they must remain
4895 * protected by the global lock.
4897 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4898 call->flags &= ~RX_CALL_TQ_BUSY;
4899 if (call->flags & RX_CALL_TQ_WAIT) {
4900 call->flags &= ~RX_CALL_TQ_WAIT;
4901 #ifdef RX_ENABLE_LOCKS
4902 CV_BROADCAST(&call->cv_tq);
4903 #else /* RX_ENABLE_LOCKS */
4904 osi_rxWakeup(&call->tq);
4905 #endif /* RX_ENABLE_LOCKS */
4910 /* We went into the error state while sending packets. Now is
4911 * the time to reset the call. This will also inform the using
4912 * process that the call is in an error state.
4914 MUTEX_ENTER(&rx_stats_mutex);
4915 rx_tq_debug.rxi_start_aborted ++;
4916 MUTEX_EXIT(&rx_stats_mutex);
4917 call->flags &= ~RX_CALL_TQ_BUSY;
4918 if (call->flags & RX_CALL_TQ_WAIT) {
4919 call->flags &= ~RX_CALL_TQ_WAIT;
4920 #ifdef RX_ENABLE_LOCKS
4921 CV_BROADCAST(&call->cv_tq);
4922 #else /* RX_ENABLE_LOCKS */
4923 osi_rxWakeup(&call->tq);
4924 #endif /* RX_ENABLE_LOCKS */
4926 rxi_CallError(call, call->error);
4929 #ifdef RX_ENABLE_LOCKS
4930 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4931 register int missing;
4932 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4933 /* Some packets have received acks. If they all have, we can clear
4934 * the transmit queue.
4936 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4937 if (p->header.seq < call->tfirst && p->acked) {
4945 call->flags |= RX_CALL_TQ_CLEARME;
4947 #endif /* RX_ENABLE_LOCKS */
4948 /* Don't bother doing retransmits if the TQ is cleared. */
4949 if (call->flags & RX_CALL_TQ_CLEARME) {
4950 rxi_ClearTransmitQueue(call, 1);
4952 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4955 /* Always post a resend event, if there is anything in the
4956 * queue, and resend is possible. There should be at least
4957 * one unacknowledged packet in the queue ... otherwise none
4958 * of these packets should be on the queue in the first place.
4960 if (call->resendEvent) {
4961 /* Cancel the existing event and post a new one */
4962 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4965 /* The retry time is the retry time on the first unacknowledged
4966 * packet inside the current window */
4967 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4968 /* Don't set timers for packets outside the window */
4969 if (p->header.seq >= call->tfirst + call->twind) {
4973 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4975 retryTime = p->retryTime;
4980 /* Post a new event to re-run rxi_Start when retries may be needed */
4981 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4982 #ifdef RX_ENABLE_LOCKS
4983 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4984 call->resendEvent = rxevent_Post(&retryTime,
4986 (char *)call, istack);
4987 #else /* RX_ENABLE_LOCKS */
4988 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4989 (char *)call, (void*)(long)istack);
4990 #endif /* RX_ENABLE_LOCKS */
4993 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4994 } while (call->flags & RX_CALL_NEED_START);
4996 * TQ references no longer protected by this flag; they must remain
4997 * protected by the global lock.
4999 call->flags &= ~RX_CALL_TQ_BUSY;
5000 if (call->flags & RX_CALL_TQ_WAIT) {
5001 call->flags &= ~RX_CALL_TQ_WAIT;
5002 #ifdef RX_ENABLE_LOCKS
5003 CV_BROADCAST(&call->cv_tq);
5004 #else /* RX_ENABLE_LOCKS */
5005 osi_rxWakeup(&call->tq);
5006 #endif /* RX_ENABLE_LOCKS */
5009 call->flags |= RX_CALL_NEED_START;
5011 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5013 if (call->resendEvent) {
5014 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5019 /* Also adjusts the keep alive parameters for the call, to reflect
5020 * that we have just sent a packet (so keep alives aren't sent
5022 void rxi_Send(call, p, istack)
5023 register struct rx_call *call;
5024 register struct rx_packet *p;
5027 register struct rx_connection *conn = call->conn;
5029 /* Stamp each packet with the user supplied status */
5030 p->header.userStatus = call->localStatus;
5032 /* Allow the security object controlling this call's security to
5033 * make any last-minute changes to the packet */
5034 RXS_SendPacket(conn->securityObject, call, p);
5036 /* Since we're about to send SOME sort of packet to the peer, it's
5037 * safe to nuke any scheduled end-of-packets ack */
5038 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5040 /* Actually send the packet, filling in more connection-specific fields */
5041 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5042 MUTEX_EXIT(&call->lock);
5043 rxi_SendPacket(conn, p, istack);
5044 MUTEX_ENTER(&call->lock);
5045 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5047 /* Update last send time for this call (for keep-alive
5048 * processing), and for the connection (so that we can discover
5049 * idle connections) */
5050 conn->lastSendTime = call->lastSendTime = clock_Sec();
5054 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5055 * that things are fine. Also called periodically to guarantee that nothing
5056 * falls through the cracks (e.g. (error + dally) connections have keepalive
5057 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5060 #ifdef RX_ENABLE_LOCKS
5061 int rxi_CheckCall(call, haveCTLock)
5062 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5063 #else /* RX_ENABLE_LOCKS */
5064 int rxi_CheckCall(call)
5065 #endif /* RX_ENABLE_LOCKS */
5066 register struct rx_call *call;
5068 register struct rx_connection *conn = call->conn;
5069 register struct rx_service *tservice;
5071 afs_uint32 deadTime;
5073 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5074 if (call->flags & RX_CALL_TQ_BUSY) {
5075 /* Call is active and will be reset by rxi_Start if it's
5076 * in an error state.
5081 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5082 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5083 ((afs_uint32)conn->peer->rtt >> 3) +
5084 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5086 /* These are computed to the second (+- 1 second). But that's
5087 * good enough for these values, which should be a significant
5088 * number of seconds. */
5089 if (now > (call->lastReceiveTime + deadTime)) {
5090 if (call->state == RX_STATE_ACTIVE) {
5091 rxi_CallError(call, RX_CALL_DEAD);
5095 #ifdef RX_ENABLE_LOCKS
5096 /* Cancel pending events */
5097 rxevent_Cancel(call->delayedAckEvent, call,
5098 RX_CALL_REFCOUNT_DELAY);
5099 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5100 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5101 if (call->refCount == 0) {
5102 rxi_FreeCall(call, haveCTLock);
5106 #else /* RX_ENABLE_LOCKS */
5109 #endif /* RX_ENABLE_LOCKS */
5111 /* Non-active calls are destroyed if they are not responding
5112 * to pings; active calls are simply flagged in error, so the
5113 * attached process can die reasonably gracefully. */
5115 /* see if we have a non-activity timeout */
5116 tservice = conn->service;
5117 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5118 && tservice->idleDeadTime
5119 && ((call->startWait + tservice->idleDeadTime) < now)) {
5120 if (call->state == RX_STATE_ACTIVE) {
5121 rxi_CallError(call, RX_CALL_TIMEOUT);
5125 /* see if we have a hard timeout */
5126 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5127 if (call->state == RX_STATE_ACTIVE)
5128 rxi_CallError(call, RX_CALL_TIMEOUT);
5135 /* When a call is in progress, this routine is called occasionally to
5136 * make sure that some traffic has arrived (or been sent to) the peer.
5137 * If nothing has arrived in a reasonable amount of time, the call is
5138 * declared dead; if nothing has been sent for a while, we send a
5139 * keep-alive packet (if we're actually trying to keep the call alive)
5141 void rxi_KeepAliveEvent(event, call, dummy)
5142 struct rxevent *event;
5143 register struct rx_call *call;
5145 struct rx_connection *conn;
5148 MUTEX_ENTER(&call->lock);
5149 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5150 if (event == call->keepAliveEvent)
5151 call->keepAliveEvent = (struct rxevent *) 0;
5154 #ifdef RX_ENABLE_LOCKS
5155 if(rxi_CheckCall(call, 0)) {
5156 MUTEX_EXIT(&call->lock);
5159 #else /* RX_ENABLE_LOCKS */
5160 if (rxi_CheckCall(call)) return;
5161 #endif /* RX_ENABLE_LOCKS */
5163 /* Don't try to keep alive dallying calls */
5164 if (call->state == RX_STATE_DALLY) {
5165 MUTEX_EXIT(&call->lock);
5170 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5171 /* Don't try to send keepalives if there is unacknowledged data */
5172 /* the rexmit code should be good enough, this little hack
5173 * doesn't quite work XXX */
5174 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5176 rxi_ScheduleKeepAliveEvent(call);
5177 MUTEX_EXIT(&call->lock);
5181 void rxi_ScheduleKeepAliveEvent(call)
5182 register struct rx_call *call;
5184 if (!call->keepAliveEvent) {
5186 clock_GetTime(&when);
5187 when.sec += call->conn->secondsUntilPing;
5188 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5189 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5193 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5194 void rxi_KeepAliveOn(call)
5195 register struct rx_call *call;
5197 /* Pretend last packet received was received now--i.e. if another
5198 * packet isn't received within the keep alive time, then the call
5199 * will die; Initialize last send time to the current time--even
5200 * if a packet hasn't been sent yet. This will guarantee that a
5201 * keep-alive is sent within the ping time */
5202 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5203 rxi_ScheduleKeepAliveEvent(call);
5206 /* This routine is called to send connection abort messages
5207 * that have been delayed to throttle looping clients. */
5208 void rxi_SendDelayedConnAbort(event, conn, dummy)
5209 struct rxevent *event;
5210 register struct rx_connection *conn;
5214 struct rx_packet *packet;
5216 MUTEX_ENTER(&conn->conn_data_lock);
5217 conn->delayedAbortEvent = (struct rxevent *) 0;
5218 error = htonl(conn->error);
5220 MUTEX_EXIT(&conn->conn_data_lock);
5221 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5223 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5224 RX_PACKET_TYPE_ABORT, (char *)&error,
5226 rxi_FreePacket(packet);
5230 /* This routine is called to send call abort messages
5231 * that have been delayed to throttle looping clients. */
5232 void rxi_SendDelayedCallAbort(event, call, dummy)
5233 struct rxevent *event;
5234 register struct rx_call *call;
5238 struct rx_packet *packet;
5240 MUTEX_ENTER(&call->lock);
5241 call->delayedAbortEvent = (struct rxevent *) 0;
5242 error = htonl(call->error);
5244 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5246 packet = rxi_SendSpecial(call, call->conn, packet,
5247 RX_PACKET_TYPE_ABORT, (char *)&error,
5249 rxi_FreePacket(packet);
5251 MUTEX_EXIT(&call->lock);
5254 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5255 * seconds) to ask the client to authenticate itself. The routine
5256 * issues a challenge to the client, which is obtained from the
5257 * security object associated with the connection */
5258 void rxi_ChallengeEvent(event, conn, dummy)
5259 struct rxevent *event;
5260 register struct rx_connection *conn;
5263 conn->challengeEvent = (struct rxevent *) 0;
5264 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5265 register struct rx_packet *packet;
5267 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5269 /* If there's no packet available, do this later. */
5270 RXS_GetChallenge(conn->securityObject, conn, packet);
5271 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5272 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5273 rxi_FreePacket(packet);
5275 clock_GetTime(&when);
5276 when.sec += RX_CHALLENGE_TIMEOUT;
5277 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5281 /* Call this routine to start requesting the client to authenticate
5282 * itself. This will continue until authentication is established,
5283 * the call times out, or an invalid response is returned. The
5284 * security object associated with the connection is asked to create
5285 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5286 * defined earlier. */
5287 void rxi_ChallengeOn(conn)
5288 register struct rx_connection *conn;
5290 if (!conn->challengeEvent) {
5291 RXS_CreateChallenge(conn->securityObject, conn);
5292 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5297 /* Compute round trip time of the packet provided, in *rttp.
5300 /* rxi_ComputeRoundTripTime is called with peer locked. */
5301 void rxi_ComputeRoundTripTime(p, sentp, peer)
5302 register struct clock *sentp; /* may be null */
5303 register struct rx_peer *peer; /* may be null */
5304 register struct rx_packet *p;
5306 struct clock thisRtt, *rttp = &thisRtt;
5308 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5309 /* making year 2038 bugs to get this running now - stroucki */
5310 struct timeval temptime;
5312 register int rtt_timeout;
5313 static char id[]="@(#)adaptive RTO";
5315 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5316 /* yet again. This was the worst Heisenbug of the port - stroucki */
5317 clock_GetTime(&temptime);
5318 rttp->sec=(afs_int32)temptime.tv_sec;
5319 rttp->usec=(afs_int32)temptime.tv_usec;
5321 clock_GetTime(rttp);
5323 if (clock_Lt(rttp, sentp)) {
5325 return; /* somebody set the clock back, don't count this time. */
5327 clock_Sub(rttp, sentp);
5328 MUTEX_ENTER(&rx_stats_mutex);
5329 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5330 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5331 if (rttp->sec > 60) {
5332 MUTEX_EXIT(&rx_stats_mutex);
5333 return; /* somebody set the clock ahead */
5335 rx_stats.maxRtt = *rttp;
5337 clock_Add(&rx_stats.totalRtt, rttp);
5338 rx_stats.nRttSamples++;
5339 MUTEX_EXIT(&rx_stats_mutex);
5341 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5343 /* Apply VanJacobson round-trip estimations */
5348 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5349 * srtt is stored as fixed point with 3 bits after the binary
5350 * point (i.e., scaled by 8). The following magic is
5351 * equivalent to the smoothing algorithm in rfc793 with an
5352 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5353 * srtt*8 = srtt*8 + rtt - srtt
5354 * srtt = srtt + rtt/8 - srtt/8
5357 delta = MSEC(rttp) - (peer->rtt >> 3);
5361 * We accumulate a smoothed rtt variance (actually, a smoothed
5362 * mean difference), then set the retransmit timer to smoothed
5363 * rtt + 4 times the smoothed variance (was 2x in van's original
5364 * paper, but 4x works better for me, and apparently for him as
5366 * rttvar is stored as
5367 * fixed point with 2 bits after the binary point (scaled by
5368 * 4). The following is equivalent to rfc793 smoothing with
5369 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5370 * replaces rfc793's wired-in beta.
5371 * dev*4 = dev*4 + (|actual - expected| - dev)
5377 delta -= (peer->rtt_dev >> 2);
5378 peer->rtt_dev += delta;
5381 /* I don't have a stored RTT so I start with this value. Since I'm
5382 * probably just starting a call, and will be pushing more data down
5383 * this, I expect congestion to increase rapidly. So I fudge a
5384 * little, and I set deviance to half the rtt. In practice,
5385 * deviance tends to approach something a little less than
5386 * half the smoothed rtt. */
5387 peer->rtt = (MSEC(rttp) << 3) + 8;
5388 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5390 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5391 * the other of these connections is usually in a user process, and can
5392 * be switched and/or swapped out. So on fast, reliable networks, the
5393 * timeout would otherwise be too short.
5395 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5396 clock_Zero(&(peer->timeout));
5397 clock_Addmsec(&(peer->timeout), rtt_timeout);
5399 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5400 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5401 (peer->timeout.sec),(peer->timeout.usec)) );
5405 /* Find all server connections that have not been active for a long time, and
5407 void rxi_ReapConnections()
5410 clock_GetTime(&now);
5412 /* Find server connection structures that haven't been used for
5413 * greater than rx_idleConnectionTime */
5414 { struct rx_connection **conn_ptr, **conn_end;
5415 int i, havecalls = 0;
5416 MUTEX_ENTER(&rx_connHashTable_lock);
5417 for (conn_ptr = &rx_connHashTable[0],
5418 conn_end = &rx_connHashTable[rx_hashTableSize];
5419 conn_ptr < conn_end; conn_ptr++) {
5420 struct rx_connection *conn, *next;
5421 struct rx_call *call;
5425 for (conn = *conn_ptr; conn; conn = next) {
5426 /* XXX -- Shouldn't the connection be locked? */
5429 for(i=0;i<RX_MAXCALLS;i++) {
5430 call = conn->call[i];
5433 MUTEX_ENTER(&call->lock);
5434 #ifdef RX_ENABLE_LOCKS
5435 result = rxi_CheckCall(call, 1);
5436 #else /* RX_ENABLE_LOCKS */
5437 result = rxi_CheckCall(call);
5438 #endif /* RX_ENABLE_LOCKS */
5439 MUTEX_EXIT(&call->lock);
5441 /* If CheckCall freed the call, it might
5442 * have destroyed the connection as well,
5443 * which screws up the linked lists.
5449 if (conn->type == RX_SERVER_CONNECTION) {
5450 /* This only actually destroys the connection if
5451 * there are no outstanding calls */
5452 MUTEX_ENTER(&conn->conn_data_lock);
5453 if (!havecalls && !conn->refCount &&
5454 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5455 conn->refCount++; /* it will be decr in rx_DestroyConn */
5456 MUTEX_EXIT(&conn->conn_data_lock);
5457 #ifdef RX_ENABLE_LOCKS
5458 rxi_DestroyConnectionNoLock(conn);
5459 #else /* RX_ENABLE_LOCKS */
5460 rxi_DestroyConnection(conn);
5461 #endif /* RX_ENABLE_LOCKS */
5463 #ifdef RX_ENABLE_LOCKS
5465 MUTEX_EXIT(&conn->conn_data_lock);
5467 #endif /* RX_ENABLE_LOCKS */
5471 #ifdef RX_ENABLE_LOCKS
5472 while (rx_connCleanup_list) {
5473 struct rx_connection *conn;
5474 conn = rx_connCleanup_list;
5475 rx_connCleanup_list = rx_connCleanup_list->next;
5476 MUTEX_EXIT(&rx_connHashTable_lock);
5477 rxi_CleanupConnection(conn);
5478 MUTEX_ENTER(&rx_connHashTable_lock);
5480 MUTEX_EXIT(&rx_connHashTable_lock);
5481 #endif /* RX_ENABLE_LOCKS */
5484 /* Find any peer structures that haven't been used (haven't had an
5485 * associated connection) for greater than rx_idlePeerTime */
5486 { struct rx_peer **peer_ptr, **peer_end;
5488 MUTEX_ENTER(&rx_rpc_stats);
5489 MUTEX_ENTER(&rx_peerHashTable_lock);
5490 for (peer_ptr = &rx_peerHashTable[0],
5491 peer_end = &rx_peerHashTable[rx_hashTableSize];
5492 peer_ptr < peer_end; peer_ptr++) {
5493 struct rx_peer *peer, *next, *prev;
5494 for (prev = peer = *peer_ptr; peer; peer = next) {
5496 code = MUTEX_TRYENTER(&peer->peer_lock);
5497 if ((code) && (peer->refCount == 0)
5498 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5499 rx_interface_stat_p rpc_stat, nrpc_stat;
5501 MUTEX_EXIT(&peer->peer_lock);
5502 MUTEX_DESTROY(&peer->peer_lock);
5503 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5504 rx_interface_stat)) {
5505 unsigned int num_funcs;
5506 if (!rpc_stat) break;
5507 queue_Remove(&rpc_stat->queue_header);
5508 queue_Remove(&rpc_stat->all_peers);
5509 num_funcs = rpc_stat->stats[0].func_total;
5510 space = sizeof(rx_interface_stat_t) +
5511 rpc_stat->stats[0].func_total *
5512 sizeof(rx_function_entry_v1_t);
5514 rxi_Free(rpc_stat, space);
5515 rxi_rpc_peer_stat_cnt -= num_funcs;
5518 MUTEX_ENTER(&rx_stats_mutex);
5519 rx_stats.nPeerStructs--;
5520 MUTEX_EXIT(&rx_stats_mutex);
5521 if (prev == *peer_ptr) {
5530 MUTEX_EXIT(&peer->peer_lock);
5536 MUTEX_EXIT(&rx_peerHashTable_lock);
5537 MUTEX_EXIT(&rx_rpc_stats);
5540 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5541 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5542 GC, just below. Really, we shouldn't have to keep moving packets from
5543 one place to another, but instead ought to always know if we can
5544 afford to hold onto a packet in its particular use. */
5545 MUTEX_ENTER(&rx_freePktQ_lock);
5546 if (rx_waitingForPackets) {
5547 rx_waitingForPackets = 0;
5548 #ifdef RX_ENABLE_LOCKS
5549 CV_BROADCAST(&rx_waitingForPackets_cv);
5551 osi_rxWakeup(&rx_waitingForPackets);
5554 MUTEX_EXIT(&rx_freePktQ_lock);
5556 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5557 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5561 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5562 * rx.h is sort of strange this is better. This is called with a security
5563 * object before it is discarded. Each connection using a security object has
5564 * its own refcount to the object so it won't actually be freed until the last
5565 * connection is destroyed.
5567 * This is the only rxs module call. A hold could also be written but no one
5570 int rxs_Release (aobj)
5571 struct rx_securityClass *aobj;
5573 return RXS_Close (aobj);
5577 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5578 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5579 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5580 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5582 /* Adjust our estimate of the transmission rate to this peer, given
5583 * that the packet p was just acked. We can adjust peer->timeout and
5584 * call->twind. Pragmatically, this is called
5585 * only with packets of maximal length.
5586 * Called with peer and call locked.
5589 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5590 register struct rx_peer *peer;
5591 register struct rx_call *call;
5592 struct rx_packet *p, *ackp;
5595 afs_int32 xferSize, xferMs;
5596 register afs_int32 minTime;
5599 /* Count down packets */
5600 if (peer->rateFlag > 0) peer->rateFlag--;
5601 /* Do nothing until we're enabled */
5602 if (peer->rateFlag != 0) return;
5603 if (!call->conn) return;
5605 /* Count only when the ack seems legitimate */
5606 switch (ackReason) {
5607 case RX_ACK_REQUESTED:
5608 xferSize = p->length + RX_HEADER_SIZE +
5609 call->conn->securityMaxTrailerSize;
5613 case RX_ACK_PING_RESPONSE:
5614 if (p) /* want the response to ping-request, not data send */
5616 clock_GetTime(&newTO);
5617 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5618 clock_Sub(&newTO, &call->pingRequestTime);
5619 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5623 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5630 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5631 ntohl(peer->host), ntohs(peer->port),
5632 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5633 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5636 /* Track only packets that are big enough. */
5637 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5641 /* absorb RTT data (in milliseconds) for these big packets */
5642 if (peer->smRtt == 0) {
5643 peer->smRtt = xferMs;
5645 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5646 if (!peer->smRtt) peer->smRtt = 1;
5649 if (peer->countDown) {
5653 peer->countDown = 10; /* recalculate only every so often */
5655 /* In practice, we can measure only the RTT for full packets,
5656 * because of the way Rx acks the data that it receives. (If it's
5657 * smaller than a full packet, it often gets implicitly acked
5658 * either by the call response (from a server) or by the next call
5659 * (from a client), and either case confuses transmission times
5660 * with processing times.) Therefore, replace the above
5661 * more-sophisticated processing with a simpler version, where the
5662 * smoothed RTT is kept for full-size packets, and the time to
5663 * transmit a windowful of full-size packets is simply RTT *
5664 * windowSize. Again, we take two steps:
5665 - ensure the timeout is large enough for a single packet's RTT;
5666 - ensure that the window is small enough to fit in the desired timeout.*/
5668 /* First, the timeout check. */
5669 minTime = peer->smRtt;
5670 /* Get a reasonable estimate for a timeout period */
5672 newTO.sec = minTime / 1000;
5673 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5675 /* Increase the timeout period so that we can always do at least
5676 * one packet exchange */
5677 if (clock_Gt(&newTO, &peer->timeout)) {
5679 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5680 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5681 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5684 peer->timeout = newTO;
5687 /* Now, get an estimate for the transmit window size. */
5688 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5689 /* Now, convert to the number of full packets that could fit in a
5690 * reasonable fraction of that interval */
5691 minTime /= (peer->smRtt << 1);
5692 xferSize = minTime; /* (make a copy) */
5694 /* Now clamp the size to reasonable bounds. */
5695 if (minTime <= 1) minTime = 1;
5696 else if (minTime > rx_Window) minTime = rx_Window;
5697 /* if (minTime != peer->maxWindow) {
5698 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5699 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5700 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5702 peer->maxWindow = minTime;
5703 elide... call->twind = minTime;
5707 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5708 * Discern this by calculating the timeout necessary for rx_Window
5710 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5711 /* calculate estimate for transmission interval in milliseconds */
5712 minTime = rx_Window * peer->smRtt;
5713 if (minTime < 1000) {
5714 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5715 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5716 peer->timeout.usec, peer->smRtt,
5719 newTO.sec = 0; /* cut back on timeout by half a second */
5720 newTO.usec = 500000;
5721 clock_Sub(&peer->timeout, &newTO);
5726 } /* end of rxi_ComputeRate */
5727 #endif /* ADAPT_WINDOW */
5735 /* Don't call this debugging routine directly; use dpf */
5737 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5738 a11, a12, a13, a14, a15)
5742 clock_GetTime(&now);
5743 fprintf(rx_Log, " %u.%.3u:", now.sec, now.usec/1000);
5744 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5751 * This function is used to process the rx_stats structure that is local
5752 * to a process as well as an rx_stats structure received from a remote
5753 * process (via rxdebug). Therefore, it needs to do minimal version
5756 void rx_PrintTheseStats (file, s, size, freePackets, version)
5759 int size; /* some idea of version control */
5760 afs_int32 freePackets;
5765 if (size != sizeof(struct rx_stats)) {
5767 "Unexpected size of stats structure: was %d, expected %d\n",
5768 size, sizeof(struct rx_stats));
5772 "rx stats: free packets %d, "
5777 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5779 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5780 s->receivePktAllocFailures,
5781 s->receiveCbufPktAllocFailures,
5782 s->sendPktAllocFailures,
5783 s->sendCbufPktAllocFailures,
5784 s->specialPktAllocFailures);
5787 "alloc-failures(rcv %d,send %d,ack %d)\n",
5788 s->receivePktAllocFailures,
5789 s->sendPktAllocFailures,
5790 s->specialPktAllocFailures);
5795 "bogusReads %d (last from host %x), "
5801 s->bogusPacketOnRead,
5804 s->noPacketBuffersOnRead,
5808 fprintf(file, " packets read: ");
5809 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5815 fprintf(file, "\n");
5818 " other read counters: data %d, "
5826 s->spuriousPacketsRead,
5827 s->ignorePacketDally);
5829 fprintf(file, " packets sent: ");
5830 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5836 fprintf(file, "\n");
5839 " other send counters: ack %d, "
5840 "data %d (not resends), "
5843 "acked&ignored %d\n",
5846 s->dataPacketsReSent,
5847 s->dataPacketsPushed,
5848 s->ignoreAckedPacket);
5851 " \t(these should be small) sendFailed %d, "
5856 if (s->nRttSamples) {
5858 " Average rtt is %0.3f, with %d samples\n",
5859 clock_Float(&s->totalRtt)/s->nRttSamples,
5863 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5864 clock_Float(&s->minRtt),
5865 clock_Float(&s->maxRtt));
5869 " %d server connections, "
5870 "%d client connections, "
5873 "%d free call structs\n",
5878 s->nFreeCallStructs);
5880 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5882 " %d clock updates\n",
5888 /* for backward compatibility */
5889 void rx_PrintStats(file)
5892 MUTEX_ENTER(&rx_stats_mutex);
5893 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5894 MUTEX_EXIT(&rx_stats_mutex);
5897 void rx_PrintPeerStats(file, peer)
5899 struct rx_peer *peer;
5904 "burst wait %u.%d.\n",
5908 peer->burstWait.sec,
5909 peer->burstWait.usec);
5913 "retry time %u.%06d, "
5924 "max in packet skew %d, "
5925 "max out packet skew %d\n",
5928 peer->outPacketSkew);
5931 #ifdef AFS_PTHREAD_ENV
5933 * This mutex protects the following static variables:
5937 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5938 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5940 #define LOCK_RX_DEBUG
5941 #define UNLOCK_RX_DEBUG
5942 #endif /* AFS_PTHREAD_ENV */
5944 static int MakeDebugCall(
5946 afs_uint32 remoteAddr,
5947 afs_uint16 remotePort,
5955 static afs_int32 counter = 100;
5957 struct rx_header theader;
5959 register afs_int32 code;
5961 struct sockaddr_in taddr, faddr;
5966 endTime = time(0) + 20; /* try for 20 seconds */
5970 tp = &tbuffer[sizeof(struct rx_header)];
5971 taddr.sin_family = AF_INET;
5972 taddr.sin_port = remotePort;
5973 taddr.sin_addr.s_addr = remoteAddr;
5975 memset(&theader, 0, sizeof(theader));
5976 theader.epoch = htonl(999);
5978 theader.callNumber = htonl(counter);
5981 theader.type = type;
5982 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5983 theader.serviceId = 0;
5985 bcopy(&theader, tbuffer, sizeof(theader));
5986 bcopy(inputData, tp, inputLength);
5987 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5988 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5990 /* see if there's a packet available */
5992 FD_SET(socket, &imask);
5995 code = select(socket+1, &imask, 0, 0, &tv);
5997 /* now receive a packet */
5998 faddrLen = sizeof(struct sockaddr_in);
5999 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6000 (struct sockaddr *) &faddr, &faddrLen);
6002 bcopy(tbuffer, &theader, sizeof(struct rx_header));
6003 if (counter == ntohl(theader.callNumber)) break;
6006 /* see if we've timed out */
6007 if (endTime < time(0)) return -1;
6009 code -= sizeof(struct rx_header);
6010 if (code > outputLength) code = outputLength;
6011 bcopy(tp, outputData, code);
6015 afs_int32 rx_GetServerDebug(
6017 afs_uint32 remoteAddr,
6018 afs_uint16 remotePort,
6019 struct rx_debugStats *stat,
6020 afs_uint32 *supportedValues
6023 struct rx_debugIn in;
6026 *supportedValues = 0;
6027 in.type = htonl(RX_DEBUGI_GETSTATS);
6030 rc = MakeDebugCall(socket,
6033 RX_PACKET_TYPE_DEBUG,
6040 * If the call was successful, fixup the version and indicate
6041 * what contents of the stat structure are valid.
6042 * Also do net to host conversion of fields here.
6046 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6047 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6049 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6050 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6052 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6053 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6055 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6056 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6058 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6059 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6061 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6062 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6064 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6065 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6068 stat->nFreePackets = ntohl(stat->nFreePackets);
6069 stat->packetReclaims = ntohl(stat->packetReclaims);
6070 stat->callsExecuted = ntohl(stat->callsExecuted);
6071 stat->nWaiting = ntohl(stat->nWaiting);
6072 stat->idleThreads = ntohl(stat->idleThreads);
6078 afs_int32 rx_GetServerStats(
6080 afs_uint32 remoteAddr,
6081 afs_uint16 remotePort,
6082 struct rx_stats *stat,
6083 afs_uint32 *supportedValues
6086 struct rx_debugIn in;
6087 afs_int32 *lp = (afs_int32 *) stat;
6092 * supportedValues is currently unused, but added to allow future
6093 * versioning of this function.
6096 *supportedValues = 0;
6097 in.type = htonl(RX_DEBUGI_RXSTATS);
6099 memset(stat, 0, sizeof(*stat));
6101 rc = MakeDebugCall(socket,
6104 RX_PACKET_TYPE_DEBUG,
6113 * Do net to host conversion here
6116 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6124 afs_int32 rx_GetServerVersion(
6126 afs_uint32 remoteAddr,
6127 afs_uint16 remotePort,
6128 size_t version_length,
6133 return MakeDebugCall(socket,
6136 RX_PACKET_TYPE_VERSION,
6143 afs_int32 rx_GetServerConnections(
6145 afs_uint32 remoteAddr,
6146 afs_uint16 remotePort,
6147 afs_int32 *nextConnection,
6149 afs_uint32 debugSupportedValues,
6150 struct rx_debugConn *conn,
6151 afs_uint32 *supportedValues
6154 struct rx_debugIn in;
6159 * supportedValues is currently unused, but added to allow future
6160 * versioning of this function.
6163 *supportedValues = 0;
6164 if (allConnections) {
6165 in.type = htonl(RX_DEBUGI_GETALLCONN);
6167 in.type = htonl(RX_DEBUGI_GETCONN);
6169 in.index = htonl(*nextConnection);
6170 memset(conn, 0, sizeof(*conn));
6172 rc = MakeDebugCall(socket,
6175 RX_PACKET_TYPE_DEBUG,
6182 *nextConnection += 1;
6185 * Convert old connection format to new structure.
6188 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6189 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6190 #define MOVEvL(a) (conn->a = vL->a)
6192 /* any old or unrecognized version... */
6193 for (i=0;i<RX_MAXCALLS;i++) {
6194 MOVEvL(callState[i]);
6195 MOVEvL(callMode[i]);
6196 MOVEvL(callFlags[i]);
6197 MOVEvL(callOther[i]);
6199 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6200 MOVEvL(secStats.type);
6201 MOVEvL(secStats.level);
6202 MOVEvL(secStats.flags);
6203 MOVEvL(secStats.expires);
6204 MOVEvL(secStats.packetsReceived);
6205 MOVEvL(secStats.packetsSent);
6206 MOVEvL(secStats.bytesReceived);
6207 MOVEvL(secStats.bytesSent);
6212 * Do net to host conversion here
6214 * I don't convert host or port since we are most likely
6215 * going to want these in NBO.
6217 conn->cid = ntohl(conn->cid);
6218 conn->serial = ntohl(conn->serial);
6219 for(i=0;i<RX_MAXCALLS;i++) {
6220 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6222 conn->error = ntohl(conn->error);
6223 conn->secStats.flags = ntohl(conn->secStats.flags);
6224 conn->secStats.expires = ntohl(conn->secStats.expires);
6225 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6226 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6227 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6228 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6229 conn->epoch = ntohl(conn->epoch);
6230 conn->natMTU = ntohl(conn->natMTU);
6236 afs_int32 rx_GetServerPeers(
6238 afs_uint32 remoteAddr,
6239 afs_uint16 remotePort,
6240 afs_int32 *nextPeer,
6241 afs_uint32 debugSupportedValues,
6242 struct rx_debugPeer *peer,
6243 afs_uint32 *supportedValues
6246 struct rx_debugIn in;
6251 * supportedValues is currently unused, but added to allow future
6252 * versioning of this function.
6255 *supportedValues = 0;
6256 in.type = htonl(RX_DEBUGI_GETPEER);
6257 in.index = htonl(*nextPeer);
6258 memset(peer, 0, sizeof(*peer));
6260 rc = MakeDebugCall(socket,
6263 RX_PACKET_TYPE_DEBUG,
6273 * Do net to host conversion here
6275 * I don't convert host or port since we are most likely
6276 * going to want these in NBO.
6278 peer->ifMTU = ntohs(peer->ifMTU);
6279 peer->idleWhen = ntohl(peer->idleWhen);
6280 peer->refCount = ntohs(peer->refCount);
6281 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6282 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6283 peer->rtt = ntohl(peer->rtt);
6284 peer->rtt_dev = ntohl(peer->rtt_dev);
6285 peer->timeout.sec = ntohl(peer->timeout.sec);
6286 peer->timeout.usec = ntohl(peer->timeout.usec);
6287 peer->nSent = ntohl(peer->nSent);
6288 peer->reSends = ntohl(peer->reSends);
6289 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6290 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6291 peer->rateFlag = ntohl(peer->rateFlag);
6292 peer->natMTU = ntohs(peer->natMTU);
6293 peer->maxMTU = ntohs(peer->maxMTU);
6294 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6295 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6296 peer->MTU = ntohs(peer->MTU);
6297 peer->cwind = ntohs(peer->cwind);
6298 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6299 peer->congestSeq = ntohs(peer->congestSeq);
6300 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6301 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6302 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6303 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6308 #endif /* RXDEBUG */
6310 void shutdown_rx(void)
6312 struct rx_serverQueueEntry *np;
6314 register struct rx_call *call;
6315 register struct rx_serverQueueEntry *sq;
6318 if (rxinit_status == 1) {
6320 return; /* Already shutdown. */
6325 #ifndef AFS_PTHREAD_ENV
6326 FD_ZERO(&rx_selectMask);
6327 #endif /* AFS_PTHREAD_ENV */
6328 rxi_dataQuota = RX_MAX_QUOTA;
6329 #ifndef AFS_PTHREAD_ENV
6331 #endif /* AFS_PTHREAD_ENV */
6334 #ifndef AFS_PTHREAD_ENV
6335 #ifndef AFS_USE_GETTIMEOFDAY
6337 #endif /* AFS_USE_GETTIMEOFDAY */
6338 #endif /* AFS_PTHREAD_ENV */
6340 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6341 call = queue_First(&rx_freeCallQueue, rx_call);
6343 rxi_Free(call, sizeof(struct rx_call));
6346 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6347 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6353 struct rx_peer **peer_ptr, **peer_end;
6354 for (peer_ptr = &rx_peerHashTable[0],
6355 peer_end = &rx_peerHashTable[rx_hashTableSize];
6356 peer_ptr < peer_end; peer_ptr++) {
6357 struct rx_peer *peer, *next;
6358 for (peer = *peer_ptr; peer; peer = next) {
6359 rx_interface_stat_p rpc_stat, nrpc_stat;
6361 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6362 rx_interface_stat)) {
6363 unsigned int num_funcs;
6364 if (!rpc_stat) break;
6365 queue_Remove(&rpc_stat->queue_header);
6366 queue_Remove(&rpc_stat->all_peers);
6367 num_funcs = rpc_stat->stats[0].func_total;
6368 space = sizeof(rx_interface_stat_t) +
6369 rpc_stat->stats[0].func_total *
6370 sizeof(rx_function_entry_v1_t);
6372 rxi_Free(rpc_stat, space);
6373 MUTEX_ENTER(&rx_rpc_stats);
6374 rxi_rpc_peer_stat_cnt -= num_funcs;
6375 MUTEX_EXIT(&rx_rpc_stats);
6379 MUTEX_ENTER(&rx_stats_mutex);
6380 rx_stats.nPeerStructs--;
6381 MUTEX_EXIT(&rx_stats_mutex);
6385 for (i = 0; i<RX_MAX_SERVICES; i++) {
6387 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6389 for (i = 0; i < rx_hashTableSize; i++) {
6390 register struct rx_connection *tc, *ntc;
6391 MUTEX_ENTER(&rx_connHashTable_lock);
6392 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6394 for (j = 0; j < RX_MAXCALLS; j++) {
6396 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6399 rxi_Free(tc, sizeof(*tc));
6401 MUTEX_EXIT(&rx_connHashTable_lock);
6404 MUTEX_ENTER(&freeSQEList_lock);
6406 while (np = rx_FreeSQEList) {
6407 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6408 MUTEX_DESTROY(&np->lock);
6409 rxi_Free(np, sizeof(*np));
6412 MUTEX_EXIT(&freeSQEList_lock);
6413 MUTEX_DESTROY(&freeSQEList_lock);
6414 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6415 MUTEX_DESTROY(&rx_connHashTable_lock);
6416 MUTEX_DESTROY(&rx_peerHashTable_lock);
6417 MUTEX_DESTROY(&rx_serverPool_lock);
6419 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6420 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6422 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6423 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6425 rxi_FreeAllPackets();
6427 MUTEX_ENTER(&rx_stats_mutex);
6428 rxi_dataQuota = RX_MAX_QUOTA;
6429 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6430 MUTEX_EXIT(&rx_stats_mutex);
6436 #ifdef RX_ENABLE_LOCKS
6437 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6439 if (!MUTEX_ISMINE(lockaddr))
6440 osi_Panic("Lock not held: %s", msg);
6442 #endif /* RX_ENABLE_LOCKS */
6447 * Routines to implement connection specific data.
6450 int rx_KeyCreate(rx_destructor_t rtn)
6453 MUTEX_ENTER(&rxi_keyCreate_lock);
6454 key = rxi_keyCreate_counter++;
6455 rxi_keyCreate_destructor = (rx_destructor_t *)
6456 realloc((void *)rxi_keyCreate_destructor,
6457 (key+1) * sizeof(rx_destructor_t));
6458 rxi_keyCreate_destructor[key] = rtn;
6459 MUTEX_EXIT(&rxi_keyCreate_lock);
6463 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6466 MUTEX_ENTER(&conn->conn_data_lock);
6467 if (!conn->specific) {
6468 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6469 for (i = 0 ; i < key ; i++)
6470 conn->specific[i] = NULL;
6471 conn->nSpecific = key+1;
6472 conn->specific[key] = ptr;
6473 } else if (key >= conn->nSpecific) {
6474 conn->specific = (void **)
6475 realloc(conn->specific,(key+1)*sizeof(void *));
6476 for (i = conn->nSpecific ; i < key ; i++)
6477 conn->specific[i] = NULL;
6478 conn->nSpecific = key+1;
6479 conn->specific[key] = ptr;
6481 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6482 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6483 conn->specific[key] = ptr;
6485 MUTEX_EXIT(&conn->conn_data_lock);
6488 void *rx_GetSpecific(struct rx_connection *conn, int key)
6491 MUTEX_ENTER(&conn->conn_data_lock);
6492 if (key >= conn->nSpecific)
6495 ptr = conn->specific[key];
6496 MUTEX_EXIT(&conn->conn_data_lock);
6500 #endif /* !KERNEL */
6503 * processStats is a queue used to store the statistics for the local
6504 * process. Its contents are similar to the contents of the rpcStats
6505 * queue on a rx_peer structure, but the actual data stored within
6506 * this queue contains totals across the lifetime of the process (assuming
6507 * the stats have not been reset) - unlike the per peer structures
6508 * which can come and go based upon the peer lifetime.
6511 static struct rx_queue processStats = {&processStats,&processStats};
6514 * peerStats is a queue used to store the statistics for all peer structs.
6515 * Its contents are the union of all the peer rpcStats queues.
6518 static struct rx_queue peerStats = {&peerStats,&peerStats};
6521 * rxi_monitor_processStats is used to turn process wide stat collection
6525 static int rxi_monitor_processStats = 0;
6528 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6531 static int rxi_monitor_peerStats = 0;
6534 * rxi_AddRpcStat - given all of the information for a particular rpc
6535 * call, create (if needed) and update the stat totals for the rpc.
6539 * IN stats - the queue of stats that will be updated with the new value
6541 * IN rxInterface - a unique number that identifies the rpc interface
6543 * IN currentFunc - the index of the function being invoked
6545 * IN totalFunc - the total number of functions in this interface
6547 * IN queueTime - the amount of time this function waited for a thread
6549 * IN execTime - the amount of time this function invocation took to execute
6551 * IN bytesSent - the number bytes sent by this invocation
6553 * IN bytesRcvd - the number bytes received by this invocation
6555 * IN isServer - if true, this invocation was made to a server
6557 * IN remoteHost - the ip address of the remote host
6559 * IN remotePort - the port of the remote host
6561 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6563 * INOUT counter - if a new stats structure is allocated, the counter will
6564 * be updated with the new number of allocated stat structures
6571 static int rxi_AddRpcStat(
6572 struct rx_queue *stats,
6573 afs_uint32 rxInterface,
6574 afs_uint32 currentFunc,
6575 afs_uint32 totalFunc,
6576 struct clock *queueTime,
6577 struct clock *execTime,
6578 afs_hyper_t *bytesSent,
6579 afs_hyper_t *bytesRcvd,
6581 afs_uint32 remoteHost,
6582 afs_uint32 remotePort,
6584 unsigned int *counter)
6587 rx_interface_stat_p rpc_stat, nrpc_stat;
6590 * See if there's already a structure for this interface
6593 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6594 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6595 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6599 * Didn't find a match so allocate a new structure and add it to the
6603 if ((rpc_stat == NULL) ||
6604 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6605 (rpc_stat->stats[0].remote_is_server != isServer)) {
6609 space = sizeof(rx_interface_stat_t) + totalFunc *
6610 sizeof(rx_function_entry_v1_t);
6612 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6613 if (rpc_stat == NULL) {
6617 *counter += totalFunc;
6618 for(i=0;i<totalFunc;i++) {
6619 rpc_stat->stats[i].remote_peer = remoteHost;
6620 rpc_stat->stats[i].remote_port = remotePort;
6621 rpc_stat->stats[i].remote_is_server = isServer;
6622 rpc_stat->stats[i].interfaceId = rxInterface;
6623 rpc_stat->stats[i].func_total = totalFunc;
6624 rpc_stat->stats[i].func_index = i;
6625 hzero(rpc_stat->stats[i].invocations);
6626 hzero(rpc_stat->stats[i].bytes_sent);
6627 hzero(rpc_stat->stats[i].bytes_rcvd);
6628 rpc_stat->stats[i].queue_time_sum.sec = 0;
6629 rpc_stat->stats[i].queue_time_sum.usec = 0;
6630 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6631 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6632 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6633 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6634 rpc_stat->stats[i].queue_time_max.sec = 0;
6635 rpc_stat->stats[i].queue_time_max.usec = 0;
6636 rpc_stat->stats[i].execution_time_sum.sec = 0;
6637 rpc_stat->stats[i].execution_time_sum.usec = 0;
6638 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6639 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6640 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6641 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6642 rpc_stat->stats[i].execution_time_max.sec = 0;
6643 rpc_stat->stats[i].execution_time_max.usec = 0;
6645 queue_Prepend(stats, rpc_stat);
6646 if (addToPeerList) {
6647 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6652 * Increment the stats for this function
6655 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6656 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6657 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6658 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6659 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6660 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6661 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6663 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6664 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6666 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6667 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6668 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6669 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6671 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6672 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6680 * rx_IncrementTimeAndCount - increment the times and count for a particular
6685 * IN peer - the peer who invoked the rpc
6687 * IN rxInterface - a unique number that identifies the rpc interface
6689 * IN currentFunc - the index of the function being invoked
6691 * IN totalFunc - the total number of functions in this interface
6693 * IN queueTime - the amount of time this function waited for a thread
6695 * IN execTime - the amount of time this function invocation took to execute
6697 * IN bytesSent - the number bytes sent by this invocation
6699 * IN bytesRcvd - the number bytes received by this invocation
6701 * IN isServer - if true, this invocation was made to a server
6708 void rx_IncrementTimeAndCount(
6709 struct rx_peer *peer,
6710 afs_uint32 rxInterface,
6711 afs_uint32 currentFunc,
6712 afs_uint32 totalFunc,
6713 struct clock *queueTime,
6714 struct clock *execTime,
6715 afs_hyper_t *bytesSent,
6716 afs_hyper_t *bytesRcvd,
6720 MUTEX_ENTER(&rx_rpc_stats);
6721 MUTEX_ENTER(&peer->peer_lock);
6723 if (rxi_monitor_peerStats) {
6724 rxi_AddRpcStat(&peer->rpcStats,
6736 &rxi_rpc_peer_stat_cnt);
6739 if (rxi_monitor_processStats) {
6740 rxi_AddRpcStat(&processStats,
6752 &rxi_rpc_process_stat_cnt);
6755 MUTEX_EXIT(&peer->peer_lock);
6756 MUTEX_EXIT(&rx_rpc_stats);
6761 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6765 * IN callerVersion - the rpc stat version of the caller.
6767 * IN count - the number of entries to marshall.
6769 * IN stats - pointer to stats to be marshalled.
6771 * OUT ptr - Where to store the marshalled data.
6777 void rx_MarshallProcessRPCStats(
6778 afs_uint32 callerVersion,
6780 rx_function_entry_v1_t *stats,
6787 * We only support the first version
6789 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6790 *(ptr++) = stats->remote_peer;
6791 *(ptr++) = stats->remote_port;
6792 *(ptr++) = stats->remote_is_server;
6793 *(ptr++) = stats->interfaceId;
6794 *(ptr++) = stats->func_total;
6795 *(ptr++) = stats->func_index;
6796 *(ptr++) = hgethi(stats->invocations);
6797 *(ptr++) = hgetlo(stats->invocations);
6798 *(ptr++) = hgethi(stats->bytes_sent);
6799 *(ptr++) = hgetlo(stats->bytes_sent);
6800 *(ptr++) = hgethi(stats->bytes_rcvd);
6801 *(ptr++) = hgetlo(stats->bytes_rcvd);
6802 *(ptr++) = stats->queue_time_sum.sec;
6803 *(ptr++) = stats->queue_time_sum.usec;
6804 *(ptr++) = stats->queue_time_sum_sqr.sec;
6805 *(ptr++) = stats->queue_time_sum_sqr.usec;
6806 *(ptr++) = stats->queue_time_min.sec;
6807 *(ptr++) = stats->queue_time_min.usec;
6808 *(ptr++) = stats->queue_time_max.sec;
6809 *(ptr++) = stats->queue_time_max.usec;
6810 *(ptr++) = stats->execution_time_sum.sec;
6811 *(ptr++) = stats->execution_time_sum.usec;
6812 *(ptr++) = stats->execution_time_sum_sqr.sec;
6813 *(ptr++) = stats->execution_time_sum_sqr.usec;
6814 *(ptr++) = stats->execution_time_min.sec;
6815 *(ptr++) = stats->execution_time_min.usec;
6816 *(ptr++) = stats->execution_time_max.sec;
6817 *(ptr++) = stats->execution_time_max.usec;
6823 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6828 * IN callerVersion - the rpc stat version of the caller
6830 * OUT myVersion - the rpc stat version of this function
6832 * OUT clock_sec - local time seconds
6834 * OUT clock_usec - local time microseconds
6836 * OUT allocSize - the number of bytes allocated to contain stats
6838 * OUT statCount - the number stats retrieved from this process.
6840 * OUT stats - the actual stats retrieved from this process.
6844 * Returns void. If successful, stats will != NULL.
6847 int rx_RetrieveProcessRPCStats(
6848 afs_uint32 callerVersion,
6849 afs_uint32 *myVersion,
6850 afs_uint32 *clock_sec,
6851 afs_uint32 *clock_usec,
6853 afs_uint32 *statCount,
6864 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6867 * Check to see if stats are enabled
6870 MUTEX_ENTER(&rx_rpc_stats);
6871 if (!rxi_monitor_processStats) {
6872 MUTEX_EXIT(&rx_rpc_stats);
6876 clock_GetTime(&now);
6877 *clock_sec = now.sec;
6878 *clock_usec = now.usec;
6881 * Allocate the space based upon the caller version
6883 * If the client is at an older version than we are,
6884 * we return the statistic data in the older data format, but
6885 * we still return our version number so the client knows we
6886 * are maintaining more data than it can retrieve.
6889 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6890 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6891 *statCount = rxi_rpc_process_stat_cnt;
6894 * This can't happen yet, but in the future version changes
6895 * can be handled by adding additional code here
6899 if (space > (size_t) 0) {
6901 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6904 register struct rx_peer *pp;
6907 rx_interface_stat_p rpc_stat, nrpc_stat;
6910 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6911 rx_interface_stat)) {
6913 * Copy the data based upon the caller version
6915 rx_MarshallProcessRPCStats(callerVersion,
6916 rpc_stat->stats[0].func_total,
6917 rpc_stat->stats, &ptr);
6923 MUTEX_EXIT(&rx_rpc_stats);
6928 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6932 * IN callerVersion - the rpc stat version of the caller
6934 * OUT myVersion - the rpc stat version of this function
6936 * OUT clock_sec - local time seconds
6938 * OUT clock_usec - local time microseconds
6940 * OUT allocSize - the number of bytes allocated to contain stats
6942 * OUT statCount - the number of stats retrieved from the individual
6945 * OUT stats - the actual stats retrieved from the individual peer structures.
6949 * Returns void. If successful, stats will != NULL.
6952 int rx_RetrievePeerRPCStats(
6953 afs_uint32 callerVersion,
6954 afs_uint32 *myVersion,
6955 afs_uint32 *clock_sec,
6956 afs_uint32 *clock_usec,
6958 afs_uint32 *statCount,
6969 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6972 * Check to see if stats are enabled
6975 MUTEX_ENTER(&rx_rpc_stats);
6976 if (!rxi_monitor_peerStats) {
6977 MUTEX_EXIT(&rx_rpc_stats);
6981 clock_GetTime(&now);
6982 *clock_sec = now.sec;
6983 *clock_usec = now.usec;
6986 * Allocate the space based upon the caller version
6988 * If the client is at an older version than we are,
6989 * we return the statistic data in the older data format, but
6990 * we still return our version number so the client knows we
6991 * are maintaining more data than it can retrieve.
6994 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6995 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6996 *statCount = rxi_rpc_peer_stat_cnt;
6999 * This can't happen yet, but in the future version changes
7000 * can be handled by adding additional code here
7004 if (space > (size_t) 0) {
7006 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7011 rx_interface_stat_p rpc_stat, nrpc_stat;
7014 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7015 rx_interface_stat)) {
7017 * We have to fix the offset of rpc_stat since we are
7018 * keeping this structure on two rx_queues. The rx_queue
7019 * package assumes that the rx_queue member is the first
7020 * member of the structure. That is, rx_queue assumes that
7021 * any one item is only on one queue at a time. We are
7022 * breaking that assumption and so we have to do a little
7023 * math to fix our pointers.
7026 fix_offset = (char *) rpc_stat;
7027 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7028 rpc_stat = (rx_interface_stat_p) fix_offset;
7031 * Copy the data based upon the caller version
7033 rx_MarshallProcessRPCStats(callerVersion,
7034 rpc_stat->stats[0].func_total,
7035 rpc_stat->stats, &ptr);
7041 MUTEX_EXIT(&rx_rpc_stats);
7046 * rx_FreeRPCStats - free memory allocated by
7047 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7051 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7052 * rx_RetrievePeerRPCStats
7054 * IN allocSize - the number of bytes in stats.
7061 void rx_FreeRPCStats(
7065 rxi_Free(stats, allocSize);
7069 * rx_queryProcessRPCStats - see if process rpc stat collection is
7070 * currently enabled.
7076 * Returns 0 if stats are not enabled != 0 otherwise
7079 int rx_queryProcessRPCStats()
7082 MUTEX_ENTER(&rx_rpc_stats);
7083 rc = rxi_monitor_processStats;
7084 MUTEX_EXIT(&rx_rpc_stats);
7089 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7095 * Returns 0 if stats are not enabled != 0 otherwise
7098 int rx_queryPeerRPCStats()
7101 MUTEX_ENTER(&rx_rpc_stats);
7102 rc = rxi_monitor_peerStats;
7103 MUTEX_EXIT(&rx_rpc_stats);
7108 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7117 void rx_enableProcessRPCStats()
7119 MUTEX_ENTER(&rx_rpc_stats);
7120 rx_enable_stats = 1;
7121 rxi_monitor_processStats = 1;
7122 MUTEX_EXIT(&rx_rpc_stats);
7126 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7135 void rx_enablePeerRPCStats()
7137 MUTEX_ENTER(&rx_rpc_stats);
7138 rx_enable_stats = 1;
7139 rxi_monitor_peerStats = 1;
7140 MUTEX_EXIT(&rx_rpc_stats);
7144 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7153 void rx_disableProcessRPCStats()
7155 rx_interface_stat_p rpc_stat, nrpc_stat;
7158 MUTEX_ENTER(&rx_rpc_stats);
7161 * Turn off process statistics and if peer stats is also off, turn
7165 rxi_monitor_processStats = 0;
7166 if (rxi_monitor_peerStats == 0) {
7167 rx_enable_stats = 0;
7170 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7171 unsigned int num_funcs = 0;
7172 if (!rpc_stat) break;
7173 queue_Remove(rpc_stat);
7174 num_funcs = rpc_stat->stats[0].func_total;
7175 space = sizeof(rx_interface_stat_t) +
7176 rpc_stat->stats[0].func_total *
7177 sizeof(rx_function_entry_v1_t);
7179 rxi_Free(rpc_stat, space);
7180 rxi_rpc_process_stat_cnt -= num_funcs;
7182 MUTEX_EXIT(&rx_rpc_stats);
7186 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7195 void rx_disablePeerRPCStats()
7197 struct rx_peer **peer_ptr, **peer_end;
7200 MUTEX_ENTER(&rx_rpc_stats);
7203 * Turn off peer statistics and if process stats is also off, turn
7207 rxi_monitor_peerStats = 0;
7208 if (rxi_monitor_processStats == 0) {
7209 rx_enable_stats = 0;
7212 MUTEX_ENTER(&rx_peerHashTable_lock);
7213 for (peer_ptr = &rx_peerHashTable[0],
7214 peer_end = &rx_peerHashTable[rx_hashTableSize];
7215 peer_ptr < peer_end; peer_ptr++) {
7216 struct rx_peer *peer, *next, *prev;
7217 for (prev = peer = *peer_ptr; peer; peer = next) {
7219 code = MUTEX_TRYENTER(&peer->peer_lock);
7221 rx_interface_stat_p rpc_stat, nrpc_stat;
7223 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7224 rx_interface_stat)) {
7225 unsigned int num_funcs = 0;
7226 if (!rpc_stat) break;
7227 queue_Remove(&rpc_stat->queue_header);
7228 queue_Remove(&rpc_stat->all_peers);
7229 num_funcs = rpc_stat->stats[0].func_total;
7230 space = sizeof(rx_interface_stat_t) +
7231 rpc_stat->stats[0].func_total *
7232 sizeof(rx_function_entry_v1_t);
7234 rxi_Free(rpc_stat, space);
7235 rxi_rpc_peer_stat_cnt -= num_funcs;
7237 MUTEX_EXIT(&peer->peer_lock);
7238 if (prev == *peer_ptr) {
7250 MUTEX_EXIT(&rx_peerHashTable_lock);
7251 MUTEX_EXIT(&rx_rpc_stats);
7255 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7260 * IN clearFlag - flag indicating which stats to clear
7267 void rx_clearProcessRPCStats(
7268 afs_uint32 clearFlag)
7270 rx_interface_stat_p rpc_stat, nrpc_stat;
7272 MUTEX_ENTER(&rx_rpc_stats);
7274 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7275 unsigned int num_funcs = 0, i;
7276 num_funcs = rpc_stat->stats[0].func_total;
7277 for(i=0;i<num_funcs;i++) {
7278 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7279 hzero(rpc_stat->stats[i].invocations);
7281 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7282 hzero(rpc_stat->stats[i].bytes_sent);
7284 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7285 hzero(rpc_stat->stats[i].bytes_rcvd);
7287 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7288 rpc_stat->stats[i].queue_time_sum.sec = 0;
7289 rpc_stat->stats[i].queue_time_sum.usec = 0;
7291 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7292 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7293 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7295 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7296 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7297 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7299 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7300 rpc_stat->stats[i].queue_time_max.sec = 0;
7301 rpc_stat->stats[i].queue_time_max.usec = 0;
7303 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7304 rpc_stat->stats[i].execution_time_sum.sec = 0;
7305 rpc_stat->stats[i].execution_time_sum.usec = 0;
7307 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7308 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7309 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7311 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7312 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7313 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7315 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7316 rpc_stat->stats[i].execution_time_max.sec = 0;
7317 rpc_stat->stats[i].execution_time_max.usec = 0;
7322 MUTEX_EXIT(&rx_rpc_stats);
7326 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7331 * IN clearFlag - flag indicating which stats to clear
7338 void rx_clearPeerRPCStats(
7339 afs_uint32 clearFlag)
7341 rx_interface_stat_p rpc_stat, nrpc_stat;
7343 MUTEX_ENTER(&rx_rpc_stats);
7345 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7346 unsigned int num_funcs = 0, i;
7349 * We have to fix the offset of rpc_stat since we are
7350 * keeping this structure on two rx_queues. The rx_queue
7351 * package assumes that the rx_queue member is the first
7352 * member of the structure. That is, rx_queue assumes that
7353 * any one item is only on one queue at a time. We are
7354 * breaking that assumption and so we have to do a little
7355 * math to fix our pointers.
7358 fix_offset = (char *) rpc_stat;
7359 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7360 rpc_stat = (rx_interface_stat_p) fix_offset;
7362 num_funcs = rpc_stat->stats[0].func_total;
7363 for(i=0;i<num_funcs;i++) {
7364 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7365 hzero(rpc_stat->stats[i].invocations);
7367 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7368 hzero(rpc_stat->stats[i].bytes_sent);
7370 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7371 hzero(rpc_stat->stats[i].bytes_rcvd);
7373 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7374 rpc_stat->stats[i].queue_time_sum.sec = 0;
7375 rpc_stat->stats[i].queue_time_sum.usec = 0;
7377 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7378 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7379 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7381 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7382 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7383 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7385 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7386 rpc_stat->stats[i].queue_time_max.sec = 0;
7387 rpc_stat->stats[i].queue_time_max.usec = 0;
7389 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7390 rpc_stat->stats[i].execution_time_sum.sec = 0;
7391 rpc_stat->stats[i].execution_time_sum.usec = 0;
7393 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7394 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7395 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7397 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7398 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7399 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7401 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7402 rpc_stat->stats[i].execution_time_max.sec = 0;
7403 rpc_stat->stats[i].execution_time_max.usec = 0;
7408 MUTEX_EXIT(&rx_rpc_stats);
7412 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7413 * is authorized to enable/disable/clear RX statistics.
7415 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7417 void rx_SetRxStatUserOk(
7418 int (*proc)(struct rx_call *call))
7420 rxi_rxstat_userok = proc;
7423 int rx_RxStatUserOk(
7424 struct rx_call *call)
7426 if (!rxi_rxstat_userok)
7428 return rxi_rxstat_userok(call);