2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "../afs/param.h"
16 #include <afs/param.h>
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
46 #include "../afsint/afsint.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include "rx_internal.h"
105 # include <afs/rxgen_consts.h>
108 int (*registerProgram)() = 0;
109 int (*swapNameProgram)() = 0;
111 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
113 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
114 afs_int32 rxi_start_in_error;
116 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
119 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
120 * currently allocated within rx. This number is used to allocate the
121 * memory required to return the statistics when queried.
124 static unsigned int rxi_rpc_peer_stat_cnt;
127 * rxi_rpc_process_stat_cnt counts the total number of local process stat
128 * structures currently allocated within rx. The number is used to allocate
129 * the memory required to return the statistics when queried.
132 static unsigned int rxi_rpc_process_stat_cnt;
134 #if !defined(offsetof)
135 #include <stddef.h> /* for definition of offsetof() */
138 #ifdef AFS_PTHREAD_ENV
142 * Use procedural initialization of mutexes/condition variables
146 extern pthread_mutex_t rxkad_stats_mutex;
147 extern pthread_mutex_t des_init_mutex;
148 extern pthread_mutex_t des_random_mutex;
149 extern pthread_mutex_t rx_clock_mutex;
150 extern pthread_mutex_t rxi_connCacheMutex;
151 extern pthread_mutex_t rx_event_mutex;
152 extern pthread_mutex_t osi_malloc_mutex;
153 extern pthread_mutex_t event_handler_mutex;
154 extern pthread_mutex_t listener_mutex;
155 extern pthread_mutex_t rx_if_init_mutex;
156 extern pthread_mutex_t rx_if_mutex;
157 extern pthread_mutex_t rxkad_client_uid_mutex;
158 extern pthread_mutex_t rxkad_random_mutex;
160 extern pthread_cond_t rx_event_handler_cond;
161 extern pthread_cond_t rx_listener_cond;
163 static pthread_mutex_t epoch_mutex;
164 static pthread_mutex_t rx_init_mutex;
165 static pthread_mutex_t rx_debug_mutex;
167 static void rxi_InitPthread(void) {
168 assert(pthread_mutex_init(&rx_clock_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rxi_connCacheMutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&rx_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&epoch_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&rx_event_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&des_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&des_random_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&osi_malloc_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&event_handler_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&listener_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_if_init_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rx_if_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
193 (const pthread_mutexattr_t*)0)==0);
194 assert(pthread_mutex_init(&rxkad_random_mutex,
195 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_mutex_init(&rxkad_stats_mutex,
197 (const pthread_mutexattr_t*)0)==0);
198 assert(pthread_mutex_init(&rx_debug_mutex,
199 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_cond_init(&rx_event_handler_cond,
202 (const pthread_condattr_t*)0)==0);
203 assert(pthread_cond_init(&rx_listener_cond,
204 (const pthread_condattr_t*)0)==0);
205 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
208 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
209 #define INIT_PTHREAD_LOCKS \
210 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
212 * The rx_stats_mutex mutex protects the following global variables:
217 * rxi_lowConnRefCount
218 * rxi_lowPeerRefCount
227 #define INIT_PTHREAD_LOCKS
231 /* Variables for handling the minProcs implementation. availProcs gives the
232 * number of threads available in the pool at this moment (not counting dudes
233 * executing right now). totalMin gives the total number of procs required
234 * for handling all minProcs requests. minDeficit is a dynamic variable
235 * tracking the # of procs required to satisfy all of the remaining minProcs
237 * For fine grain locking to work, the quota check and the reservation of
238 * a server thread has to come while rxi_availProcs and rxi_minDeficit
239 * are locked. To this end, the code has been modified under #ifdef
240 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
241 * same time. A new function, ReturnToServerPool() returns the allocation.
243 * A call can be on several queue's (but only one at a time). When
244 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
245 * that no one else is touching the queue. To this end, we store the address
246 * of the queue lock in the call structure (under the call lock) when we
247 * put the call on a queue, and we clear the call_queue_lock when the
248 * call is removed from a queue (once the call lock has been obtained).
249 * This allows rxi_ResetCall to safely synchronize with others wishing
250 * to manipulate the queue.
253 #ifdef RX_ENABLE_LOCKS
254 static int rxi_ServerThreadSelectingCall;
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
260 ** pretty good that the next packet coming in is from the same connection
261 ** as the last packet, since we're send multiple packets in a transmit window.
263 struct rx_connection *rxLastConn = 0;
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
268 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269 * call->lock - locks call data fields.
270 * Most any other lock - these are all independent of each other.....
272 * rx_freeCallQueue_lock
274 * rx_connHashTable_lock
277 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
280 * peer_lock - locks peer data fields.
281 * conn_data_lock - that more than one thread is not updating a conn data
282 * field at the same time.
283 * Do we need a lock to protect the peer field in the conn structure?
284 * conn->peer was previously a constant for all intents and so has no
285 * lock protecting this field. The multihomed client delta introduced
286 * a RX code change : change the peer field in the connection structure
287 * to that remote inetrface from which the last packet for this
288 * connection was sent out. This may become an issue if further changes
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 struct rx_serverQueueEntry *rx_waitForPacket = 0;
306 /* ------------Exported Interfaces------------- */
308 /* This function allows rxkad to set the epoch to a suitably random number
309 * which rx_NewConnection will use in the future. The principle purpose is to
310 * get rxnull connections to use the same epoch as the rxkad connections do, at
311 * least once the first rxkad connection is established. This is important now
312 * that the host/port addresses aren't used in FindConnection: the uniqueness
313 * of epoch/cid matters and the start time won't do. */
315 #ifdef AFS_PTHREAD_ENV
317 * This mutex protects the following global variables:
321 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
322 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
326 #endif /* AFS_PTHREAD_ENV */
328 void rx_SetEpoch (epoch)
336 /* Initialize rx. A port number may be mentioned, in which case this
337 * becomes the default port number for any service installed later.
338 * If 0 is provided for the port number, a random port will be chosen
339 * by the kernel. Whether this will ever overlap anything in
340 * /etc/services is anybody's guess... Returns 0 on success, -1 on
342 static int rxinit_status = 1;
343 #ifdef AFS_PTHREAD_ENV
345 * This mutex protects the following global variables:
349 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
353 #define UNLOCK_RX_INIT
356 int rx_Init(u_int port)
363 char *htable, *ptable;
366 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
367 __djgpp_set_quiet_socket(1);
374 if (rxinit_status == 0) {
375 tmp_status = rxinit_status;
377 return tmp_status; /* Already started; return previous error code. */
381 if (afs_winsockInit()<0)
387 * Initialize anything necessary to provide a non-premptive threading
390 rxi_InitializeThreadSupport();
393 /* Allocate and initialize a socket for client and perhaps server
396 rx_socket = rxi_GetUDPSocket((u_short)port);
397 if (rx_socket == OSI_NULLSOCKET) {
403 #ifdef RX_ENABLE_LOCKS
406 #endif /* RX_LOCKS_DB */
407 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
413 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
414 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
415 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
418 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
420 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
421 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
423 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
424 #endif /* KERNEL && AFS_HPUX110_ENV */
425 #else /* RX_ENABLE_LOCKS */
426 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
427 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
428 #endif /* AFS_GLOBAL_SUNLOCK */
429 #endif /* RX_ENABLE_LOCKS */
432 rx_connDeadTime = 12;
433 rx_tranquil = 0; /* reset flag */
434 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
436 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
437 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
438 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
439 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
440 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
441 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
443 /* Malloc up a bunch of packets & buffers */
445 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
446 queue_Init(&rx_freePacketQueue);
447 rxi_NeedMorePackets = FALSE;
448 rxi_MorePackets(rx_nPackets);
456 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
457 tv.tv_sec = clock_now.sec;
458 tv.tv_usec = clock_now.usec;
459 srand((unsigned int) tv.tv_usec);
466 #if defined(KERNEL) && !defined(UKERNEL)
467 /* Really, this should never happen in a real kernel */
470 struct sockaddr_in addr;
471 int addrlen = sizeof(addr);
472 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
476 rx_port = addr.sin_port;
479 rx_stats.minRtt.sec = 9999999;
481 rx_SetEpoch (tv.tv_sec | 0x80000000);
483 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
484 * will provide a randomer value. */
486 MUTEX_ENTER(&rx_stats_mutex);
487 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
488 MUTEX_EXIT(&rx_stats_mutex);
489 /* *Slightly* random start time for the cid. This is just to help
490 * out with the hashing function at the peer */
491 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
492 rx_connHashTable = (struct rx_connection **) htable;
493 rx_peerHashTable = (struct rx_peer **) ptable;
495 rx_lastAckDelay.sec = 0;
496 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
497 rx_hardAckDelay.sec = 0;
498 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
499 rx_softAckDelay.sec = 0;
500 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
502 rxevent_Init(20, rxi_ReScheduleEvents);
504 /* Initialize various global queues */
505 queue_Init(&rx_idleServerQueue);
506 queue_Init(&rx_incomingCallQueue);
507 queue_Init(&rx_freeCallQueue);
509 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
510 /* Initialize our list of usable IP addresses. */
514 /* Start listener process (exact function is dependent on the
515 * implementation environment--kernel or user space) */
520 tmp_status = rxinit_status = 0;
525 /* called with unincremented nRequestsRunning to see if it is OK to start
526 * a new thread in this service. Could be "no" for two reasons: over the
527 * max quota, or would prevent others from reaching their min quota.
529 #ifdef RX_ENABLE_LOCKS
530 /* This verion of QuotaOK reserves quota if it's ok while the
531 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
533 static int QuotaOK(aservice)
534 register struct rx_service *aservice;
536 /* check if over max quota */
537 if (aservice->nRequestsRunning >= aservice->maxProcs) {
541 /* under min quota, we're OK */
542 /* otherwise, can use only if there are enough to allow everyone
543 * to go to their min quota after this guy starts.
545 MUTEX_ENTER(&rx_stats_mutex);
546 if ((aservice->nRequestsRunning < aservice->minProcs) ||
547 (rxi_availProcs > rxi_minDeficit)) {
548 aservice->nRequestsRunning++;
549 /* just started call in minProcs pool, need fewer to maintain
551 if (aservice->nRequestsRunning <= aservice->minProcs)
554 MUTEX_EXIT(&rx_stats_mutex);
557 MUTEX_EXIT(&rx_stats_mutex);
561 static void ReturnToServerPool(aservice)
562 register struct rx_service *aservice;
564 aservice->nRequestsRunning--;
565 MUTEX_ENTER(&rx_stats_mutex);
566 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
568 MUTEX_EXIT(&rx_stats_mutex);
571 #else /* RX_ENABLE_LOCKS */
572 static int QuotaOK(aservice)
573 register struct rx_service *aservice; {
575 /* under min quota, we're OK */
576 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
578 /* check if over max quota */
579 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
581 /* otherwise, can use only if there are enough to allow everyone
582 * to go to their min quota after this guy starts.
584 if (rxi_availProcs > rxi_minDeficit) rc = 1;
587 #endif /* RX_ENABLE_LOCKS */
590 /* Called by rx_StartServer to start up lwp's to service calls.
591 NExistingProcs gives the number of procs already existing, and which
592 therefore needn't be created. */
593 void rxi_StartServerProcs(nExistingProcs)
596 register struct rx_service *service;
601 /* For each service, reserve N processes, where N is the "minimum"
602 number of processes that MUST be able to execute a request in parallel,
603 at any time, for that process. Also compute the maximum difference
604 between any service's maximum number of processes that can run
605 (i.e. the maximum number that ever will be run, and a guarantee
606 that this number will run if other services aren't running), and its
607 minimum number. The result is the extra number of processes that
608 we need in order to provide the latter guarantee */
609 for (i=0; i<RX_MAX_SERVICES; i++) {
611 service = rx_services[i];
612 if (service == (struct rx_service *) 0) break;
613 nProcs += service->minProcs;
614 diff = service->maxProcs - service->minProcs;
615 if (diff > maxdiff) maxdiff = diff;
617 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
618 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
619 for (i = 0; i<nProcs; i++) {
620 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
625 /* This routine must be called if any services are exported. If the
626 * donateMe flag is set, the calling process is donated to the server
628 void rx_StartServer(donateMe)
630 register struct rx_service *service;
631 register int i, nProcs=0;
637 /* Start server processes, if necessary (exact function is dependent
638 * on the implementation environment--kernel or user space). DonateMe
639 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
640 * case, one less new proc will be created rx_StartServerProcs.
642 rxi_StartServerProcs(donateMe);
644 /* count up the # of threads in minProcs, and add set the min deficit to
645 * be that value, too.
647 for (i=0; i<RX_MAX_SERVICES; i++) {
648 service = rx_services[i];
649 if (service == (struct rx_service *) 0) break;
650 MUTEX_ENTER(&rx_stats_mutex);
651 rxi_totalMin += service->minProcs;
652 /* below works even if a thread is running, since minDeficit would
653 * still have been decremented and later re-incremented.
655 rxi_minDeficit += service->minProcs;
656 MUTEX_EXIT(&rx_stats_mutex);
659 /* Turn on reaping of idle server connections */
660 rxi_ReapConnections();
669 #ifdef AFS_PTHREAD_ENV
671 pid = (pid_t) pthread_self();
672 #else /* AFS_PTHREAD_ENV */
674 LWP_CurrentProcess(&pid);
675 #endif /* AFS_PTHREAD_ENV */
677 sprintf(name,"srv_%d", ++nProcs);
679 (*registerProgram)(pid, name);
681 #endif /* AFS_NT40_ENV */
682 rx_ServerProc(); /* Never returns */
687 /* Create a new client connection to the specified service, using the
688 * specified security object to implement the security model for this
690 struct rx_connection *
691 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
692 register afs_uint32 shost; /* Server host */
693 u_short sport; /* Server port */
694 u_short sservice; /* Server service id */
695 register struct rx_securityClass *securityObject;
696 int serviceSecurityIndex;
700 register struct rx_connection *conn;
705 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
706 shost, sport, sservice, securityObject, serviceSecurityIndex));
708 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
709 * the case of kmem_alloc? */
710 conn = rxi_AllocConnection();
711 #ifdef RX_ENABLE_LOCKS
712 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
713 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
714 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
718 MUTEX_ENTER(&rx_connHashTable_lock);
719 cid = (rx_nextCid += RX_MAXCALLS);
720 conn->type = RX_CLIENT_CONNECTION;
722 conn->epoch = rx_epoch;
723 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
724 conn->serviceId = sservice;
725 conn->securityObject = securityObject;
726 /* This doesn't work in all compilers with void (they're buggy), so fake it
728 conn->securityData = (VOID *) 0;
729 conn->securityIndex = serviceSecurityIndex;
730 rx_SetConnDeadTime(conn, rx_connDeadTime);
731 conn->ackRate = RX_FAST_ACK_RATE;
733 conn->specific = NULL;
734 conn->challengeEvent = (struct rxevent *)0;
735 conn->delayedAbortEvent = (struct rxevent *)0;
736 conn->abortCount = 0;
739 RXS_NewConnection(securityObject, conn);
740 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
742 conn->refCount++; /* no lock required since only this thread knows... */
743 conn->next = rx_connHashTable[hashindex];
744 rx_connHashTable[hashindex] = conn;
745 MUTEX_ENTER(&rx_stats_mutex);
746 rx_stats.nClientConns++;
747 MUTEX_EXIT(&rx_stats_mutex);
749 MUTEX_EXIT(&rx_connHashTable_lock);
755 void rx_SetConnDeadTime(conn, seconds)
756 register struct rx_connection *conn;
757 register int seconds;
759 /* The idea is to set the dead time to a value that allows several
760 * keepalives to be dropped without timing out the connection. */
761 conn->secondsUntilDead = MAX(seconds, 6);
762 conn->secondsUntilPing = conn->secondsUntilDead/6;
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
769 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770 * NOTE: must not be called with rx_connHashTable_lock held.
772 void rxi_CleanupConnection(conn)
773 struct rx_connection *conn;
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(conn)
830 register struct rx_connection *conn;
832 MUTEX_ENTER(&rx_connHashTable_lock);
833 rxi_DestroyConnectionNoLock(conn);
834 /* conn should be at the head of the cleanup list */
835 if (conn == rx_connCleanup_list) {
836 rx_connCleanup_list = rx_connCleanup_list->next;
837 MUTEX_EXIT(&rx_connHashTable_lock);
838 rxi_CleanupConnection(conn);
840 #ifdef RX_ENABLE_LOCKS
842 MUTEX_EXIT(&rx_connHashTable_lock);
844 #endif /* RX_ENABLE_LOCKS */
847 static void rxi_DestroyConnectionNoLock(conn)
848 register struct rx_connection *conn;
850 register struct rx_connection **conn_ptr;
851 register int havecalls = 0;
852 struct rx_packet *packet;
859 MUTEX_ENTER(&conn->conn_data_lock);
860 if (conn->refCount > 0)
863 MUTEX_ENTER(&rx_stats_mutex);
864 rxi_lowConnRefCount++;
865 MUTEX_EXIT(&rx_stats_mutex);
868 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
869 /* Busy; wait till the last guy before proceeding */
870 MUTEX_EXIT(&conn->conn_data_lock);
875 /* If the client previously called rx_NewCall, but it is still
876 * waiting, treat this as a running call, and wait to destroy the
877 * connection later when the call completes. */
878 if ((conn->type == RX_CLIENT_CONNECTION) &&
879 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
880 conn->flags |= RX_CONN_DESTROY_ME;
881 MUTEX_EXIT(&conn->conn_data_lock);
885 MUTEX_EXIT(&conn->conn_data_lock);
887 /* Check for extant references to this connection */
888 for (i = 0; i<RX_MAXCALLS; i++) {
889 register struct rx_call *call = conn->call[i];
892 if (conn->type == RX_CLIENT_CONNECTION) {
893 MUTEX_ENTER(&call->lock);
894 if (call->delayedAckEvent) {
895 /* Push the final acknowledgment out now--there
896 * won't be a subsequent call to acknowledge the
897 * last reply packets */
898 rxevent_Cancel(call->delayedAckEvent, call,
899 RX_CALL_REFCOUNT_DELAY);
900 rxi_AckAll((struct rxevent *)0, call, 0);
902 MUTEX_EXIT(&call->lock);
906 #ifdef RX_ENABLE_LOCKS
908 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
909 MUTEX_EXIT(&conn->conn_data_lock);
912 /* Someone is accessing a packet right now. */
916 #endif /* RX_ENABLE_LOCKS */
919 /* Don't destroy the connection if there are any call
920 * structures still in use */
921 MUTEX_ENTER(&conn->conn_data_lock);
922 conn->flags |= RX_CONN_DESTROY_ME;
923 MUTEX_EXIT(&conn->conn_data_lock);
928 if (conn->delayedAbortEvent) {
929 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
930 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
932 MUTEX_ENTER(&conn->conn_data_lock);
933 rxi_SendConnectionAbort(conn, packet, 0, 1);
934 MUTEX_EXIT(&conn->conn_data_lock);
935 rxi_FreePacket(packet);
939 /* Remove from connection hash table before proceeding */
940 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
941 conn->epoch, conn->type) ];
942 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
943 if (*conn_ptr == conn) {
944 *conn_ptr = conn->next;
948 /* if the conn that we are destroying was the last connection, then we
949 * clear rxLastConn as well */
950 if ( rxLastConn == conn )
953 /* Make sure the connection is completely reset before deleting it. */
954 /* get rid of pending events that could zap us later */
955 if (conn->challengeEvent) {
956 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
959 /* Add the connection to the list of destroyed connections that
960 * need to be cleaned up. This is necessary to avoid deadlocks
961 * in the routines we call to inform others that this connection is
962 * being destroyed. */
963 conn->next = rx_connCleanup_list;
964 rx_connCleanup_list = conn;
967 /* Externally available version */
968 void rx_DestroyConnection(conn)
969 register struct rx_connection *conn;
975 rxi_DestroyConnection (conn);
980 /* Start a new rx remote procedure call, on the specified connection.
981 * If wait is set to 1, wait for a free call channel; otherwise return
982 * 0. Maxtime gives the maximum number of seconds this call may take,
983 * after rx_MakeCall returns. After this time interval, a call to any
984 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
985 * For fine grain locking, we hold the conn_call_lock in order to
986 * to ensure that we don't get signalle after we found a call in an active
987 * state and before we go to sleep.
989 struct rx_call *rx_NewCall(conn)
990 register struct rx_connection *conn;
993 register struct rx_call *call;
994 struct clock queueTime;
998 dpf (("rx_MakeCall(conn %x)\n", conn));
1001 clock_GetTime(&queueTime);
1003 MUTEX_ENTER(&conn->conn_call_lock);
1005 for (i=0; i<RX_MAXCALLS; i++) {
1006 call = conn->call[i];
1008 MUTEX_ENTER(&call->lock);
1009 if (call->state == RX_STATE_DALLY) {
1010 rxi_ResetCall(call, 0);
1011 (*call->callNumber)++;
1014 MUTEX_EXIT(&call->lock);
1017 call = rxi_NewCall(conn, i);
1018 MUTEX_ENTER(&call->lock);
1022 if (i < RX_MAXCALLS) {
1025 MUTEX_ENTER(&conn->conn_data_lock);
1026 conn->flags |= RX_CONN_MAKECALL_WAITING;
1027 MUTEX_EXIT(&conn->conn_data_lock);
1028 #ifdef RX_ENABLE_LOCKS
1029 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1035 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1037 /* Client is initially in send mode */
1038 call->state = RX_STATE_ACTIVE;
1039 call->mode = RX_MODE_SENDING;
1041 /* remember start time for call in case we have hard dead time limit */
1042 call->queueTime = queueTime;
1043 clock_GetTime(&call->startTime);
1044 hzero(call->bytesSent);
1045 hzero(call->bytesRcvd);
1047 /* Turn on busy protocol. */
1048 rxi_KeepAliveOn(call);
1050 MUTEX_EXIT(&call->lock);
1051 MUTEX_EXIT(&conn->conn_call_lock);
1055 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1056 /* Now, if TQ wasn't cleared earlier, do it now. */
1058 MUTEX_ENTER(&call->lock);
1059 while (call->flags & RX_CALL_TQ_BUSY) {
1060 call->flags |= RX_CALL_TQ_WAIT;
1061 #ifdef RX_ENABLE_LOCKS
1062 CV_WAIT(&call->cv_tq, &call->lock);
1063 #else /* RX_ENABLE_LOCKS */
1064 osi_rxSleep(&call->tq);
1065 #endif /* RX_ENABLE_LOCKS */
1067 if (call->flags & RX_CALL_TQ_CLEARME) {
1068 rxi_ClearTransmitQueue(call, 0);
1069 queue_Init(&call->tq);
1071 MUTEX_EXIT(&call->lock);
1073 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1079 rxi_HasActiveCalls(aconn)
1080 register struct rx_connection *aconn; {
1082 register struct rx_call *tcall;
1086 for(i=0; i<RX_MAXCALLS; i++) {
1087 if ((tcall = aconn->call[i])) {
1088 if ((tcall->state == RX_STATE_ACTIVE)
1089 || (tcall->state == RX_STATE_PRECALL)) {
1100 rxi_GetCallNumberVector(aconn, aint32s)
1101 register struct rx_connection *aconn;
1102 register afs_int32 *aint32s; {
1104 register struct rx_call *tcall;
1108 for(i=0; i<RX_MAXCALLS; i++) {
1109 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1110 aint32s[i] = aconn->callNumber[i]+1;
1112 aint32s[i] = aconn->callNumber[i];
1119 rxi_SetCallNumberVector(aconn, aint32s)
1120 register struct rx_connection *aconn;
1121 register afs_int32 *aint32s; {
1123 register struct rx_call *tcall;
1127 for(i=0; i<RX_MAXCALLS; i++) {
1128 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1129 aconn->callNumber[i] = aint32s[i] - 1;
1131 aconn->callNumber[i] = aint32s[i];
1137 /* Advertise a new service. A service is named locally by a UDP port
1138 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1141 rx_NewService(port, serviceId, serviceName, securityObjects,
1142 nSecurityObjects, serviceProc)
1145 char *serviceName; /* Name for identification purposes (e.g. the
1146 * service name might be used for probing for
1148 struct rx_securityClass **securityObjects;
1149 int nSecurityObjects;
1150 afs_int32 (*serviceProc)();
1152 osi_socket socket = OSI_NULLSOCKET;
1153 register struct rx_service *tservice;
1159 if (serviceId == 0) {
1160 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1166 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1173 tservice = rxi_AllocService();
1176 for (i = 0; i<RX_MAX_SERVICES; i++) {
1177 register struct rx_service *service = rx_services[i];
1179 if (port == service->servicePort) {
1180 if (service->serviceId == serviceId) {
1181 /* The identical service has already been
1182 * installed; if the caller was intending to
1183 * change the security classes used by this
1184 * service, he/she loses. */
1185 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1188 rxi_FreeService(tservice);
1191 /* Different service, same port: re-use the socket
1192 * which is bound to the same port */
1193 socket = service->socket;
1196 if (socket == OSI_NULLSOCKET) {
1197 /* If we don't already have a socket (from another
1198 * service on same port) get a new one */
1199 socket = rxi_GetUDPSocket(port);
1200 if (socket == OSI_NULLSOCKET) {
1203 rxi_FreeService(tservice);
1208 service->socket = socket;
1209 service->servicePort = port;
1210 service->serviceId = serviceId;
1211 service->serviceName = serviceName;
1212 service->nSecurityObjects = nSecurityObjects;
1213 service->securityObjects = securityObjects;
1214 service->minProcs = 0;
1215 service->maxProcs = 1;
1216 service->idleDeadTime = 60;
1217 service->connDeadTime = rx_connDeadTime;
1218 service->executeRequestProc = serviceProc;
1219 rx_services[i] = service; /* not visible until now */
1227 rxi_FreeService(tservice);
1228 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1232 /* Generic request processing loop. This routine should be called
1233 * by the implementation dependent rx_ServerProc. If socketp is
1234 * non-null, it will be set to the file descriptor that this thread
1235 * is now listening on. If socketp is null, this routine will never
1237 void rxi_ServerProc(threadID, newcall, socketp)
1239 struct rx_call *newcall;
1240 osi_socket *socketp;
1242 register struct rx_call *call;
1243 register afs_int32 code;
1244 register struct rx_service *tservice = NULL;
1251 call = rx_GetCall(threadID, tservice, socketp);
1252 if (socketp && *socketp != OSI_NULLSOCKET) {
1253 /* We are now a listener thread */
1258 /* if server is restarting( typically smooth shutdown) then do not
1259 * allow any new calls.
1262 if ( rx_tranquil && (call != NULL) ) {
1267 MUTEX_ENTER(&call->lock);
1269 rxi_CallError(call, RX_RESTARTING);
1270 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1272 MUTEX_EXIT(&call->lock);
1278 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1279 #ifdef RX_ENABLE_LOCKS
1281 #endif /* RX_ENABLE_LOCKS */
1282 afs_termState = AFSOP_STOP_AFS;
1283 afs_osi_Wakeup(&afs_termState);
1284 #ifdef RX_ENABLE_LOCKS
1286 #endif /* RX_ENABLE_LOCKS */
1291 tservice = call->conn->service;
1293 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1295 code = call->conn->service->executeRequestProc(call);
1297 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1299 rx_EndCall(call, code);
1300 MUTEX_ENTER(&rx_stats_mutex);
1302 MUTEX_EXIT(&rx_stats_mutex);
1307 void rx_WakeupServerProcs()
1309 struct rx_serverQueueEntry *np, *tqp;
1314 MUTEX_ENTER(&rx_serverPool_lock);
1316 #ifdef RX_ENABLE_LOCKS
1317 if (rx_waitForPacket)
1318 CV_BROADCAST(&rx_waitForPacket->cv);
1319 #else /* RX_ENABLE_LOCKS */
1320 if (rx_waitForPacket)
1321 osi_rxWakeup(rx_waitForPacket);
1322 #endif /* RX_ENABLE_LOCKS */
1323 MUTEX_ENTER(&freeSQEList_lock);
1324 for (np = rx_FreeSQEList; np; np = tqp) {
1325 tqp = *(struct rx_serverQueueEntry **)np;
1326 #ifdef RX_ENABLE_LOCKS
1327 CV_BROADCAST(&np->cv);
1328 #else /* RX_ENABLE_LOCKS */
1330 #endif /* RX_ENABLE_LOCKS */
1332 MUTEX_EXIT(&freeSQEList_lock);
1333 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1334 #ifdef RX_ENABLE_LOCKS
1335 CV_BROADCAST(&np->cv);
1336 #else /* RX_ENABLE_LOCKS */
1338 #endif /* RX_ENABLE_LOCKS */
1340 MUTEX_EXIT(&rx_serverPool_lock);
1346 * One thing that seems to happen is that all the server threads get
1347 * tied up on some empty or slow call, and then a whole bunch of calls
1348 * arrive at once, using up the packet pool, so now there are more
1349 * empty calls. The most critical resources here are server threads
1350 * and the free packet pool. The "doreclaim" code seems to help in
1351 * general. I think that eventually we arrive in this state: there
1352 * are lots of pending calls which do have all their packets present,
1353 * so they won't be reclaimed, are multi-packet calls, so they won't
1354 * be scheduled until later, and thus are tying up most of the free
1355 * packet pool for a very long time.
1357 * 1. schedule multi-packet calls if all the packets are present.
1358 * Probably CPU-bound operation, useful to return packets to pool.
1359 * Do what if there is a full window, but the last packet isn't here?
1360 * 3. preserve one thread which *only* runs "best" calls, otherwise
1361 * it sleeps and waits for that type of call.
1362 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1363 * the current dataquota business is badly broken. The quota isn't adjusted
1364 * to reflect how many packets are presently queued for a running call.
1365 * So, when we schedule a queued call with a full window of packets queued
1366 * up for it, that *should* free up a window full of packets for other 2d-class
1367 * calls to be able to use from the packet pool. But it doesn't.
1369 * NB. Most of the time, this code doesn't run -- since idle server threads
1370 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1371 * as a new call arrives.
1373 /* Sleep until a call arrives. Returns a pointer to the call, ready
1374 * for an rx_Read. */
1375 #ifdef RX_ENABLE_LOCKS
1377 rx_GetCall(tno, cur_service, socketp)
1379 struct rx_service *cur_service;
1380 osi_socket *socketp;
1382 struct rx_serverQueueEntry *sq;
1383 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1384 struct rx_service *service = NULL;
1387 MUTEX_ENTER(&freeSQEList_lock);
1389 if ((sq = rx_FreeSQEList)) {
1390 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1391 MUTEX_EXIT(&freeSQEList_lock);
1392 } else { /* otherwise allocate a new one and return that */
1393 MUTEX_EXIT(&freeSQEList_lock);
1394 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1395 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1396 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1399 MUTEX_ENTER(&rx_serverPool_lock);
1400 if (cur_service != NULL) {
1401 ReturnToServerPool(cur_service);
1404 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1405 register struct rx_call *tcall, *ncall;
1406 choice2 = (struct rx_call *) 0;
1407 /* Scan for eligible incoming calls. A call is not eligible
1408 * if the maximum number of calls for its service type are
1409 * already executing */
1410 /* One thread will process calls FCFS (to prevent starvation),
1411 * while the other threads may run ahead looking for calls which
1412 * have all their input data available immediately. This helps
1413 * keep threads from blocking, waiting for data from the client. */
1414 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1415 service = tcall->conn->service;
1416 if (!QuotaOK(service)) {
1419 if (!tno || !tcall->queue_item_header.next ) {
1420 /* If we're thread 0, then we'll just use
1421 * this call. If we haven't been able to find an optimal
1422 * choice, and we're at the end of the list, then use a
1423 * 2d choice if one has been identified. Otherwise... */
1424 call = (choice2 ? choice2 : tcall);
1425 service = call->conn->service;
1426 } else if (!queue_IsEmpty(&tcall->rq)) {
1427 struct rx_packet *rp;
1428 rp = queue_First(&tcall->rq, rx_packet);
1429 if (rp->header.seq == 1) {
1430 if (!meltdown_1pkt ||
1431 (rp->header.flags & RX_LAST_PACKET)) {
1433 } else if (rxi_2dchoice && !choice2 &&
1434 !(tcall->flags & RX_CALL_CLEARED) &&
1435 (tcall->rprev > rxi_HardAckRate)) {
1437 } else rxi_md2cnt++;
1443 ReturnToServerPool(service);
1450 rxi_ServerThreadSelectingCall = 1;
1451 MUTEX_EXIT(&rx_serverPool_lock);
1452 MUTEX_ENTER(&call->lock);
1453 MUTEX_ENTER(&rx_serverPool_lock);
1455 if (queue_IsEmpty(&call->rq) ||
1456 queue_First(&call->rq, rx_packet)->header.seq != 1)
1457 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1459 CLEAR_CALL_QUEUE_LOCK(call);
1461 MUTEX_EXIT(&call->lock);
1462 ReturnToServerPool(service);
1463 rxi_ServerThreadSelectingCall = 0;
1464 CV_SIGNAL(&rx_serverPool_cv);
1465 call = (struct rx_call*)0;
1468 call->flags &= (~RX_CALL_WAIT_PROC);
1469 MUTEX_ENTER(&rx_stats_mutex);
1471 MUTEX_EXIT(&rx_stats_mutex);
1472 rxi_ServerThreadSelectingCall = 0;
1473 CV_SIGNAL(&rx_serverPool_cv);
1474 MUTEX_EXIT(&rx_serverPool_lock);
1478 /* If there are no eligible incoming calls, add this process
1479 * to the idle server queue, to wait for one */
1483 *socketp = OSI_NULLSOCKET;
1485 sq->socketp = socketp;
1486 queue_Append(&rx_idleServerQueue, sq);
1487 #ifndef AFS_AIX41_ENV
1488 rx_waitForPacket = sq;
1489 #endif /* AFS_AIX41_ENV */
1491 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1493 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1494 MUTEX_EXIT(&rx_serverPool_lock);
1495 return (struct rx_call *)0;
1498 } while (!(call = sq->newcall) &&
1499 !(socketp && *socketp != OSI_NULLSOCKET));
1500 MUTEX_EXIT(&rx_serverPool_lock);
1502 MUTEX_ENTER(&call->lock);
1508 MUTEX_ENTER(&freeSQEList_lock);
1509 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1510 rx_FreeSQEList = sq;
1511 MUTEX_EXIT(&freeSQEList_lock);
1514 clock_GetTime(&call->startTime);
1515 call->state = RX_STATE_ACTIVE;
1516 call->mode = RX_MODE_RECEIVING;
1518 rxi_calltrace(RX_CALL_START, call);
1519 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1520 call->conn->service->servicePort,
1521 call->conn->service->serviceId, call));
1523 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1524 MUTEX_EXIT(&call->lock);
1526 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1531 #else /* RX_ENABLE_LOCKS */
1533 rx_GetCall(tno, cur_service, socketp)
1535 struct rx_service *cur_service;
1536 osi_socket *socketp;
1538 struct rx_serverQueueEntry *sq;
1539 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1540 struct rx_service *service = NULL;
1545 MUTEX_ENTER(&freeSQEList_lock);
1547 if ((sq = rx_FreeSQEList)) {
1548 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1549 MUTEX_EXIT(&freeSQEList_lock);
1550 } else { /* otherwise allocate a new one and return that */
1551 MUTEX_EXIT(&freeSQEList_lock);
1552 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1553 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1554 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1556 MUTEX_ENTER(&sq->lock);
1558 if (cur_service != NULL) {
1559 cur_service->nRequestsRunning--;
1560 if (cur_service->nRequestsRunning < cur_service->minProcs)
1564 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1565 register struct rx_call *tcall, *ncall;
1566 /* Scan for eligible incoming calls. A call is not eligible
1567 * if the maximum number of calls for its service type are
1568 * already executing */
1569 /* One thread will process calls FCFS (to prevent starvation),
1570 * while the other threads may run ahead looking for calls which
1571 * have all their input data available immediately. This helps
1572 * keep threads from blocking, waiting for data from the client. */
1573 choice2 = (struct rx_call *) 0;
1574 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1575 service = tcall->conn->service;
1576 if (QuotaOK(service)) {
1577 if (!tno || !tcall->queue_item_header.next ) {
1578 /* If we're thread 0, then we'll just use
1579 * this call. If we haven't been able to find an optimal
1580 * choice, and we're at the end of the list, then use a
1581 * 2d choice if one has been identified. Otherwise... */
1582 call = (choice2 ? choice2 : tcall);
1583 service = call->conn->service;
1584 } else if (!queue_IsEmpty(&tcall->rq)) {
1585 struct rx_packet *rp;
1586 rp = queue_First(&tcall->rq, rx_packet);
1587 if (rp->header.seq == 1
1588 && (!meltdown_1pkt ||
1589 (rp->header.flags & RX_LAST_PACKET))) {
1591 } else if (rxi_2dchoice && !choice2 &&
1592 !(tcall->flags & RX_CALL_CLEARED) &&
1593 (tcall->rprev > rxi_HardAckRate)) {
1595 } else rxi_md2cnt++;
1605 /* we can't schedule a call if there's no data!!! */
1606 /* send an ack if there's no data, if we're missing the
1607 * first packet, or we're missing something between first
1608 * and last -- there's a "hole" in the incoming data. */
1609 if (queue_IsEmpty(&call->rq) ||
1610 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1611 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1612 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1614 call->flags &= (~RX_CALL_WAIT_PROC);
1615 service->nRequestsRunning++;
1616 /* just started call in minProcs pool, need fewer to maintain
1618 if (service->nRequestsRunning <= service->minProcs)
1622 /* MUTEX_EXIT(&call->lock); */
1625 /* If there are no eligible incoming calls, add this process
1626 * to the idle server queue, to wait for one */
1629 *socketp = OSI_NULLSOCKET;
1631 sq->socketp = socketp;
1632 queue_Append(&rx_idleServerQueue, sq);
1636 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1639 return (struct rx_call *)0;
1642 } while (!(call = sq->newcall) &&
1643 !(socketp && *socketp != OSI_NULLSOCKET));
1645 MUTEX_EXIT(&sq->lock);
1647 MUTEX_ENTER(&freeSQEList_lock);
1648 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1649 rx_FreeSQEList = sq;
1650 MUTEX_EXIT(&freeSQEList_lock);
1653 clock_GetTime(&call->startTime);
1654 call->state = RX_STATE_ACTIVE;
1655 call->mode = RX_MODE_RECEIVING;
1657 rxi_calltrace(RX_CALL_START, call);
1658 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1659 call->conn->service->servicePort,
1660 call->conn->service->serviceId, call));
1662 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1670 #endif /* RX_ENABLE_LOCKS */
1674 /* Establish a procedure to be called when a packet arrives for a
1675 * call. This routine will be called at most once after each call,
1676 * and will also be called if there is an error condition on the or
1677 * the call is complete. Used by multi rx to build a selection
1678 * function which determines which of several calls is likely to be a
1679 * good one to read from.
1680 * NOTE: the way this is currently implemented it is probably only a
1681 * good idea to (1) use it immediately after a newcall (clients only)
1682 * and (2) only use it once. Other uses currently void your warranty
1684 void rx_SetArrivalProc(call, proc, handle, arg)
1685 register struct rx_call *call;
1686 register VOID (*proc)();
1687 register VOID *handle;
1690 call->arrivalProc = proc;
1691 call->arrivalProcHandle = handle;
1692 call->arrivalProcArg = arg;
1695 /* Call is finished (possibly prematurely). Return rc to the peer, if
1696 * appropriate, and return the final error code from the conversation
1699 afs_int32 rx_EndCall(call, rc)
1700 register struct rx_call *call;
1703 register struct rx_connection *conn = call->conn;
1704 register struct rx_service *service;
1705 register struct rx_packet *tp; /* Temporary packet pointer */
1706 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1710 dpf(("rx_EndCall(call %x)\n", call));
1714 MUTEX_ENTER(&call->lock);
1716 if (rc == 0 && call->error == 0) {
1717 call->abortCode = 0;
1718 call->abortCount = 0;
1721 call->arrivalProc = (VOID (*)()) 0;
1722 if (rc && call->error == 0) {
1723 rxi_CallError(call, rc);
1724 /* Send an abort message to the peer if this error code has
1725 * only just been set. If it was set previously, assume the
1726 * peer has already been sent the error code or will request it
1728 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1730 if (conn->type == RX_SERVER_CONNECTION) {
1731 /* Make sure reply or at least dummy reply is sent */
1732 if (call->mode == RX_MODE_RECEIVING) {
1733 rxi_WriteProc(call, 0, 0);
1735 if (call->mode == RX_MODE_SENDING) {
1736 rxi_FlushWrite(call);
1738 service = conn->service;
1739 rxi_calltrace(RX_CALL_END, call);
1740 /* Call goes to hold state until reply packets are acknowledged */
1741 if (call->tfirst + call->nSoftAcked < call->tnext) {
1742 call->state = RX_STATE_HOLD;
1744 call->state = RX_STATE_DALLY;
1745 rxi_ClearTransmitQueue(call, 0);
1746 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1747 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1750 else { /* Client connection */
1752 /* Make sure server receives input packets, in the case where
1753 * no reply arguments are expected */
1754 if ((call->mode == RX_MODE_SENDING)
1755 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1756 (void) rxi_ReadProc(call, &dummy, 1);
1758 /* We need to release the call lock since it's lower than the
1759 * conn_call_lock and we don't want to hold the conn_call_lock
1760 * over the rx_ReadProc call. The conn_call_lock needs to be held
1761 * here for the case where rx_NewCall is perusing the calls on
1762 * the connection structure. We don't want to signal until
1763 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1764 * have checked this call, found it active and by the time it
1765 * goes to sleep, will have missed the signal.
1767 MUTEX_EXIT(&call->lock);
1768 MUTEX_ENTER(&conn->conn_call_lock);
1769 MUTEX_ENTER(&call->lock);
1770 MUTEX_ENTER(&conn->conn_data_lock);
1771 conn->flags |= RX_CONN_BUSY;
1772 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1773 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1774 MUTEX_EXIT(&conn->conn_data_lock);
1775 #ifdef RX_ENABLE_LOCKS
1776 CV_BROADCAST(&conn->conn_call_cv);
1781 #ifdef RX_ENABLE_LOCKS
1783 MUTEX_EXIT(&conn->conn_data_lock);
1785 #endif /* RX_ENABLE_LOCKS */
1786 call->state = RX_STATE_DALLY;
1788 error = call->error;
1790 /* currentPacket, nLeft, and NFree must be zeroed here, because
1791 * ResetCall cannot: ResetCall may be called at splnet(), in the
1792 * kernel version, and may interrupt the macros rx_Read or
1793 * rx_Write, which run at normal priority for efficiency. */
1794 if (call->currentPacket) {
1795 rxi_FreePacket(call->currentPacket);
1796 call->currentPacket = (struct rx_packet *) 0;
1797 call->nLeft = call->nFree = call->curlen = 0;
1800 call->nLeft = call->nFree = call->curlen = 0;
1802 /* Free any packets from the last call to ReadvProc/WritevProc */
1803 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1808 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1809 MUTEX_EXIT(&call->lock);
1810 if (conn->type == RX_CLIENT_CONNECTION) {
1811 MUTEX_EXIT(&conn->conn_call_lock);
1812 conn->flags &= ~RX_CONN_BUSY;
1817 * Map errors to the local host's errno.h format.
1819 error = ntoh_syserr_conv(error);
1823 #if !defined(KERNEL)
1825 /* Call this routine when shutting down a server or client (especially
1826 * clients). This will allow Rx to gracefully garbage collect server
1827 * connections, and reduce the number of retries that a server might
1828 * make to a dead client.
1829 * This is not quite right, since some calls may still be ongoing and
1830 * we can't lock them to destroy them. */
1831 void rx_Finalize() {
1832 register struct rx_connection **conn_ptr, **conn_end;
1836 if (rxinit_status == 1) {
1838 return; /* Already shutdown. */
1840 rxi_DeleteCachedConnections();
1841 if (rx_connHashTable) {
1842 MUTEX_ENTER(&rx_connHashTable_lock);
1843 for (conn_ptr = &rx_connHashTable[0],
1844 conn_end = &rx_connHashTable[rx_hashTableSize];
1845 conn_ptr < conn_end; conn_ptr++) {
1846 struct rx_connection *conn, *next;
1847 for (conn = *conn_ptr; conn; conn = next) {
1849 if (conn->type == RX_CLIENT_CONNECTION) {
1850 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1852 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1853 #ifdef RX_ENABLE_LOCKS
1854 rxi_DestroyConnectionNoLock(conn);
1855 #else /* RX_ENABLE_LOCKS */
1856 rxi_DestroyConnection(conn);
1857 #endif /* RX_ENABLE_LOCKS */
1861 #ifdef RX_ENABLE_LOCKS
1862 while (rx_connCleanup_list) {
1863 struct rx_connection *conn;
1864 conn = rx_connCleanup_list;
1865 rx_connCleanup_list = rx_connCleanup_list->next;
1866 MUTEX_EXIT(&rx_connHashTable_lock);
1867 rxi_CleanupConnection(conn);
1868 MUTEX_ENTER(&rx_connHashTable_lock);
1870 MUTEX_EXIT(&rx_connHashTable_lock);
1871 #endif /* RX_ENABLE_LOCKS */
1880 /* if we wakeup packet waiter too often, can get in loop with two
1881 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1883 rxi_PacketsUnWait() {
1885 if (!rx_waitingForPackets) {
1889 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1890 return; /* still over quota */
1893 rx_waitingForPackets = 0;
1894 #ifdef RX_ENABLE_LOCKS
1895 CV_BROADCAST(&rx_waitingForPackets_cv);
1897 osi_rxWakeup(&rx_waitingForPackets);
1903 /* ------------------Internal interfaces------------------------- */
1905 /* Return this process's service structure for the
1906 * specified socket and service */
1907 struct rx_service *rxi_FindService(socket, serviceId)
1908 register osi_socket socket;
1909 register u_short serviceId;
1911 register struct rx_service **sp;
1912 for (sp = &rx_services[0]; *sp; sp++) {
1913 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1919 /* Allocate a call structure, for the indicated channel of the
1920 * supplied connection. The mode and state of the call must be set by
1922 struct rx_call *rxi_NewCall(conn, channel)
1923 register struct rx_connection *conn;
1924 register int channel;
1926 register struct rx_call *call;
1927 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1928 register struct rx_call *cp; /* Call pointer temp */
1929 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1930 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1932 /* Grab an existing call structure, or allocate a new one.
1933 * Existing call structures are assumed to have been left reset by
1935 MUTEX_ENTER(&rx_freeCallQueue_lock);
1937 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1939 * EXCEPT that the TQ might not yet be cleared out.
1940 * Skip over those with in-use TQs.
1943 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1944 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1950 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1951 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1952 call = queue_First(&rx_freeCallQueue, rx_call);
1953 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1955 MUTEX_ENTER(&rx_stats_mutex);
1956 rx_stats.nFreeCallStructs--;
1957 MUTEX_EXIT(&rx_stats_mutex);
1958 MUTEX_EXIT(&rx_freeCallQueue_lock);
1959 MUTEX_ENTER(&call->lock);
1960 CLEAR_CALL_QUEUE_LOCK(call);
1961 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1962 /* Now, if TQ wasn't cleared earlier, do it now. */
1963 if (call->flags & RX_CALL_TQ_CLEARME) {
1964 rxi_ClearTransmitQueue(call, 0);
1965 queue_Init(&call->tq);
1967 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1968 /* Bind the call to its connection structure */
1970 rxi_ResetCall(call, 1);
1973 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1975 MUTEX_EXIT(&rx_freeCallQueue_lock);
1976 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1977 MUTEX_ENTER(&call->lock);
1978 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1979 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1980 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1982 MUTEX_ENTER(&rx_stats_mutex);
1983 rx_stats.nCallStructs++;
1984 MUTEX_EXIT(&rx_stats_mutex);
1985 /* Initialize once-only items */
1986 queue_Init(&call->tq);
1987 queue_Init(&call->rq);
1988 queue_Init(&call->iovq);
1989 /* Bind the call to its connection structure (prereq for reset) */
1991 rxi_ResetCall(call, 1);
1993 call->channel = channel;
1994 call->callNumber = &conn->callNumber[channel];
1995 /* Note that the next expected call number is retained (in
1996 * conn->callNumber[i]), even if we reallocate the call structure
1998 conn->call[channel] = call;
1999 /* if the channel's never been used (== 0), we should start at 1, otherwise
2000 the call number is valid from the last time this channel was used */
2001 if (*call->callNumber == 0) *call->callNumber = 1;
2003 MUTEX_EXIT(&call->lock);
2007 /* A call has been inactive long enough that so we can throw away
2008 * state, including the call structure, which is placed on the call
2010 * Call is locked upon entry.
2012 #ifdef RX_ENABLE_LOCKS
2013 void rxi_FreeCall(call, haveCTLock)
2014 int haveCTLock; /* Set if called from rxi_ReapConnections */
2015 #else /* RX_ENABLE_LOCKS */
2016 void rxi_FreeCall(call)
2017 #endif /* RX_ENABLE_LOCKS */
2018 register struct rx_call *call;
2020 register int channel = call->channel;
2021 register struct rx_connection *conn = call->conn;
2024 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2025 (*call->callNumber)++;
2026 rxi_ResetCall(call, 0);
2027 call->conn->call[channel] = (struct rx_call *) 0;
2029 MUTEX_ENTER(&rx_freeCallQueue_lock);
2030 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2031 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2032 /* A call may be free even though its transmit queue is still in use.
2033 * Since we search the call list from head to tail, put busy calls at
2034 * the head of the list, and idle calls at the tail.
2036 if (call->flags & RX_CALL_TQ_BUSY)
2037 queue_Prepend(&rx_freeCallQueue, call);
2039 queue_Append(&rx_freeCallQueue, call);
2040 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2041 queue_Append(&rx_freeCallQueue, call);
2042 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2043 MUTEX_ENTER(&rx_stats_mutex);
2044 rx_stats.nFreeCallStructs++;
2045 MUTEX_EXIT(&rx_stats_mutex);
2047 MUTEX_EXIT(&rx_freeCallQueue_lock);
2049 /* Destroy the connection if it was previously slated for
2050 * destruction, i.e. the Rx client code previously called
2051 * rx_DestroyConnection (client connections), or
2052 * rxi_ReapConnections called the same routine (server
2053 * connections). Only do this, however, if there are no
2054 * outstanding calls. Note that for fine grain locking, there appears
2055 * to be a deadlock in that rxi_FreeCall has a call locked and
2056 * DestroyConnectionNoLock locks each call in the conn. But note a
2057 * few lines up where we have removed this call from the conn.
2058 * If someone else destroys a connection, they either have no
2059 * call lock held or are going through this section of code.
2061 if (conn->flags & RX_CONN_DESTROY_ME) {
2062 MUTEX_ENTER(&conn->conn_data_lock);
2064 MUTEX_EXIT(&conn->conn_data_lock);
2065 #ifdef RX_ENABLE_LOCKS
2067 rxi_DestroyConnectionNoLock(conn);
2069 rxi_DestroyConnection(conn);
2070 #else /* RX_ENABLE_LOCKS */
2071 rxi_DestroyConnection(conn);
2072 #endif /* RX_ENABLE_LOCKS */
2076 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2077 char *rxi_Alloc(size)
2078 register size_t size;
2082 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2083 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2086 int glockOwner = ISAFS_GLOCK();
2090 MUTEX_ENTER(&rx_stats_mutex);
2091 rxi_Alloccnt++; rxi_Allocsize += size;
2092 MUTEX_EXIT(&rx_stats_mutex);
2093 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2094 if (size > AFS_SMALLOCSIZ) {
2095 p = (char *) osi_AllocMediumSpace(size);
2097 p = (char *) osi_AllocSmall(size, 1);
2098 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2103 p = (char *) osi_Alloc(size);
2105 if (!p) osi_Panic("rxi_Alloc error");
2110 void rxi_Free(addr, size)
2112 register size_t size;
2114 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2115 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2118 int glockOwner = ISAFS_GLOCK();
2122 MUTEX_ENTER(&rx_stats_mutex);
2123 rxi_Alloccnt--; rxi_Allocsize -= size;
2124 MUTEX_EXIT(&rx_stats_mutex);
2125 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2126 if (size > AFS_SMALLOCSIZ)
2127 osi_FreeMediumSpace(addr);
2129 osi_FreeSmall(addr);
2130 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2135 osi_Free(addr, size);
2139 /* Find the peer process represented by the supplied (host,port)
2140 * combination. If there is no appropriate active peer structure, a
2141 * new one will be allocated and initialized
2142 * The origPeer, if set, is a pointer to a peer structure on which the
2143 * refcount will be be decremented. This is used to replace the peer
2144 * structure hanging off a connection structure */
2145 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2146 register afs_uint32 host;
2147 register u_short port;
2148 struct rx_peer *origPeer;
2151 register struct rx_peer *pp;
2153 hashIndex = PEER_HASH(host, port);
2154 MUTEX_ENTER(&rx_peerHashTable_lock);
2155 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2156 if ((pp->host == host) && (pp->port == port)) break;
2160 pp = rxi_AllocPeer(); /* This bzero's *pp */
2161 pp->host = host; /* set here or in InitPeerParams is zero */
2163 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2164 queue_Init(&pp->congestionQueue);
2165 queue_Init(&pp->rpcStats);
2166 pp->next = rx_peerHashTable[hashIndex];
2167 rx_peerHashTable[hashIndex] = pp;
2168 rxi_InitPeerParams(pp);
2169 MUTEX_ENTER(&rx_stats_mutex);
2170 rx_stats.nPeerStructs++;
2171 MUTEX_EXIT(&rx_stats_mutex);
2178 origPeer->refCount--;
2179 MUTEX_EXIT(&rx_peerHashTable_lock);
2184 /* Find the connection at (host, port) started at epoch, and with the
2185 * given connection id. Creates the server connection if necessary.
2186 * The type specifies whether a client connection or a server
2187 * connection is desired. In both cases, (host, port) specify the
2188 * peer's (host, pair) pair. Client connections are not made
2189 * automatically by this routine. The parameter socket gives the
2190 * socket descriptor on which the packet was received. This is used,
2191 * in the case of server connections, to check that *new* connections
2192 * come via a valid (port, serviceId). Finally, the securityIndex
2193 * parameter must match the existing index for the connection. If a
2194 * server connection is created, it will be created using the supplied
2195 * index, if the index is valid for this service */
2196 struct rx_connection *
2197 rxi_FindConnection(socket, host, port, serviceId, cid,
2198 epoch, type, securityIndex)
2200 register afs_int32 host;
2201 register u_short port;
2206 u_int securityIndex;
2208 int hashindex, flag;
2209 register struct rx_connection *conn;
2210 struct rx_peer *peer;
2211 hashindex = CONN_HASH(host, port, cid, epoch, type);
2212 MUTEX_ENTER(&rx_connHashTable_lock);
2213 rxLastConn ? (conn = rxLastConn, flag = 0) :
2214 (conn = rx_connHashTable[hashindex], flag = 1);
2216 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2217 && (epoch == conn->epoch)) {
2218 register struct rx_peer *pp = conn->peer;
2219 if (securityIndex != conn->securityIndex) {
2220 /* this isn't supposed to happen, but someone could forge a packet
2221 like this, and there seems to be some CM bug that makes this
2222 happen from time to time -- in which case, the fileserver
2224 MUTEX_EXIT(&rx_connHashTable_lock);
2225 return (struct rx_connection *) 0;
2227 /* epoch's high order bits mean route for security reasons only on
2228 * the cid, not the host and port fields.
2230 if (conn->epoch & 0x80000000) break;
2231 if (((type == RX_CLIENT_CONNECTION)
2232 || (pp->host == host)) && (pp->port == port))
2237 /* the connection rxLastConn that was used the last time is not the
2238 ** one we are looking for now. Hence, start searching in the hash */
2240 conn = rx_connHashTable[hashindex];
2246 struct rx_service *service;
2247 if (type == RX_CLIENT_CONNECTION) {
2248 MUTEX_EXIT(&rx_connHashTable_lock);
2249 return (struct rx_connection *) 0;
2251 service = rxi_FindService(socket, serviceId);
2252 if (!service || (securityIndex >= service->nSecurityObjects)
2253 || (service->securityObjects[securityIndex] == 0)) {
2254 MUTEX_EXIT(&rx_connHashTable_lock);
2255 return (struct rx_connection *) 0;
2257 conn = rxi_AllocConnection(); /* This bzero's the connection */
2258 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2260 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2262 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2263 conn->next = rx_connHashTable[hashindex];
2264 rx_connHashTable[hashindex] = conn;
2265 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2266 conn->type = RX_SERVER_CONNECTION;
2267 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2268 conn->epoch = epoch;
2269 conn->cid = cid & RX_CIDMASK;
2270 /* conn->serial = conn->lastSerial = 0; */
2271 /* conn->timeout = 0; */
2272 conn->ackRate = RX_FAST_ACK_RATE;
2273 conn->service = service;
2274 conn->serviceId = serviceId;
2275 conn->securityIndex = securityIndex;
2276 conn->securityObject = service->securityObjects[securityIndex];
2277 conn->nSpecific = 0;
2278 conn->specific = NULL;
2279 rx_SetConnDeadTime(conn, service->connDeadTime);
2280 /* Notify security object of the new connection */
2281 RXS_NewConnection(conn->securityObject, conn);
2282 /* XXXX Connection timeout? */
2283 if (service->newConnProc) (*service->newConnProc)(conn);
2284 MUTEX_ENTER(&rx_stats_mutex);
2285 rx_stats.nServerConns++;
2286 MUTEX_EXIT(&rx_stats_mutex);
2290 /* Ensure that the peer structure is set up in such a way that
2291 ** replies in this connection go back to that remote interface
2292 ** from which the last packet was sent out. In case, this packet's
2293 ** source IP address does not match the peer struct for this conn,
2294 ** then drop the refCount on conn->peer and get a new peer structure.
2295 ** We can check the host,port field in the peer structure without the
2296 ** rx_peerHashTable_lock because the peer structure has its refCount
2297 ** incremented and the only time the host,port in the peer struct gets
2298 ** updated is when the peer structure is created.
2300 if (conn->peer->host == host )
2301 peer = conn->peer; /* no change to the peer structure */
2303 peer = rxi_FindPeer(host, port, conn->peer, 1);
2306 MUTEX_ENTER(&conn->conn_data_lock);
2309 MUTEX_EXIT(&conn->conn_data_lock);
2311 rxLastConn = conn; /* store this connection as the last conn used */
2312 MUTEX_EXIT(&rx_connHashTable_lock);
2316 /* There are two packet tracing routines available for testing and monitoring
2317 * Rx. One is called just after every packet is received and the other is
2318 * called just before every packet is sent. Received packets, have had their
2319 * headers decoded, and packets to be sent have not yet had their headers
2320 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2321 * containing the network address. Both can be modified. The return value, if
2322 * non-zero, indicates that the packet should be dropped. */
2324 int (*rx_justReceived)() = 0;
2325 int (*rx_almostSent)() = 0;
2327 /* A packet has been received off the interface. Np is the packet, socket is
2328 * the socket number it was received from (useful in determining which service
2329 * this packet corresponds to), and (host, port) reflect the host,port of the
2330 * sender. This call returns the packet to the caller if it is finished with
2331 * it, rather than de-allocating it, just as a small performance hack */
2333 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2334 register struct rx_packet *np;
2339 struct rx_call **newcallp;
2341 register struct rx_call *call;
2342 register struct rx_connection *conn;
2344 afs_uint32 currentCallNumber;
2350 struct rx_packet *tnp;
2353 /* We don't print out the packet until now because (1) the time may not be
2354 * accurate enough until now in the lwp implementation (rx_Listener only gets
2355 * the time after the packet is read) and (2) from a protocol point of view,
2356 * this is the first time the packet has been seen */
2357 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2358 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2359 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2360 np->header.serial, packetType, host, port, np->header.serviceId,
2361 np->header.epoch, np->header.cid, np->header.callNumber,
2362 np->header.seq, np->header.flags, np));
2365 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2366 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2369 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2370 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2373 /* If an input tracer function is defined, call it with the packet and
2374 * network address. Note this function may modify its arguments. */
2375 if (rx_justReceived) {
2376 struct sockaddr_in addr;
2378 addr.sin_family = AF_INET;
2379 addr.sin_port = port;
2380 addr.sin_addr.s_addr = host;
2381 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2382 addr.sin_len = sizeof(addr);
2383 #endif /* AFS_OSF_ENV */
2384 drop = (*rx_justReceived) (np, &addr);
2385 /* drop packet if return value is non-zero */
2386 if (drop) return np;
2387 port = addr.sin_port; /* in case fcn changed addr */
2388 host = addr.sin_addr.s_addr;
2392 /* If packet was not sent by the client, then *we* must be the client */
2393 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2394 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2396 /* Find the connection (or fabricate one, if we're the server & if
2397 * necessary) associated with this packet */
2398 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2399 np->header.cid, np->header.epoch, type,
2400 np->header.securityIndex);
2403 /* If no connection found or fabricated, just ignore the packet.
2404 * (An argument could be made for sending an abort packet for
2409 MUTEX_ENTER(&conn->conn_data_lock);
2410 if (conn->maxSerial < np->header.serial)
2411 conn->maxSerial = np->header.serial;
2412 MUTEX_EXIT(&conn->conn_data_lock);
2414 /* If the connection is in an error state, send an abort packet and ignore
2415 * the incoming packet */
2417 /* Don't respond to an abort packet--we don't want loops! */
2418 MUTEX_ENTER(&conn->conn_data_lock);
2419 if (np->header.type != RX_PACKET_TYPE_ABORT)
2420 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2422 MUTEX_EXIT(&conn->conn_data_lock);
2426 /* Check for connection-only requests (i.e. not call specific). */
2427 if (np->header.callNumber == 0) {
2428 switch (np->header.type) {
2429 case RX_PACKET_TYPE_ABORT:
2430 /* What if the supplied error is zero? */
2431 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2432 MUTEX_ENTER(&conn->conn_data_lock);
2434 MUTEX_EXIT(&conn->conn_data_lock);
2436 case RX_PACKET_TYPE_CHALLENGE:
2437 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2438 MUTEX_ENTER(&conn->conn_data_lock);
2440 MUTEX_EXIT(&conn->conn_data_lock);
2442 case RX_PACKET_TYPE_RESPONSE:
2443 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2444 MUTEX_ENTER(&conn->conn_data_lock);
2446 MUTEX_EXIT(&conn->conn_data_lock);
2448 case RX_PACKET_TYPE_PARAMS:
2449 case RX_PACKET_TYPE_PARAMS+1:
2450 case RX_PACKET_TYPE_PARAMS+2:
2451 /* ignore these packet types for now */
2452 MUTEX_ENTER(&conn->conn_data_lock);
2454 MUTEX_EXIT(&conn->conn_data_lock);
2459 /* Should not reach here, unless the peer is broken: send an
2461 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2462 MUTEX_ENTER(&conn->conn_data_lock);
2463 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2465 MUTEX_EXIT(&conn->conn_data_lock);
2470 channel = np->header.cid & RX_CHANNELMASK;
2471 call = conn->call[channel];
2472 #ifdef RX_ENABLE_LOCKS
2474 MUTEX_ENTER(&call->lock);
2475 /* Test to see if call struct is still attached to conn. */
2476 if (call != conn->call[channel]) {
2478 MUTEX_EXIT(&call->lock);
2479 if (type == RX_SERVER_CONNECTION) {
2480 call = conn->call[channel];
2481 /* If we started with no call attached and there is one now,
2482 * another thread is also running this routine and has gotten
2483 * the connection channel. We should drop this packet in the tests
2484 * below. If there was a call on this connection and it's now
2485 * gone, then we'll be making a new call below.
2486 * If there was previously a call and it's now different then
2487 * the old call was freed and another thread running this routine
2488 * has created a call on this channel. One of these two threads
2489 * has a packet for the old call and the code below handles those
2493 MUTEX_ENTER(&call->lock);
2496 /* This packet can't be for this call. If the new call address is
2497 * 0 then no call is running on this channel. If there is a call
2498 * then, since this is a client connection we're getting data for
2499 * it must be for the previous call.
2501 MUTEX_ENTER(&rx_stats_mutex);
2502 rx_stats.spuriousPacketsRead++;
2503 MUTEX_EXIT(&rx_stats_mutex);
2504 MUTEX_ENTER(&conn->conn_data_lock);
2506 MUTEX_EXIT(&conn->conn_data_lock);
2511 currentCallNumber = conn->callNumber[channel];
2513 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2514 if (np->header.callNumber < currentCallNumber) {
2515 MUTEX_ENTER(&rx_stats_mutex);
2516 rx_stats.spuriousPacketsRead++;
2517 MUTEX_EXIT(&rx_stats_mutex);
2518 #ifdef RX_ENABLE_LOCKS
2520 MUTEX_EXIT(&call->lock);
2522 MUTEX_ENTER(&conn->conn_data_lock);
2524 MUTEX_EXIT(&conn->conn_data_lock);
2528 call = rxi_NewCall(conn, channel);
2529 MUTEX_ENTER(&call->lock);
2530 *call->callNumber = np->header.callNumber;
2531 call->state = RX_STATE_PRECALL;
2532 clock_GetTime(&call->queueTime);
2533 hzero(call->bytesSent);
2534 hzero(call->bytesRcvd);
2535 rxi_KeepAliveOn(call);
2537 else if (np->header.callNumber != currentCallNumber) {
2538 /* Wait until the transmit queue is idle before deciding
2539 * whether to reset the current call. Chances are that the
2540 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2543 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2544 while ((call->state == RX_STATE_ACTIVE) &&
2545 (call->flags & RX_CALL_TQ_BUSY)) {
2546 call->flags |= RX_CALL_TQ_WAIT;
2547 #ifdef RX_ENABLE_LOCKS
2548 CV_WAIT(&call->cv_tq, &call->lock);
2549 #else /* RX_ENABLE_LOCKS */
2550 osi_rxSleep(&call->tq);
2551 #endif /* RX_ENABLE_LOCKS */
2553 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2554 /* If the new call cannot be taken right now send a busy and set
2555 * the error condition in this call, so that it terminates as
2556 * quickly as possible */
2557 if (call->state == RX_STATE_ACTIVE) {
2558 struct rx_packet *tp;
2560 rxi_CallError(call, RX_CALL_DEAD);
2561 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2562 MUTEX_EXIT(&call->lock);
2563 MUTEX_ENTER(&conn->conn_data_lock);
2565 MUTEX_EXIT(&conn->conn_data_lock);
2568 rxi_ResetCall(call, 0);
2569 *call->callNumber = np->header.callNumber;
2570 call->state = RX_STATE_PRECALL;
2571 clock_GetTime(&call->queueTime);
2572 hzero(call->bytesSent);
2573 hzero(call->bytesRcvd);
2575 * If the number of queued calls exceeds the overload
2576 * threshold then abort this call.
2578 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2579 struct rx_packet *tp;
2581 rxi_CallError(call, rx_BusyError);
2582 tp = rxi_SendCallAbort(call, np, 1, 0);
2583 MUTEX_EXIT(&call->lock);
2584 MUTEX_ENTER(&conn->conn_data_lock);
2586 MUTEX_EXIT(&conn->conn_data_lock);
2589 rxi_KeepAliveOn(call);
2592 /* Continuing call; do nothing here. */
2594 } else { /* we're the client */
2595 /* Ignore all incoming acknowledgements for calls in DALLY state */
2596 if ( call && (call->state == RX_STATE_DALLY)
2597 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2598 MUTEX_ENTER(&rx_stats_mutex);
2599 rx_stats.ignorePacketDally++;
2600 MUTEX_EXIT(&rx_stats_mutex);
2601 #ifdef RX_ENABLE_LOCKS
2603 MUTEX_EXIT(&call->lock);
2606 MUTEX_ENTER(&conn->conn_data_lock);
2608 MUTEX_EXIT(&conn->conn_data_lock);
2612 /* Ignore anything that's not relevant to the current call. If there
2613 * isn't a current call, then no packet is relevant. */
2614 if (!call || (np->header.callNumber != currentCallNumber)) {
2615 MUTEX_ENTER(&rx_stats_mutex);
2616 rx_stats.spuriousPacketsRead++;
2617 MUTEX_EXIT(&rx_stats_mutex);
2618 #ifdef RX_ENABLE_LOCKS
2620 MUTEX_EXIT(&call->lock);
2623 MUTEX_ENTER(&conn->conn_data_lock);
2625 MUTEX_EXIT(&conn->conn_data_lock);
2628 /* If the service security object index stamped in the packet does not
2629 * match the connection's security index, ignore the packet */
2630 if (np->header.securityIndex != conn->securityIndex) {
2631 #ifdef RX_ENABLE_LOCKS
2632 MUTEX_EXIT(&call->lock);
2634 MUTEX_ENTER(&conn->conn_data_lock);
2636 MUTEX_EXIT(&conn->conn_data_lock);
2640 /* If we're receiving the response, then all transmit packets are
2641 * implicitly acknowledged. Get rid of them. */
2642 if (np->header.type == RX_PACKET_TYPE_DATA) {
2643 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2644 /* XXX Hack. Because we must release the global rx lock when
2645 * sending packets (osi_NetSend) we drop all acks while we're
2646 * traversing the tq in rxi_Start sending packets out because
2647 * packets may move to the freePacketQueue as result of being here!
2648 * So we drop these packets until we're safely out of the
2649 * traversing. Really ugly!
2650 * For fine grain RX locking, we set the acked field in the
2651 * packets and let rxi_Start remove them from the transmit queue.
2653 if (call->flags & RX_CALL_TQ_BUSY) {
2654 #ifdef RX_ENABLE_LOCKS
2655 rxi_SetAcksInTransmitQueue(call);
2658 return np; /* xmitting; drop packet */
2662 rxi_ClearTransmitQueue(call, 0);
2664 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2665 rxi_ClearTransmitQueue(call, 0);
2666 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2668 if (np->header.type == RX_PACKET_TYPE_ACK) {
2669 /* now check to see if this is an ack packet acknowledging that the
2670 * server actually *lost* some hard-acked data. If this happens we
2671 * ignore this packet, as it may indicate that the server restarted in
2672 * the middle of a call. It is also possible that this is an old ack
2673 * packet. We don't abort the connection in this case, because this
2674 * *might* just be an old ack packet. The right way to detect a server
2675 * restart in the midst of a call is to notice that the server epoch
2677 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2678 * XXX unacknowledged. I think that this is off-by-one, but
2679 * XXX I don't dare change it just yet, since it will
2680 * XXX interact badly with the server-restart detection
2681 * XXX code in receiveackpacket. */
2682 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2683 MUTEX_ENTER(&rx_stats_mutex);
2684 rx_stats.spuriousPacketsRead++;
2685 MUTEX_EXIT(&rx_stats_mutex);
2686 MUTEX_EXIT(&call->lock);
2687 MUTEX_ENTER(&conn->conn_data_lock);
2689 MUTEX_EXIT(&conn->conn_data_lock);
2693 } /* else not a data packet */
2696 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2697 /* Set remote user defined status from packet */
2698 call->remoteStatus = np->header.userStatus;
2700 /* Note the gap between the expected next packet and the actual
2701 * packet that arrived, when the new packet has a smaller serial number
2702 * than expected. Rioses frequently reorder packets all by themselves,
2703 * so this will be quite important with very large window sizes.
2704 * Skew is checked against 0 here to avoid any dependence on the type of
2705 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2707 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2708 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2709 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2711 MUTEX_ENTER(&conn->conn_data_lock);
2712 skew = conn->lastSerial - np->header.serial;
2713 conn->lastSerial = np->header.serial;
2714 MUTEX_EXIT(&conn->conn_data_lock);
2716 register struct rx_peer *peer;
2718 if (skew > peer->inPacketSkew) {
2719 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2720 peer->inPacketSkew = skew;
2724 /* Now do packet type-specific processing */
2725 switch (np->header.type) {
2726 case RX_PACKET_TYPE_DATA:
2727 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2730 case RX_PACKET_TYPE_ACK:
2731 /* Respond immediately to ack packets requesting acknowledgement
2733 if (np->header.flags & RX_REQUEST_ACK) {
2734 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2735 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2737 np = rxi_ReceiveAckPacket(call, np, 1);
2739 case RX_PACKET_TYPE_ABORT:
2740 /* An abort packet: reset the connection, passing the error up to
2742 /* What if error is zero? */
2743 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2745 case RX_PACKET_TYPE_BUSY:
2748 case RX_PACKET_TYPE_ACKALL:
2749 /* All packets acknowledged, so we can drop all packets previously
2750 * readied for sending */
2751 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2752 /* XXX Hack. We because we can't release the global rx lock when
2753 * sending packets (osi_NetSend) we drop all ack pkts while we're
2754 * traversing the tq in rxi_Start sending packets out because
2755 * packets may move to the freePacketQueue as result of being
2756 * here! So we drop these packets until we're safely out of the
2757 * traversing. Really ugly!
2758 * For fine grain RX locking, we set the acked field in the packets
2759 * and let rxi_Start remove the packets from the transmit queue.
2761 if (call->flags & RX_CALL_TQ_BUSY) {
2762 #ifdef RX_ENABLE_LOCKS
2763 rxi_SetAcksInTransmitQueue(call);
2765 #else /* RX_ENABLE_LOCKS */
2767 return np; /* xmitting; drop packet */
2768 #endif /* RX_ENABLE_LOCKS */
2770 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2771 rxi_ClearTransmitQueue(call, 0);
2774 /* Should not reach here, unless the peer is broken: send an abort
2776 rxi_CallError(call, RX_PROTOCOL_ERROR);
2777 np = rxi_SendCallAbort(call, np, 1, 0);
2780 /* Note when this last legitimate packet was received, for keep-alive
2781 * processing. Note, we delay getting the time until now in the hope that
2782 * the packet will be delivered to the user before any get time is required
2783 * (if not, then the time won't actually be re-evaluated here). */
2784 call->lastReceiveTime = clock_Sec();
2785 MUTEX_EXIT(&call->lock);
2786 MUTEX_ENTER(&conn->conn_data_lock);
2788 MUTEX_EXIT(&conn->conn_data_lock);
2792 /* return true if this is an "interesting" connection from the point of view
2793 of someone trying to debug the system */
2794 int rxi_IsConnInteresting(struct rx_connection *aconn)
2797 register struct rx_call *tcall;
2799 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2801 for(i=0;i<RX_MAXCALLS;i++) {
2802 tcall = aconn->call[i];
2804 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2806 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2814 /* if this is one of the last few packets AND it wouldn't be used by the
2815 receiving call to immediately satisfy a read request, then drop it on
2816 the floor, since accepting it might prevent a lock-holding thread from
2817 making progress in its reading. If a call has been cleared while in
2818 the precall state then ignore all subsequent packets until the call
2819 is assigned to a thread. */
2821 static TooLow(ap, acall)
2822 struct rx_call *acall;
2823 struct rx_packet *ap; {
2825 MUTEX_ENTER(&rx_stats_mutex);
2826 if (((ap->header.seq != 1) &&
2827 (acall->flags & RX_CALL_CLEARED) &&
2828 (acall->state == RX_STATE_PRECALL)) ||
2829 ((rx_nFreePackets < rxi_dataQuota+2) &&
2830 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2831 && (acall->flags & RX_CALL_READER_WAIT)))) {
2834 MUTEX_EXIT(&rx_stats_mutex);
2839 /* try to attach call, if authentication is complete */
2840 static void TryAttach(acall, socket, tnop, newcallp)
2841 register struct rx_call *acall;
2842 register osi_socket socket;
2844 register struct rx_call **newcallp; {
2845 register struct rx_connection *conn;
2847 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2848 /* Don't attach until we have any req'd. authentication. */
2849 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2850 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2851 /* Note: this does not necessarily succeed; there
2852 may not any proc available */
2855 rxi_ChallengeOn(acall->conn);
2860 /* A data packet has been received off the interface. This packet is
2861 * appropriate to the call (the call is in the right state, etc.). This
2862 * routine can return a packet to the caller, for re-use */
2864 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2865 port, tnop, newcallp)
2866 register struct rx_call *call;
2867 register struct rx_packet *np;
2873 struct rx_call **newcallp;
2879 afs_uint32 seq, serial, flags;
2881 struct rx_packet *tnp;
2883 MUTEX_ENTER(&rx_stats_mutex);
2884 rx_stats.dataPacketsRead++;
2885 MUTEX_EXIT(&rx_stats_mutex);
2888 /* If there are no packet buffers, drop this new packet, unless we can find
2889 * packet buffers from inactive calls */
2891 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2892 MUTEX_ENTER(&rx_freePktQ_lock);
2893 rxi_NeedMorePackets = TRUE;
2894 MUTEX_EXIT(&rx_freePktQ_lock);
2895 MUTEX_ENTER(&rx_stats_mutex);
2896 rx_stats.noPacketBuffersOnRead++;
2897 MUTEX_EXIT(&rx_stats_mutex);
2898 call->rprev = np->header.serial;
2899 rxi_calltrace(RX_TRACE_DROP, call);
2900 dpf (("packet %x dropped on receipt - quota problems", np));
2902 rxi_ClearReceiveQueue(call);
2903 clock_GetTime(&when);
2904 clock_Add(&when, &rx_softAckDelay);
2905 if (!call->delayedAckEvent ||
2906 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2907 rxevent_Cancel(call->delayedAckEvent, call,
2908 RX_CALL_REFCOUNT_DELAY);
2909 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2910 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2913 /* we've damaged this call already, might as well do it in. */
2919 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2920 * packet is one of several packets transmitted as a single
2921 * datagram. Do not send any soft or hard acks until all packets
2922 * in a jumbogram have been processed. Send negative acks right away.
2924 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2925 /* tnp is non-null when there are more packets in the
2926 * current jumbo gram */
2933 seq = np->header.seq;
2934 serial = np->header.serial;
2935 flags = np->header.flags;
2937 /* If the call is in an error state, send an abort message */
2939 return rxi_SendCallAbort(call, np, istack, 0);
2941 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2942 * AFS 3.5 jumbogram. */
2943 if (flags & RX_JUMBO_PACKET) {
2944 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2949 if (np->header.spare != 0) {
2950 MUTEX_ENTER(&call->conn->conn_data_lock);
2951 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2952 MUTEX_EXIT(&call->conn->conn_data_lock);
2955 /* The usual case is that this is the expected next packet */
2956 if (seq == call->rnext) {
2958 /* Check to make sure it is not a duplicate of one already queued */
2959 if (queue_IsNotEmpty(&call->rq)
2960 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2961 MUTEX_ENTER(&rx_stats_mutex);
2962 rx_stats.dupPacketsRead++;
2963 MUTEX_EXIT(&rx_stats_mutex);
2964 dpf (("packet %x dropped on receipt - duplicate", np));
2965 rxevent_Cancel(call->delayedAckEvent, call,
2966 RX_CALL_REFCOUNT_DELAY);
2967 np = rxi_SendAck(call, np, seq, serial,
2968 flags, RX_ACK_DUPLICATE, istack);
2974 /* It's the next packet. Stick it on the receive queue
2975 * for this call. Set newPackets to make sure we wake
2976 * the reader once all packets have been processed */
2977 queue_Prepend(&call->rq, np);
2979 np = NULL; /* We can't use this anymore */
2982 /* If an ack is requested then set a flag to make sure we
2983 * send an acknowledgement for this packet */
2984 if (flags & RX_REQUEST_ACK) {
2988 /* Keep track of whether we have received the last packet */
2989 if (flags & RX_LAST_PACKET) {
2990 call->flags |= RX_CALL_HAVE_LAST;
2994 /* Check whether we have all of the packets for this call */
2995 if (call->flags & RX_CALL_HAVE_LAST) {
2996 afs_uint32 tseq; /* temporary sequence number */
2997 struct rx_packet *tp; /* Temporary packet pointer */
2998 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3000 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3001 if (tseq != tp->header.seq)
3003 if (tp->header.flags & RX_LAST_PACKET) {
3004 call->flags |= RX_CALL_RECEIVE_DONE;
3011 /* Provide asynchronous notification for those who want it
3012 * (e.g. multi rx) */
3013 if (call->arrivalProc) {
3014 (*call->arrivalProc)(call, call->arrivalProcHandle,
3015 call->arrivalProcArg);
3016 call->arrivalProc = (VOID (*)()) 0;
3019 /* Update last packet received */
3022 /* If there is no server process serving this call, grab
3023 * one, if available. We only need to do this once. If a
3024 * server thread is available, this thread becomes a server
3025 * thread and the server thread becomes a listener thread. */
3027 TryAttach(call, socket, tnop, newcallp);
3030 /* This is not the expected next packet. */
3032 /* Determine whether this is a new or old packet, and if it's
3033 * a new one, whether it fits into the current receive window.
3034 * Also figure out whether the packet was delivered in sequence.
3035 * We use the prev variable to determine whether the new packet
3036 * is the successor of its immediate predecessor in the
3037 * receive queue, and the missing flag to determine whether
3038 * any of this packets predecessors are missing. */
3040 afs_uint32 prev; /* "Previous packet" sequence number */
3041 struct rx_packet *tp; /* Temporary packet pointer */
3042 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3043 int missing; /* Are any predecessors missing? */
3045 /* If the new packet's sequence number has been sent to the
3046 * application already, then this is a duplicate */
3047 if (seq < call->rnext) {
3048 MUTEX_ENTER(&rx_stats_mutex);
3049 rx_stats.dupPacketsRead++;
3050 MUTEX_EXIT(&rx_stats_mutex);
3051 rxevent_Cancel(call->delayedAckEvent, call,
3052 RX_CALL_REFCOUNT_DELAY);
3053 np = rxi_SendAck(call, np, seq, serial,
3054 flags, RX_ACK_DUPLICATE, istack);
3060 /* If the sequence number is greater than what can be
3061 * accomodated by the current window, then send a negative
3062 * acknowledge and drop the packet */
3063 if ((call->rnext + call->rwind) <= seq) {
3064 rxevent_Cancel(call->delayedAckEvent, call,
3065 RX_CALL_REFCOUNT_DELAY);
3066 np = rxi_SendAck(call, np, seq, serial,
3067 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3073 /* Look for the packet in the queue of old received packets */
3074 for (prev = call->rnext - 1, missing = 0,
3075 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3076 /*Check for duplicate packet */
3077 if (seq == tp->header.seq) {
3078 MUTEX_ENTER(&rx_stats_mutex);
3079 rx_stats.dupPacketsRead++;
3080 MUTEX_EXIT(&rx_stats_mutex);
3081 rxevent_Cancel(call->delayedAckEvent, call,
3082 RX_CALL_REFCOUNT_DELAY);
3083 np = rxi_SendAck(call, np, seq, serial,
3084 flags, RX_ACK_DUPLICATE, istack);
3089 /* If we find a higher sequence packet, break out and
3090 * insert the new packet here. */
3091 if (seq < tp->header.seq) break;
3092 /* Check for missing packet */
3093 if (tp->header.seq != prev+1) {
3097 prev = tp->header.seq;
3100 /* Keep track of whether we have received the last packet. */
3101 if (flags & RX_LAST_PACKET) {
3102 call->flags |= RX_CALL_HAVE_LAST;
3105 /* It's within the window: add it to the the receive queue.
3106 * tp is left by the previous loop either pointing at the
3107 * packet before which to insert the new packet, or at the
3108 * queue head if the queue is empty or the packet should be
3110 queue_InsertBefore(tp, np);
3114 /* Check whether we have all of the packets for this call */
3115 if ((call->flags & RX_CALL_HAVE_LAST)
3116 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3117 afs_uint32 tseq; /* temporary sequence number */
3119 for (tseq = call->rnext,
3120 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3121 if (tseq != tp->header.seq)
3123 if (tp->header.flags & RX_LAST_PACKET) {
3124 call->flags |= RX_CALL_RECEIVE_DONE;
3131 /* We need to send an ack of the packet is out of sequence,
3132 * or if an ack was requested by the peer. */
3133 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3137 /* Acknowledge the last packet for each call */
3138 if (flags & RX_LAST_PACKET) {
3149 * If the receiver is waiting for an iovec, fill the iovec
3150 * using the data from the receive queue */
3151 if (call->flags & RX_CALL_IOVEC_WAIT) {
3152 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3153 /* the call may have been aborted */
3162 /* Wakeup the reader if any */
3163 if ((call->flags & RX_CALL_READER_WAIT) &&
3164 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3165 (call->iovNext >= call->iovMax) ||
3166 (call->flags & RX_CALL_RECEIVE_DONE))) {
3167 call->flags &= ~RX_CALL_READER_WAIT;
3168 #ifdef RX_ENABLE_LOCKS
3169 CV_BROADCAST(&call->cv_rq);
3171 osi_rxWakeup(&call->rq);
3177 * Send an ack when requested by the peer, or once every
3178 * rxi_SoftAckRate packets until the last packet has been
3179 * received. Always send a soft ack for the last packet in
3180 * the server's reply. */
3182 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3183 np = rxi_SendAck(call, np, seq, serial, flags,
3184 RX_ACK_REQUESTED, istack);
3185 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3186 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3187 np = rxi_SendAck(call, np, seq, serial, flags,
3188 RX_ACK_IDLE, istack);
3189 } else if (call->nSoftAcks) {
3190 clock_GetTime(&when);
3191 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3192 clock_Add(&when, &rx_lastAckDelay);
3194 clock_Add(&when, &rx_softAckDelay);
3196 if (!call->delayedAckEvent ||
3197 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3198 rxevent_Cancel(call->delayedAckEvent, call,
3199 RX_CALL_REFCOUNT_DELAY);
3200 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3201 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3204 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3205 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3212 static void rxi_ComputeRate();
3215 /* The real smarts of the whole thing. */
3216 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3217 register struct rx_call *call;
3218 struct rx_packet *np;
3221 struct rx_ackPacket *ap;
3223 register struct rx_packet *tp;
3224 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3225 register struct rx_connection *conn = call->conn;
3226 struct rx_peer *peer = conn->peer;
3229 /* because there are CM's that are bogus, sending weird values for this. */
3230 afs_uint32 skew = 0;
3235 int newAckCount = 0;
3236 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3237 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3239 MUTEX_ENTER(&rx_stats_mutex);
3240 rx_stats.ackPacketsRead++;
3241 MUTEX_EXIT(&rx_stats_mutex);
3242 ap = (struct rx_ackPacket *) rx_DataOf(np);
3243 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3245 return np; /* truncated ack packet */
3247 /* depends on ack packet struct */
3248 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3249 first = ntohl(ap->firstPacket);
3250 serial = ntohl(ap->serial);
3251 /* temporarily disabled -- needs to degrade over time
3252 skew = ntohs(ap->maxSkew); */
3254 /* Ignore ack packets received out of order */
3255 if (first < call->tfirst) {
3259 if (np->header.flags & RX_SLOW_START_OK) {
3260 call->flags |= RX_CALL_SLOW_START_OK;
3266 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3267 ap->reason, ntohl(ap->previousPacket),
3268 (unsigned int) np->header.seq, (unsigned int) serial,
3269 (unsigned int) skew, ntohl(ap->firstPacket));
3272 for (offset = 0; offset < nAcks; offset++)
3273 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3279 /* if a server connection has been re-created, it doesn't remember what
3280 serial # it was up to. An ack will tell us, since the serial field
3281 contains the largest serial received by the other side */
3282 MUTEX_ENTER(&conn->conn_data_lock);
3283 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3284 conn->serial = serial+1;
3286 MUTEX_EXIT(&conn->conn_data_lock);
3288 /* Update the outgoing packet skew value to the latest value of
3289 * the peer's incoming packet skew value. The ack packet, of
3290 * course, could arrive out of order, but that won't affect things
3292 MUTEX_ENTER(&peer->peer_lock);
3293 peer->outPacketSkew = skew;
3295 /* Check for packets that no longer need to be transmitted, and
3296 * discard them. This only applies to packets positively
3297 * acknowledged as having been sent to the peer's upper level.
3298 * All other packets must be retained. So only packets with
3299 * sequence numbers < ap->firstPacket are candidates. */
3300 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3301 if (tp->header.seq >= first) break;
3302 call->tfirst = tp->header.seq + 1;
3303 if (tp->header.serial == serial) {
3304 /* Use RTT if not delayed by client. */
3305 if (ap->reason != RX_ACK_DELAY)
3306 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3308 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3311 else if (tp->firstSerial == serial) {
3312 /* Use RTT if not delayed by client. */
3313 if (ap->reason != RX_ACK_DELAY)
3314 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3316 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3319 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3320 /* XXX Hack. Because we have to release the global rx lock when sending
3321 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3322 * in rxi_Start sending packets out because packets may move to the
3323 * freePacketQueue as result of being here! So we drop these packets until
3324 * we're safely out of the traversing. Really ugly!
3325 * To make it even uglier, if we're using fine grain locking, we can
3326 * set the ack bits in the packets and have rxi_Start remove the packets
3327 * when it's done transmitting.
3332 if (call->flags & RX_CALL_TQ_BUSY) {
3333 #ifdef RX_ENABLE_LOCKS
3335 call->flags |= RX_CALL_TQ_SOME_ACKED;
3336 #else /* RX_ENABLE_LOCKS */
3338 #endif /* RX_ENABLE_LOCKS */
3340 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3343 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3348 /* Give rate detector a chance to respond to ping requests */
3349 if (ap->reason == RX_ACK_PING_RESPONSE) {
3350 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3354 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3356 /* Now go through explicit acks/nacks and record the results in
3357 * the waiting packets. These are packets that can't be released
3358 * yet, even with a positive acknowledge. This positive
3359 * acknowledge only means the packet has been received by the
3360 * peer, not that it will be retained long enough to be sent to
3361 * the peer's upper level. In addition, reset the transmit timers
3362 * of any missing packets (those packets that must be missing
3363 * because this packet was out of sequence) */
3365 call->nSoftAcked = 0;
3366 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3367 /* Update round trip time if the ack was stimulated on receipt
3369 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3370 #ifdef RX_ENABLE_LOCKS
3371 if (tp->header.seq >= first) {
3372 #endif /* RX_ENABLE_LOCKS */
3373 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3374 if (tp->header.serial == serial) {
3375 /* Use RTT if not delayed by client. */
3376 if (ap->reason != RX_ACK_DELAY)
3377 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3379 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3382 else if ((tp->firstSerial == serial)) {
3383 /* Use RTT if not delayed by client. */
3384 if (ap->reason != RX_ACK_DELAY)
3385 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3387 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3390 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3391 #ifdef RX_ENABLE_LOCKS
3393 #endif /* RX_ENABLE_LOCKS */
3394 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3396 /* Set the acknowledge flag per packet based on the
3397 * information in the ack packet. An acknowlegded packet can
3398 * be downgraded when the server has discarded a packet it
3399 * soacked previously, or when an ack packet is received
3400 * out of sequence. */
3401 if (tp->header.seq < first) {
3402 /* Implicit ack information */
3408 else if (tp->header.seq < first + nAcks) {
3409 /* Explicit ack information: set it in the packet appropriately */
3410 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3430 /* If packet isn't yet acked, and it has been transmitted at least
3431 * once, reset retransmit time using latest timeout
3432 * ie, this should readjust the retransmit timer for all outstanding
3433 * packets... So we don't just retransmit when we should know better*/
3435 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3436 tp->retryTime = tp->timeSent;
3437 clock_Add(&tp->retryTime, &peer->timeout);
3438 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3439 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3443 /* If the window has been extended by this acknowledge packet,
3444 * then wakeup a sender waiting in alloc for window space, or try
3445 * sending packets now, if he's been sitting on packets due to
3446 * lack of window space */
3447 if (call->tnext < (call->tfirst + call->twind)) {
3448 #ifdef RX_ENABLE_LOCKS
3449 CV_SIGNAL(&call->cv_twind);
3451 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3452 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3453 osi_rxWakeup(&call->twind);
3456 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3457 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3461 /* if the ack packet has a receivelen field hanging off it,
3462 * update our state */
3463 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3466 /* If the ack packet has a "recommended" size that is less than
3467 * what I am using now, reduce my size to match */
3468 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3469 sizeof(afs_int32), &tSize);
3470 tSize = (afs_uint32) ntohl(tSize);
3471 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3473 /* Get the maximum packet size to send to this peer */
3474 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3476 tSize = (afs_uint32)ntohl(tSize);
3477 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3478 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3480 /* sanity check - peer might have restarted with different params.
3481 * If peer says "send less", dammit, send less... Peer should never
3482 * be unable to accept packets of the size that prior AFS versions would
3483 * send without asking. */
3484 if (peer->maxMTU != tSize) {
3485 peer->maxMTU = tSize;
3486 peer->MTU = MIN(tSize, peer->MTU);
3487 call->MTU = MIN(call->MTU, tSize);
3491 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3493 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3494 sizeof(afs_int32), &tSize);
3495 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3496 if (tSize < call->twind) { /* smaller than our send */
3497 call->twind = tSize; /* window, we must send less... */
3498 call->ssthresh = MIN(call->twind, call->ssthresh);
3501 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3502 * network MTU confused with the loopback MTU. Calculate the
3503 * maximum MTU here for use in the slow start code below.
3505 maxMTU = peer->maxMTU;
3506 /* Did peer restart with older RX version? */
3507 if (peer->maxDgramPackets > 1) {
3508 peer->maxDgramPackets = 1;
3510 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3512 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3513 sizeof(afs_int32), &tSize);
3514 tSize = (afs_uint32) ntohl(tSize);
3516 * As of AFS 3.5 we set the send window to match the receive window.
3518 if (tSize < call->twind) {
3519 call->twind = tSize;
3520 call->ssthresh = MIN(call->twind, call->ssthresh);
3521 } else if (tSize > call->twind) {
3522 call->twind = tSize;
3526 * As of AFS 3.5, a jumbogram is more than one fixed size
3527 * packet transmitted in a single UDP datagram. If the remote
3528 * MTU is smaller than our local MTU then never send a datagram
3529 * larger than the natural MTU.
3531 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3532 sizeof(afs_int32), &tSize);
3533 maxDgramPackets = (afs_uint32) ntohl(tSize);
3534 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3535 maxDgramPackets = MIN(maxDgramPackets,
3536 (int)(peer->ifDgramPackets));
3537 maxDgramPackets = MIN(maxDgramPackets, tSize);
3538 if (maxDgramPackets > 1) {
3539 peer->maxDgramPackets = maxDgramPackets;
3540 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3542 peer->maxDgramPackets = 1;
3543 call->MTU = peer->natMTU;
3545 } else if (peer->maxDgramPackets > 1) {
3546 /* Restarted with lower version of RX */
3547 peer->maxDgramPackets = 1;
3549 } else if (peer->maxDgramPackets > 1 ||
3550 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3551 /* Restarted with lower version of RX */
3552 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3553 peer->natMTU = OLD_MAX_PACKET_SIZE;
3554 peer->MTU = OLD_MAX_PACKET_SIZE;
3555 peer->maxDgramPackets = 1;
3556 peer->nDgramPackets = 1;
3558 call->MTU = OLD_MAX_PACKET_SIZE;
3563 * Calculate how many datagrams were successfully received after
3564 * the first missing packet and adjust the negative ack counter
3569 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3570 if (call->nNacks < nNacked) {
3571 call->nNacks = nNacked;
3580 if (call->flags & RX_CALL_FAST_RECOVER) {
3582 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3584 call->flags &= ~RX_CALL_FAST_RECOVER;
3585 call->cwind = call->nextCwind;
3586 call->nextCwind = 0;
3589 call->nCwindAcks = 0;
3591 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3592 /* Three negative acks in a row trigger congestion recovery */
3593 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3594 MUTEX_EXIT(&peer->peer_lock);
3595 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3596 /* someone else is waiting to start recovery */
3599 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3600 while (call->flags & RX_CALL_TQ_BUSY) {
3601 call->flags |= RX_CALL_TQ_WAIT;
3602 #ifdef RX_ENABLE_LOCKS
3603 CV_WAIT(&call->cv_tq, &call->lock);
3604 #else /* RX_ENABLE_LOCKS */
3605 osi_rxSleep(&call->tq);
3606 #endif /* RX_ENABLE_LOCKS */
3608 MUTEX_ENTER(&peer->peer_lock);
3609 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3610 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3611 call->flags |= RX_CALL_FAST_RECOVER;
3612 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3613 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3615 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3616 call->nextCwind = call->ssthresh;
3619 peer->MTU = call->MTU;
3620 peer->cwind = call->nextCwind;
3621 peer->nDgramPackets = call->nDgramPackets;
3623 call->congestSeq = peer->congestSeq;
3624 /* Reset the resend times on the packets that were nacked
3625 * so we will retransmit as soon as the window permits*/
3626 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3629 clock_Zero(&tp->retryTime);
3631 } else if (tp->acked) {
3636 /* If cwind is smaller than ssthresh, then increase
3637 * the window one packet for each ack we receive (exponential
3639 * If cwind is greater than or equal to ssthresh then increase
3640 * the congestion window by one packet for each cwind acks we
3641 * receive (linear growth). */
3642 if (call->cwind < call->ssthresh) {
3643 call->cwind = MIN((int)call->ssthresh,
3644 (int)(call->cwind + newAckCount));
3645 call->nCwindAcks = 0;
3647 call->nCwindAcks += newAckCount;
3648 if (call->nCwindAcks >= call->cwind) {
3649 call->nCwindAcks = 0;
3650 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3654 * If we have received several acknowledgements in a row then
3655 * it is time to increase the size of our datagrams
3657 if ((int)call->nAcks > rx_nDgramThreshold) {
3658 if (peer->maxDgramPackets > 1) {
3659 if (call->nDgramPackets < peer->maxDgramPackets) {
3660 call->nDgramPackets++;
3662 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3663 } else if (call->MTU < peer->maxMTU) {
3664 call->MTU += peer->natMTU;
3665 call->MTU = MIN(call->MTU, peer->maxMTU);
3671 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3673 /* Servers need to hold the call until all response packets have
3674 * been acknowledged. Soft acks are good enough since clients
3675 * are not allowed to clear their receive queues. */
3676 if (call->state == RX_STATE_HOLD &&
3677 call->tfirst + call->nSoftAcked >= call->tnext) {
3678 call->state = RX_STATE_DALLY;
3679 rxi_ClearTransmitQueue(call, 0);
3680 } else if (!queue_IsEmpty(&call->tq)) {
3681 rxi_Start(0, call, istack);
3686 /* Received a response to a challenge packet */
3687 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3688 register struct rx_connection *conn;
3689 register struct rx_packet *np;
3694 /* Ignore the packet if we're the client */
3695 if (conn->type == RX_CLIENT_CONNECTION) return np;
3697 /* If already authenticated, ignore the packet (it's probably a retry) */
3698 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3701 /* Otherwise, have the security object evaluate the response packet */
3702 error = RXS_CheckResponse(conn->securityObject, conn, np);
3704 /* If the response is invalid, reset the connection, sending
3705 * an abort to the peer */
3709 rxi_ConnectionError(conn, error);
3710 MUTEX_ENTER(&conn->conn_data_lock);
3711 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3712 MUTEX_EXIT(&conn->conn_data_lock);
3716 /* If the response is valid, any calls waiting to attach
3717 * servers can now do so */
3719 for (i=0; i<RX_MAXCALLS; i++) {
3720 struct rx_call *call = conn->call[i];
3722 MUTEX_ENTER(&call->lock);
3723 if (call->state == RX_STATE_PRECALL)
3724 rxi_AttachServerProc(call, -1, NULL, NULL);
3725 MUTEX_EXIT(&call->lock);
3732 /* A client has received an authentication challenge: the security
3733 * object is asked to cough up a respectable response packet to send
3734 * back to the server. The server is responsible for retrying the
3735 * challenge if it fails to get a response. */
3738 rxi_ReceiveChallengePacket(conn, np, istack)
3739 register struct rx_connection *conn;
3740 register struct rx_packet *np;
3745 /* Ignore the challenge if we're the server */
3746 if (conn->type == RX_SERVER_CONNECTION) return np;
3748 /* Ignore the challenge if the connection is otherwise idle; someone's
3749 * trying to use us as an oracle. */
3750 if (!rxi_HasActiveCalls(conn)) return np;
3752 /* Send the security object the challenge packet. It is expected to fill
3753 * in the response. */
3754 error = RXS_GetResponse(conn->securityObject, conn, np);
3756 /* If the security object is unable to return a valid response, reset the
3757 * connection and send an abort to the peer. Otherwise send the response
3758 * packet to the peer connection. */
3760 rxi_ConnectionError(conn, error);
3761 MUTEX_ENTER(&conn->conn_data_lock);
3762 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3763 MUTEX_EXIT(&conn->conn_data_lock);
3766 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3767 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3773 /* Find an available server process to service the current request in
3774 * the given call structure. If one isn't available, queue up this
3775 * call so it eventually gets one */
3777 rxi_AttachServerProc(call, socket, tnop, newcallp)
3778 register struct rx_call *call;
3779 register osi_socket socket;
3781 register struct rx_call **newcallp;
3783 register struct rx_serverQueueEntry *sq;
3784 register struct rx_service *service = call->conn->service;
3785 #ifdef RX_ENABLE_LOCKS
3786 register int haveQuota = 0;
3787 #endif /* RX_ENABLE_LOCKS */
3788 /* May already be attached */
3789 if (call->state == RX_STATE_ACTIVE) return;
3791 MUTEX_ENTER(&rx_serverPool_lock);
3792 #ifdef RX_ENABLE_LOCKS
3793 while(rxi_ServerThreadSelectingCall) {
3794 MUTEX_EXIT(&call->lock);
3795 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3796 MUTEX_EXIT(&rx_serverPool_lock);
3797 MUTEX_ENTER(&call->lock);
3798 MUTEX_ENTER(&rx_serverPool_lock);
3799 /* Call may have been attached */
3800 if (call->state == RX_STATE_ACTIVE) return;
3803 haveQuota = QuotaOK(service);
3804 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3805 /* If there are no processes available to service this call,
3806 * put the call on the incoming call queue (unless it's
3807 * already on the queue).
3810 ReturnToServerPool(service);
3811 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3812 call->flags |= RX_CALL_WAIT_PROC;
3813 MUTEX_ENTER(&rx_stats_mutex);
3815 MUTEX_EXIT(&rx_stats_mutex);
3816 rxi_calltrace(RX_CALL_ARRIVAL, call);
3817 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3818 queue_Append(&rx_incomingCallQueue, call);
3821 #else /* RX_ENABLE_LOCKS */
3822 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3823 /* If there are no processes available to service this call,
3824 * put the call on the incoming call queue (unless it's
3825 * already on the queue).
3827 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3828 call->flags |= RX_CALL_WAIT_PROC;
3830 rxi_calltrace(RX_CALL_ARRIVAL, call);
3831 queue_Append(&rx_incomingCallQueue, call);
3834 #endif /* RX_ENABLE_LOCKS */
3836 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3838 /* If hot threads are enabled, and both newcallp and sq->socketp
3839 * are non-null, then this thread will process the call, and the
3840 * idle server thread will start listening on this threads socket.
3843 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3846 *sq->socketp = socket;
3847 clock_GetTime(&call->startTime);
3848 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3852 if (call->flags & RX_CALL_WAIT_PROC) {
3853 /* Conservative: I don't think this should happen */
3854 call->flags &= ~RX_CALL_WAIT_PROC;
3855 MUTEX_ENTER(&rx_stats_mutex);
3857 MUTEX_EXIT(&rx_stats_mutex);
3860 call->state = RX_STATE_ACTIVE;
3861 call->mode = RX_MODE_RECEIVING;
3862 if (call->flags & RX_CALL_CLEARED) {
3863 /* send an ack now to start the packet flow up again */
3864 call->flags &= ~RX_CALL_CLEARED;
3865 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3867 #ifdef RX_ENABLE_LOCKS
3870 service->nRequestsRunning++;
3871 if (service->nRequestsRunning <= service->minProcs)
3877 MUTEX_EXIT(&rx_serverPool_lock);
3880 /* Delay the sending of an acknowledge event for a short while, while
3881 * a new call is being prepared (in the case of a client) or a reply
3882 * is being prepared (in the case of a server). Rather than sending
3883 * an ack packet, an ACKALL packet is sent. */
3884 void rxi_AckAll(event, call, dummy)
3885 struct rxevent *event;
3886 register struct rx_call *call;
3889 #ifdef RX_ENABLE_LOCKS
3891 MUTEX_ENTER(&call->lock);
3892 call->delayedAckEvent = (struct rxevent *) 0;
3893 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3895 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3896 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3898 MUTEX_EXIT(&call->lock);
3899 #else /* RX_ENABLE_LOCKS */
3900 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3901 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3902 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3903 #endif /* RX_ENABLE_LOCKS */
3906 void rxi_SendDelayedAck(event, call, dummy)
3907 struct rxevent *event;
3908 register struct rx_call *call;
3911 #ifdef RX_ENABLE_LOCKS
3913 MUTEX_ENTER(&call->lock);
3914 if (event == call->delayedAckEvent)
3915 call->delayedAckEvent = (struct rxevent *) 0;
3916 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3918 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3920 MUTEX_EXIT(&call->lock);
3921 #else /* RX_ENABLE_LOCKS */
3922 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3923 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3924 #endif /* RX_ENABLE_LOCKS */
3928 #ifdef RX_ENABLE_LOCKS
3929 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3930 * clearing them out.
3932 static void rxi_SetAcksInTransmitQueue(call)
3933 register struct rx_call *call;
3935 register struct rx_packet *p, *tp;
3938 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3945 call->flags |= RX_CALL_TQ_CLEARME;
3946 call->flags |= RX_CALL_TQ_SOME_ACKED;
3949 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3950 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3951 call->tfirst = call->tnext;
3952 call->nSoftAcked = 0;
3954 if (call->flags & RX_CALL_FAST_RECOVER) {
3955 call->flags &= ~RX_CALL_FAST_RECOVER;
3956 call->cwind = call->nextCwind;
3957 call->nextCwind = 0;
3960 CV_SIGNAL(&call->cv_twind);
3962 #endif /* RX_ENABLE_LOCKS */
3964 /* Clear out the transmit queue for the current call (all packets have
3965 * been received by peer) */
3966 void rxi_ClearTransmitQueue(call, force)
3967 register struct rx_call *call;
3970 register struct rx_packet *p, *tp;
3972 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3973 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3975 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3982 call->flags |= RX_CALL_TQ_CLEARME;
3983 call->flags |= RX_CALL_TQ_SOME_ACKED;
3986 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3987 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3993 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3994 call->flags &= ~RX_CALL_TQ_CLEARME;
3996 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3998 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3999 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4000 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4001 call->nSoftAcked = 0;
4003 if (call->flags & RX_CALL_FAST_RECOVER) {
4004 call->flags &= ~RX_CALL_FAST_RECOVER;
4005 call->cwind = call->nextCwind;
4008 #ifdef RX_ENABLE_LOCKS
4009 CV_SIGNAL(&call->cv_twind);
4011 osi_rxWakeup(&call->twind);
4015 void rxi_ClearReceiveQueue(call)
4016 register struct rx_call *call;
4018 register struct rx_packet *p, *tp;
4019 if (queue_IsNotEmpty(&call->rq)) {
4020 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4025 rx_packetReclaims++;
4027 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4029 if (call->state == RX_STATE_PRECALL) {
4030 call->flags |= RX_CALL_CLEARED;
4034 /* Send an abort packet for the specified call */
4035 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4036 register struct rx_call *call;
4037 struct rx_packet *packet;
4047 /* Clients should never delay abort messages */
4048 if (rx_IsClientConn(call->conn))
4051 if (call->abortCode != call->error) {
4052 call->abortCode = call->error;
4053 call->abortCount = 0;
4056 if (force || rxi_callAbortThreshhold == 0 ||
4057 call->abortCount < rxi_callAbortThreshhold) {
4058 if (call->delayedAbortEvent) {
4059 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4061 error = htonl(call->error);
4063 packet = rxi_SendSpecial(call, call->conn, packet,
4064 RX_PACKET_TYPE_ABORT, (char *)&error,
4065 sizeof(error), istack);
4066 } else if (!call->delayedAbortEvent) {
4067 clock_GetTime(&when);
4068 clock_Addmsec(&when, rxi_callAbortDelay);
4069 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4070 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4076 /* Send an abort packet for the specified connection. Packet is an
4077 * optional pointer to a packet that can be used to send the abort.
4078 * Once the number of abort messages reaches the threshhold, an
4079 * event is scheduled to send the abort. Setting the force flag
4080 * overrides sending delayed abort messages.
4082 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4083 * to send the abort packet.
4085 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4086 register struct rx_connection *conn;
4087 struct rx_packet *packet;
4097 /* Clients should never delay abort messages */
4098 if (rx_IsClientConn(conn))
4101 if (force || rxi_connAbortThreshhold == 0 ||
4102 conn->abortCount < rxi_connAbortThreshhold) {
4103 if (conn->delayedAbortEvent) {
4104 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4106 error = htonl(conn->error);
4108 MUTEX_EXIT(&conn->conn_data_lock);
4109 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4110 RX_PACKET_TYPE_ABORT, (char *)&error,
4111 sizeof(error), istack);
4112 MUTEX_ENTER(&conn->conn_data_lock);
4113 } else if (!conn->delayedAbortEvent) {
4114 clock_GetTime(&when);
4115 clock_Addmsec(&when, rxi_connAbortDelay);
4116 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4122 /* Associate an error all of the calls owned by a connection. Called
4123 * with error non-zero. This is only for really fatal things, like
4124 * bad authentication responses. The connection itself is set in
4125 * error at this point, so that future packets received will be
4127 void rxi_ConnectionError(conn, error)
4128 register struct rx_connection *conn;
4129 register afs_int32 error;
4133 if (conn->challengeEvent)
4134 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4135 for (i=0; i<RX_MAXCALLS; i++) {
4136 struct rx_call *call = conn->call[i];
4138 MUTEX_ENTER(&call->lock);
4139 rxi_CallError(call, error);
4140 MUTEX_EXIT(&call->lock);
4143 conn->error = error;
4144 MUTEX_ENTER(&rx_stats_mutex);
4145 rx_stats.fatalErrors++;
4146 MUTEX_EXIT(&rx_stats_mutex);
4150 void rxi_CallError(call, error)
4151 register struct rx_call *call;
4154 if (call->error) error = call->error;
4155 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4156 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4157 rxi_ResetCall(call, 0);
4160 rxi_ResetCall(call, 0);
4162 call->error = error;
4163 call->mode = RX_MODE_ERROR;
4166 /* Reset various fields in a call structure, and wakeup waiting
4167 * processes. Some fields aren't changed: state & mode are not
4168 * touched (these must be set by the caller), and bufptr, nLeft, and
4169 * nFree are not reset, since these fields are manipulated by
4170 * unprotected macros, and may only be reset by non-interrupting code.
4173 /* this code requires that call->conn be set properly as a pre-condition. */
4174 #endif /* ADAPT_WINDOW */
4176 void rxi_ResetCall(call, newcall)
4177 register struct rx_call *call;
4178 register int newcall;
4181 register struct rx_peer *peer;
4182 struct rx_packet *packet;
4184 /* Notify anyone who is waiting for asynchronous packet arrival */
4185 if (call->arrivalProc) {
4186 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4187 call->arrivalProc = (VOID (*)()) 0;
4190 if (call->delayedAbortEvent) {
4191 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4192 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4194 rxi_SendCallAbort(call, packet, 0, 1);
4195 rxi_FreePacket(packet);
4200 * Update the peer with the congestion information in this call
4201 * so other calls on this connection can pick up where this call
4202 * left off. If the congestion sequence numbers don't match then
4203 * another call experienced a retransmission.
4205 peer = call->conn->peer;
4206 MUTEX_ENTER(&peer->peer_lock);
4208 if (call->congestSeq == peer->congestSeq) {
4209 peer->cwind = MAX(peer->cwind, call->cwind);
4210 peer->MTU = MAX(peer->MTU, call->MTU);
4211 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4214 call->abortCode = 0;
4215 call->abortCount = 0;
4217 if (peer->maxDgramPackets > 1) {
4218 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4220 call->MTU = peer->MTU;
4222 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4223 call->ssthresh = rx_maxSendWindow;
4224 call->nDgramPackets = peer->nDgramPackets;
4225 call->congestSeq = peer->congestSeq;
4226 MUTEX_EXIT(&peer->peer_lock);
4228 flags = call->flags;
4229 rxi_ClearReceiveQueue(call);
4230 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4231 if (call->flags & RX_CALL_TQ_BUSY) {
4232 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4233 call->flags |= (flags & RX_CALL_TQ_WAIT);
4235 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4237 rxi_ClearTransmitQueue(call, 0);
4238 queue_Init(&call->tq);
4241 queue_Init(&call->rq);
4243 call->rwind = rx_initReceiveWindow;
4244 call->twind = rx_initSendWindow;
4245 call->nSoftAcked = 0;
4246 call->nextCwind = 0;
4249 call->nCwindAcks = 0;
4250 call->nSoftAcks = 0;
4251 call->nHardAcks = 0;
4253 call->tfirst = call->rnext = call->tnext = 1;
4255 call->lastAcked = 0;
4256 call->localStatus = call->remoteStatus = 0;
4258 if (flags & RX_CALL_READER_WAIT) {
4259 #ifdef RX_ENABLE_LOCKS
4260 CV_BROADCAST(&call->cv_rq);
4262 osi_rxWakeup(&call->rq);
4265 if (flags & RX_CALL_WAIT_PACKETS) {
4266 MUTEX_ENTER(&rx_freePktQ_lock);
4267 rxi_PacketsUnWait(); /* XXX */
4268 MUTEX_EXIT(&rx_freePktQ_lock);
4271 #ifdef RX_ENABLE_LOCKS
4272 CV_SIGNAL(&call->cv_twind);
4274 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4275 osi_rxWakeup(&call->twind);
4278 #ifdef RX_ENABLE_LOCKS
4279 /* The following ensures that we don't mess with any queue while some
4280 * other thread might also be doing so. The call_queue_lock field is
4281 * is only modified under the call lock. If the call is in the process
4282 * of being removed from a queue, the call is not locked until the
4283 * the queue lock is dropped and only then is the call_queue_lock field
4284 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4285 * Note that any other routine which removes a call from a queue has to
4286 * obtain the queue lock before examing the queue and removing the call.
4288 if (call->call_queue_lock) {
4289 MUTEX_ENTER(call->call_queue_lock);
4290 if (queue_IsOnQueue(call)) {
4292 if (flags & RX_CALL_WAIT_PROC) {
4293 MUTEX_ENTER(&rx_stats_mutex);
4295 MUTEX_EXIT(&rx_stats_mutex);
4298 MUTEX_EXIT(call->call_queue_lock);
4299 CLEAR_CALL_QUEUE_LOCK(call);
4301 #else /* RX_ENABLE_LOCKS */
4302 if (queue_IsOnQueue(call)) {
4304 if (flags & RX_CALL_WAIT_PROC)
4307 #endif /* RX_ENABLE_LOCKS */
4309 rxi_KeepAliveOff(call);
4310 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4313 /* Send an acknowledge for the indicated packet (seq,serial) of the
4314 * indicated call, for the indicated reason (reason). This
4315 * acknowledge will specifically acknowledge receiving the packet, and
4316 * will also specify which other packets for this call have been
4317 * received. This routine returns the packet that was used to the
4318 * caller. The caller is responsible for freeing it or re-using it.
4319 * This acknowledgement also returns the highest sequence number
4320 * actually read out by the higher level to the sender; the sender
4321 * promises to keep around packets that have not been read by the
4322 * higher level yet (unless, of course, the sender decides to abort
4323 * the call altogether). Any of p, seq, serial, pflags, or reason may
4324 * be set to zero without ill effect. That is, if they are zero, they
4325 * will not convey any information.
4326 * NOW there is a trailer field, after the ack where it will safely be
4327 * ignored by mundanes, which indicates the maximum size packet this
4328 * host can swallow. */
4329 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4330 register struct rx_call *call;
4331 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4332 int seq; /* Sequence number of the packet we are acking */
4333 int serial; /* Serial number of the packet */
4334 int pflags; /* Flags field from packet header */
4335 int reason; /* Reason an acknowledge was prompted */
4338 struct rx_ackPacket *ap;
4339 register struct rx_packet *rqp;
4340 register struct rx_packet *nxp; /* For queue_Scan */
4341 register struct rx_packet *p;
4346 * Open the receive window once a thread starts reading packets
4348 if (call->rnext > 1) {
4349 call->rwind = rx_maxReceiveWindow;
4352 call->nHardAcks = 0;
4353 call->nSoftAcks = 0;
4354 if (call->rnext > call->lastAcked)
4355 call->lastAcked = call->rnext;
4359 rx_computelen(p, p->length); /* reset length, you never know */
4360 } /* where that's been... */
4362 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4363 /* We won't send the ack, but don't panic. */
4364 return optionalPacket;
4367 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4369 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4370 if (!optionalPacket) rxi_FreePacket(p);
4371 return optionalPacket;
4373 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4374 if (rx_Contiguous(p)<templ) {
4375 if (!optionalPacket) rxi_FreePacket(p);
4376 return optionalPacket;
4378 } /* MTUXXX failing to send an ack is very serious. We should */
4379 /* try as hard as possible to send even a partial ack; it's */
4380 /* better than nothing. */
4382 ap = (struct rx_ackPacket *) rx_DataOf(p);
4383 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4384 ap->reason = reason;
4386 /* The skew computation used to be bogus, I think it's better now. */
4387 /* We should start paying attention to skew. XXX */
4388 ap->serial = htonl(call->conn->maxSerial);
4389 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4391 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4392 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4394 /* No fear of running out of ack packet here because there can only be at most
4395 * one window full of unacknowledged packets. The window size must be constrained
4396 * to be less than the maximum ack size, of course. Also, an ack should always
4397 * fit into a single packet -- it should not ever be fragmented. */
4398 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4399 if (!rqp || !call->rq.next
4400 || (rqp->header.seq > (call->rnext + call->rwind))) {
4401 if (!optionalPacket) rxi_FreePacket(p);
4402 rxi_CallError(call, RX_CALL_DEAD);
4403 return optionalPacket;
4406 while (rqp->header.seq > call->rnext + offset)
4407 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4408 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4410 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4411 if (!optionalPacket) rxi_FreePacket(p);
4412 rxi_CallError(call, RX_CALL_DEAD);
4413 return optionalPacket;
4418 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4420 /* these are new for AFS 3.3 */
4421 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4422 templ = htonl(templ);
4423 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4424 templ = htonl(call->conn->peer->ifMTU);
4425 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4427 /* new for AFS 3.4 */
4428 templ = htonl(call->rwind);
4429 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4431 /* new for AFS 3.5 */
4432 templ = htonl(call->conn->peer->ifDgramPackets);
4433 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4435 p->header.serviceId = call->conn->serviceId;
4436 p->header.cid = (call->conn->cid | call->channel);
4437 p->header.callNumber = *call->callNumber;
4438 p->header.seq = seq;
4439 p->header.securityIndex = call->conn->securityIndex;
4440 p->header.epoch = call->conn->epoch;
4441 p->header.type = RX_PACKET_TYPE_ACK;
4442 p->header.flags = RX_SLOW_START_OK;
4443 if (reason == RX_ACK_PING) {
4444 p->header.flags |= RX_REQUEST_ACK;
4446 clock_GetTime(&call->pingRequestTime);
4449 if (call->conn->type == RX_CLIENT_CONNECTION)
4450 p->header.flags |= RX_CLIENT_INITIATED;
4454 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4455 ap->reason, ntohl(ap->previousPacket),
4456 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4458 for (offset = 0; offset < ap->nAcks; offset++)
4459 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4466 register int i, nbytes = p->length;
4468 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4469 if (nbytes <= p->wirevec[i].iov_len) {
4470 register int savelen, saven;
4472 savelen = p->wirevec[i].iov_len;
4474 p->wirevec[i].iov_len = nbytes;
4476 rxi_Send(call, p, istack);
4477 p->wirevec[i].iov_len = savelen;
4481 else nbytes -= p->wirevec[i].iov_len;
4484 MUTEX_ENTER(&rx_stats_mutex);
4485 rx_stats.ackPacketsSent++;
4486 MUTEX_EXIT(&rx_stats_mutex);
4487 if (!optionalPacket) rxi_FreePacket(p);
4488 return optionalPacket; /* Return packet for re-use by caller */
4491 /* Send all of the packets in the list in single datagram */
4492 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4493 struct rx_call *call;
4494 struct rx_packet **list;
4499 struct clock *retryTime;
4505 struct rx_connection *conn = call->conn;
4506 struct rx_peer *peer = conn->peer;
4508 MUTEX_ENTER(&peer->peer_lock);
4510 if (resending) peer->reSends += len;
4511 MUTEX_ENTER(&rx_stats_mutex);
4512 rx_stats.dataPacketsSent += len;
4513 MUTEX_EXIT(&rx_stats_mutex);
4514 MUTEX_EXIT(&peer->peer_lock);
4516 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4520 /* Set the packet flags and schedule the resend events */
4521 /* Only request an ack for the last packet in the list */
4522 for (i = 0 ; i < len ; i++) {
4523 list[i]->retryTime = *retryTime;
4524 if (list[i]->header.serial) {
4525 /* Exponentially backoff retry times */
4526 if (list[i]->backoff < MAXBACKOFF) {
4527 /* so it can't stay == 0 */
4528 list[i]->backoff = (list[i]->backoff << 1) +1;
4530 else list[i]->backoff++;
4531 clock_Addmsec(&(list[i]->retryTime),
4532 ((afs_uint32) list[i]->backoff) << 8);
4535 /* Wait a little extra for the ack on the last packet */
4536 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4537 clock_Addmsec(&(list[i]->retryTime), 400);
4540 /* Record the time sent */
4541 list[i]->timeSent = *now;
4543 /* Ask for an ack on retransmitted packets, on every other packet
4544 * if the peer doesn't support slow start. Ask for an ack on every
4545 * packet until the congestion window reaches the ack rate. */
4546 if (list[i]->header.serial) {
4548 MUTEX_ENTER(&rx_stats_mutex);
4549 rx_stats.dataPacketsReSent++;
4550 MUTEX_EXIT(&rx_stats_mutex);
4552 /* improved RTO calculation- not Karn */
4553 list[i]->firstSent = *now;
4555 && (call->cwind <= (u_short)(conn->ackRate+1)
4556 || (!(call->flags & RX_CALL_SLOW_START_OK)
4557 && (list[i]->header.seq & 1)))) {
4562 MUTEX_ENTER(&peer->peer_lock);
4564 if (resending) peer->reSends++;
4565 MUTEX_ENTER(&rx_stats_mutex);
4566 rx_stats.dataPacketsSent++;
4567 MUTEX_EXIT(&rx_stats_mutex);
4568 MUTEX_EXIT(&peer->peer_lock);
4570 /* Tag this packet as not being the last in this group,
4571 * for the receiver's benefit */
4572 if (i < len-1 || moreFlag) {
4573 list[i]->header.flags |= RX_MORE_PACKETS;
4576 /* Install the new retransmit time for the packet, and
4577 * record the time sent */
4578 list[i]->timeSent = *now;
4582 list[len-1]->header.flags |= RX_REQUEST_ACK;
4585 /* Since we're about to send a data packet to the peer, it's
4586 * safe to nuke any scheduled end-of-packets ack */
4587 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4589 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4590 MUTEX_EXIT(&call->lock);
4592 rxi_SendPacketList(conn, list, len, istack);
4594 rxi_SendPacket(conn, list[0], istack);
4596 MUTEX_ENTER(&call->lock);
4597 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4599 /* Update last send time for this call (for keep-alive
4600 * processing), and for the connection (so that we can discover
4601 * idle connections) */
4602 conn->lastSendTime = call->lastSendTime = clock_Sec();
4605 /* When sending packets we need to follow these rules:
4606 * 1. Never send more than maxDgramPackets in a jumbogram.
4607 * 2. Never send a packet with more than two iovecs in a jumbogram.
4608 * 3. Never send a retransmitted packet in a jumbogram.
4609 * 4. Never send more than cwind/4 packets in a jumbogram
4610 * We always keep the last list we should have sent so we
4611 * can set the RX_MORE_PACKETS flags correctly.
4613 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4614 struct rx_call *call;
4615 struct rx_packet **list;
4619 struct clock *retryTime;
4622 int i, cnt, lastCnt = 0;
4623 struct rx_packet **listP, **lastP = 0;
4624 struct rx_peer *peer = call->conn->peer;
4625 int morePackets = 0;
4627 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4628 /* Does the current packet force us to flush the current list? */
4630 && (list[i]->header.serial
4632 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4634 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4635 /* If the call enters an error state stop sending, or if
4636 * we entered congestion recovery mode, stop sending */
4637 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4645 /* Add the current packet to the list if it hasn't been acked.
4646 * Otherwise adjust the list pointer to skip the current packet. */
4647 if (!list[i]->acked) {
4649 /* Do we need to flush the list? */
4650 if (cnt >= (int)peer->maxDgramPackets
4651 || cnt >= (int)call->nDgramPackets
4652 || cnt >= (int)call->cwind
4653 || list[i]->header.serial
4654 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4656 rxi_SendList(call, lastP, lastCnt, istack, 1,
4657 now, retryTime, resending);
4658 /* If the call enters an error state stop sending, or if
4659 * we entered congestion recovery mode, stop sending */
4660 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4670 osi_Panic("rxi_SendList error");
4676 /* Send the whole list when the call is in receive mode, when
4677 * the call is in eof mode, when we are in fast recovery mode,
4678 * and when we have the last packet */
4679 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4680 || call->mode == RX_MODE_RECEIVING
4681 || call->mode == RX_MODE_EOF
4682 || (call->flags & RX_CALL_FAST_RECOVER)) {
4683 /* Check for the case where the current list contains
4684 * an acked packet. Since we always send retransmissions
4685 * in a separate packet, we only need to check the first
4686 * packet in the list */
4687 if (cnt > 0 && !listP[0]->acked) {
4691 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4692 now, retryTime, resending);
4693 /* If the call enters an error state stop sending, or if
4694 * we entered congestion recovery mode, stop sending */
4695 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4699 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4701 } else if (lastCnt > 0) {
4702 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4706 #ifdef RX_ENABLE_LOCKS
4707 /* Call rxi_Start, below, but with the call lock held. */
4708 void rxi_StartUnlocked(event, call, istack)
4709 struct rxevent *event;
4710 register struct rx_call *call;
4713 MUTEX_ENTER(&call->lock);
4714 rxi_Start(event, call, istack);
4715 MUTEX_EXIT(&call->lock);
4717 #endif /* RX_ENABLE_LOCKS */
4719 /* This routine is called when new packets are readied for
4720 * transmission and when retransmission may be necessary, or when the
4721 * transmission window or burst count are favourable. This should be
4722 * better optimized for new packets, the usual case, now that we've
4723 * got rid of queues of send packets. XXXXXXXXXXX */
4724 void rxi_Start(event, call, istack)
4725 struct rxevent *event;
4726 register struct rx_call *call;
4729 struct rx_packet *p;
4730 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4731 struct rx_peer *peer = call->conn->peer;
4732 struct clock now, retryTime;
4736 struct rx_packet **xmitList;
4739 /* If rxi_Start is being called as a result of a resend event,
4740 * then make sure that the event pointer is removed from the call
4741 * structure, since there is no longer a per-call retransmission
4743 if (event && event == call->resendEvent) {
4744 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4745 call->resendEvent = NULL;
4747 if (queue_IsEmpty(&call->tq)) {
4751 /* Timeouts trigger congestion recovery */
4752 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4753 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4754 /* someone else is waiting to start recovery */
4757 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4758 while (call->flags & RX_CALL_TQ_BUSY) {
4759 call->flags |= RX_CALL_TQ_WAIT;
4760 #ifdef RX_ENABLE_LOCKS
4761 CV_WAIT(&call->cv_tq, &call->lock);
4762 #else /* RX_ENABLE_LOCKS */
4763 osi_rxSleep(&call->tq);
4764 #endif /* RX_ENABLE_LOCKS */
4766 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4767 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4768 call->flags |= RX_CALL_FAST_RECOVER;
4769 if (peer->maxDgramPackets > 1) {
4770 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4772 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4774 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4775 call->nDgramPackets = 1;
4777 call->nextCwind = 1;
4780 MUTEX_ENTER(&peer->peer_lock);
4781 peer->MTU = call->MTU;
4782 peer->cwind = call->cwind;
4783 peer->nDgramPackets = 1;
4785 call->congestSeq = peer->congestSeq;
4786 MUTEX_EXIT(&peer->peer_lock);
4787 /* Clear retry times on packets. Otherwise, it's possible for
4788 * some packets in the queue to force resends at rates faster
4789 * than recovery rates.
4791 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4793 clock_Zero(&p->retryTime);
4798 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4799 MUTEX_ENTER(&rx_stats_mutex);
4800 rx_tq_debug.rxi_start_in_error ++;
4801 MUTEX_EXIT(&rx_stats_mutex);
4806 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4807 /* Get clock to compute the re-transmit time for any packets
4808 * in this burst. Note, if we back off, it's reasonable to
4809 * back off all of the packets in the same manner, even if
4810 * some of them have been retransmitted more times than more
4811 * recent additions */
4812 clock_GetTime(&now);
4813 retryTime = now; /* initialize before use */
4814 MUTEX_ENTER(&peer->peer_lock);
4815 clock_Add(&retryTime, &peer->timeout);
4816 MUTEX_EXIT(&peer->peer_lock);
4818 /* Send (or resend) any packets that need it, subject to
4819 * window restrictions and congestion burst control
4820 * restrictions. Ask for an ack on the last packet sent in
4821 * this burst. For now, we're relying upon the window being
4822 * considerably bigger than the largest number of packets that
4823 * are typically sent at once by one initial call to
4824 * rxi_Start. This is probably bogus (perhaps we should ask
4825 * for an ack when we're half way through the current
4826 * window?). Also, for non file transfer applications, this
4827 * may end up asking for an ack for every packet. Bogus. XXXX
4830 * But check whether we're here recursively, and let the other guy
4833 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4834 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4835 call->flags |= RX_CALL_TQ_BUSY;
4837 call->flags &= ~RX_CALL_NEED_START;
4838 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4840 maxXmitPackets = MIN(call->twind, call->cwind);
4841 xmitList = (struct rx_packet **)
4842 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4843 if (xmitList == NULL)
4844 osi_Panic("rxi_Start, failed to allocate xmit list");
4845 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4846 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4847 /* We shouldn't be sending packets if a thread is waiting
4848 * to initiate congestion recovery */
4851 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4852 /* Only send one packet during fast recovery */
4855 if ((p->header.flags == RX_FREE_PACKET) ||
4856 (!queue_IsEnd(&call->tq, nxp)
4857 && (nxp->header.flags == RX_FREE_PACKET)) ||
4858 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4859 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4860 osi_Panic("rxi_Start: xmit queue clobbered");
4863 MUTEX_ENTER(&rx_stats_mutex);
4864 rx_stats.ignoreAckedPacket++;
4865 MUTEX_EXIT(&rx_stats_mutex);
4866 continue; /* Ignore this packet if it has been acknowledged */
4869 /* Turn off all flags except these ones, which are the same
4870 * on each transmission */
4871 p->header.flags &= RX_PRESET_FLAGS;
4873 if (p->header.seq >= call->tfirst +
4874 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4875 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4876 /* Note: if we're waiting for more window space, we can
4877 * still send retransmits; hence we don't return here, but
4878 * break out to schedule a retransmit event */
4879 dpf(("call %d waiting for window", *(call->callNumber)));
4883 /* Transmit the packet if it needs to be sent. */
4884 if (!clock_Lt(&now, &p->retryTime)) {
4885 if (nXmitPackets == maxXmitPackets) {
4886 osi_Panic("rxi_Start: xmit list overflowed");
4888 xmitList[nXmitPackets++] = p;
4892 /* xmitList now hold pointers to all of the packets that are
4893 * ready to send. Now we loop to send the packets */
4894 if (nXmitPackets > 0) {
4895 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4896 &now, &retryTime, resending);
4898 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4900 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4902 * TQ references no longer protected by this flag; they must remain
4903 * protected by the global lock.
4905 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4906 call->flags &= ~RX_CALL_TQ_BUSY;
4907 if (call->flags & RX_CALL_TQ_WAIT) {
4908 call->flags &= ~RX_CALL_TQ_WAIT;
4909 #ifdef RX_ENABLE_LOCKS
4910 CV_BROADCAST(&call->cv_tq);
4911 #else /* RX_ENABLE_LOCKS */
4912 osi_rxWakeup(&call->tq);
4913 #endif /* RX_ENABLE_LOCKS */
4918 /* We went into the error state while sending packets. Now is
4919 * the time to reset the call. This will also inform the using
4920 * process that the call is in an error state.
4922 MUTEX_ENTER(&rx_stats_mutex);
4923 rx_tq_debug.rxi_start_aborted ++;
4924 MUTEX_EXIT(&rx_stats_mutex);
4925 call->flags &= ~RX_CALL_TQ_BUSY;
4926 if (call->flags & RX_CALL_TQ_WAIT) {
4927 call->flags &= ~RX_CALL_TQ_WAIT;
4928 #ifdef RX_ENABLE_LOCKS
4929 CV_BROADCAST(&call->cv_tq);
4930 #else /* RX_ENABLE_LOCKS */
4931 osi_rxWakeup(&call->tq);
4932 #endif /* RX_ENABLE_LOCKS */
4934 rxi_CallError(call, call->error);
4937 #ifdef RX_ENABLE_LOCKS
4938 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4939 register int missing;
4940 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4941 /* Some packets have received acks. If they all have, we can clear
4942 * the transmit queue.
4944 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4945 if (p->header.seq < call->tfirst && p->acked) {
4953 call->flags |= RX_CALL_TQ_CLEARME;
4955 #endif /* RX_ENABLE_LOCKS */
4956 /* Don't bother doing retransmits if the TQ is cleared. */
4957 if (call->flags & RX_CALL_TQ_CLEARME) {
4958 rxi_ClearTransmitQueue(call, 1);
4960 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4963 /* Always post a resend event, if there is anything in the
4964 * queue, and resend is possible. There should be at least
4965 * one unacknowledged packet in the queue ... otherwise none
4966 * of these packets should be on the queue in the first place.
4968 if (call->resendEvent) {
4969 /* Cancel the existing event and post a new one */
4970 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4973 /* The retry time is the retry time on the first unacknowledged
4974 * packet inside the current window */
4975 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4976 /* Don't set timers for packets outside the window */
4977 if (p->header.seq >= call->tfirst + call->twind) {
4981 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4983 retryTime = p->retryTime;
4988 /* Post a new event to re-run rxi_Start when retries may be needed */
4989 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4990 #ifdef RX_ENABLE_LOCKS
4991 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4992 call->resendEvent = rxevent_Post(&retryTime,
4994 (char *)call, istack);
4995 #else /* RX_ENABLE_LOCKS */
4996 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4997 (char *)call, (void*)(long)istack);
4998 #endif /* RX_ENABLE_LOCKS */
5001 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5002 } while (call->flags & RX_CALL_NEED_START);
5004 * TQ references no longer protected by this flag; they must remain
5005 * protected by the global lock.
5007 call->flags &= ~RX_CALL_TQ_BUSY;
5008 if (call->flags & RX_CALL_TQ_WAIT) {
5009 call->flags &= ~RX_CALL_TQ_WAIT;
5010 #ifdef RX_ENABLE_LOCKS
5011 CV_BROADCAST(&call->cv_tq);
5012 #else /* RX_ENABLE_LOCKS */
5013 osi_rxWakeup(&call->tq);
5014 #endif /* RX_ENABLE_LOCKS */
5017 call->flags |= RX_CALL_NEED_START;
5019 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5021 if (call->resendEvent) {
5022 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5027 /* Also adjusts the keep alive parameters for the call, to reflect
5028 * that we have just sent a packet (so keep alives aren't sent
5030 void rxi_Send(call, p, istack)
5031 register struct rx_call *call;
5032 register struct rx_packet *p;
5035 register struct rx_connection *conn = call->conn;
5037 /* Stamp each packet with the user supplied status */
5038 p->header.userStatus = call->localStatus;
5040 /* Allow the security object controlling this call's security to
5041 * make any last-minute changes to the packet */
5042 RXS_SendPacket(conn->securityObject, call, p);
5044 /* Since we're about to send SOME sort of packet to the peer, it's
5045 * safe to nuke any scheduled end-of-packets ack */
5046 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5048 /* Actually send the packet, filling in more connection-specific fields */
5049 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5050 MUTEX_EXIT(&call->lock);
5051 rxi_SendPacket(conn, p, istack);
5052 MUTEX_ENTER(&call->lock);
5053 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5055 /* Update last send time for this call (for keep-alive
5056 * processing), and for the connection (so that we can discover
5057 * idle connections) */
5058 conn->lastSendTime = call->lastSendTime = clock_Sec();
5062 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5063 * that things are fine. Also called periodically to guarantee that nothing
5064 * falls through the cracks (e.g. (error + dally) connections have keepalive
5065 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5068 #ifdef RX_ENABLE_LOCKS
5069 int rxi_CheckCall(call, haveCTLock)
5070 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5071 #else /* RX_ENABLE_LOCKS */
5072 int rxi_CheckCall(call)
5073 #endif /* RX_ENABLE_LOCKS */
5074 register struct rx_call *call;
5076 register struct rx_connection *conn = call->conn;
5077 register struct rx_service *tservice;
5079 afs_uint32 deadTime;
5081 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5082 if (call->flags & RX_CALL_TQ_BUSY) {
5083 /* Call is active and will be reset by rxi_Start if it's
5084 * in an error state.
5089 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5090 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5091 ((afs_uint32)conn->peer->rtt >> 3) +
5092 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5094 /* These are computed to the second (+- 1 second). But that's
5095 * good enough for these values, which should be a significant
5096 * number of seconds. */
5097 if (now > (call->lastReceiveTime + deadTime)) {
5098 if (call->state == RX_STATE_ACTIVE) {
5099 rxi_CallError(call, RX_CALL_DEAD);
5103 #ifdef RX_ENABLE_LOCKS
5104 /* Cancel pending events */
5105 rxevent_Cancel(call->delayedAckEvent, call,
5106 RX_CALL_REFCOUNT_DELAY);
5107 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5108 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5109 if (call->refCount == 0) {
5110 rxi_FreeCall(call, haveCTLock);
5114 #else /* RX_ENABLE_LOCKS */
5117 #endif /* RX_ENABLE_LOCKS */
5119 /* Non-active calls are destroyed if they are not responding
5120 * to pings; active calls are simply flagged in error, so the
5121 * attached process can die reasonably gracefully. */
5123 /* see if we have a non-activity timeout */
5124 tservice = conn->service;
5125 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5126 && tservice->idleDeadTime
5127 && ((call->startWait + tservice->idleDeadTime) < now)) {
5128 if (call->state == RX_STATE_ACTIVE) {
5129 rxi_CallError(call, RX_CALL_TIMEOUT);
5133 /* see if we have a hard timeout */
5134 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5135 if (call->state == RX_STATE_ACTIVE)
5136 rxi_CallError(call, RX_CALL_TIMEOUT);
5143 /* When a call is in progress, this routine is called occasionally to
5144 * make sure that some traffic has arrived (or been sent to) the peer.
5145 * If nothing has arrived in a reasonable amount of time, the call is
5146 * declared dead; if nothing has been sent for a while, we send a
5147 * keep-alive packet (if we're actually trying to keep the call alive)
5149 void rxi_KeepAliveEvent(event, call, dummy)
5150 struct rxevent *event;
5151 register struct rx_call *call;
5153 struct rx_connection *conn;
5156 MUTEX_ENTER(&call->lock);
5157 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5158 if (event == call->keepAliveEvent)
5159 call->keepAliveEvent = (struct rxevent *) 0;
5162 #ifdef RX_ENABLE_LOCKS
5163 if(rxi_CheckCall(call, 0)) {
5164 MUTEX_EXIT(&call->lock);
5167 #else /* RX_ENABLE_LOCKS */
5168 if (rxi_CheckCall(call)) return;
5169 #endif /* RX_ENABLE_LOCKS */
5171 /* Don't try to keep alive dallying calls */
5172 if (call->state == RX_STATE_DALLY) {
5173 MUTEX_EXIT(&call->lock);
5178 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5179 /* Don't try to send keepalives if there is unacknowledged data */
5180 /* the rexmit code should be good enough, this little hack
5181 * doesn't quite work XXX */
5182 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5184 rxi_ScheduleKeepAliveEvent(call);
5185 MUTEX_EXIT(&call->lock);
5189 void rxi_ScheduleKeepAliveEvent(call)
5190 register struct rx_call *call;
5192 if (!call->keepAliveEvent) {
5194 clock_GetTime(&when);
5195 when.sec += call->conn->secondsUntilPing;
5196 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5197 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5201 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5202 void rxi_KeepAliveOn(call)
5203 register struct rx_call *call;
5205 /* Pretend last packet received was received now--i.e. if another
5206 * packet isn't received within the keep alive time, then the call
5207 * will die; Initialize last send time to the current time--even
5208 * if a packet hasn't been sent yet. This will guarantee that a
5209 * keep-alive is sent within the ping time */
5210 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5211 rxi_ScheduleKeepAliveEvent(call);
5214 /* This routine is called to send connection abort messages
5215 * that have been delayed to throttle looping clients. */
5216 void rxi_SendDelayedConnAbort(event, conn, dummy)
5217 struct rxevent *event;
5218 register struct rx_connection *conn;
5222 struct rx_packet *packet;
5224 MUTEX_ENTER(&conn->conn_data_lock);
5225 conn->delayedAbortEvent = (struct rxevent *) 0;
5226 error = htonl(conn->error);
5228 MUTEX_EXIT(&conn->conn_data_lock);
5229 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5231 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5232 RX_PACKET_TYPE_ABORT, (char *)&error,
5234 rxi_FreePacket(packet);
5238 /* This routine is called to send call abort messages
5239 * that have been delayed to throttle looping clients. */
5240 void rxi_SendDelayedCallAbort(event, call, dummy)
5241 struct rxevent *event;
5242 register struct rx_call *call;
5246 struct rx_packet *packet;
5248 MUTEX_ENTER(&call->lock);
5249 call->delayedAbortEvent = (struct rxevent *) 0;
5250 error = htonl(call->error);
5252 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5254 packet = rxi_SendSpecial(call, call->conn, packet,
5255 RX_PACKET_TYPE_ABORT, (char *)&error,
5257 rxi_FreePacket(packet);
5259 MUTEX_EXIT(&call->lock);
5262 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5263 * seconds) to ask the client to authenticate itself. The routine
5264 * issues a challenge to the client, which is obtained from the
5265 * security object associated with the connection */
5266 void rxi_ChallengeEvent(event, conn, dummy)
5267 struct rxevent *event;
5268 register struct rx_connection *conn;
5271 conn->challengeEvent = (struct rxevent *) 0;
5272 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5273 register struct rx_packet *packet;
5275 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5277 /* If there's no packet available, do this later. */
5278 RXS_GetChallenge(conn->securityObject, conn, packet);
5279 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5280 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5281 rxi_FreePacket(packet);
5283 clock_GetTime(&when);
5284 when.sec += RX_CHALLENGE_TIMEOUT;
5285 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5289 /* Call this routine to start requesting the client to authenticate
5290 * itself. This will continue until authentication is established,
5291 * the call times out, or an invalid response is returned. The
5292 * security object associated with the connection is asked to create
5293 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5294 * defined earlier. */
5295 void rxi_ChallengeOn(conn)
5296 register struct rx_connection *conn;
5298 if (!conn->challengeEvent) {
5299 RXS_CreateChallenge(conn->securityObject, conn);
5300 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5305 /* Compute round trip time of the packet provided, in *rttp.
5308 /* rxi_ComputeRoundTripTime is called with peer locked. */
5309 void rxi_ComputeRoundTripTime(p, sentp, peer)
5310 register struct clock *sentp; /* may be null */
5311 register struct rx_peer *peer; /* may be null */
5312 register struct rx_packet *p;
5314 struct clock thisRtt, *rttp = &thisRtt;
5316 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5317 /* making year 2038 bugs to get this running now - stroucki */
5318 struct timeval temptime;
5320 register int rtt_timeout;
5322 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5323 /* yet again. This was the worst Heisenbug of the port - stroucki */
5324 clock_GetTime(&temptime);
5325 rttp->sec=(afs_int32)temptime.tv_sec;
5326 rttp->usec=(afs_int32)temptime.tv_usec;
5328 clock_GetTime(rttp);
5330 if (clock_Lt(rttp, sentp)) {
5332 return; /* somebody set the clock back, don't count this time. */
5334 clock_Sub(rttp, sentp);
5335 MUTEX_ENTER(&rx_stats_mutex);
5336 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5337 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5338 if (rttp->sec > 60) {
5339 MUTEX_EXIT(&rx_stats_mutex);
5340 return; /* somebody set the clock ahead */
5342 rx_stats.maxRtt = *rttp;
5344 clock_Add(&rx_stats.totalRtt, rttp);
5345 rx_stats.nRttSamples++;
5346 MUTEX_EXIT(&rx_stats_mutex);
5348 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5350 /* Apply VanJacobson round-trip estimations */
5355 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5356 * srtt is stored as fixed point with 3 bits after the binary
5357 * point (i.e., scaled by 8). The following magic is
5358 * equivalent to the smoothing algorithm in rfc793 with an
5359 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5360 * srtt*8 = srtt*8 + rtt - srtt
5361 * srtt = srtt + rtt/8 - srtt/8
5364 delta = MSEC(rttp) - (peer->rtt >> 3);
5368 * We accumulate a smoothed rtt variance (actually, a smoothed
5369 * mean difference), then set the retransmit timer to smoothed
5370 * rtt + 4 times the smoothed variance (was 2x in van's original
5371 * paper, but 4x works better for me, and apparently for him as
5373 * rttvar is stored as
5374 * fixed point with 2 bits after the binary point (scaled by
5375 * 4). The following is equivalent to rfc793 smoothing with
5376 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5377 * replaces rfc793's wired-in beta.
5378 * dev*4 = dev*4 + (|actual - expected| - dev)
5384 delta -= (peer->rtt_dev >> 2);
5385 peer->rtt_dev += delta;
5388 /* I don't have a stored RTT so I start with this value. Since I'm
5389 * probably just starting a call, and will be pushing more data down
5390 * this, I expect congestion to increase rapidly. So I fudge a
5391 * little, and I set deviance to half the rtt. In practice,
5392 * deviance tends to approach something a little less than
5393 * half the smoothed rtt. */
5394 peer->rtt = (MSEC(rttp) << 3) + 8;
5395 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5397 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5398 * the other of these connections is usually in a user process, and can
5399 * be switched and/or swapped out. So on fast, reliable networks, the
5400 * timeout would otherwise be too short.
5402 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5403 clock_Zero(&(peer->timeout));
5404 clock_Addmsec(&(peer->timeout), rtt_timeout);
5406 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5407 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5408 (peer->timeout.sec),(peer->timeout.usec)) );
5412 /* Find all server connections that have not been active for a long time, and
5414 void rxi_ReapConnections()
5417 clock_GetTime(&now);
5419 /* Find server connection structures that haven't been used for
5420 * greater than rx_idleConnectionTime */
5421 { struct rx_connection **conn_ptr, **conn_end;
5422 int i, havecalls = 0;
5423 MUTEX_ENTER(&rx_connHashTable_lock);
5424 for (conn_ptr = &rx_connHashTable[0],
5425 conn_end = &rx_connHashTable[rx_hashTableSize];
5426 conn_ptr < conn_end; conn_ptr++) {
5427 struct rx_connection *conn, *next;
5428 struct rx_call *call;
5432 for (conn = *conn_ptr; conn; conn = next) {
5433 /* XXX -- Shouldn't the connection be locked? */
5436 for(i=0;i<RX_MAXCALLS;i++) {
5437 call = conn->call[i];
5440 MUTEX_ENTER(&call->lock);
5441 #ifdef RX_ENABLE_LOCKS
5442 result = rxi_CheckCall(call, 1);
5443 #else /* RX_ENABLE_LOCKS */
5444 result = rxi_CheckCall(call);
5445 #endif /* RX_ENABLE_LOCKS */
5446 MUTEX_EXIT(&call->lock);
5448 /* If CheckCall freed the call, it might
5449 * have destroyed the connection as well,
5450 * which screws up the linked lists.
5456 if (conn->type == RX_SERVER_CONNECTION) {
5457 /* This only actually destroys the connection if
5458 * there are no outstanding calls */
5459 MUTEX_ENTER(&conn->conn_data_lock);
5460 if (!havecalls && !conn->refCount &&
5461 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5462 conn->refCount++; /* it will be decr in rx_DestroyConn */
5463 MUTEX_EXIT(&conn->conn_data_lock);
5464 #ifdef RX_ENABLE_LOCKS
5465 rxi_DestroyConnectionNoLock(conn);
5466 #else /* RX_ENABLE_LOCKS */
5467 rxi_DestroyConnection(conn);
5468 #endif /* RX_ENABLE_LOCKS */
5470 #ifdef RX_ENABLE_LOCKS
5472 MUTEX_EXIT(&conn->conn_data_lock);
5474 #endif /* RX_ENABLE_LOCKS */
5478 #ifdef RX_ENABLE_LOCKS
5479 while (rx_connCleanup_list) {
5480 struct rx_connection *conn;
5481 conn = rx_connCleanup_list;
5482 rx_connCleanup_list = rx_connCleanup_list->next;
5483 MUTEX_EXIT(&rx_connHashTable_lock);
5484 rxi_CleanupConnection(conn);
5485 MUTEX_ENTER(&rx_connHashTable_lock);
5487 MUTEX_EXIT(&rx_connHashTable_lock);
5488 #endif /* RX_ENABLE_LOCKS */
5491 /* Find any peer structures that haven't been used (haven't had an
5492 * associated connection) for greater than rx_idlePeerTime */
5493 { struct rx_peer **peer_ptr, **peer_end;
5495 MUTEX_ENTER(&rx_rpc_stats);
5496 MUTEX_ENTER(&rx_peerHashTable_lock);
5497 for (peer_ptr = &rx_peerHashTable[0],
5498 peer_end = &rx_peerHashTable[rx_hashTableSize];
5499 peer_ptr < peer_end; peer_ptr++) {
5500 struct rx_peer *peer, *next, *prev;
5501 for (prev = peer = *peer_ptr; peer; peer = next) {
5503 code = MUTEX_TRYENTER(&peer->peer_lock);
5504 if ((code) && (peer->refCount == 0)
5505 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5506 rx_interface_stat_p rpc_stat, nrpc_stat;
5508 MUTEX_EXIT(&peer->peer_lock);
5509 MUTEX_DESTROY(&peer->peer_lock);
5510 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5511 rx_interface_stat)) {
5512 unsigned int num_funcs;
5513 if (!rpc_stat) break;
5514 queue_Remove(&rpc_stat->queue_header);
5515 queue_Remove(&rpc_stat->all_peers);
5516 num_funcs = rpc_stat->stats[0].func_total;
5517 space = sizeof(rx_interface_stat_t) +
5518 rpc_stat->stats[0].func_total *
5519 sizeof(rx_function_entry_v1_t);
5521 rxi_Free(rpc_stat, space);
5522 rxi_rpc_peer_stat_cnt -= num_funcs;
5525 MUTEX_ENTER(&rx_stats_mutex);
5526 rx_stats.nPeerStructs--;
5527 MUTEX_EXIT(&rx_stats_mutex);
5528 if (prev == *peer_ptr) {
5537 MUTEX_EXIT(&peer->peer_lock);
5543 MUTEX_EXIT(&rx_peerHashTable_lock);
5544 MUTEX_EXIT(&rx_rpc_stats);
5547 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5548 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5549 GC, just below. Really, we shouldn't have to keep moving packets from
5550 one place to another, but instead ought to always know if we can
5551 afford to hold onto a packet in its particular use. */
5552 MUTEX_ENTER(&rx_freePktQ_lock);
5553 if (rx_waitingForPackets) {
5554 rx_waitingForPackets = 0;
5555 #ifdef RX_ENABLE_LOCKS
5556 CV_BROADCAST(&rx_waitingForPackets_cv);
5558 osi_rxWakeup(&rx_waitingForPackets);
5561 MUTEX_EXIT(&rx_freePktQ_lock);
5563 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5564 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5568 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5569 * rx.h is sort of strange this is better. This is called with a security
5570 * object before it is discarded. Each connection using a security object has
5571 * its own refcount to the object so it won't actually be freed until the last
5572 * connection is destroyed.
5574 * This is the only rxs module call. A hold could also be written but no one
5577 int rxs_Release (aobj)
5578 struct rx_securityClass *aobj;
5580 return RXS_Close (aobj);
5584 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5585 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5586 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5587 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5589 /* Adjust our estimate of the transmission rate to this peer, given
5590 * that the packet p was just acked. We can adjust peer->timeout and
5591 * call->twind. Pragmatically, this is called
5592 * only with packets of maximal length.
5593 * Called with peer and call locked.
5596 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5597 register struct rx_peer *peer;
5598 register struct rx_call *call;
5599 struct rx_packet *p, *ackp;
5602 afs_int32 xferSize, xferMs;
5603 register afs_int32 minTime;
5606 /* Count down packets */
5607 if (peer->rateFlag > 0) peer->rateFlag--;
5608 /* Do nothing until we're enabled */
5609 if (peer->rateFlag != 0) return;
5610 if (!call->conn) return;
5612 /* Count only when the ack seems legitimate */
5613 switch (ackReason) {
5614 case RX_ACK_REQUESTED:
5615 xferSize = p->length + RX_HEADER_SIZE +
5616 call->conn->securityMaxTrailerSize;
5620 case RX_ACK_PING_RESPONSE:
5621 if (p) /* want the response to ping-request, not data send */
5623 clock_GetTime(&newTO);
5624 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5625 clock_Sub(&newTO, &call->pingRequestTime);
5626 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5630 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5637 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5638 ntohl(peer->host), ntohs(peer->port),
5639 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5640 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5643 /* Track only packets that are big enough. */
5644 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5648 /* absorb RTT data (in milliseconds) for these big packets */
5649 if (peer->smRtt == 0) {
5650 peer->smRtt = xferMs;
5652 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5653 if (!peer->smRtt) peer->smRtt = 1;
5656 if (peer->countDown) {
5660 peer->countDown = 10; /* recalculate only every so often */
5662 /* In practice, we can measure only the RTT for full packets,
5663 * because of the way Rx acks the data that it receives. (If it's
5664 * smaller than a full packet, it often gets implicitly acked
5665 * either by the call response (from a server) or by the next call
5666 * (from a client), and either case confuses transmission times
5667 * with processing times.) Therefore, replace the above
5668 * more-sophisticated processing with a simpler version, where the
5669 * smoothed RTT is kept for full-size packets, and the time to
5670 * transmit a windowful of full-size packets is simply RTT *
5671 * windowSize. Again, we take two steps:
5672 - ensure the timeout is large enough for a single packet's RTT;
5673 - ensure that the window is small enough to fit in the desired timeout.*/
5675 /* First, the timeout check. */
5676 minTime = peer->smRtt;
5677 /* Get a reasonable estimate for a timeout period */
5679 newTO.sec = minTime / 1000;
5680 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5682 /* Increase the timeout period so that we can always do at least
5683 * one packet exchange */
5684 if (clock_Gt(&newTO, &peer->timeout)) {
5686 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5687 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5688 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5691 peer->timeout = newTO;
5694 /* Now, get an estimate for the transmit window size. */
5695 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5696 /* Now, convert to the number of full packets that could fit in a
5697 * reasonable fraction of that interval */
5698 minTime /= (peer->smRtt << 1);
5699 xferSize = minTime; /* (make a copy) */
5701 /* Now clamp the size to reasonable bounds. */
5702 if (minTime <= 1) minTime = 1;
5703 else if (minTime > rx_Window) minTime = rx_Window;
5704 /* if (minTime != peer->maxWindow) {
5705 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5706 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5707 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5709 peer->maxWindow = minTime;
5710 elide... call->twind = minTime;
5714 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5715 * Discern this by calculating the timeout necessary for rx_Window
5717 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5718 /* calculate estimate for transmission interval in milliseconds */
5719 minTime = rx_Window * peer->smRtt;
5720 if (minTime < 1000) {
5721 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5722 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5723 peer->timeout.usec, peer->smRtt,
5726 newTO.sec = 0; /* cut back on timeout by half a second */
5727 newTO.usec = 500000;
5728 clock_Sub(&peer->timeout, &newTO);
5733 } /* end of rxi_ComputeRate */
5734 #endif /* ADAPT_WINDOW */
5742 /* Don't call this debugging routine directly; use dpf */
5744 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5745 a11, a12, a13, a14, a15)
5749 clock_GetTime(&now);
5750 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5751 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5758 * This function is used to process the rx_stats structure that is local
5759 * to a process as well as an rx_stats structure received from a remote
5760 * process (via rxdebug). Therefore, it needs to do minimal version
5763 void rx_PrintTheseStats (file, s, size, freePackets, version)
5766 int size; /* some idea of version control */
5767 afs_int32 freePackets;
5772 if (size != sizeof(struct rx_stats)) {
5774 "Unexpected size of stats structure: was %d, expected %d\n",
5775 size, sizeof(struct rx_stats));
5779 "rx stats: free packets %d, allocs %d, ",
5783 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5785 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5786 s->receivePktAllocFailures,
5787 s->receiveCbufPktAllocFailures,
5788 s->sendPktAllocFailures,
5789 s->sendCbufPktAllocFailures,
5790 s->specialPktAllocFailures);
5793 "alloc-failures(rcv %d,send %d,ack %d)\n",
5794 s->receivePktAllocFailures,
5795 s->sendPktAllocFailures,
5796 s->specialPktAllocFailures);
5801 "bogusReads %d (last from host %x), "
5807 s->bogusPacketOnRead,
5810 s->noPacketBuffersOnRead,
5814 fprintf(file, " packets read: ");
5815 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5821 fprintf(file, "\n");
5824 " other read counters: data %d, "
5832 s->spuriousPacketsRead,
5833 s->ignorePacketDally);
5835 fprintf(file, " packets sent: ");
5836 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5842 fprintf(file, "\n");
5845 " other send counters: ack %d, "
5846 "data %d (not resends), "
5849 "acked&ignored %d\n",
5852 s->dataPacketsReSent,
5853 s->dataPacketsPushed,
5854 s->ignoreAckedPacket);
5857 " \t(these should be small) sendFailed %d, "
5860 (int) s->fatalErrors);
5862 if (s->nRttSamples) {
5864 " Average rtt is %0.3f, with %d samples\n",
5865 clock_Float(&s->totalRtt)/s->nRttSamples,
5869 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5870 clock_Float(&s->minRtt),
5871 clock_Float(&s->maxRtt));
5875 " %d server connections, "
5876 "%d client connections, "
5879 "%d free call structs\n",
5884 s->nFreeCallStructs);
5886 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5888 " %d clock updates\n",
5894 /* for backward compatibility */
5895 void rx_PrintStats(file)
5898 MUTEX_ENTER(&rx_stats_mutex);
5899 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5900 MUTEX_EXIT(&rx_stats_mutex);
5903 void rx_PrintPeerStats(file, peer)
5905 struct rx_peer *peer;
5910 "burst wait %u.%d.\n",
5913 (int) peer->burstSize,
5914 (int) peer->burstWait.sec,
5915 (int) peer->burstWait.usec);
5919 "retry time %u.%06d, "
5923 (int) peer->timeout.sec,
5924 (int) peer->timeout.usec,
5930 "max in packet skew %d, "
5931 "max out packet skew %d\n",
5933 (int) peer->inPacketSkew,
5934 (int) peer->outPacketSkew);
5937 #ifdef AFS_PTHREAD_ENV
5939 * This mutex protects the following static variables:
5943 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5944 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5946 #define LOCK_RX_DEBUG
5947 #define UNLOCK_RX_DEBUG
5948 #endif /* AFS_PTHREAD_ENV */
5950 static int MakeDebugCall(
5952 afs_uint32 remoteAddr,
5953 afs_uint16 remotePort,
5961 static afs_int32 counter = 100;
5963 struct rx_header theader;
5965 register afs_int32 code;
5967 struct sockaddr_in taddr, faddr;
5972 endTime = time(0) + 20; /* try for 20 seconds */
5976 tp = &tbuffer[sizeof(struct rx_header)];
5977 taddr.sin_family = AF_INET;
5978 taddr.sin_port = remotePort;
5979 taddr.sin_addr.s_addr = remoteAddr;
5981 memset(&theader, 0, sizeof(theader));
5982 theader.epoch = htonl(999);
5984 theader.callNumber = htonl(counter);
5987 theader.type = type;
5988 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5989 theader.serviceId = 0;
5991 memcpy(tbuffer, &theader, sizeof(theader));
5992 memcpy(tp, inputData, inputLength);
5993 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5994 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5996 /* see if there's a packet available */
5998 FD_SET(socket, &imask);
6001 code = select(socket+1, &imask, 0, 0, &tv);
6003 /* now receive a packet */
6004 faddrLen = sizeof(struct sockaddr_in);
6005 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6006 (struct sockaddr *) &faddr, &faddrLen);
6008 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6009 if (counter == ntohl(theader.callNumber)) break;
6012 /* see if we've timed out */
6013 if (endTime < time(0)) return -1;
6015 code -= sizeof(struct rx_header);
6016 if (code > outputLength) code = outputLength;
6017 memcpy(outputData, tp, code);
6021 afs_int32 rx_GetServerDebug(
6023 afs_uint32 remoteAddr,
6024 afs_uint16 remotePort,
6025 struct rx_debugStats *stat,
6026 afs_uint32 *supportedValues
6029 struct rx_debugIn in;
6032 *supportedValues = 0;
6033 in.type = htonl(RX_DEBUGI_GETSTATS);
6036 rc = MakeDebugCall(socket,
6039 RX_PACKET_TYPE_DEBUG,
6046 * If the call was successful, fixup the version and indicate
6047 * what contents of the stat structure are valid.
6048 * Also do net to host conversion of fields here.
6052 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6053 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6055 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6056 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6058 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6059 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6061 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6062 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6064 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6065 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6067 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6068 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6070 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6071 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6074 stat->nFreePackets = ntohl(stat->nFreePackets);
6075 stat->packetReclaims = ntohl(stat->packetReclaims);
6076 stat->callsExecuted = ntohl(stat->callsExecuted);
6077 stat->nWaiting = ntohl(stat->nWaiting);
6078 stat->idleThreads = ntohl(stat->idleThreads);
6084 afs_int32 rx_GetServerStats(
6086 afs_uint32 remoteAddr,
6087 afs_uint16 remotePort,
6088 struct rx_stats *stat,
6089 afs_uint32 *supportedValues
6092 struct rx_debugIn in;
6093 afs_int32 *lp = (afs_int32 *) stat;
6098 * supportedValues is currently unused, but added to allow future
6099 * versioning of this function.
6102 *supportedValues = 0;
6103 in.type = htonl(RX_DEBUGI_RXSTATS);
6105 memset(stat, 0, sizeof(*stat));
6107 rc = MakeDebugCall(socket,
6110 RX_PACKET_TYPE_DEBUG,
6119 * Do net to host conversion here
6122 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6130 afs_int32 rx_GetServerVersion(
6132 afs_uint32 remoteAddr,
6133 afs_uint16 remotePort,
6134 size_t version_length,
6139 return MakeDebugCall(socket,
6142 RX_PACKET_TYPE_VERSION,
6149 afs_int32 rx_GetServerConnections(
6151 afs_uint32 remoteAddr,
6152 afs_uint16 remotePort,
6153 afs_int32 *nextConnection,
6155 afs_uint32 debugSupportedValues,
6156 struct rx_debugConn *conn,
6157 afs_uint32 *supportedValues
6160 struct rx_debugIn in;
6165 * supportedValues is currently unused, but added to allow future
6166 * versioning of this function.
6169 *supportedValues = 0;
6170 if (allConnections) {
6171 in.type = htonl(RX_DEBUGI_GETALLCONN);
6173 in.type = htonl(RX_DEBUGI_GETCONN);
6175 in.index = htonl(*nextConnection);
6176 memset(conn, 0, sizeof(*conn));
6178 rc = MakeDebugCall(socket,
6181 RX_PACKET_TYPE_DEBUG,
6188 *nextConnection += 1;
6191 * Convert old connection format to new structure.
6194 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6195 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6196 #define MOVEvL(a) (conn->a = vL->a)
6198 /* any old or unrecognized version... */
6199 for (i=0;i<RX_MAXCALLS;i++) {
6200 MOVEvL(callState[i]);
6201 MOVEvL(callMode[i]);
6202 MOVEvL(callFlags[i]);
6203 MOVEvL(callOther[i]);
6205 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6206 MOVEvL(secStats.type);
6207 MOVEvL(secStats.level);
6208 MOVEvL(secStats.flags);
6209 MOVEvL(secStats.expires);
6210 MOVEvL(secStats.packetsReceived);
6211 MOVEvL(secStats.packetsSent);
6212 MOVEvL(secStats.bytesReceived);
6213 MOVEvL(secStats.bytesSent);
6218 * Do net to host conversion here
6220 * I don't convert host or port since we are most likely
6221 * going to want these in NBO.
6223 conn->cid = ntohl(conn->cid);
6224 conn->serial = ntohl(conn->serial);
6225 for(i=0;i<RX_MAXCALLS;i++) {
6226 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6228 conn->error = ntohl(conn->error);
6229 conn->secStats.flags = ntohl(conn->secStats.flags);
6230 conn->secStats.expires = ntohl(conn->secStats.expires);
6231 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6232 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6233 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6234 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6235 conn->epoch = ntohl(conn->epoch);
6236 conn->natMTU = ntohl(conn->natMTU);
6242 afs_int32 rx_GetServerPeers(
6244 afs_uint32 remoteAddr,
6245 afs_uint16 remotePort,
6246 afs_int32 *nextPeer,
6247 afs_uint32 debugSupportedValues,
6248 struct rx_debugPeer *peer,
6249 afs_uint32 *supportedValues
6252 struct rx_debugIn in;
6256 * supportedValues is currently unused, but added to allow future
6257 * versioning of this function.
6260 *supportedValues = 0;
6261 in.type = htonl(RX_DEBUGI_GETPEER);
6262 in.index = htonl(*nextPeer);
6263 memset(peer, 0, sizeof(*peer));
6265 rc = MakeDebugCall(socket,
6268 RX_PACKET_TYPE_DEBUG,
6278 * Do net to host conversion here
6280 * I don't convert host or port since we are most likely
6281 * going to want these in NBO.
6283 peer->ifMTU = ntohs(peer->ifMTU);
6284 peer->idleWhen = ntohl(peer->idleWhen);
6285 peer->refCount = ntohs(peer->refCount);
6286 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6287 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6288 peer->rtt = ntohl(peer->rtt);
6289 peer->rtt_dev = ntohl(peer->rtt_dev);
6290 peer->timeout.sec = ntohl(peer->timeout.sec);
6291 peer->timeout.usec = ntohl(peer->timeout.usec);
6292 peer->nSent = ntohl(peer->nSent);
6293 peer->reSends = ntohl(peer->reSends);
6294 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6295 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6296 peer->rateFlag = ntohl(peer->rateFlag);
6297 peer->natMTU = ntohs(peer->natMTU);
6298 peer->maxMTU = ntohs(peer->maxMTU);
6299 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6300 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6301 peer->MTU = ntohs(peer->MTU);
6302 peer->cwind = ntohs(peer->cwind);
6303 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6304 peer->congestSeq = ntohs(peer->congestSeq);
6305 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6306 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6307 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6308 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6313 #endif /* RXDEBUG */
6315 void shutdown_rx(void)
6317 struct rx_serverQueueEntry *np;
6319 register struct rx_call *call;
6320 register struct rx_serverQueueEntry *sq;
6323 if (rxinit_status == 1) {
6325 return; /* Already shutdown. */
6330 #ifndef AFS_PTHREAD_ENV
6331 FD_ZERO(&rx_selectMask);
6332 #endif /* AFS_PTHREAD_ENV */
6333 rxi_dataQuota = RX_MAX_QUOTA;
6334 #ifndef AFS_PTHREAD_ENV
6336 #endif /* AFS_PTHREAD_ENV */
6339 #ifndef AFS_PTHREAD_ENV
6340 #ifndef AFS_USE_GETTIMEOFDAY
6342 #endif /* AFS_USE_GETTIMEOFDAY */
6343 #endif /* AFS_PTHREAD_ENV */
6345 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6346 call = queue_First(&rx_freeCallQueue, rx_call);
6348 rxi_Free(call, sizeof(struct rx_call));
6351 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6352 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6358 struct rx_peer **peer_ptr, **peer_end;
6359 for (peer_ptr = &rx_peerHashTable[0],
6360 peer_end = &rx_peerHashTable[rx_hashTableSize];
6361 peer_ptr < peer_end; peer_ptr++) {
6362 struct rx_peer *peer, *next;
6363 for (peer = *peer_ptr; peer; peer = next) {
6364 rx_interface_stat_p rpc_stat, nrpc_stat;
6366 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6367 rx_interface_stat)) {
6368 unsigned int num_funcs;
6369 if (!rpc_stat) break;
6370 queue_Remove(&rpc_stat->queue_header);
6371 queue_Remove(&rpc_stat->all_peers);
6372 num_funcs = rpc_stat->stats[0].func_total;
6373 space = sizeof(rx_interface_stat_t) +
6374 rpc_stat->stats[0].func_total *
6375 sizeof(rx_function_entry_v1_t);
6377 rxi_Free(rpc_stat, space);
6378 MUTEX_ENTER(&rx_rpc_stats);
6379 rxi_rpc_peer_stat_cnt -= num_funcs;
6380 MUTEX_EXIT(&rx_rpc_stats);
6384 MUTEX_ENTER(&rx_stats_mutex);
6385 rx_stats.nPeerStructs--;
6386 MUTEX_EXIT(&rx_stats_mutex);
6390 for (i = 0; i<RX_MAX_SERVICES; i++) {
6392 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6394 for (i = 0; i < rx_hashTableSize; i++) {
6395 register struct rx_connection *tc, *ntc;
6396 MUTEX_ENTER(&rx_connHashTable_lock);
6397 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6399 for (j = 0; j < RX_MAXCALLS; j++) {
6401 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6404 rxi_Free(tc, sizeof(*tc));
6406 MUTEX_EXIT(&rx_connHashTable_lock);
6409 MUTEX_ENTER(&freeSQEList_lock);
6411 while ((np = rx_FreeSQEList)) {
6412 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6413 MUTEX_DESTROY(&np->lock);
6414 rxi_Free(np, sizeof(*np));
6417 MUTEX_EXIT(&freeSQEList_lock);
6418 MUTEX_DESTROY(&freeSQEList_lock);
6419 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6420 MUTEX_DESTROY(&rx_connHashTable_lock);
6421 MUTEX_DESTROY(&rx_peerHashTable_lock);
6422 MUTEX_DESTROY(&rx_serverPool_lock);
6424 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6425 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6427 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6428 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6430 rxi_FreeAllPackets();
6432 MUTEX_ENTER(&rx_stats_mutex);
6433 rxi_dataQuota = RX_MAX_QUOTA;
6434 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6435 MUTEX_EXIT(&rx_stats_mutex);
6441 #ifdef RX_ENABLE_LOCKS
6442 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6444 if (!MUTEX_ISMINE(lockaddr))
6445 osi_Panic("Lock not held: %s", msg);
6447 #endif /* RX_ENABLE_LOCKS */
6452 * Routines to implement connection specific data.
6455 int rx_KeyCreate(rx_destructor_t rtn)
6458 MUTEX_ENTER(&rxi_keyCreate_lock);
6459 key = rxi_keyCreate_counter++;
6460 rxi_keyCreate_destructor = (rx_destructor_t *)
6461 realloc((void *)rxi_keyCreate_destructor,
6462 (key+1) * sizeof(rx_destructor_t));
6463 rxi_keyCreate_destructor[key] = rtn;
6464 MUTEX_EXIT(&rxi_keyCreate_lock);
6468 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6471 MUTEX_ENTER(&conn->conn_data_lock);
6472 if (!conn->specific) {
6473 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6474 for (i = 0 ; i < key ; i++)
6475 conn->specific[i] = NULL;
6476 conn->nSpecific = key+1;
6477 conn->specific[key] = ptr;
6478 } else if (key >= conn->nSpecific) {
6479 conn->specific = (void **)
6480 realloc(conn->specific,(key+1)*sizeof(void *));
6481 for (i = conn->nSpecific ; i < key ; i++)
6482 conn->specific[i] = NULL;
6483 conn->nSpecific = key+1;
6484 conn->specific[key] = ptr;
6486 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6487 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6488 conn->specific[key] = ptr;
6490 MUTEX_EXIT(&conn->conn_data_lock);
6493 void *rx_GetSpecific(struct rx_connection *conn, int key)
6496 MUTEX_ENTER(&conn->conn_data_lock);
6497 if (key >= conn->nSpecific)
6500 ptr = conn->specific[key];
6501 MUTEX_EXIT(&conn->conn_data_lock);
6505 #endif /* !KERNEL */
6508 * processStats is a queue used to store the statistics for the local
6509 * process. Its contents are similar to the contents of the rpcStats
6510 * queue on a rx_peer structure, but the actual data stored within
6511 * this queue contains totals across the lifetime of the process (assuming
6512 * the stats have not been reset) - unlike the per peer structures
6513 * which can come and go based upon the peer lifetime.
6516 static struct rx_queue processStats = {&processStats,&processStats};
6519 * peerStats is a queue used to store the statistics for all peer structs.
6520 * Its contents are the union of all the peer rpcStats queues.
6523 static struct rx_queue peerStats = {&peerStats,&peerStats};
6526 * rxi_monitor_processStats is used to turn process wide stat collection
6530 static int rxi_monitor_processStats = 0;
6533 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6536 static int rxi_monitor_peerStats = 0;
6539 * rxi_AddRpcStat - given all of the information for a particular rpc
6540 * call, create (if needed) and update the stat totals for the rpc.
6544 * IN stats - the queue of stats that will be updated with the new value
6546 * IN rxInterface - a unique number that identifies the rpc interface
6548 * IN currentFunc - the index of the function being invoked
6550 * IN totalFunc - the total number of functions in this interface
6552 * IN queueTime - the amount of time this function waited for a thread
6554 * IN execTime - the amount of time this function invocation took to execute
6556 * IN bytesSent - the number bytes sent by this invocation
6558 * IN bytesRcvd - the number bytes received by this invocation
6560 * IN isServer - if true, this invocation was made to a server
6562 * IN remoteHost - the ip address of the remote host
6564 * IN remotePort - the port of the remote host
6566 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6568 * INOUT counter - if a new stats structure is allocated, the counter will
6569 * be updated with the new number of allocated stat structures
6576 static int rxi_AddRpcStat(
6577 struct rx_queue *stats,
6578 afs_uint32 rxInterface,
6579 afs_uint32 currentFunc,
6580 afs_uint32 totalFunc,
6581 struct clock *queueTime,
6582 struct clock *execTime,
6583 afs_hyper_t *bytesSent,
6584 afs_hyper_t *bytesRcvd,
6586 afs_uint32 remoteHost,
6587 afs_uint32 remotePort,
6589 unsigned int *counter)
6592 rx_interface_stat_p rpc_stat, nrpc_stat;
6595 * See if there's already a structure for this interface
6598 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6599 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6600 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6604 * Didn't find a match so allocate a new structure and add it to the
6608 if (queue_IsEnd(stats, rpc_stat) ||
6609 (rpc_stat == NULL) ||
6610 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6611 (rpc_stat->stats[0].remote_is_server != isServer)) {
6615 space = sizeof(rx_interface_stat_t) + totalFunc *
6616 sizeof(rx_function_entry_v1_t);
6618 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6619 if (rpc_stat == NULL) {
6623 *counter += totalFunc;
6624 for(i=0;i<totalFunc;i++) {
6625 rpc_stat->stats[i].remote_peer = remoteHost;
6626 rpc_stat->stats[i].remote_port = remotePort;
6627 rpc_stat->stats[i].remote_is_server = isServer;
6628 rpc_stat->stats[i].interfaceId = rxInterface;
6629 rpc_stat->stats[i].func_total = totalFunc;
6630 rpc_stat->stats[i].func_index = i;
6631 hzero(rpc_stat->stats[i].invocations);
6632 hzero(rpc_stat->stats[i].bytes_sent);
6633 hzero(rpc_stat->stats[i].bytes_rcvd);
6634 rpc_stat->stats[i].queue_time_sum.sec = 0;
6635 rpc_stat->stats[i].queue_time_sum.usec = 0;
6636 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6637 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6638 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6639 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6640 rpc_stat->stats[i].queue_time_max.sec = 0;
6641 rpc_stat->stats[i].queue_time_max.usec = 0;
6642 rpc_stat->stats[i].execution_time_sum.sec = 0;
6643 rpc_stat->stats[i].execution_time_sum.usec = 0;
6644 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6645 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6646 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6647 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6648 rpc_stat->stats[i].execution_time_max.sec = 0;
6649 rpc_stat->stats[i].execution_time_max.usec = 0;
6651 queue_Prepend(stats, rpc_stat);
6652 if (addToPeerList) {
6653 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6658 * Increment the stats for this function
6661 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6662 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6663 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6664 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6665 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6666 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6667 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6669 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6670 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6672 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6673 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6674 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6675 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6677 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6678 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6686 * rx_IncrementTimeAndCount - increment the times and count for a particular
6691 * IN peer - the peer who invoked the rpc
6693 * IN rxInterface - a unique number that identifies the rpc interface
6695 * IN currentFunc - the index of the function being invoked
6697 * IN totalFunc - the total number of functions in this interface
6699 * IN queueTime - the amount of time this function waited for a thread
6701 * IN execTime - the amount of time this function invocation took to execute
6703 * IN bytesSent - the number bytes sent by this invocation
6705 * IN bytesRcvd - the number bytes received by this invocation
6707 * IN isServer - if true, this invocation was made to a server
6714 void rx_IncrementTimeAndCount(
6715 struct rx_peer *peer,
6716 afs_uint32 rxInterface,
6717 afs_uint32 currentFunc,
6718 afs_uint32 totalFunc,
6719 struct clock *queueTime,
6720 struct clock *execTime,
6721 afs_hyper_t *bytesSent,
6722 afs_hyper_t *bytesRcvd,
6726 MUTEX_ENTER(&rx_rpc_stats);
6727 MUTEX_ENTER(&peer->peer_lock);
6729 if (rxi_monitor_peerStats) {
6730 rxi_AddRpcStat(&peer->rpcStats,
6742 &rxi_rpc_peer_stat_cnt);
6745 if (rxi_monitor_processStats) {
6746 rxi_AddRpcStat(&processStats,
6758 &rxi_rpc_process_stat_cnt);
6761 MUTEX_EXIT(&peer->peer_lock);
6762 MUTEX_EXIT(&rx_rpc_stats);
6767 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6771 * IN callerVersion - the rpc stat version of the caller.
6773 * IN count - the number of entries to marshall.
6775 * IN stats - pointer to stats to be marshalled.
6777 * OUT ptr - Where to store the marshalled data.
6783 void rx_MarshallProcessRPCStats(
6784 afs_uint32 callerVersion,
6786 rx_function_entry_v1_t *stats,
6793 * We only support the first version
6795 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6796 *(ptr++) = stats->remote_peer;
6797 *(ptr++) = stats->remote_port;
6798 *(ptr++) = stats->remote_is_server;
6799 *(ptr++) = stats->interfaceId;
6800 *(ptr++) = stats->func_total;
6801 *(ptr++) = stats->func_index;
6802 *(ptr++) = hgethi(stats->invocations);
6803 *(ptr++) = hgetlo(stats->invocations);
6804 *(ptr++) = hgethi(stats->bytes_sent);
6805 *(ptr++) = hgetlo(stats->bytes_sent);
6806 *(ptr++) = hgethi(stats->bytes_rcvd);
6807 *(ptr++) = hgetlo(stats->bytes_rcvd);
6808 *(ptr++) = stats->queue_time_sum.sec;
6809 *(ptr++) = stats->queue_time_sum.usec;
6810 *(ptr++) = stats->queue_time_sum_sqr.sec;
6811 *(ptr++) = stats->queue_time_sum_sqr.usec;
6812 *(ptr++) = stats->queue_time_min.sec;
6813 *(ptr++) = stats->queue_time_min.usec;
6814 *(ptr++) = stats->queue_time_max.sec;
6815 *(ptr++) = stats->queue_time_max.usec;
6816 *(ptr++) = stats->execution_time_sum.sec;
6817 *(ptr++) = stats->execution_time_sum.usec;
6818 *(ptr++) = stats->execution_time_sum_sqr.sec;
6819 *(ptr++) = stats->execution_time_sum_sqr.usec;
6820 *(ptr++) = stats->execution_time_min.sec;
6821 *(ptr++) = stats->execution_time_min.usec;
6822 *(ptr++) = stats->execution_time_max.sec;
6823 *(ptr++) = stats->execution_time_max.usec;
6829 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6834 * IN callerVersion - the rpc stat version of the caller
6836 * OUT myVersion - the rpc stat version of this function
6838 * OUT clock_sec - local time seconds
6840 * OUT clock_usec - local time microseconds
6842 * OUT allocSize - the number of bytes allocated to contain stats
6844 * OUT statCount - the number stats retrieved from this process.
6846 * OUT stats - the actual stats retrieved from this process.
6850 * Returns void. If successful, stats will != NULL.
6853 int rx_RetrieveProcessRPCStats(
6854 afs_uint32 callerVersion,
6855 afs_uint32 *myVersion,
6856 afs_uint32 *clock_sec,
6857 afs_uint32 *clock_usec,
6859 afs_uint32 *statCount,
6870 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6873 * Check to see if stats are enabled
6876 MUTEX_ENTER(&rx_rpc_stats);
6877 if (!rxi_monitor_processStats) {
6878 MUTEX_EXIT(&rx_rpc_stats);
6882 clock_GetTime(&now);
6883 *clock_sec = now.sec;
6884 *clock_usec = now.usec;
6887 * Allocate the space based upon the caller version
6889 * If the client is at an older version than we are,
6890 * we return the statistic data in the older data format, but
6891 * we still return our version number so the client knows we
6892 * are maintaining more data than it can retrieve.
6895 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6896 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6897 *statCount = rxi_rpc_process_stat_cnt;
6900 * This can't happen yet, but in the future version changes
6901 * can be handled by adding additional code here
6905 if (space > (size_t) 0) {
6907 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6910 rx_interface_stat_p rpc_stat, nrpc_stat;
6913 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6914 rx_interface_stat)) {
6916 * Copy the data based upon the caller version
6918 rx_MarshallProcessRPCStats(callerVersion,
6919 rpc_stat->stats[0].func_total,
6920 rpc_stat->stats, &ptr);
6926 MUTEX_EXIT(&rx_rpc_stats);
6931 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6935 * IN callerVersion - the rpc stat version of the caller
6937 * OUT myVersion - the rpc stat version of this function
6939 * OUT clock_sec - local time seconds
6941 * OUT clock_usec - local time microseconds
6943 * OUT allocSize - the number of bytes allocated to contain stats
6945 * OUT statCount - the number of stats retrieved from the individual
6948 * OUT stats - the actual stats retrieved from the individual peer structures.
6952 * Returns void. If successful, stats will != NULL.
6955 int rx_RetrievePeerRPCStats(
6956 afs_uint32 callerVersion,
6957 afs_uint32 *myVersion,
6958 afs_uint32 *clock_sec,
6959 afs_uint32 *clock_usec,
6961 afs_uint32 *statCount,
6972 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6975 * Check to see if stats are enabled
6978 MUTEX_ENTER(&rx_rpc_stats);
6979 if (!rxi_monitor_peerStats) {
6980 MUTEX_EXIT(&rx_rpc_stats);
6984 clock_GetTime(&now);
6985 *clock_sec = now.sec;
6986 *clock_usec = now.usec;
6989 * Allocate the space based upon the caller version
6991 * If the client is at an older version than we are,
6992 * we return the statistic data in the older data format, but
6993 * we still return our version number so the client knows we
6994 * are maintaining more data than it can retrieve.
6997 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6998 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6999 *statCount = rxi_rpc_peer_stat_cnt;
7002 * This can't happen yet, but in the future version changes
7003 * can be handled by adding additional code here
7007 if (space > (size_t) 0) {
7009 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7012 rx_interface_stat_p rpc_stat, nrpc_stat;
7015 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7016 rx_interface_stat)) {
7018 * We have to fix the offset of rpc_stat since we are
7019 * keeping this structure on two rx_queues. The rx_queue
7020 * package assumes that the rx_queue member is the first
7021 * member of the structure. That is, rx_queue assumes that
7022 * any one item is only on one queue at a time. We are
7023 * breaking that assumption and so we have to do a little
7024 * math to fix our pointers.
7027 fix_offset = (char *) rpc_stat;
7028 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7029 rpc_stat = (rx_interface_stat_p) fix_offset;
7032 * Copy the data based upon the caller version
7034 rx_MarshallProcessRPCStats(callerVersion,
7035 rpc_stat->stats[0].func_total,
7036 rpc_stat->stats, &ptr);
7042 MUTEX_EXIT(&rx_rpc_stats);
7047 * rx_FreeRPCStats - free memory allocated by
7048 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7052 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7053 * rx_RetrievePeerRPCStats
7055 * IN allocSize - the number of bytes in stats.
7062 void rx_FreeRPCStats(
7066 rxi_Free(stats, allocSize);
7070 * rx_queryProcessRPCStats - see if process rpc stat collection is
7071 * currently enabled.
7077 * Returns 0 if stats are not enabled != 0 otherwise
7080 int rx_queryProcessRPCStats()
7083 MUTEX_ENTER(&rx_rpc_stats);
7084 rc = rxi_monitor_processStats;
7085 MUTEX_EXIT(&rx_rpc_stats);
7090 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7096 * Returns 0 if stats are not enabled != 0 otherwise
7099 int rx_queryPeerRPCStats()
7102 MUTEX_ENTER(&rx_rpc_stats);
7103 rc = rxi_monitor_peerStats;
7104 MUTEX_EXIT(&rx_rpc_stats);
7109 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7118 void rx_enableProcessRPCStats()
7120 MUTEX_ENTER(&rx_rpc_stats);
7121 rx_enable_stats = 1;
7122 rxi_monitor_processStats = 1;
7123 MUTEX_EXIT(&rx_rpc_stats);
7127 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7136 void rx_enablePeerRPCStats()
7138 MUTEX_ENTER(&rx_rpc_stats);
7139 rx_enable_stats = 1;
7140 rxi_monitor_peerStats = 1;
7141 MUTEX_EXIT(&rx_rpc_stats);
7145 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7154 void rx_disableProcessRPCStats()
7156 rx_interface_stat_p rpc_stat, nrpc_stat;
7159 MUTEX_ENTER(&rx_rpc_stats);
7162 * Turn off process statistics and if peer stats is also off, turn
7166 rxi_monitor_processStats = 0;
7167 if (rxi_monitor_peerStats == 0) {
7168 rx_enable_stats = 0;
7171 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7172 unsigned int num_funcs = 0;
7173 if (!rpc_stat) break;
7174 queue_Remove(rpc_stat);
7175 num_funcs = rpc_stat->stats[0].func_total;
7176 space = sizeof(rx_interface_stat_t) +
7177 rpc_stat->stats[0].func_total *
7178 sizeof(rx_function_entry_v1_t);
7180 rxi_Free(rpc_stat, space);
7181 rxi_rpc_process_stat_cnt -= num_funcs;
7183 MUTEX_EXIT(&rx_rpc_stats);
7187 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7196 void rx_disablePeerRPCStats()
7198 struct rx_peer **peer_ptr, **peer_end;
7201 MUTEX_ENTER(&rx_rpc_stats);
7204 * Turn off peer statistics and if process stats is also off, turn
7208 rxi_monitor_peerStats = 0;
7209 if (rxi_monitor_processStats == 0) {
7210 rx_enable_stats = 0;
7213 MUTEX_ENTER(&rx_peerHashTable_lock);
7214 for (peer_ptr = &rx_peerHashTable[0],
7215 peer_end = &rx_peerHashTable[rx_hashTableSize];
7216 peer_ptr < peer_end; peer_ptr++) {
7217 struct rx_peer *peer, *next, *prev;
7218 for (prev = peer = *peer_ptr; peer; peer = next) {
7220 code = MUTEX_TRYENTER(&peer->peer_lock);
7222 rx_interface_stat_p rpc_stat, nrpc_stat;
7224 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7225 rx_interface_stat)) {
7226 unsigned int num_funcs = 0;
7227 if (!rpc_stat) break;
7228 queue_Remove(&rpc_stat->queue_header);
7229 queue_Remove(&rpc_stat->all_peers);
7230 num_funcs = rpc_stat->stats[0].func_total;
7231 space = sizeof(rx_interface_stat_t) +
7232 rpc_stat->stats[0].func_total *
7233 sizeof(rx_function_entry_v1_t);
7235 rxi_Free(rpc_stat, space);
7236 rxi_rpc_peer_stat_cnt -= num_funcs;
7238 MUTEX_EXIT(&peer->peer_lock);
7239 if (prev == *peer_ptr) {
7251 MUTEX_EXIT(&rx_peerHashTable_lock);
7252 MUTEX_EXIT(&rx_rpc_stats);
7256 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7261 * IN clearFlag - flag indicating which stats to clear
7268 void rx_clearProcessRPCStats(
7269 afs_uint32 clearFlag)
7271 rx_interface_stat_p rpc_stat, nrpc_stat;
7273 MUTEX_ENTER(&rx_rpc_stats);
7275 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7276 unsigned int num_funcs = 0, i;
7277 num_funcs = rpc_stat->stats[0].func_total;
7278 for(i=0;i<num_funcs;i++) {
7279 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7280 hzero(rpc_stat->stats[i].invocations);
7282 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7283 hzero(rpc_stat->stats[i].bytes_sent);
7285 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7286 hzero(rpc_stat->stats[i].bytes_rcvd);
7288 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7289 rpc_stat->stats[i].queue_time_sum.sec = 0;
7290 rpc_stat->stats[i].queue_time_sum.usec = 0;
7292 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7293 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7294 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7296 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7297 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7298 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7300 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7301 rpc_stat->stats[i].queue_time_max.sec = 0;
7302 rpc_stat->stats[i].queue_time_max.usec = 0;
7304 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7305 rpc_stat->stats[i].execution_time_sum.sec = 0;
7306 rpc_stat->stats[i].execution_time_sum.usec = 0;
7308 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7309 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7310 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7312 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7313 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7314 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7316 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7317 rpc_stat->stats[i].execution_time_max.sec = 0;
7318 rpc_stat->stats[i].execution_time_max.usec = 0;
7323 MUTEX_EXIT(&rx_rpc_stats);
7327 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7332 * IN clearFlag - flag indicating which stats to clear
7339 void rx_clearPeerRPCStats(
7340 afs_uint32 clearFlag)
7342 rx_interface_stat_p rpc_stat, nrpc_stat;
7344 MUTEX_ENTER(&rx_rpc_stats);
7346 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7347 unsigned int num_funcs = 0, i;
7350 * We have to fix the offset of rpc_stat since we are
7351 * keeping this structure on two rx_queues. The rx_queue
7352 * package assumes that the rx_queue member is the first
7353 * member of the structure. That is, rx_queue assumes that
7354 * any one item is only on one queue at a time. We are
7355 * breaking that assumption and so we have to do a little
7356 * math to fix our pointers.
7359 fix_offset = (char *) rpc_stat;
7360 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7361 rpc_stat = (rx_interface_stat_p) fix_offset;
7363 num_funcs = rpc_stat->stats[0].func_total;
7364 for(i=0;i<num_funcs;i++) {
7365 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7366 hzero(rpc_stat->stats[i].invocations);
7368 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7369 hzero(rpc_stat->stats[i].bytes_sent);
7371 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7372 hzero(rpc_stat->stats[i].bytes_rcvd);
7374 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7375 rpc_stat->stats[i].queue_time_sum.sec = 0;
7376 rpc_stat->stats[i].queue_time_sum.usec = 0;
7378 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7379 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7380 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7382 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7383 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7384 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7386 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7387 rpc_stat->stats[i].queue_time_max.sec = 0;
7388 rpc_stat->stats[i].queue_time_max.usec = 0;
7390 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7391 rpc_stat->stats[i].execution_time_sum.sec = 0;
7392 rpc_stat->stats[i].execution_time_sum.usec = 0;
7394 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7395 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7396 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7398 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7399 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7400 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7402 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7403 rpc_stat->stats[i].execution_time_max.sec = 0;
7404 rpc_stat->stats[i].execution_time_max.usec = 0;
7409 MUTEX_EXIT(&rx_rpc_stats);
7413 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7414 * is authorized to enable/disable/clear RX statistics.
7416 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7418 void rx_SetRxStatUserOk(
7419 int (*proc)(struct rx_call *call))
7421 rxi_rxstat_userok = proc;
7424 int rx_RxStatUserOk(
7425 struct rx_call *call)
7427 if (!rxi_rxstat_userok)
7429 return rxi_rxstat_userok(call);