2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
13 #include "../afs/param.h"
14 #include <afsconfig.h>
15 #include "../afs/sysincludes.h"
16 #include "../afs/afsincludes.h"
18 #include "../h/types.h"
19 #include "../h/time.h"
20 #include "../h/stat.h"
22 #include <net/net_globals.h>
23 #endif /* AFS_OSF_ENV */
24 #ifdef AFS_LINUX20_ENV
25 #include "../h/socket.h"
27 #include "../netinet/in.h"
28 #include "../afs/afs_args.h"
29 #include "../afs/afs_osi.h"
30 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
31 #include "../h/systm.h"
34 #undef RXDEBUG /* turn off debugging */
36 #if defined(AFS_SGI_ENV)
37 #include "../sys/debug.h"
39 #include "../afsint/afsint.h"
46 #endif /* AFS_ALPHA_ENV */
48 #include "../afs/sysincludes.h"
49 #include "../afs/afsincludes.h"
51 #include "../afs/lock.h"
52 #include "../rx/rx_kmutex.h"
53 #include "../rx/rx_kernel.h"
54 #include "../rx/rx_clock.h"
55 #include "../rx/rx_queue.h"
57 #include "../rx/rx_globals.h"
58 #include "../rx/rx_trace.h"
59 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
60 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
61 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
62 #include "../afsint/afsint.h"
63 extern afs_int32 afs_termState;
65 #include "sys/lockl.h"
66 #include "sys/lock_def.h"
67 #endif /* AFS_AIX41_ENV */
68 # include "../afsint/rxgen_consts.h"
70 # include <afs/param.h>
71 # include <afsconfig.h>
72 # include <sys/types.h>
79 # include <sys/socket.h>
80 # include <sys/file.h>
82 # include <sys/stat.h>
83 # include <netinet/in.h>
84 # include <sys/time.h>
94 # include "rx_clock.h"
95 # include "rx_queue.h"
96 # include "rx_globals.h"
97 # include "rx_trace.h"
98 # include "rx_internal.h"
99 # include <afs/rxgen_consts.h>
102 int (*registerProgram)() = 0;
103 int (*swapNameProgram)() = 0;
105 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
107 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
108 afs_int32 rxi_start_in_error;
110 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
113 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
114 * currently allocated within rx. This number is used to allocate the
115 * memory required to return the statistics when queried.
118 static unsigned int rxi_rpc_peer_stat_cnt;
121 * rxi_rpc_process_stat_cnt counts the total number of local process stat
122 * structures currently allocated within rx. The number is used to allocate
123 * the memory required to return the statistics when queried.
126 static unsigned int rxi_rpc_process_stat_cnt;
128 #if !defined(offsetof)
129 #include <stddef.h> /* for definition of offsetof() */
132 #ifdef AFS_PTHREAD_ENV
136 * Use procedural initialization of mutexes/condition variables
140 extern pthread_mutex_t rxkad_stats_mutex;
141 extern pthread_mutex_t des_init_mutex;
142 extern pthread_mutex_t des_random_mutex;
143 extern pthread_mutex_t rx_clock_mutex;
144 extern pthread_mutex_t rxi_connCacheMutex;
145 extern pthread_mutex_t rx_event_mutex;
146 extern pthread_mutex_t osi_malloc_mutex;
147 extern pthread_mutex_t event_handler_mutex;
148 extern pthread_mutex_t listener_mutex;
149 extern pthread_mutex_t rx_if_init_mutex;
150 extern pthread_mutex_t rx_if_mutex;
151 extern pthread_mutex_t rxkad_client_uid_mutex;
152 extern pthread_mutex_t rxkad_random_mutex;
154 extern pthread_cond_t rx_event_handler_cond;
155 extern pthread_cond_t rx_listener_cond;
157 static pthread_mutex_t epoch_mutex;
158 static pthread_mutex_t rx_init_mutex;
159 static pthread_mutex_t rx_debug_mutex;
161 static void rxi_InitPthread(void) {
162 assert(pthread_mutex_init(&rx_clock_mutex,
163 (const pthread_mutexattr_t*)0)==0);
164 assert(pthread_mutex_init(&rxi_connCacheMutex,
165 (const pthread_mutexattr_t*)0)==0);
166 assert(pthread_mutex_init(&rx_init_mutex,
167 (const pthread_mutexattr_t*)0)==0);
168 assert(pthread_mutex_init(&epoch_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rx_event_mutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&des_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&des_random_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&osi_malloc_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&event_handler_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&listener_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&rx_if_init_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&rx_if_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rxkad_random_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rxkad_stats_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rx_debug_mutex,
193 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_cond_init(&rx_event_handler_cond,
196 (const pthread_condattr_t*)0)==0);
197 assert(pthread_cond_init(&rx_listener_cond,
198 (const pthread_condattr_t*)0)==0);
199 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
202 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
203 #define INIT_PTHREAD_LOCKS \
204 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
206 * The rx_stats_mutex mutex protects the following global variables:
211 * rxi_lowConnRefCount
212 * rxi_lowPeerRefCount
221 #define INIT_PTHREAD_LOCKS
225 /* Variables for handling the minProcs implementation. availProcs gives the
226 * number of threads available in the pool at this moment (not counting dudes
227 * executing right now). totalMin gives the total number of procs required
228 * for handling all minProcs requests. minDeficit is a dynamic variable
229 * tracking the # of procs required to satisfy all of the remaining minProcs
231 * For fine grain locking to work, the quota check and the reservation of
232 * a server thread has to come while rxi_availProcs and rxi_minDeficit
233 * are locked. To this end, the code has been modified under #ifdef
234 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
235 * same time. A new function, ReturnToServerPool() returns the allocation.
237 * A call can be on several queue's (but only one at a time). When
238 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
239 * that no one else is touching the queue. To this end, we store the address
240 * of the queue lock in the call structure (under the call lock) when we
241 * put the call on a queue, and we clear the call_queue_lock when the
242 * call is removed from a queue (once the call lock has been obtained).
243 * This allows rxi_ResetCall to safely synchronize with others wishing
244 * to manipulate the queue.
247 #ifdef RX_ENABLE_LOCKS
248 static int rxi_ServerThreadSelectingCall;
249 static afs_kmutex_t rx_rpc_stats;
250 void rxi_StartUnlocked();
253 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
254 ** pretty good that the next packet coming in is from the same connection
255 ** as the last packet, since we're send multiple packets in a transmit window.
257 struct rx_connection *rxLastConn = 0;
259 #ifdef RX_ENABLE_LOCKS
260 /* The locking hierarchy for rx fine grain locking is composed of five
262 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
263 * call->lock - locks call data fields.
264 * Most any other lock - these are all independent of each other.....
266 * rx_freeCallQueue_lock
268 * rx_connHashTable_lock
271 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
274 * peer_lock - locks peer data fields.
275 * conn_data_lock - that more than one thread is not updating a conn data
276 * field at the same time.
277 * Do we need a lock to protect the peer field in the conn structure?
278 * conn->peer was previously a constant for all intents and so has no
279 * lock protecting this field. The multihomed client delta introduced
280 * a RX code change : change the peer field in the connection structure
281 * to that remote inetrface from which the last packet for this
282 * connection was sent out. This may become an issue if further changes
285 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
286 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
288 /* rxdb_fileID is used to identify the lock location, along with line#. */
289 static int rxdb_fileID = RXDB_FILE_RX;
290 #endif /* RX_LOCKS_DB */
291 static void rxi_SetAcksInTransmitQueue();
292 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
293 #else /* RX_ENABLE_LOCKS */
294 #define SET_CALL_QUEUE_LOCK(C, L)
295 #define CLEAR_CALL_QUEUE_LOCK(C)
296 #endif /* RX_ENABLE_LOCKS */
297 static void rxi_DestroyConnectionNoLock();
298 struct rx_serverQueueEntry *rx_waitForPacket = 0;
300 /* ------------Exported Interfaces------------- */
302 /* This function allows rxkad to set the epoch to a suitably random number
303 * which rx_NewConnection will use in the future. The principle purpose is to
304 * get rxnull connections to use the same epoch as the rxkad connections do, at
305 * least once the first rxkad connection is established. This is important now
306 * that the host/port addresses aren't used in FindConnection: the uniqueness
307 * of epoch/cid matters and the start time won't do. */
309 #ifdef AFS_PTHREAD_ENV
311 * This mutex protects the following global variables:
315 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
316 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
320 #endif /* AFS_PTHREAD_ENV */
322 void rx_SetEpoch (epoch)
330 /* Initialize rx. A port number may be mentioned, in which case this
331 * becomes the default port number for any service installed later.
332 * If 0 is provided for the port number, a random port will be chosen
333 * by the kernel. Whether this will ever overlap anything in
334 * /etc/services is anybody's guess... Returns 0 on success, -1 on
336 static int rxinit_status = 1;
337 #ifdef AFS_PTHREAD_ENV
339 * This mutex protects the following global variables:
343 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
344 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
347 #define UNLOCK_RX_INIT
350 int rx_Init(u_int port)
357 char *htable, *ptable;
360 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
361 __djgpp_set_quiet_socket(1);
368 if (rxinit_status == 0) {
369 tmp_status = rxinit_status;
371 return tmp_status; /* Already started; return previous error code. */
375 if (afs_winsockInit()<0)
381 * Initialize anything necessary to provide a non-premptive threading
384 rxi_InitializeThreadSupport();
387 /* Allocate and initialize a socket for client and perhaps server
390 rx_socket = rxi_GetUDPSocket((u_short)port);
391 if (rx_socket == OSI_NULLSOCKET) {
397 #ifdef RX_ENABLE_LOCKS
400 #endif /* RX_LOCKS_DB */
401 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
402 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
403 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
404 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
405 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
407 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
408 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
412 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
414 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
415 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
417 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
418 #endif /* KERNEL && AFS_HPUX110_ENV */
419 #else /* RX_ENABLE_LOCKS */
420 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
421 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
422 #endif /* AFS_GLOBAL_SUNLOCK */
423 #endif /* RX_ENABLE_LOCKS */
426 rx_connDeadTime = 12;
427 rx_tranquil = 0; /* reset flag */
428 bzero((char *)&rx_stats, sizeof(struct rx_stats));
430 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
431 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
432 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
433 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
434 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
435 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
437 /* Malloc up a bunch of packets & buffers */
439 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
440 queue_Init(&rx_freePacketQueue);
441 rxi_NeedMorePackets = FALSE;
442 rxi_MorePackets(rx_nPackets);
450 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
451 tv.tv_sec = clock_now.sec;
452 tv.tv_usec = clock_now.usec;
453 srand((unsigned int) tv.tv_usec);
460 #if defined(KERNEL) && !defined(UKERNEL)
461 /* Really, this should never happen in a real kernel */
464 struct sockaddr_in addr;
465 int addrlen = sizeof(addr);
466 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
470 rx_port = addr.sin_port;
473 rx_stats.minRtt.sec = 9999999;
475 rx_SetEpoch (tv.tv_sec | 0x80000000);
477 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
478 * will provide a randomer value. */
480 MUTEX_ENTER(&rx_stats_mutex);
481 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
482 MUTEX_EXIT(&rx_stats_mutex);
483 /* *Slightly* random start time for the cid. This is just to help
484 * out with the hashing function at the peer */
485 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
486 rx_connHashTable = (struct rx_connection **) htable;
487 rx_peerHashTable = (struct rx_peer **) ptable;
489 rx_lastAckDelay.sec = 0;
490 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
491 rx_hardAckDelay.sec = 0;
492 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
493 rx_softAckDelay.sec = 0;
494 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
496 rxevent_Init(20, rxi_ReScheduleEvents);
498 /* Initialize various global queues */
499 queue_Init(&rx_idleServerQueue);
500 queue_Init(&rx_incomingCallQueue);
501 queue_Init(&rx_freeCallQueue);
503 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
504 /* Initialize our list of usable IP addresses. */
508 /* Start listener process (exact function is dependent on the
509 * implementation environment--kernel or user space) */
514 tmp_status = rxinit_status = 0;
519 /* called with unincremented nRequestsRunning to see if it is OK to start
520 * a new thread in this service. Could be "no" for two reasons: over the
521 * max quota, or would prevent others from reaching their min quota.
523 #ifdef RX_ENABLE_LOCKS
524 /* This verion of QuotaOK reserves quota if it's ok while the
525 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
527 static int QuotaOK(aservice)
528 register struct rx_service *aservice;
530 /* check if over max quota */
531 if (aservice->nRequestsRunning >= aservice->maxProcs) {
535 /* under min quota, we're OK */
536 /* otherwise, can use only if there are enough to allow everyone
537 * to go to their min quota after this guy starts.
539 MUTEX_ENTER(&rx_stats_mutex);
540 if ((aservice->nRequestsRunning < aservice->minProcs) ||
541 (rxi_availProcs > rxi_minDeficit)) {
542 aservice->nRequestsRunning++;
543 /* just started call in minProcs pool, need fewer to maintain
545 if (aservice->nRequestsRunning <= aservice->minProcs)
548 MUTEX_EXIT(&rx_stats_mutex);
551 MUTEX_EXIT(&rx_stats_mutex);
555 static void ReturnToServerPool(aservice)
556 register struct rx_service *aservice;
558 aservice->nRequestsRunning--;
559 MUTEX_ENTER(&rx_stats_mutex);
560 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
562 MUTEX_EXIT(&rx_stats_mutex);
565 #else /* RX_ENABLE_LOCKS */
566 static int QuotaOK(aservice)
567 register struct rx_service *aservice; {
569 /* under min quota, we're OK */
570 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
572 /* check if over max quota */
573 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
575 /* otherwise, can use only if there are enough to allow everyone
576 * to go to their min quota after this guy starts.
578 if (rxi_availProcs > rxi_minDeficit) rc = 1;
581 #endif /* RX_ENABLE_LOCKS */
584 /* Called by rx_StartServer to start up lwp's to service calls.
585 NExistingProcs gives the number of procs already existing, and which
586 therefore needn't be created. */
587 void rxi_StartServerProcs(nExistingProcs)
590 register struct rx_service *service;
595 /* For each service, reserve N processes, where N is the "minimum"
596 number of processes that MUST be able to execute a request in parallel,
597 at any time, for that process. Also compute the maximum difference
598 between any service's maximum number of processes that can run
599 (i.e. the maximum number that ever will be run, and a guarantee
600 that this number will run if other services aren't running), and its
601 minimum number. The result is the extra number of processes that
602 we need in order to provide the latter guarantee */
603 for (i=0; i<RX_MAX_SERVICES; i++) {
605 service = rx_services[i];
606 if (service == (struct rx_service *) 0) break;
607 nProcs += service->minProcs;
608 diff = service->maxProcs - service->minProcs;
609 if (diff > maxdiff) maxdiff = diff;
611 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
612 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
613 for (i = 0; i<nProcs; i++) {
614 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
619 /* This routine must be called if any services are exported. If the
620 * donateMe flag is set, the calling process is donated to the server
622 void rx_StartServer(donateMe)
624 register struct rx_service *service;
625 register int i, nProcs=0;
631 /* Start server processes, if necessary (exact function is dependent
632 * on the implementation environment--kernel or user space). DonateMe
633 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
634 * case, one less new proc will be created rx_StartServerProcs.
636 rxi_StartServerProcs(donateMe);
638 /* count up the # of threads in minProcs, and add set the min deficit to
639 * be that value, too.
641 for (i=0; i<RX_MAX_SERVICES; i++) {
642 service = rx_services[i];
643 if (service == (struct rx_service *) 0) break;
644 MUTEX_ENTER(&rx_stats_mutex);
645 rxi_totalMin += service->minProcs;
646 /* below works even if a thread is running, since minDeficit would
647 * still have been decremented and later re-incremented.
649 rxi_minDeficit += service->minProcs;
650 MUTEX_EXIT(&rx_stats_mutex);
653 /* Turn on reaping of idle server connections */
654 rxi_ReapConnections();
663 #ifdef AFS_PTHREAD_ENV
665 pid = (pid_t) pthread_self();
666 #else /* AFS_PTHREAD_ENV */
668 LWP_CurrentProcess(&pid);
669 #endif /* AFS_PTHREAD_ENV */
671 sprintf(name,"srv_%d", ++nProcs);
673 (*registerProgram)(pid, name);
675 #endif /* AFS_NT40_ENV */
676 rx_ServerProc(); /* Never returns */
681 /* Create a new client connection to the specified service, using the
682 * specified security object to implement the security model for this
684 struct rx_connection *
685 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
686 register afs_uint32 shost; /* Server host */
687 u_short sport; /* Server port */
688 u_short sservice; /* Server service id */
689 register struct rx_securityClass *securityObject;
690 int serviceSecurityIndex;
694 register struct rx_connection *conn;
699 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
700 shost, sport, sservice, securityObject, serviceSecurityIndex));
702 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
703 * the case of kmem_alloc? */
704 conn = rxi_AllocConnection();
705 #ifdef RX_ENABLE_LOCKS
706 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
707 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
708 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
712 MUTEX_ENTER(&rx_connHashTable_lock);
713 cid = (rx_nextCid += RX_MAXCALLS);
714 conn->type = RX_CLIENT_CONNECTION;
716 conn->epoch = rx_epoch;
717 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
718 conn->serviceId = sservice;
719 conn->securityObject = securityObject;
720 /* This doesn't work in all compilers with void (they're buggy), so fake it
722 conn->securityData = (VOID *) 0;
723 conn->securityIndex = serviceSecurityIndex;
724 rx_SetConnDeadTime(conn, rx_connDeadTime);
725 conn->ackRate = RX_FAST_ACK_RATE;
727 conn->specific = NULL;
728 conn->challengeEvent = (struct rxevent *)0;
729 conn->delayedAbortEvent = (struct rxevent *)0;
730 conn->abortCount = 0;
733 RXS_NewConnection(securityObject, conn);
734 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
736 conn->refCount++; /* no lock required since only this thread knows... */
737 conn->next = rx_connHashTable[hashindex];
738 rx_connHashTable[hashindex] = conn;
739 MUTEX_ENTER(&rx_stats_mutex);
740 rx_stats.nClientConns++;
741 MUTEX_EXIT(&rx_stats_mutex);
743 MUTEX_EXIT(&rx_connHashTable_lock);
749 void rx_SetConnDeadTime(conn, seconds)
750 register struct rx_connection *conn;
751 register int seconds;
753 /* The idea is to set the dead time to a value that allows several
754 * keepalives to be dropped without timing out the connection. */
755 conn->secondsUntilDead = MAX(seconds, 6);
756 conn->secondsUntilPing = conn->secondsUntilDead/6;
759 int rxi_lowPeerRefCount = 0;
760 int rxi_lowConnRefCount = 0;
763 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
764 * NOTE: must not be called with rx_connHashTable_lock held.
766 void rxi_CleanupConnection(conn)
767 struct rx_connection *conn;
771 /* Notify the service exporter, if requested, that this connection
772 * is being destroyed */
773 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
774 (*conn->service->destroyConnProc)(conn);
776 /* Notify the security module that this connection is being destroyed */
777 RXS_DestroyConnection(conn->securityObject, conn);
779 /* If this is the last connection using the rx_peer struct, set its
780 * idle time to now. rxi_ReapConnections will reap it if it's still
781 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
783 MUTEX_ENTER(&rx_peerHashTable_lock);
784 if (--conn->peer->refCount <= 0) {
785 conn->peer->idleWhen = clock_Sec();
786 if (conn->peer->refCount < 0) {
787 conn->peer->refCount = 0;
788 MUTEX_ENTER(&rx_stats_mutex);
789 rxi_lowPeerRefCount ++;
790 MUTEX_EXIT(&rx_stats_mutex);
793 MUTEX_EXIT(&rx_peerHashTable_lock);
795 MUTEX_ENTER(&rx_stats_mutex);
796 if (conn->type == RX_SERVER_CONNECTION)
797 rx_stats.nServerConns--;
799 rx_stats.nClientConns--;
800 MUTEX_EXIT(&rx_stats_mutex);
803 if (conn->specific) {
804 for (i = 0 ; i < conn->nSpecific ; i++) {
805 if (conn->specific[i] && rxi_keyCreate_destructor[i])
806 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
807 conn->specific[i] = NULL;
809 free(conn->specific);
811 conn->specific = NULL;
815 MUTEX_DESTROY(&conn->conn_call_lock);
816 MUTEX_DESTROY(&conn->conn_data_lock);
817 CV_DESTROY(&conn->conn_call_cv);
819 rxi_FreeConnection(conn);
822 /* Destroy the specified connection */
823 void rxi_DestroyConnection(conn)
824 register struct rx_connection *conn;
826 MUTEX_ENTER(&rx_connHashTable_lock);
827 rxi_DestroyConnectionNoLock(conn);
828 /* conn should be at the head of the cleanup list */
829 if (conn == rx_connCleanup_list) {
830 rx_connCleanup_list = rx_connCleanup_list->next;
831 MUTEX_EXIT(&rx_connHashTable_lock);
832 rxi_CleanupConnection(conn);
834 #ifdef RX_ENABLE_LOCKS
836 MUTEX_EXIT(&rx_connHashTable_lock);
838 #endif /* RX_ENABLE_LOCKS */
841 static void rxi_DestroyConnectionNoLock(conn)
842 register struct rx_connection *conn;
844 register struct rx_connection **conn_ptr;
845 register int havecalls = 0;
846 struct rx_packet *packet;
853 MUTEX_ENTER(&conn->conn_data_lock);
854 if (conn->refCount > 0)
857 MUTEX_ENTER(&rx_stats_mutex);
858 rxi_lowConnRefCount++;
859 MUTEX_EXIT(&rx_stats_mutex);
862 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
863 /* Busy; wait till the last guy before proceeding */
864 MUTEX_EXIT(&conn->conn_data_lock);
869 /* If the client previously called rx_NewCall, but it is still
870 * waiting, treat this as a running call, and wait to destroy the
871 * connection later when the call completes. */
872 if ((conn->type == RX_CLIENT_CONNECTION) &&
873 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
874 conn->flags |= RX_CONN_DESTROY_ME;
875 MUTEX_EXIT(&conn->conn_data_lock);
879 MUTEX_EXIT(&conn->conn_data_lock);
881 /* Check for extant references to this connection */
882 for (i = 0; i<RX_MAXCALLS; i++) {
883 register struct rx_call *call = conn->call[i];
886 if (conn->type == RX_CLIENT_CONNECTION) {
887 MUTEX_ENTER(&call->lock);
888 if (call->delayedAckEvent) {
889 /* Push the final acknowledgment out now--there
890 * won't be a subsequent call to acknowledge the
891 * last reply packets */
892 rxevent_Cancel(call->delayedAckEvent, call,
893 RX_CALL_REFCOUNT_DELAY);
894 rxi_AckAll((struct rxevent *)0, call, 0);
896 MUTEX_EXIT(&call->lock);
900 #ifdef RX_ENABLE_LOCKS
902 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
903 MUTEX_EXIT(&conn->conn_data_lock);
906 /* Someone is accessing a packet right now. */
910 #endif /* RX_ENABLE_LOCKS */
913 /* Don't destroy the connection if there are any call
914 * structures still in use */
915 MUTEX_ENTER(&conn->conn_data_lock);
916 conn->flags |= RX_CONN_DESTROY_ME;
917 MUTEX_EXIT(&conn->conn_data_lock);
922 if (conn->delayedAbortEvent) {
923 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
924 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
926 MUTEX_ENTER(&conn->conn_data_lock);
927 rxi_SendConnectionAbort(conn, packet, 0, 1);
928 MUTEX_EXIT(&conn->conn_data_lock);
929 rxi_FreePacket(packet);
933 /* Remove from connection hash table before proceeding */
934 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
935 conn->epoch, conn->type) ];
936 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
937 if (*conn_ptr == conn) {
938 *conn_ptr = conn->next;
942 /* if the conn that we are destroying was the last connection, then we
943 * clear rxLastConn as well */
944 if ( rxLastConn == conn )
947 /* Make sure the connection is completely reset before deleting it. */
948 /* get rid of pending events that could zap us later */
949 if (conn->challengeEvent) {
950 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
953 /* Add the connection to the list of destroyed connections that
954 * need to be cleaned up. This is necessary to avoid deadlocks
955 * in the routines we call to inform others that this connection is
956 * being destroyed. */
957 conn->next = rx_connCleanup_list;
958 rx_connCleanup_list = conn;
961 /* Externally available version */
962 void rx_DestroyConnection(conn)
963 register struct rx_connection *conn;
969 rxi_DestroyConnection (conn);
974 /* Start a new rx remote procedure call, on the specified connection.
975 * If wait is set to 1, wait for a free call channel; otherwise return
976 * 0. Maxtime gives the maximum number of seconds this call may take,
977 * after rx_MakeCall returns. After this time interval, a call to any
978 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
979 * For fine grain locking, we hold the conn_call_lock in order to
980 * to ensure that we don't get signalle after we found a call in an active
981 * state and before we go to sleep.
983 struct rx_call *rx_NewCall(conn)
984 register struct rx_connection *conn;
987 register struct rx_call *call;
988 struct clock queueTime;
992 dpf (("rx_MakeCall(conn %x)\n", conn));
995 clock_GetTime(&queueTime);
997 MUTEX_ENTER(&conn->conn_call_lock);
999 for (i=0; i<RX_MAXCALLS; i++) {
1000 call = conn->call[i];
1002 MUTEX_ENTER(&call->lock);
1003 if (call->state == RX_STATE_DALLY) {
1004 rxi_ResetCall(call, 0);
1005 (*call->callNumber)++;
1008 MUTEX_EXIT(&call->lock);
1011 call = rxi_NewCall(conn, i);
1012 MUTEX_ENTER(&call->lock);
1016 if (i < RX_MAXCALLS) {
1019 MUTEX_ENTER(&conn->conn_data_lock);
1020 conn->flags |= RX_CONN_MAKECALL_WAITING;
1021 MUTEX_EXIT(&conn->conn_data_lock);
1022 #ifdef RX_ENABLE_LOCKS
1023 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1029 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1031 /* Client is initially in send mode */
1032 call->state = RX_STATE_ACTIVE;
1033 call->mode = RX_MODE_SENDING;
1035 /* remember start time for call in case we have hard dead time limit */
1036 call->queueTime = queueTime;
1037 clock_GetTime(&call->startTime);
1038 hzero(call->bytesSent);
1039 hzero(call->bytesRcvd);
1041 /* Turn on busy protocol. */
1042 rxi_KeepAliveOn(call);
1044 MUTEX_EXIT(&call->lock);
1045 MUTEX_EXIT(&conn->conn_call_lock);
1049 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1050 /* Now, if TQ wasn't cleared earlier, do it now. */
1052 MUTEX_ENTER(&call->lock);
1053 while (call->flags & RX_CALL_TQ_BUSY) {
1054 call->flags |= RX_CALL_TQ_WAIT;
1055 #ifdef RX_ENABLE_LOCKS
1056 CV_WAIT(&call->cv_tq, &call->lock);
1057 #else /* RX_ENABLE_LOCKS */
1058 osi_rxSleep(&call->tq);
1059 #endif /* RX_ENABLE_LOCKS */
1061 if (call->flags & RX_CALL_TQ_CLEARME) {
1062 rxi_ClearTransmitQueue(call, 0);
1063 queue_Init(&call->tq);
1065 MUTEX_EXIT(&call->lock);
1067 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1073 rxi_HasActiveCalls(aconn)
1074 register struct rx_connection *aconn; {
1076 register struct rx_call *tcall;
1080 for(i=0; i<RX_MAXCALLS; i++) {
1081 if ((tcall = aconn->call[i])) {
1082 if ((tcall->state == RX_STATE_ACTIVE)
1083 || (tcall->state == RX_STATE_PRECALL)) {
1094 rxi_GetCallNumberVector(aconn, aint32s)
1095 register struct rx_connection *aconn;
1096 register afs_int32 *aint32s; {
1098 register struct rx_call *tcall;
1102 for(i=0; i<RX_MAXCALLS; i++) {
1103 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1104 aint32s[i] = aconn->callNumber[i]+1;
1106 aint32s[i] = aconn->callNumber[i];
1113 rxi_SetCallNumberVector(aconn, aint32s)
1114 register struct rx_connection *aconn;
1115 register afs_int32 *aint32s; {
1117 register struct rx_call *tcall;
1121 for(i=0; i<RX_MAXCALLS; i++) {
1122 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1123 aconn->callNumber[i] = aint32s[i] - 1;
1125 aconn->callNumber[i] = aint32s[i];
1131 /* Advertise a new service. A service is named locally by a UDP port
1132 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1135 rx_NewService(port, serviceId, serviceName, securityObjects,
1136 nSecurityObjects, serviceProc)
1139 char *serviceName; /* Name for identification purposes (e.g. the
1140 * service name might be used for probing for
1142 struct rx_securityClass **securityObjects;
1143 int nSecurityObjects;
1144 afs_int32 (*serviceProc)();
1146 osi_socket socket = OSI_NULLSOCKET;
1147 register struct rx_service *tservice;
1153 if (serviceId == 0) {
1154 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1160 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1167 tservice = rxi_AllocService();
1170 for (i = 0; i<RX_MAX_SERVICES; i++) {
1171 register struct rx_service *service = rx_services[i];
1173 if (port == service->servicePort) {
1174 if (service->serviceId == serviceId) {
1175 /* The identical service has already been
1176 * installed; if the caller was intending to
1177 * change the security classes used by this
1178 * service, he/she loses. */
1179 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1182 rxi_FreeService(tservice);
1185 /* Different service, same port: re-use the socket
1186 * which is bound to the same port */
1187 socket = service->socket;
1190 if (socket == OSI_NULLSOCKET) {
1191 /* If we don't already have a socket (from another
1192 * service on same port) get a new one */
1193 socket = rxi_GetUDPSocket(port);
1194 if (socket == OSI_NULLSOCKET) {
1197 rxi_FreeService(tservice);
1202 service->socket = socket;
1203 service->servicePort = port;
1204 service->serviceId = serviceId;
1205 service->serviceName = serviceName;
1206 service->nSecurityObjects = nSecurityObjects;
1207 service->securityObjects = securityObjects;
1208 service->minProcs = 0;
1209 service->maxProcs = 1;
1210 service->idleDeadTime = 60;
1211 service->connDeadTime = rx_connDeadTime;
1212 service->executeRequestProc = serviceProc;
1213 rx_services[i] = service; /* not visible until now */
1221 rxi_FreeService(tservice);
1222 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1226 /* Generic request processing loop. This routine should be called
1227 * by the implementation dependent rx_ServerProc. If socketp is
1228 * non-null, it will be set to the file descriptor that this thread
1229 * is now listening on. If socketp is null, this routine will never
1231 void rxi_ServerProc(threadID, newcall, socketp)
1233 struct rx_call *newcall;
1234 osi_socket *socketp;
1236 register struct rx_call *call;
1237 register afs_int32 code;
1238 register struct rx_service *tservice = NULL;
1245 call = rx_GetCall(threadID, tservice, socketp);
1246 if (socketp && *socketp != OSI_NULLSOCKET) {
1247 /* We are now a listener thread */
1252 /* if server is restarting( typically smooth shutdown) then do not
1253 * allow any new calls.
1256 if ( rx_tranquil && (call != NULL) ) {
1261 MUTEX_ENTER(&call->lock);
1263 rxi_CallError(call, RX_RESTARTING);
1264 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1266 MUTEX_EXIT(&call->lock);
1272 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1273 #ifdef RX_ENABLE_LOCKS
1275 #endif /* RX_ENABLE_LOCKS */
1276 afs_termState = AFSOP_STOP_AFS;
1277 afs_osi_Wakeup(&afs_termState);
1278 #ifdef RX_ENABLE_LOCKS
1280 #endif /* RX_ENABLE_LOCKS */
1285 tservice = call->conn->service;
1287 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1289 code = call->conn->service->executeRequestProc(call);
1291 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1293 rx_EndCall(call, code);
1294 MUTEX_ENTER(&rx_stats_mutex);
1296 MUTEX_EXIT(&rx_stats_mutex);
1301 void rx_WakeupServerProcs()
1303 struct rx_serverQueueEntry *np, *tqp;
1308 MUTEX_ENTER(&rx_serverPool_lock);
1310 #ifdef RX_ENABLE_LOCKS
1311 if (rx_waitForPacket)
1312 CV_BROADCAST(&rx_waitForPacket->cv);
1313 #else /* RX_ENABLE_LOCKS */
1314 if (rx_waitForPacket)
1315 osi_rxWakeup(rx_waitForPacket);
1316 #endif /* RX_ENABLE_LOCKS */
1317 MUTEX_ENTER(&freeSQEList_lock);
1318 for (np = rx_FreeSQEList; np; np = tqp) {
1319 tqp = *(struct rx_serverQueueEntry **)np;
1320 #ifdef RX_ENABLE_LOCKS
1321 CV_BROADCAST(&np->cv);
1322 #else /* RX_ENABLE_LOCKS */
1324 #endif /* RX_ENABLE_LOCKS */
1326 MUTEX_EXIT(&freeSQEList_lock);
1327 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1328 #ifdef RX_ENABLE_LOCKS
1329 CV_BROADCAST(&np->cv);
1330 #else /* RX_ENABLE_LOCKS */
1332 #endif /* RX_ENABLE_LOCKS */
1334 MUTEX_EXIT(&rx_serverPool_lock);
1340 * One thing that seems to happen is that all the server threads get
1341 * tied up on some empty or slow call, and then a whole bunch of calls
1342 * arrive at once, using up the packet pool, so now there are more
1343 * empty calls. The most critical resources here are server threads
1344 * and the free packet pool. The "doreclaim" code seems to help in
1345 * general. I think that eventually we arrive in this state: there
1346 * are lots of pending calls which do have all their packets present,
1347 * so they won't be reclaimed, are multi-packet calls, so they won't
1348 * be scheduled until later, and thus are tying up most of the free
1349 * packet pool for a very long time.
1351 * 1. schedule multi-packet calls if all the packets are present.
1352 * Probably CPU-bound operation, useful to return packets to pool.
1353 * Do what if there is a full window, but the last packet isn't here?
1354 * 3. preserve one thread which *only* runs "best" calls, otherwise
1355 * it sleeps and waits for that type of call.
1356 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1357 * the current dataquota business is badly broken. The quota isn't adjusted
1358 * to reflect how many packets are presently queued for a running call.
1359 * So, when we schedule a queued call with a full window of packets queued
1360 * up for it, that *should* free up a window full of packets for other 2d-class
1361 * calls to be able to use from the packet pool. But it doesn't.
1363 * NB. Most of the time, this code doesn't run -- since idle server threads
1364 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1365 * as a new call arrives.
1367 /* Sleep until a call arrives. Returns a pointer to the call, ready
1368 * for an rx_Read. */
1369 #ifdef RX_ENABLE_LOCKS
1371 rx_GetCall(tno, cur_service, socketp)
1373 struct rx_service *cur_service;
1374 osi_socket *socketp;
1376 struct rx_serverQueueEntry *sq;
1377 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1378 struct rx_service *service = NULL;
1381 MUTEX_ENTER(&freeSQEList_lock);
1383 if ((sq = rx_FreeSQEList)) {
1384 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1385 MUTEX_EXIT(&freeSQEList_lock);
1386 } else { /* otherwise allocate a new one and return that */
1387 MUTEX_EXIT(&freeSQEList_lock);
1388 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1389 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1390 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1393 MUTEX_ENTER(&rx_serverPool_lock);
1394 if (cur_service != NULL) {
1395 ReturnToServerPool(cur_service);
1398 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1399 register struct rx_call *tcall, *ncall;
1400 choice2 = (struct rx_call *) 0;
1401 /* Scan for eligible incoming calls. A call is not eligible
1402 * if the maximum number of calls for its service type are
1403 * already executing */
1404 /* One thread will process calls FCFS (to prevent starvation),
1405 * while the other threads may run ahead looking for calls which
1406 * have all their input data available immediately. This helps
1407 * keep threads from blocking, waiting for data from the client. */
1408 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1409 service = tcall->conn->service;
1410 if (!QuotaOK(service)) {
1413 if (!tno || !tcall->queue_item_header.next ) {
1414 /* If we're thread 0, then we'll just use
1415 * this call. If we haven't been able to find an optimal
1416 * choice, and we're at the end of the list, then use a
1417 * 2d choice if one has been identified. Otherwise... */
1418 call = (choice2 ? choice2 : tcall);
1419 service = call->conn->service;
1420 } else if (!queue_IsEmpty(&tcall->rq)) {
1421 struct rx_packet *rp;
1422 rp = queue_First(&tcall->rq, rx_packet);
1423 if (rp->header.seq == 1) {
1424 if (!meltdown_1pkt ||
1425 (rp->header.flags & RX_LAST_PACKET)) {
1427 } else if (rxi_2dchoice && !choice2 &&
1428 !(tcall->flags & RX_CALL_CLEARED) &&
1429 (tcall->rprev > rxi_HardAckRate)) {
1431 } else rxi_md2cnt++;
1437 ReturnToServerPool(service);
1444 rxi_ServerThreadSelectingCall = 1;
1445 MUTEX_EXIT(&rx_serverPool_lock);
1446 MUTEX_ENTER(&call->lock);
1447 MUTEX_ENTER(&rx_serverPool_lock);
1449 if (queue_IsEmpty(&call->rq) ||
1450 queue_First(&call->rq, rx_packet)->header.seq != 1)
1451 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1453 CLEAR_CALL_QUEUE_LOCK(call);
1455 MUTEX_EXIT(&call->lock);
1456 ReturnToServerPool(service);
1457 rxi_ServerThreadSelectingCall = 0;
1458 CV_SIGNAL(&rx_serverPool_cv);
1459 call = (struct rx_call*)0;
1462 call->flags &= (~RX_CALL_WAIT_PROC);
1463 MUTEX_ENTER(&rx_stats_mutex);
1465 MUTEX_EXIT(&rx_stats_mutex);
1466 rxi_ServerThreadSelectingCall = 0;
1467 CV_SIGNAL(&rx_serverPool_cv);
1468 MUTEX_EXIT(&rx_serverPool_lock);
1472 /* If there are no eligible incoming calls, add this process
1473 * to the idle server queue, to wait for one */
1477 *socketp = OSI_NULLSOCKET;
1479 sq->socketp = socketp;
1480 queue_Append(&rx_idleServerQueue, sq);
1481 #ifndef AFS_AIX41_ENV
1482 rx_waitForPacket = sq;
1483 #endif /* AFS_AIX41_ENV */
1485 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1487 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1488 MUTEX_EXIT(&rx_serverPool_lock);
1489 return (struct rx_call *)0;
1492 } while (!(call = sq->newcall) &&
1493 !(socketp && *socketp != OSI_NULLSOCKET));
1494 MUTEX_EXIT(&rx_serverPool_lock);
1496 MUTEX_ENTER(&call->lock);
1502 MUTEX_ENTER(&freeSQEList_lock);
1503 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1504 rx_FreeSQEList = sq;
1505 MUTEX_EXIT(&freeSQEList_lock);
1508 clock_GetTime(&call->startTime);
1509 call->state = RX_STATE_ACTIVE;
1510 call->mode = RX_MODE_RECEIVING;
1512 rxi_calltrace(RX_CALL_START, call);
1513 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1514 call->conn->service->servicePort,
1515 call->conn->service->serviceId, call));
1517 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1518 MUTEX_EXIT(&call->lock);
1520 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1525 #else /* RX_ENABLE_LOCKS */
1527 rx_GetCall(tno, cur_service, socketp)
1529 struct rx_service *cur_service;
1530 osi_socket *socketp;
1532 struct rx_serverQueueEntry *sq;
1533 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1534 struct rx_service *service = NULL;
1539 MUTEX_ENTER(&freeSQEList_lock);
1541 if ((sq = rx_FreeSQEList)) {
1542 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1543 MUTEX_EXIT(&freeSQEList_lock);
1544 } else { /* otherwise allocate a new one and return that */
1545 MUTEX_EXIT(&freeSQEList_lock);
1546 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1547 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1548 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1550 MUTEX_ENTER(&sq->lock);
1552 if (cur_service != NULL) {
1553 cur_service->nRequestsRunning--;
1554 if (cur_service->nRequestsRunning < cur_service->minProcs)
1558 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1559 register struct rx_call *tcall, *ncall;
1560 /* Scan for eligible incoming calls. A call is not eligible
1561 * if the maximum number of calls for its service type are
1562 * already executing */
1563 /* One thread will process calls FCFS (to prevent starvation),
1564 * while the other threads may run ahead looking for calls which
1565 * have all their input data available immediately. This helps
1566 * keep threads from blocking, waiting for data from the client. */
1567 choice2 = (struct rx_call *) 0;
1568 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1569 service = tcall->conn->service;
1570 if (QuotaOK(service)) {
1571 if (!tno || !tcall->queue_item_header.next ) {
1572 /* If we're thread 0, then we'll just use
1573 * this call. If we haven't been able to find an optimal
1574 * choice, and we're at the end of the list, then use a
1575 * 2d choice if one has been identified. Otherwise... */
1576 call = (choice2 ? choice2 : tcall);
1577 service = call->conn->service;
1578 } else if (!queue_IsEmpty(&tcall->rq)) {
1579 struct rx_packet *rp;
1580 rp = queue_First(&tcall->rq, rx_packet);
1581 if (rp->header.seq == 1
1582 && (!meltdown_1pkt ||
1583 (rp->header.flags & RX_LAST_PACKET))) {
1585 } else if (rxi_2dchoice && !choice2 &&
1586 !(tcall->flags & RX_CALL_CLEARED) &&
1587 (tcall->rprev > rxi_HardAckRate)) {
1589 } else rxi_md2cnt++;
1599 /* we can't schedule a call if there's no data!!! */
1600 /* send an ack if there's no data, if we're missing the
1601 * first packet, or we're missing something between first
1602 * and last -- there's a "hole" in the incoming data. */
1603 if (queue_IsEmpty(&call->rq) ||
1604 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1605 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1606 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1608 call->flags &= (~RX_CALL_WAIT_PROC);
1609 service->nRequestsRunning++;
1610 /* just started call in minProcs pool, need fewer to maintain
1612 if (service->nRequestsRunning <= service->minProcs)
1616 /* MUTEX_EXIT(&call->lock); */
1619 /* If there are no eligible incoming calls, add this process
1620 * to the idle server queue, to wait for one */
1623 *socketp = OSI_NULLSOCKET;
1625 sq->socketp = socketp;
1626 queue_Append(&rx_idleServerQueue, sq);
1630 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1633 return (struct rx_call *)0;
1636 } while (!(call = sq->newcall) &&
1637 !(socketp && *socketp != OSI_NULLSOCKET));
1639 MUTEX_EXIT(&sq->lock);
1641 MUTEX_ENTER(&freeSQEList_lock);
1642 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1643 rx_FreeSQEList = sq;
1644 MUTEX_EXIT(&freeSQEList_lock);
1647 clock_GetTime(&call->startTime);
1648 call->state = RX_STATE_ACTIVE;
1649 call->mode = RX_MODE_RECEIVING;
1651 rxi_calltrace(RX_CALL_START, call);
1652 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1653 call->conn->service->servicePort,
1654 call->conn->service->serviceId, call));
1656 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1664 #endif /* RX_ENABLE_LOCKS */
1668 /* Establish a procedure to be called when a packet arrives for a
1669 * call. This routine will be called at most once after each call,
1670 * and will also be called if there is an error condition on the or
1671 * the call is complete. Used by multi rx to build a selection
1672 * function which determines which of several calls is likely to be a
1673 * good one to read from.
1674 * NOTE: the way this is currently implemented it is probably only a
1675 * good idea to (1) use it immediately after a newcall (clients only)
1676 * and (2) only use it once. Other uses currently void your warranty
1678 void rx_SetArrivalProc(call, proc, handle, arg)
1679 register struct rx_call *call;
1680 register VOID (*proc)();
1681 register VOID *handle;
1684 call->arrivalProc = proc;
1685 call->arrivalProcHandle = handle;
1686 call->arrivalProcArg = arg;
1689 /* Call is finished (possibly prematurely). Return rc to the peer, if
1690 * appropriate, and return the final error code from the conversation
1693 afs_int32 rx_EndCall(call, rc)
1694 register struct rx_call *call;
1697 register struct rx_connection *conn = call->conn;
1698 register struct rx_service *service;
1699 register struct rx_packet *tp; /* Temporary packet pointer */
1700 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1704 dpf(("rx_EndCall(call %x)\n", call));
1708 MUTEX_ENTER(&call->lock);
1710 if (rc == 0 && call->error == 0) {
1711 call->abortCode = 0;
1712 call->abortCount = 0;
1715 call->arrivalProc = (VOID (*)()) 0;
1716 if (rc && call->error == 0) {
1717 rxi_CallError(call, rc);
1718 /* Send an abort message to the peer if this error code has
1719 * only just been set. If it was set previously, assume the
1720 * peer has already been sent the error code or will request it
1722 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1724 if (conn->type == RX_SERVER_CONNECTION) {
1725 /* Make sure reply or at least dummy reply is sent */
1726 if (call->mode == RX_MODE_RECEIVING) {
1727 rxi_WriteProc(call, 0, 0);
1729 if (call->mode == RX_MODE_SENDING) {
1730 rxi_FlushWrite(call);
1732 service = conn->service;
1733 rxi_calltrace(RX_CALL_END, call);
1734 /* Call goes to hold state until reply packets are acknowledged */
1735 if (call->tfirst + call->nSoftAcked < call->tnext) {
1736 call->state = RX_STATE_HOLD;
1738 call->state = RX_STATE_DALLY;
1739 rxi_ClearTransmitQueue(call, 0);
1740 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1741 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1744 else { /* Client connection */
1746 /* Make sure server receives input packets, in the case where
1747 * no reply arguments are expected */
1748 if ((call->mode == RX_MODE_SENDING)
1749 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1750 (void) rxi_ReadProc(call, &dummy, 1);
1752 /* We need to release the call lock since it's lower than the
1753 * conn_call_lock and we don't want to hold the conn_call_lock
1754 * over the rx_ReadProc call. The conn_call_lock needs to be held
1755 * here for the case where rx_NewCall is perusing the calls on
1756 * the connection structure. We don't want to signal until
1757 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1758 * have checked this call, found it active and by the time it
1759 * goes to sleep, will have missed the signal.
1761 MUTEX_EXIT(&call->lock);
1762 MUTEX_ENTER(&conn->conn_call_lock);
1763 MUTEX_ENTER(&call->lock);
1764 MUTEX_ENTER(&conn->conn_data_lock);
1765 conn->flags |= RX_CONN_BUSY;
1766 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1767 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1768 MUTEX_EXIT(&conn->conn_data_lock);
1769 #ifdef RX_ENABLE_LOCKS
1770 CV_BROADCAST(&conn->conn_call_cv);
1775 #ifdef RX_ENABLE_LOCKS
1777 MUTEX_EXIT(&conn->conn_data_lock);
1779 #endif /* RX_ENABLE_LOCKS */
1780 call->state = RX_STATE_DALLY;
1782 error = call->error;
1784 /* currentPacket, nLeft, and NFree must be zeroed here, because
1785 * ResetCall cannot: ResetCall may be called at splnet(), in the
1786 * kernel version, and may interrupt the macros rx_Read or
1787 * rx_Write, which run at normal priority for efficiency. */
1788 if (call->currentPacket) {
1789 rxi_FreePacket(call->currentPacket);
1790 call->currentPacket = (struct rx_packet *) 0;
1791 call->nLeft = call->nFree = call->curlen = 0;
1794 call->nLeft = call->nFree = call->curlen = 0;
1796 /* Free any packets from the last call to ReadvProc/WritevProc */
1797 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1802 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1803 MUTEX_EXIT(&call->lock);
1804 if (conn->type == RX_CLIENT_CONNECTION) {
1805 MUTEX_EXIT(&conn->conn_call_lock);
1806 conn->flags &= ~RX_CONN_BUSY;
1811 * Map errors to the local host's errno.h format.
1813 error = ntoh_syserr_conv(error);
1817 #if !defined(KERNEL)
1819 /* Call this routine when shutting down a server or client (especially
1820 * clients). This will allow Rx to gracefully garbage collect server
1821 * connections, and reduce the number of retries that a server might
1822 * make to a dead client.
1823 * This is not quite right, since some calls may still be ongoing and
1824 * we can't lock them to destroy them. */
1825 void rx_Finalize() {
1826 register struct rx_connection **conn_ptr, **conn_end;
1830 if (rxinit_status == 1) {
1832 return; /* Already shutdown. */
1834 rxi_DeleteCachedConnections();
1835 if (rx_connHashTable) {
1836 MUTEX_ENTER(&rx_connHashTable_lock);
1837 for (conn_ptr = &rx_connHashTable[0],
1838 conn_end = &rx_connHashTable[rx_hashTableSize];
1839 conn_ptr < conn_end; conn_ptr++) {
1840 struct rx_connection *conn, *next;
1841 for (conn = *conn_ptr; conn; conn = next) {
1843 if (conn->type == RX_CLIENT_CONNECTION) {
1844 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1846 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1847 #ifdef RX_ENABLE_LOCKS
1848 rxi_DestroyConnectionNoLock(conn);
1849 #else /* RX_ENABLE_LOCKS */
1850 rxi_DestroyConnection(conn);
1851 #endif /* RX_ENABLE_LOCKS */
1855 #ifdef RX_ENABLE_LOCKS
1856 while (rx_connCleanup_list) {
1857 struct rx_connection *conn;
1858 conn = rx_connCleanup_list;
1859 rx_connCleanup_list = rx_connCleanup_list->next;
1860 MUTEX_EXIT(&rx_connHashTable_lock);
1861 rxi_CleanupConnection(conn);
1862 MUTEX_ENTER(&rx_connHashTable_lock);
1864 MUTEX_EXIT(&rx_connHashTable_lock);
1865 #endif /* RX_ENABLE_LOCKS */
1874 /* if we wakeup packet waiter too often, can get in loop with two
1875 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1877 rxi_PacketsUnWait() {
1879 if (!rx_waitingForPackets) {
1883 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1884 return; /* still over quota */
1887 rx_waitingForPackets = 0;
1888 #ifdef RX_ENABLE_LOCKS
1889 CV_BROADCAST(&rx_waitingForPackets_cv);
1891 osi_rxWakeup(&rx_waitingForPackets);
1897 /* ------------------Internal interfaces------------------------- */
1899 /* Return this process's service structure for the
1900 * specified socket and service */
1901 struct rx_service *rxi_FindService(socket, serviceId)
1902 register osi_socket socket;
1903 register u_short serviceId;
1905 register struct rx_service **sp;
1906 for (sp = &rx_services[0]; *sp; sp++) {
1907 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1913 /* Allocate a call structure, for the indicated channel of the
1914 * supplied connection. The mode and state of the call must be set by
1916 struct rx_call *rxi_NewCall(conn, channel)
1917 register struct rx_connection *conn;
1918 register int channel;
1920 register struct rx_call *call;
1921 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1922 register struct rx_call *cp; /* Call pointer temp */
1923 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1924 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1926 /* Grab an existing call structure, or allocate a new one.
1927 * Existing call structures are assumed to have been left reset by
1929 MUTEX_ENTER(&rx_freeCallQueue_lock);
1931 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1933 * EXCEPT that the TQ might not yet be cleared out.
1934 * Skip over those with in-use TQs.
1937 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1938 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1944 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1945 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1946 call = queue_First(&rx_freeCallQueue, rx_call);
1947 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1949 MUTEX_ENTER(&rx_stats_mutex);
1950 rx_stats.nFreeCallStructs--;
1951 MUTEX_EXIT(&rx_stats_mutex);
1952 MUTEX_EXIT(&rx_freeCallQueue_lock);
1953 MUTEX_ENTER(&call->lock);
1954 CLEAR_CALL_QUEUE_LOCK(call);
1955 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1956 /* Now, if TQ wasn't cleared earlier, do it now. */
1957 if (call->flags & RX_CALL_TQ_CLEARME) {
1958 rxi_ClearTransmitQueue(call, 0);
1959 queue_Init(&call->tq);
1961 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1962 /* Bind the call to its connection structure */
1964 rxi_ResetCall(call, 1);
1967 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1969 MUTEX_EXIT(&rx_freeCallQueue_lock);
1970 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1971 MUTEX_ENTER(&call->lock);
1972 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1973 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1974 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1976 MUTEX_ENTER(&rx_stats_mutex);
1977 rx_stats.nCallStructs++;
1978 MUTEX_EXIT(&rx_stats_mutex);
1979 /* Initialize once-only items */
1980 queue_Init(&call->tq);
1981 queue_Init(&call->rq);
1982 queue_Init(&call->iovq);
1983 /* Bind the call to its connection structure (prereq for reset) */
1985 rxi_ResetCall(call, 1);
1987 call->channel = channel;
1988 call->callNumber = &conn->callNumber[channel];
1989 /* Note that the next expected call number is retained (in
1990 * conn->callNumber[i]), even if we reallocate the call structure
1992 conn->call[channel] = call;
1993 /* if the channel's never been used (== 0), we should start at 1, otherwise
1994 the call number is valid from the last time this channel was used */
1995 if (*call->callNumber == 0) *call->callNumber = 1;
1997 MUTEX_EXIT(&call->lock);
2001 /* A call has been inactive long enough that so we can throw away
2002 * state, including the call structure, which is placed on the call
2004 * Call is locked upon entry.
2006 #ifdef RX_ENABLE_LOCKS
2007 void rxi_FreeCall(call, haveCTLock)
2008 int haveCTLock; /* Set if called from rxi_ReapConnections */
2009 #else /* RX_ENABLE_LOCKS */
2010 void rxi_FreeCall(call)
2011 #endif /* RX_ENABLE_LOCKS */
2012 register struct rx_call *call;
2014 register int channel = call->channel;
2015 register struct rx_connection *conn = call->conn;
2018 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2019 (*call->callNumber)++;
2020 rxi_ResetCall(call, 0);
2021 call->conn->call[channel] = (struct rx_call *) 0;
2023 MUTEX_ENTER(&rx_freeCallQueue_lock);
2024 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2025 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2026 /* A call may be free even though its transmit queue is still in use.
2027 * Since we search the call list from head to tail, put busy calls at
2028 * the head of the list, and idle calls at the tail.
2030 if (call->flags & RX_CALL_TQ_BUSY)
2031 queue_Prepend(&rx_freeCallQueue, call);
2033 queue_Append(&rx_freeCallQueue, call);
2034 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2035 queue_Append(&rx_freeCallQueue, call);
2036 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2037 MUTEX_ENTER(&rx_stats_mutex);
2038 rx_stats.nFreeCallStructs++;
2039 MUTEX_EXIT(&rx_stats_mutex);
2041 MUTEX_EXIT(&rx_freeCallQueue_lock);
2043 /* Destroy the connection if it was previously slated for
2044 * destruction, i.e. the Rx client code previously called
2045 * rx_DestroyConnection (client connections), or
2046 * rxi_ReapConnections called the same routine (server
2047 * connections). Only do this, however, if there are no
2048 * outstanding calls. Note that for fine grain locking, there appears
2049 * to be a deadlock in that rxi_FreeCall has a call locked and
2050 * DestroyConnectionNoLock locks each call in the conn. But note a
2051 * few lines up where we have removed this call from the conn.
2052 * If someone else destroys a connection, they either have no
2053 * call lock held or are going through this section of code.
2055 if (conn->flags & RX_CONN_DESTROY_ME) {
2056 MUTEX_ENTER(&conn->conn_data_lock);
2058 MUTEX_EXIT(&conn->conn_data_lock);
2059 #ifdef RX_ENABLE_LOCKS
2061 rxi_DestroyConnectionNoLock(conn);
2063 rxi_DestroyConnection(conn);
2064 #else /* RX_ENABLE_LOCKS */
2065 rxi_DestroyConnection(conn);
2066 #endif /* RX_ENABLE_LOCKS */
2070 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2071 char *rxi_Alloc(size)
2072 register size_t size;
2076 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2077 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2080 int glockOwner = ISAFS_GLOCK();
2084 MUTEX_ENTER(&rx_stats_mutex);
2085 rxi_Alloccnt++; rxi_Allocsize += size;
2086 MUTEX_EXIT(&rx_stats_mutex);
2087 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2088 if (size > AFS_SMALLOCSIZ) {
2089 p = (char *) osi_AllocMediumSpace(size);
2091 p = (char *) osi_AllocSmall(size, 1);
2092 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2097 p = (char *) osi_Alloc(size);
2099 if (!p) osi_Panic("rxi_Alloc error");
2104 void rxi_Free(addr, size)
2106 register size_t size;
2108 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2109 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2112 int glockOwner = ISAFS_GLOCK();
2116 MUTEX_ENTER(&rx_stats_mutex);
2117 rxi_Alloccnt--; rxi_Allocsize -= size;
2118 MUTEX_EXIT(&rx_stats_mutex);
2119 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2120 if (size > AFS_SMALLOCSIZ)
2121 osi_FreeMediumSpace(addr);
2123 osi_FreeSmall(addr);
2124 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2129 osi_Free(addr, size);
2133 /* Find the peer process represented by the supplied (host,port)
2134 * combination. If there is no appropriate active peer structure, a
2135 * new one will be allocated and initialized
2136 * The origPeer, if set, is a pointer to a peer structure on which the
2137 * refcount will be be decremented. This is used to replace the peer
2138 * structure hanging off a connection structure */
2139 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2140 register afs_uint32 host;
2141 register u_short port;
2142 struct rx_peer *origPeer;
2145 register struct rx_peer *pp;
2147 hashIndex = PEER_HASH(host, port);
2148 MUTEX_ENTER(&rx_peerHashTable_lock);
2149 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2150 if ((pp->host == host) && (pp->port == port)) break;
2154 pp = rxi_AllocPeer(); /* This bzero's *pp */
2155 pp->host = host; /* set here or in InitPeerParams is zero */
2157 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2158 queue_Init(&pp->congestionQueue);
2159 queue_Init(&pp->rpcStats);
2160 pp->next = rx_peerHashTable[hashIndex];
2161 rx_peerHashTable[hashIndex] = pp;
2162 rxi_InitPeerParams(pp);
2163 MUTEX_ENTER(&rx_stats_mutex);
2164 rx_stats.nPeerStructs++;
2165 MUTEX_EXIT(&rx_stats_mutex);
2172 origPeer->refCount--;
2173 MUTEX_EXIT(&rx_peerHashTable_lock);
2178 /* Find the connection at (host, port) started at epoch, and with the
2179 * given connection id. Creates the server connection if necessary.
2180 * The type specifies whether a client connection or a server
2181 * connection is desired. In both cases, (host, port) specify the
2182 * peer's (host, pair) pair. Client connections are not made
2183 * automatically by this routine. The parameter socket gives the
2184 * socket descriptor on which the packet was received. This is used,
2185 * in the case of server connections, to check that *new* connections
2186 * come via a valid (port, serviceId). Finally, the securityIndex
2187 * parameter must match the existing index for the connection. If a
2188 * server connection is created, it will be created using the supplied
2189 * index, if the index is valid for this service */
2190 struct rx_connection *
2191 rxi_FindConnection(socket, host, port, serviceId, cid,
2192 epoch, type, securityIndex)
2194 register afs_int32 host;
2195 register u_short port;
2200 u_int securityIndex;
2202 int hashindex, flag;
2203 register struct rx_connection *conn;
2204 struct rx_peer *peer;
2205 hashindex = CONN_HASH(host, port, cid, epoch, type);
2206 MUTEX_ENTER(&rx_connHashTable_lock);
2207 rxLastConn ? (conn = rxLastConn, flag = 0) :
2208 (conn = rx_connHashTable[hashindex], flag = 1);
2210 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2211 && (epoch == conn->epoch)) {
2212 register struct rx_peer *pp = conn->peer;
2213 if (securityIndex != conn->securityIndex) {
2214 /* this isn't supposed to happen, but someone could forge a packet
2215 like this, and there seems to be some CM bug that makes this
2216 happen from time to time -- in which case, the fileserver
2218 MUTEX_EXIT(&rx_connHashTable_lock);
2219 return (struct rx_connection *) 0;
2221 /* epoch's high order bits mean route for security reasons only on
2222 * the cid, not the host and port fields.
2224 if (conn->epoch & 0x80000000) break;
2225 if (((type == RX_CLIENT_CONNECTION)
2226 || (pp->host == host)) && (pp->port == port))
2231 /* the connection rxLastConn that was used the last time is not the
2232 ** one we are looking for now. Hence, start searching in the hash */
2234 conn = rx_connHashTable[hashindex];
2240 struct rx_service *service;
2241 if (type == RX_CLIENT_CONNECTION) {
2242 MUTEX_EXIT(&rx_connHashTable_lock);
2243 return (struct rx_connection *) 0;
2245 service = rxi_FindService(socket, serviceId);
2246 if (!service || (securityIndex >= service->nSecurityObjects)
2247 || (service->securityObjects[securityIndex] == 0)) {
2248 MUTEX_EXIT(&rx_connHashTable_lock);
2249 return (struct rx_connection *) 0;
2251 conn = rxi_AllocConnection(); /* This bzero's the connection */
2252 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2254 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2256 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2257 conn->next = rx_connHashTable[hashindex];
2258 rx_connHashTable[hashindex] = conn;
2259 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2260 conn->type = RX_SERVER_CONNECTION;
2261 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2262 conn->epoch = epoch;
2263 conn->cid = cid & RX_CIDMASK;
2264 /* conn->serial = conn->lastSerial = 0; */
2265 /* conn->timeout = 0; */
2266 conn->ackRate = RX_FAST_ACK_RATE;
2267 conn->service = service;
2268 conn->serviceId = serviceId;
2269 conn->securityIndex = securityIndex;
2270 conn->securityObject = service->securityObjects[securityIndex];
2271 conn->nSpecific = 0;
2272 conn->specific = NULL;
2273 rx_SetConnDeadTime(conn, service->connDeadTime);
2274 /* Notify security object of the new connection */
2275 RXS_NewConnection(conn->securityObject, conn);
2276 /* XXXX Connection timeout? */
2277 if (service->newConnProc) (*service->newConnProc)(conn);
2278 MUTEX_ENTER(&rx_stats_mutex);
2279 rx_stats.nServerConns++;
2280 MUTEX_EXIT(&rx_stats_mutex);
2284 /* Ensure that the peer structure is set up in such a way that
2285 ** replies in this connection go back to that remote interface
2286 ** from which the last packet was sent out. In case, this packet's
2287 ** source IP address does not match the peer struct for this conn,
2288 ** then drop the refCount on conn->peer and get a new peer structure.
2289 ** We can check the host,port field in the peer structure without the
2290 ** rx_peerHashTable_lock because the peer structure has its refCount
2291 ** incremented and the only time the host,port in the peer struct gets
2292 ** updated is when the peer structure is created.
2294 if (conn->peer->host == host )
2295 peer = conn->peer; /* no change to the peer structure */
2297 peer = rxi_FindPeer(host, port, conn->peer, 1);
2300 MUTEX_ENTER(&conn->conn_data_lock);
2303 MUTEX_EXIT(&conn->conn_data_lock);
2305 rxLastConn = conn; /* store this connection as the last conn used */
2306 MUTEX_EXIT(&rx_connHashTable_lock);
2310 /* There are two packet tracing routines available for testing and monitoring
2311 * Rx. One is called just after every packet is received and the other is
2312 * called just before every packet is sent. Received packets, have had their
2313 * headers decoded, and packets to be sent have not yet had their headers
2314 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2315 * containing the network address. Both can be modified. The return value, if
2316 * non-zero, indicates that the packet should be dropped. */
2318 int (*rx_justReceived)() = 0;
2319 int (*rx_almostSent)() = 0;
2321 /* A packet has been received off the interface. Np is the packet, socket is
2322 * the socket number it was received from (useful in determining which service
2323 * this packet corresponds to), and (host, port) reflect the host,port of the
2324 * sender. This call returns the packet to the caller if it is finished with
2325 * it, rather than de-allocating it, just as a small performance hack */
2327 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2328 register struct rx_packet *np;
2333 struct rx_call **newcallp;
2335 register struct rx_call *call;
2336 register struct rx_connection *conn;
2338 afs_uint32 currentCallNumber;
2344 struct rx_packet *tnp;
2347 /* We don't print out the packet until now because (1) the time may not be
2348 * accurate enough until now in the lwp implementation (rx_Listener only gets
2349 * the time after the packet is read) and (2) from a protocol point of view,
2350 * this is the first time the packet has been seen */
2351 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2352 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2353 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2354 np->header.serial, packetType, host, port, np->header.serviceId,
2355 np->header.epoch, np->header.cid, np->header.callNumber,
2356 np->header.seq, np->header.flags, np));
2359 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2360 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2363 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2364 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2367 /* If an input tracer function is defined, call it with the packet and
2368 * network address. Note this function may modify its arguments. */
2369 if (rx_justReceived) {
2370 struct sockaddr_in addr;
2372 addr.sin_family = AF_INET;
2373 addr.sin_port = port;
2374 addr.sin_addr.s_addr = host;
2375 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2376 addr.sin_len = sizeof(addr);
2377 #endif /* AFS_OSF_ENV */
2378 drop = (*rx_justReceived) (np, &addr);
2379 /* drop packet if return value is non-zero */
2380 if (drop) return np;
2381 port = addr.sin_port; /* in case fcn changed addr */
2382 host = addr.sin_addr.s_addr;
2386 /* If packet was not sent by the client, then *we* must be the client */
2387 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2388 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2390 /* Find the connection (or fabricate one, if we're the server & if
2391 * necessary) associated with this packet */
2392 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2393 np->header.cid, np->header.epoch, type,
2394 np->header.securityIndex);
2397 /* If no connection found or fabricated, just ignore the packet.
2398 * (An argument could be made for sending an abort packet for
2403 MUTEX_ENTER(&conn->conn_data_lock);
2404 if (conn->maxSerial < np->header.serial)
2405 conn->maxSerial = np->header.serial;
2406 MUTEX_EXIT(&conn->conn_data_lock);
2408 /* If the connection is in an error state, send an abort packet and ignore
2409 * the incoming packet */
2411 /* Don't respond to an abort packet--we don't want loops! */
2412 MUTEX_ENTER(&conn->conn_data_lock);
2413 if (np->header.type != RX_PACKET_TYPE_ABORT)
2414 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2416 MUTEX_EXIT(&conn->conn_data_lock);
2420 /* Check for connection-only requests (i.e. not call specific). */
2421 if (np->header.callNumber == 0) {
2422 switch (np->header.type) {
2423 case RX_PACKET_TYPE_ABORT:
2424 /* What if the supplied error is zero? */
2425 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2426 MUTEX_ENTER(&conn->conn_data_lock);
2428 MUTEX_EXIT(&conn->conn_data_lock);
2430 case RX_PACKET_TYPE_CHALLENGE:
2431 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2432 MUTEX_ENTER(&conn->conn_data_lock);
2434 MUTEX_EXIT(&conn->conn_data_lock);
2436 case RX_PACKET_TYPE_RESPONSE:
2437 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2438 MUTEX_ENTER(&conn->conn_data_lock);
2440 MUTEX_EXIT(&conn->conn_data_lock);
2442 case RX_PACKET_TYPE_PARAMS:
2443 case RX_PACKET_TYPE_PARAMS+1:
2444 case RX_PACKET_TYPE_PARAMS+2:
2445 /* ignore these packet types for now */
2446 MUTEX_ENTER(&conn->conn_data_lock);
2448 MUTEX_EXIT(&conn->conn_data_lock);
2453 /* Should not reach here, unless the peer is broken: send an
2455 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2456 MUTEX_ENTER(&conn->conn_data_lock);
2457 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2459 MUTEX_EXIT(&conn->conn_data_lock);
2464 channel = np->header.cid & RX_CHANNELMASK;
2465 call = conn->call[channel];
2466 #ifdef RX_ENABLE_LOCKS
2468 MUTEX_ENTER(&call->lock);
2469 /* Test to see if call struct is still attached to conn. */
2470 if (call != conn->call[channel]) {
2472 MUTEX_EXIT(&call->lock);
2473 if (type == RX_SERVER_CONNECTION) {
2474 call = conn->call[channel];
2475 /* If we started with no call attached and there is one now,
2476 * another thread is also running this routine and has gotten
2477 * the connection channel. We should drop this packet in the tests
2478 * below. If there was a call on this connection and it's now
2479 * gone, then we'll be making a new call below.
2480 * If there was previously a call and it's now different then
2481 * the old call was freed and another thread running this routine
2482 * has created a call on this channel. One of these two threads
2483 * has a packet for the old call and the code below handles those
2487 MUTEX_ENTER(&call->lock);
2490 /* This packet can't be for this call. If the new call address is
2491 * 0 then no call is running on this channel. If there is a call
2492 * then, since this is a client connection we're getting data for
2493 * it must be for the previous call.
2495 MUTEX_ENTER(&rx_stats_mutex);
2496 rx_stats.spuriousPacketsRead++;
2497 MUTEX_EXIT(&rx_stats_mutex);
2498 MUTEX_ENTER(&conn->conn_data_lock);
2500 MUTEX_EXIT(&conn->conn_data_lock);
2505 currentCallNumber = conn->callNumber[channel];
2507 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2508 if (np->header.callNumber < currentCallNumber) {
2509 MUTEX_ENTER(&rx_stats_mutex);
2510 rx_stats.spuriousPacketsRead++;
2511 MUTEX_EXIT(&rx_stats_mutex);
2512 #ifdef RX_ENABLE_LOCKS
2514 MUTEX_EXIT(&call->lock);
2516 MUTEX_ENTER(&conn->conn_data_lock);
2518 MUTEX_EXIT(&conn->conn_data_lock);
2522 call = rxi_NewCall(conn, channel);
2523 MUTEX_ENTER(&call->lock);
2524 *call->callNumber = np->header.callNumber;
2525 call->state = RX_STATE_PRECALL;
2526 clock_GetTime(&call->queueTime);
2527 hzero(call->bytesSent);
2528 hzero(call->bytesRcvd);
2529 rxi_KeepAliveOn(call);
2531 else if (np->header.callNumber != currentCallNumber) {
2532 /* Wait until the transmit queue is idle before deciding
2533 * whether to reset the current call. Chances are that the
2534 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2537 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2538 while ((call->state == RX_STATE_ACTIVE) &&
2539 (call->flags & RX_CALL_TQ_BUSY)) {
2540 call->flags |= RX_CALL_TQ_WAIT;
2541 #ifdef RX_ENABLE_LOCKS
2542 CV_WAIT(&call->cv_tq, &call->lock);
2543 #else /* RX_ENABLE_LOCKS */
2544 osi_rxSleep(&call->tq);
2545 #endif /* RX_ENABLE_LOCKS */
2547 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2548 /* If the new call cannot be taken right now send a busy and set
2549 * the error condition in this call, so that it terminates as
2550 * quickly as possible */
2551 if (call->state == RX_STATE_ACTIVE) {
2552 struct rx_packet *tp;
2554 rxi_CallError(call, RX_CALL_DEAD);
2555 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2556 MUTEX_EXIT(&call->lock);
2557 MUTEX_ENTER(&conn->conn_data_lock);
2559 MUTEX_EXIT(&conn->conn_data_lock);
2562 rxi_ResetCall(call, 0);
2563 *call->callNumber = np->header.callNumber;
2564 call->state = RX_STATE_PRECALL;
2565 clock_GetTime(&call->queueTime);
2566 hzero(call->bytesSent);
2567 hzero(call->bytesRcvd);
2569 * If the number of queued calls exceeds the overload
2570 * threshold then abort this call.
2572 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2573 struct rx_packet *tp;
2575 rxi_CallError(call, rx_BusyError);
2576 tp = rxi_SendCallAbort(call, np, 1, 0);
2577 MUTEX_EXIT(&call->lock);
2578 MUTEX_ENTER(&conn->conn_data_lock);
2580 MUTEX_EXIT(&conn->conn_data_lock);
2583 rxi_KeepAliveOn(call);
2586 /* Continuing call; do nothing here. */
2588 } else { /* we're the client */
2589 /* Ignore all incoming acknowledgements for calls in DALLY state */
2590 if ( call && (call->state == RX_STATE_DALLY)
2591 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2592 MUTEX_ENTER(&rx_stats_mutex);
2593 rx_stats.ignorePacketDally++;
2594 MUTEX_EXIT(&rx_stats_mutex);
2595 #ifdef RX_ENABLE_LOCKS
2597 MUTEX_EXIT(&call->lock);
2600 MUTEX_ENTER(&conn->conn_data_lock);
2602 MUTEX_EXIT(&conn->conn_data_lock);
2606 /* Ignore anything that's not relevant to the current call. If there
2607 * isn't a current call, then no packet is relevant. */
2608 if (!call || (np->header.callNumber != currentCallNumber)) {
2609 MUTEX_ENTER(&rx_stats_mutex);
2610 rx_stats.spuriousPacketsRead++;
2611 MUTEX_EXIT(&rx_stats_mutex);
2612 #ifdef RX_ENABLE_LOCKS
2614 MUTEX_EXIT(&call->lock);
2617 MUTEX_ENTER(&conn->conn_data_lock);
2619 MUTEX_EXIT(&conn->conn_data_lock);
2622 /* If the service security object index stamped in the packet does not
2623 * match the connection's security index, ignore the packet */
2624 if (np->header.securityIndex != conn->securityIndex) {
2625 #ifdef RX_ENABLE_LOCKS
2626 MUTEX_EXIT(&call->lock);
2628 MUTEX_ENTER(&conn->conn_data_lock);
2630 MUTEX_EXIT(&conn->conn_data_lock);
2634 /* If we're receiving the response, then all transmit packets are
2635 * implicitly acknowledged. Get rid of them. */
2636 if (np->header.type == RX_PACKET_TYPE_DATA) {
2637 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2638 /* XXX Hack. Because we must release the global rx lock when
2639 * sending packets (osi_NetSend) we drop all acks while we're
2640 * traversing the tq in rxi_Start sending packets out because
2641 * packets may move to the freePacketQueue as result of being here!
2642 * So we drop these packets until we're safely out of the
2643 * traversing. Really ugly!
2644 * For fine grain RX locking, we set the acked field in the
2645 * packets and let rxi_Start remove them from the transmit queue.
2647 if (call->flags & RX_CALL_TQ_BUSY) {
2648 #ifdef RX_ENABLE_LOCKS
2649 rxi_SetAcksInTransmitQueue(call);
2652 return np; /* xmitting; drop packet */
2656 rxi_ClearTransmitQueue(call, 0);
2658 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2659 rxi_ClearTransmitQueue(call, 0);
2660 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2662 if (np->header.type == RX_PACKET_TYPE_ACK) {
2663 /* now check to see if this is an ack packet acknowledging that the
2664 * server actually *lost* some hard-acked data. If this happens we
2665 * ignore this packet, as it may indicate that the server restarted in
2666 * the middle of a call. It is also possible that this is an old ack
2667 * packet. We don't abort the connection in this case, because this
2668 * *might* just be an old ack packet. The right way to detect a server
2669 * restart in the midst of a call is to notice that the server epoch
2671 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2672 * XXX unacknowledged. I think that this is off-by-one, but
2673 * XXX I don't dare change it just yet, since it will
2674 * XXX interact badly with the server-restart detection
2675 * XXX code in receiveackpacket. */
2676 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2677 MUTEX_ENTER(&rx_stats_mutex);
2678 rx_stats.spuriousPacketsRead++;
2679 MUTEX_EXIT(&rx_stats_mutex);
2680 MUTEX_EXIT(&call->lock);
2681 MUTEX_ENTER(&conn->conn_data_lock);
2683 MUTEX_EXIT(&conn->conn_data_lock);
2687 } /* else not a data packet */
2690 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2691 /* Set remote user defined status from packet */
2692 call->remoteStatus = np->header.userStatus;
2694 /* Note the gap between the expected next packet and the actual
2695 * packet that arrived, when the new packet has a smaller serial number
2696 * than expected. Rioses frequently reorder packets all by themselves,
2697 * so this will be quite important with very large window sizes.
2698 * Skew is checked against 0 here to avoid any dependence on the type of
2699 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2701 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2702 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2703 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2705 MUTEX_ENTER(&conn->conn_data_lock);
2706 skew = conn->lastSerial - np->header.serial;
2707 conn->lastSerial = np->header.serial;
2708 MUTEX_EXIT(&conn->conn_data_lock);
2710 register struct rx_peer *peer;
2712 if (skew > peer->inPacketSkew) {
2713 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2714 peer->inPacketSkew = skew;
2718 /* Now do packet type-specific processing */
2719 switch (np->header.type) {
2720 case RX_PACKET_TYPE_DATA:
2721 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2724 case RX_PACKET_TYPE_ACK:
2725 /* Respond immediately to ack packets requesting acknowledgement
2727 if (np->header.flags & RX_REQUEST_ACK) {
2728 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2729 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2731 np = rxi_ReceiveAckPacket(call, np, 1);
2733 case RX_PACKET_TYPE_ABORT:
2734 /* An abort packet: reset the connection, passing the error up to
2736 /* What if error is zero? */
2737 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2739 case RX_PACKET_TYPE_BUSY:
2742 case RX_PACKET_TYPE_ACKALL:
2743 /* All packets acknowledged, so we can drop all packets previously
2744 * readied for sending */
2745 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2746 /* XXX Hack. We because we can't release the global rx lock when
2747 * sending packets (osi_NetSend) we drop all ack pkts while we're
2748 * traversing the tq in rxi_Start sending packets out because
2749 * packets may move to the freePacketQueue as result of being
2750 * here! So we drop these packets until we're safely out of the
2751 * traversing. Really ugly!
2752 * For fine grain RX locking, we set the acked field in the packets
2753 * and let rxi_Start remove the packets from the transmit queue.
2755 if (call->flags & RX_CALL_TQ_BUSY) {
2756 #ifdef RX_ENABLE_LOCKS
2757 rxi_SetAcksInTransmitQueue(call);
2759 #else /* RX_ENABLE_LOCKS */
2761 return np; /* xmitting; drop packet */
2762 #endif /* RX_ENABLE_LOCKS */
2764 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2765 rxi_ClearTransmitQueue(call, 0);
2768 /* Should not reach here, unless the peer is broken: send an abort
2770 rxi_CallError(call, RX_PROTOCOL_ERROR);
2771 np = rxi_SendCallAbort(call, np, 1, 0);
2774 /* Note when this last legitimate packet was received, for keep-alive
2775 * processing. Note, we delay getting the time until now in the hope that
2776 * the packet will be delivered to the user before any get time is required
2777 * (if not, then the time won't actually be re-evaluated here). */
2778 call->lastReceiveTime = clock_Sec();
2779 MUTEX_EXIT(&call->lock);
2780 MUTEX_ENTER(&conn->conn_data_lock);
2782 MUTEX_EXIT(&conn->conn_data_lock);
2786 /* return true if this is an "interesting" connection from the point of view
2787 of someone trying to debug the system */
2788 int rxi_IsConnInteresting(struct rx_connection *aconn)
2791 register struct rx_call *tcall;
2793 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2795 for(i=0;i<RX_MAXCALLS;i++) {
2796 tcall = aconn->call[i];
2798 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2800 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2808 /* if this is one of the last few packets AND it wouldn't be used by the
2809 receiving call to immediately satisfy a read request, then drop it on
2810 the floor, since accepting it might prevent a lock-holding thread from
2811 making progress in its reading. If a call has been cleared while in
2812 the precall state then ignore all subsequent packets until the call
2813 is assigned to a thread. */
2815 static TooLow(ap, acall)
2816 struct rx_call *acall;
2817 struct rx_packet *ap; {
2819 MUTEX_ENTER(&rx_stats_mutex);
2820 if (((ap->header.seq != 1) &&
2821 (acall->flags & RX_CALL_CLEARED) &&
2822 (acall->state == RX_STATE_PRECALL)) ||
2823 ((rx_nFreePackets < rxi_dataQuota+2) &&
2824 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2825 && (acall->flags & RX_CALL_READER_WAIT)))) {
2828 MUTEX_EXIT(&rx_stats_mutex);
2833 /* try to attach call, if authentication is complete */
2834 static void TryAttach(acall, socket, tnop, newcallp)
2835 register struct rx_call *acall;
2836 register osi_socket socket;
2838 register struct rx_call **newcallp; {
2839 register struct rx_connection *conn;
2841 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2842 /* Don't attach until we have any req'd. authentication. */
2843 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2844 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2845 /* Note: this does not necessarily succeed; there
2846 may not any proc available */
2849 rxi_ChallengeOn(acall->conn);
2854 /* A data packet has been received off the interface. This packet is
2855 * appropriate to the call (the call is in the right state, etc.). This
2856 * routine can return a packet to the caller, for re-use */
2858 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2859 port, tnop, newcallp)
2860 register struct rx_call *call;
2861 register struct rx_packet *np;
2867 struct rx_call **newcallp;
2873 afs_uint32 seq, serial, flags;
2875 struct rx_packet *tnp;
2877 MUTEX_ENTER(&rx_stats_mutex);
2878 rx_stats.dataPacketsRead++;
2879 MUTEX_EXIT(&rx_stats_mutex);
2882 /* If there are no packet buffers, drop this new packet, unless we can find
2883 * packet buffers from inactive calls */
2885 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2886 MUTEX_ENTER(&rx_freePktQ_lock);
2887 rxi_NeedMorePackets = TRUE;
2888 MUTEX_EXIT(&rx_freePktQ_lock);
2889 MUTEX_ENTER(&rx_stats_mutex);
2890 rx_stats.noPacketBuffersOnRead++;
2891 MUTEX_EXIT(&rx_stats_mutex);
2892 call->rprev = np->header.serial;
2893 rxi_calltrace(RX_TRACE_DROP, call);
2894 dpf (("packet %x dropped on receipt - quota problems", np));
2896 rxi_ClearReceiveQueue(call);
2897 clock_GetTime(&when);
2898 clock_Add(&when, &rx_softAckDelay);
2899 if (!call->delayedAckEvent ||
2900 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2901 rxevent_Cancel(call->delayedAckEvent, call,
2902 RX_CALL_REFCOUNT_DELAY);
2903 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2904 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2907 /* we've damaged this call already, might as well do it in. */
2913 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2914 * packet is one of several packets transmitted as a single
2915 * datagram. Do not send any soft or hard acks until all packets
2916 * in a jumbogram have been processed. Send negative acks right away.
2918 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2919 /* tnp is non-null when there are more packets in the
2920 * current jumbo gram */
2927 seq = np->header.seq;
2928 serial = np->header.serial;
2929 flags = np->header.flags;
2931 /* If the call is in an error state, send an abort message */
2933 return rxi_SendCallAbort(call, np, istack, 0);
2935 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2936 * AFS 3.5 jumbogram. */
2937 if (flags & RX_JUMBO_PACKET) {
2938 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2943 if (np->header.spare != 0) {
2944 MUTEX_ENTER(&call->conn->conn_data_lock);
2945 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2946 MUTEX_EXIT(&call->conn->conn_data_lock);
2949 /* The usual case is that this is the expected next packet */
2950 if (seq == call->rnext) {
2952 /* Check to make sure it is not a duplicate of one already queued */
2953 if (queue_IsNotEmpty(&call->rq)
2954 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2955 MUTEX_ENTER(&rx_stats_mutex);
2956 rx_stats.dupPacketsRead++;
2957 MUTEX_EXIT(&rx_stats_mutex);
2958 dpf (("packet %x dropped on receipt - duplicate", np));
2959 rxevent_Cancel(call->delayedAckEvent, call,
2960 RX_CALL_REFCOUNT_DELAY);
2961 np = rxi_SendAck(call, np, seq, serial,
2962 flags, RX_ACK_DUPLICATE, istack);
2968 /* It's the next packet. Stick it on the receive queue
2969 * for this call. Set newPackets to make sure we wake
2970 * the reader once all packets have been processed */
2971 queue_Prepend(&call->rq, np);
2973 np = NULL; /* We can't use this anymore */
2976 /* If an ack is requested then set a flag to make sure we
2977 * send an acknowledgement for this packet */
2978 if (flags & RX_REQUEST_ACK) {
2982 /* Keep track of whether we have received the last packet */
2983 if (flags & RX_LAST_PACKET) {
2984 call->flags |= RX_CALL_HAVE_LAST;
2988 /* Check whether we have all of the packets for this call */
2989 if (call->flags & RX_CALL_HAVE_LAST) {
2990 afs_uint32 tseq; /* temporary sequence number */
2991 struct rx_packet *tp; /* Temporary packet pointer */
2992 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2994 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2995 if (tseq != tp->header.seq)
2997 if (tp->header.flags & RX_LAST_PACKET) {
2998 call->flags |= RX_CALL_RECEIVE_DONE;
3005 /* Provide asynchronous notification for those who want it
3006 * (e.g. multi rx) */
3007 if (call->arrivalProc) {
3008 (*call->arrivalProc)(call, call->arrivalProcHandle,
3009 call->arrivalProcArg);
3010 call->arrivalProc = (VOID (*)()) 0;
3013 /* Update last packet received */
3016 /* If there is no server process serving this call, grab
3017 * one, if available. We only need to do this once. If a
3018 * server thread is available, this thread becomes a server
3019 * thread and the server thread becomes a listener thread. */
3021 TryAttach(call, socket, tnop, newcallp);
3024 /* This is not the expected next packet. */
3026 /* Determine whether this is a new or old packet, and if it's
3027 * a new one, whether it fits into the current receive window.
3028 * Also figure out whether the packet was delivered in sequence.
3029 * We use the prev variable to determine whether the new packet
3030 * is the successor of its immediate predecessor in the
3031 * receive queue, and the missing flag to determine whether
3032 * any of this packets predecessors are missing. */
3034 afs_uint32 prev; /* "Previous packet" sequence number */
3035 struct rx_packet *tp; /* Temporary packet pointer */
3036 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3037 int missing; /* Are any predecessors missing? */
3039 /* If the new packet's sequence number has been sent to the
3040 * application already, then this is a duplicate */
3041 if (seq < call->rnext) {
3042 MUTEX_ENTER(&rx_stats_mutex);
3043 rx_stats.dupPacketsRead++;
3044 MUTEX_EXIT(&rx_stats_mutex);
3045 rxevent_Cancel(call->delayedAckEvent, call,
3046 RX_CALL_REFCOUNT_DELAY);
3047 np = rxi_SendAck(call, np, seq, serial,
3048 flags, RX_ACK_DUPLICATE, istack);
3054 /* If the sequence number is greater than what can be
3055 * accomodated by the current window, then send a negative
3056 * acknowledge and drop the packet */
3057 if ((call->rnext + call->rwind) <= seq) {
3058 rxevent_Cancel(call->delayedAckEvent, call,
3059 RX_CALL_REFCOUNT_DELAY);
3060 np = rxi_SendAck(call, np, seq, serial,
3061 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3067 /* Look for the packet in the queue of old received packets */
3068 for (prev = call->rnext - 1, missing = 0,
3069 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3070 /*Check for duplicate packet */
3071 if (seq == tp->header.seq) {
3072 MUTEX_ENTER(&rx_stats_mutex);
3073 rx_stats.dupPacketsRead++;
3074 MUTEX_EXIT(&rx_stats_mutex);
3075 rxevent_Cancel(call->delayedAckEvent, call,
3076 RX_CALL_REFCOUNT_DELAY);
3077 np = rxi_SendAck(call, np, seq, serial,
3078 flags, RX_ACK_DUPLICATE, istack);
3083 /* If we find a higher sequence packet, break out and
3084 * insert the new packet here. */
3085 if (seq < tp->header.seq) break;
3086 /* Check for missing packet */
3087 if (tp->header.seq != prev+1) {
3091 prev = tp->header.seq;
3094 /* Keep track of whether we have received the last packet. */
3095 if (flags & RX_LAST_PACKET) {
3096 call->flags |= RX_CALL_HAVE_LAST;
3099 /* It's within the window: add it to the the receive queue.
3100 * tp is left by the previous loop either pointing at the
3101 * packet before which to insert the new packet, or at the
3102 * queue head if the queue is empty or the packet should be
3104 queue_InsertBefore(tp, np);
3108 /* Check whether we have all of the packets for this call */
3109 if ((call->flags & RX_CALL_HAVE_LAST)
3110 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3111 afs_uint32 tseq; /* temporary sequence number */
3113 for (tseq = call->rnext,
3114 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3115 if (tseq != tp->header.seq)
3117 if (tp->header.flags & RX_LAST_PACKET) {
3118 call->flags |= RX_CALL_RECEIVE_DONE;
3125 /* We need to send an ack of the packet is out of sequence,
3126 * or if an ack was requested by the peer. */
3127 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3131 /* Acknowledge the last packet for each call */
3132 if (flags & RX_LAST_PACKET) {
3143 * If the receiver is waiting for an iovec, fill the iovec
3144 * using the data from the receive queue */
3145 if (call->flags & RX_CALL_IOVEC_WAIT) {
3146 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3147 /* the call may have been aborted */
3156 /* Wakeup the reader if any */
3157 if ((call->flags & RX_CALL_READER_WAIT) &&
3158 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3159 (call->iovNext >= call->iovMax) ||
3160 (call->flags & RX_CALL_RECEIVE_DONE))) {
3161 call->flags &= ~RX_CALL_READER_WAIT;
3162 #ifdef RX_ENABLE_LOCKS
3163 CV_BROADCAST(&call->cv_rq);
3165 osi_rxWakeup(&call->rq);
3171 * Send an ack when requested by the peer, or once every
3172 * rxi_SoftAckRate packets until the last packet has been
3173 * received. Always send a soft ack for the last packet in
3174 * the server's reply. */
3176 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3177 np = rxi_SendAck(call, np, seq, serial, flags,
3178 RX_ACK_REQUESTED, istack);
3179 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3180 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3181 np = rxi_SendAck(call, np, seq, serial, flags,
3182 RX_ACK_IDLE, istack);
3183 } else if (call->nSoftAcks) {
3184 clock_GetTime(&when);
3185 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3186 clock_Add(&when, &rx_lastAckDelay);
3188 clock_Add(&when, &rx_softAckDelay);
3190 if (!call->delayedAckEvent ||
3191 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3192 rxevent_Cancel(call->delayedAckEvent, call,
3193 RX_CALL_REFCOUNT_DELAY);
3194 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3195 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3198 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3199 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3206 static void rxi_ComputeRate();
3209 /* The real smarts of the whole thing. */
3210 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3211 register struct rx_call *call;
3212 struct rx_packet *np;
3215 struct rx_ackPacket *ap;
3217 register struct rx_packet *tp;
3218 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3219 register struct rx_connection *conn = call->conn;
3220 struct rx_peer *peer = conn->peer;
3223 /* because there are CM's that are bogus, sending weird values for this. */
3224 afs_uint32 skew = 0;
3229 int newAckCount = 0;
3230 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3231 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3233 MUTEX_ENTER(&rx_stats_mutex);
3234 rx_stats.ackPacketsRead++;
3235 MUTEX_EXIT(&rx_stats_mutex);
3236 ap = (struct rx_ackPacket *) rx_DataOf(np);
3237 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3239 return np; /* truncated ack packet */
3241 /* depends on ack packet struct */
3242 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3243 first = ntohl(ap->firstPacket);
3244 serial = ntohl(ap->serial);
3245 /* temporarily disabled -- needs to degrade over time
3246 skew = ntohs(ap->maxSkew); */
3248 /* Ignore ack packets received out of order */
3249 if (first < call->tfirst) {
3253 if (np->header.flags & RX_SLOW_START_OK) {
3254 call->flags |= RX_CALL_SLOW_START_OK;
3260 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3261 ap->reason, ntohl(ap->previousPacket),
3262 (unsigned int) np->header.seq, (unsigned int) serial,
3263 (unsigned int) skew, ntohl(ap->firstPacket));
3266 for (offset = 0; offset < nAcks; offset++)
3267 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3273 /* if a server connection has been re-created, it doesn't remember what
3274 serial # it was up to. An ack will tell us, since the serial field
3275 contains the largest serial received by the other side */
3276 MUTEX_ENTER(&conn->conn_data_lock);
3277 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3278 conn->serial = serial+1;
3280 MUTEX_EXIT(&conn->conn_data_lock);
3282 /* Update the outgoing packet skew value to the latest value of
3283 * the peer's incoming packet skew value. The ack packet, of
3284 * course, could arrive out of order, but that won't affect things
3286 MUTEX_ENTER(&peer->peer_lock);
3287 peer->outPacketSkew = skew;
3289 /* Check for packets that no longer need to be transmitted, and
3290 * discard them. This only applies to packets positively
3291 * acknowledged as having been sent to the peer's upper level.
3292 * All other packets must be retained. So only packets with
3293 * sequence numbers < ap->firstPacket are candidates. */
3294 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3295 if (tp->header.seq >= first) break;
3296 call->tfirst = tp->header.seq + 1;
3297 if (tp->header.serial == serial) {
3298 /* Use RTT if not delayed by client. */
3299 if (ap->reason != RX_ACK_DELAY)
3300 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3302 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3305 else if (tp->firstSerial == serial) {
3306 /* Use RTT if not delayed by client. */
3307 if (ap->reason != RX_ACK_DELAY)
3308 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3310 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3313 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3314 /* XXX Hack. Because we have to release the global rx lock when sending
3315 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3316 * in rxi_Start sending packets out because packets may move to the
3317 * freePacketQueue as result of being here! So we drop these packets until
3318 * we're safely out of the traversing. Really ugly!
3319 * To make it even uglier, if we're using fine grain locking, we can
3320 * set the ack bits in the packets and have rxi_Start remove the packets
3321 * when it's done transmitting.
3326 if (call->flags & RX_CALL_TQ_BUSY) {
3327 #ifdef RX_ENABLE_LOCKS
3329 call->flags |= RX_CALL_TQ_SOME_ACKED;
3330 #else /* RX_ENABLE_LOCKS */
3332 #endif /* RX_ENABLE_LOCKS */
3334 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3337 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3342 /* Give rate detector a chance to respond to ping requests */
3343 if (ap->reason == RX_ACK_PING_RESPONSE) {
3344 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3348 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3350 /* Now go through explicit acks/nacks and record the results in
3351 * the waiting packets. These are packets that can't be released
3352 * yet, even with a positive acknowledge. This positive
3353 * acknowledge only means the packet has been received by the
3354 * peer, not that it will be retained long enough to be sent to
3355 * the peer's upper level. In addition, reset the transmit timers
3356 * of any missing packets (those packets that must be missing
3357 * because this packet was out of sequence) */
3359 call->nSoftAcked = 0;
3360 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3361 /* Update round trip time if the ack was stimulated on receipt
3363 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3364 #ifdef RX_ENABLE_LOCKS
3365 if (tp->header.seq >= first) {
3366 #endif /* RX_ENABLE_LOCKS */
3367 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3368 if (tp->header.serial == serial) {
3369 /* Use RTT if not delayed by client. */
3370 if (ap->reason != RX_ACK_DELAY)
3371 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3373 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3376 else if ((tp->firstSerial == serial)) {
3377 /* Use RTT if not delayed by client. */
3378 if (ap->reason != RX_ACK_DELAY)
3379 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3381 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3384 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3385 #ifdef RX_ENABLE_LOCKS
3387 #endif /* RX_ENABLE_LOCKS */
3388 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3390 /* Set the acknowledge flag per packet based on the
3391 * information in the ack packet. An acknowlegded packet can
3392 * be downgraded when the server has discarded a packet it
3393 * soacked previously, or when an ack packet is received
3394 * out of sequence. */
3395 if (tp->header.seq < first) {
3396 /* Implicit ack information */
3402 else if (tp->header.seq < first + nAcks) {
3403 /* Explicit ack information: set it in the packet appropriately */
3404 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3424 /* If packet isn't yet acked, and it has been transmitted at least
3425 * once, reset retransmit time using latest timeout
3426 * ie, this should readjust the retransmit timer for all outstanding
3427 * packets... So we don't just retransmit when we should know better*/
3429 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3430 tp->retryTime = tp->timeSent;
3431 clock_Add(&tp->retryTime, &peer->timeout);
3432 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3433 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3437 /* If the window has been extended by this acknowledge packet,
3438 * then wakeup a sender waiting in alloc for window space, or try
3439 * sending packets now, if he's been sitting on packets due to
3440 * lack of window space */
3441 if (call->tnext < (call->tfirst + call->twind)) {
3442 #ifdef RX_ENABLE_LOCKS
3443 CV_SIGNAL(&call->cv_twind);
3445 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3446 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3447 osi_rxWakeup(&call->twind);
3450 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3451 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3455 /* if the ack packet has a receivelen field hanging off it,
3456 * update our state */
3457 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3460 /* If the ack packet has a "recommended" size that is less than
3461 * what I am using now, reduce my size to match */
3462 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3463 sizeof(afs_int32), &tSize);
3464 tSize = (afs_uint32) ntohl(tSize);
3465 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3467 /* Get the maximum packet size to send to this peer */
3468 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3470 tSize = (afs_uint32)ntohl(tSize);
3471 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3472 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3474 /* sanity check - peer might have restarted with different params.
3475 * If peer says "send less", dammit, send less... Peer should never
3476 * be unable to accept packets of the size that prior AFS versions would
3477 * send without asking. */
3478 if (peer->maxMTU != tSize) {
3479 peer->maxMTU = tSize;
3480 peer->MTU = MIN(tSize, peer->MTU);
3481 call->MTU = MIN(call->MTU, tSize);
3485 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3487 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3488 sizeof(afs_int32), &tSize);
3489 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3490 if (tSize < call->twind) { /* smaller than our send */
3491 call->twind = tSize; /* window, we must send less... */
3492 call->ssthresh = MIN(call->twind, call->ssthresh);
3495 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3496 * network MTU confused with the loopback MTU. Calculate the
3497 * maximum MTU here for use in the slow start code below.
3499 maxMTU = peer->maxMTU;
3500 /* Did peer restart with older RX version? */
3501 if (peer->maxDgramPackets > 1) {
3502 peer->maxDgramPackets = 1;
3504 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3506 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3507 sizeof(afs_int32), &tSize);
3508 tSize = (afs_uint32) ntohl(tSize);
3510 * As of AFS 3.5 we set the send window to match the receive window.
3512 if (tSize < call->twind) {
3513 call->twind = tSize;
3514 call->ssthresh = MIN(call->twind, call->ssthresh);
3515 } else if (tSize > call->twind) {
3516 call->twind = tSize;
3520 * As of AFS 3.5, a jumbogram is more than one fixed size
3521 * packet transmitted in a single UDP datagram. If the remote
3522 * MTU is smaller than our local MTU then never send a datagram
3523 * larger than the natural MTU.
3525 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3526 sizeof(afs_int32), &tSize);
3527 maxDgramPackets = (afs_uint32) ntohl(tSize);
3528 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3529 maxDgramPackets = MIN(maxDgramPackets,
3530 (int)(peer->ifDgramPackets));
3531 maxDgramPackets = MIN(maxDgramPackets, tSize);
3532 if (maxDgramPackets > 1) {
3533 peer->maxDgramPackets = maxDgramPackets;
3534 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3536 peer->maxDgramPackets = 1;
3537 call->MTU = peer->natMTU;
3539 } else if (peer->maxDgramPackets > 1) {
3540 /* Restarted with lower version of RX */
3541 peer->maxDgramPackets = 1;
3543 } else if (peer->maxDgramPackets > 1 ||
3544 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3545 /* Restarted with lower version of RX */
3546 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3547 peer->natMTU = OLD_MAX_PACKET_SIZE;
3548 peer->MTU = OLD_MAX_PACKET_SIZE;
3549 peer->maxDgramPackets = 1;
3550 peer->nDgramPackets = 1;
3552 call->MTU = OLD_MAX_PACKET_SIZE;
3557 * Calculate how many datagrams were successfully received after
3558 * the first missing packet and adjust the negative ack counter
3563 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3564 if (call->nNacks < nNacked) {
3565 call->nNacks = nNacked;
3574 if (call->flags & RX_CALL_FAST_RECOVER) {
3576 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3578 call->flags &= ~RX_CALL_FAST_RECOVER;
3579 call->cwind = call->nextCwind;
3580 call->nextCwind = 0;
3583 call->nCwindAcks = 0;
3585 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3586 /* Three negative acks in a row trigger congestion recovery */
3587 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3588 MUTEX_EXIT(&peer->peer_lock);
3589 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3590 /* someone else is waiting to start recovery */
3593 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3594 while (call->flags & RX_CALL_TQ_BUSY) {
3595 call->flags |= RX_CALL_TQ_WAIT;
3596 #ifdef RX_ENABLE_LOCKS
3597 CV_WAIT(&call->cv_tq, &call->lock);
3598 #else /* RX_ENABLE_LOCKS */
3599 osi_rxSleep(&call->tq);
3600 #endif /* RX_ENABLE_LOCKS */
3602 MUTEX_ENTER(&peer->peer_lock);
3603 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3604 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3605 call->flags |= RX_CALL_FAST_RECOVER;
3606 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3607 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3609 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3610 call->nextCwind = call->ssthresh;
3613 peer->MTU = call->MTU;
3614 peer->cwind = call->nextCwind;
3615 peer->nDgramPackets = call->nDgramPackets;
3617 call->congestSeq = peer->congestSeq;
3618 /* Reset the resend times on the packets that were nacked
3619 * so we will retransmit as soon as the window permits*/
3620 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3623 clock_Zero(&tp->retryTime);
3625 } else if (tp->acked) {
3630 /* If cwind is smaller than ssthresh, then increase
3631 * the window one packet for each ack we receive (exponential
3633 * If cwind is greater than or equal to ssthresh then increase
3634 * the congestion window by one packet for each cwind acks we
3635 * receive (linear growth). */
3636 if (call->cwind < call->ssthresh) {
3637 call->cwind = MIN((int)call->ssthresh,
3638 (int)(call->cwind + newAckCount));
3639 call->nCwindAcks = 0;
3641 call->nCwindAcks += newAckCount;
3642 if (call->nCwindAcks >= call->cwind) {
3643 call->nCwindAcks = 0;
3644 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3648 * If we have received several acknowledgements in a row then
3649 * it is time to increase the size of our datagrams
3651 if ((int)call->nAcks > rx_nDgramThreshold) {
3652 if (peer->maxDgramPackets > 1) {
3653 if (call->nDgramPackets < peer->maxDgramPackets) {
3654 call->nDgramPackets++;
3656 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3657 } else if (call->MTU < peer->maxMTU) {
3658 call->MTU += peer->natMTU;
3659 call->MTU = MIN(call->MTU, peer->maxMTU);
3665 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3667 /* Servers need to hold the call until all response packets have
3668 * been acknowledged. Soft acks are good enough since clients
3669 * are not allowed to clear their receive queues. */
3670 if (call->state == RX_STATE_HOLD &&
3671 call->tfirst + call->nSoftAcked >= call->tnext) {
3672 call->state = RX_STATE_DALLY;
3673 rxi_ClearTransmitQueue(call, 0);
3674 } else if (!queue_IsEmpty(&call->tq)) {
3675 rxi_Start(0, call, istack);
3680 /* Received a response to a challenge packet */
3681 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3682 register struct rx_connection *conn;
3683 register struct rx_packet *np;
3688 /* Ignore the packet if we're the client */
3689 if (conn->type == RX_CLIENT_CONNECTION) return np;
3691 /* If already authenticated, ignore the packet (it's probably a retry) */
3692 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3695 /* Otherwise, have the security object evaluate the response packet */
3696 error = RXS_CheckResponse(conn->securityObject, conn, np);
3698 /* If the response is invalid, reset the connection, sending
3699 * an abort to the peer */
3703 rxi_ConnectionError(conn, error);
3704 MUTEX_ENTER(&conn->conn_data_lock);
3705 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3706 MUTEX_EXIT(&conn->conn_data_lock);
3710 /* If the response is valid, any calls waiting to attach
3711 * servers can now do so */
3713 for (i=0; i<RX_MAXCALLS; i++) {
3714 struct rx_call *call = conn->call[i];
3716 MUTEX_ENTER(&call->lock);
3717 if (call->state == RX_STATE_PRECALL)
3718 rxi_AttachServerProc(call, -1, NULL, NULL);
3719 MUTEX_EXIT(&call->lock);
3726 /* A client has received an authentication challenge: the security
3727 * object is asked to cough up a respectable response packet to send
3728 * back to the server. The server is responsible for retrying the
3729 * challenge if it fails to get a response. */
3732 rxi_ReceiveChallengePacket(conn, np, istack)
3733 register struct rx_connection *conn;
3734 register struct rx_packet *np;
3739 /* Ignore the challenge if we're the server */
3740 if (conn->type == RX_SERVER_CONNECTION) return np;
3742 /* Ignore the challenge if the connection is otherwise idle; someone's
3743 * trying to use us as an oracle. */
3744 if (!rxi_HasActiveCalls(conn)) return np;
3746 /* Send the security object the challenge packet. It is expected to fill
3747 * in the response. */
3748 error = RXS_GetResponse(conn->securityObject, conn, np);
3750 /* If the security object is unable to return a valid response, reset the
3751 * connection and send an abort to the peer. Otherwise send the response
3752 * packet to the peer connection. */
3754 rxi_ConnectionError(conn, error);
3755 MUTEX_ENTER(&conn->conn_data_lock);
3756 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3757 MUTEX_EXIT(&conn->conn_data_lock);
3760 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3761 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3767 /* Find an available server process to service the current request in
3768 * the given call structure. If one isn't available, queue up this
3769 * call so it eventually gets one */
3771 rxi_AttachServerProc(call, socket, tnop, newcallp)
3772 register struct rx_call *call;
3773 register osi_socket socket;
3775 register struct rx_call **newcallp;
3777 register struct rx_serverQueueEntry *sq;
3778 register struct rx_service *service = call->conn->service;
3779 #ifdef RX_ENABLE_LOCKS
3780 register int haveQuota = 0;
3781 #endif /* RX_ENABLE_LOCKS */
3782 /* May already be attached */
3783 if (call->state == RX_STATE_ACTIVE) return;
3785 MUTEX_ENTER(&rx_serverPool_lock);
3786 #ifdef RX_ENABLE_LOCKS
3787 while(rxi_ServerThreadSelectingCall) {
3788 MUTEX_EXIT(&call->lock);
3789 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3790 MUTEX_EXIT(&rx_serverPool_lock);
3791 MUTEX_ENTER(&call->lock);
3792 MUTEX_ENTER(&rx_serverPool_lock);
3793 /* Call may have been attached */
3794 if (call->state == RX_STATE_ACTIVE) return;
3797 haveQuota = QuotaOK(service);
3798 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3799 /* If there are no processes available to service this call,
3800 * put the call on the incoming call queue (unless it's
3801 * already on the queue).
3804 ReturnToServerPool(service);
3805 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3806 call->flags |= RX_CALL_WAIT_PROC;
3807 MUTEX_ENTER(&rx_stats_mutex);
3809 MUTEX_EXIT(&rx_stats_mutex);
3810 rxi_calltrace(RX_CALL_ARRIVAL, call);
3811 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3812 queue_Append(&rx_incomingCallQueue, call);
3815 #else /* RX_ENABLE_LOCKS */
3816 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3817 /* If there are no processes available to service this call,
3818 * put the call on the incoming call queue (unless it's
3819 * already on the queue).
3821 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3822 call->flags |= RX_CALL_WAIT_PROC;
3824 rxi_calltrace(RX_CALL_ARRIVAL, call);
3825 queue_Append(&rx_incomingCallQueue, call);
3828 #endif /* RX_ENABLE_LOCKS */
3830 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3832 /* If hot threads are enabled, and both newcallp and sq->socketp
3833 * are non-null, then this thread will process the call, and the
3834 * idle server thread will start listening on this threads socket.
3837 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3840 *sq->socketp = socket;
3841 clock_GetTime(&call->startTime);
3842 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3846 if (call->flags & RX_CALL_WAIT_PROC) {
3847 /* Conservative: I don't think this should happen */
3848 call->flags &= ~RX_CALL_WAIT_PROC;
3849 MUTEX_ENTER(&rx_stats_mutex);
3851 MUTEX_EXIT(&rx_stats_mutex);
3854 call->state = RX_STATE_ACTIVE;
3855 call->mode = RX_MODE_RECEIVING;
3856 if (call->flags & RX_CALL_CLEARED) {
3857 /* send an ack now to start the packet flow up again */
3858 call->flags &= ~RX_CALL_CLEARED;
3859 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3861 #ifdef RX_ENABLE_LOCKS
3864 service->nRequestsRunning++;
3865 if (service->nRequestsRunning <= service->minProcs)
3871 MUTEX_EXIT(&rx_serverPool_lock);
3874 /* Delay the sending of an acknowledge event for a short while, while
3875 * a new call is being prepared (in the case of a client) or a reply
3876 * is being prepared (in the case of a server). Rather than sending
3877 * an ack packet, an ACKALL packet is sent. */
3878 void rxi_AckAll(event, call, dummy)
3879 struct rxevent *event;
3880 register struct rx_call *call;
3883 #ifdef RX_ENABLE_LOCKS
3885 MUTEX_ENTER(&call->lock);
3886 call->delayedAckEvent = (struct rxevent *) 0;
3887 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3889 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3890 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3892 MUTEX_EXIT(&call->lock);
3893 #else /* RX_ENABLE_LOCKS */
3894 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3895 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3896 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3897 #endif /* RX_ENABLE_LOCKS */
3900 void rxi_SendDelayedAck(event, call, dummy)
3901 struct rxevent *event;
3902 register struct rx_call *call;
3905 #ifdef RX_ENABLE_LOCKS
3907 MUTEX_ENTER(&call->lock);
3908 if (event == call->delayedAckEvent)
3909 call->delayedAckEvent = (struct rxevent *) 0;
3910 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3912 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3914 MUTEX_EXIT(&call->lock);
3915 #else /* RX_ENABLE_LOCKS */
3916 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3917 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3918 #endif /* RX_ENABLE_LOCKS */
3922 #ifdef RX_ENABLE_LOCKS
3923 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3924 * clearing them out.
3926 static void rxi_SetAcksInTransmitQueue(call)
3927 register struct rx_call *call;
3929 register struct rx_packet *p, *tp;
3932 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3939 call->flags |= RX_CALL_TQ_CLEARME;
3940 call->flags |= RX_CALL_TQ_SOME_ACKED;
3943 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3944 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3945 call->tfirst = call->tnext;
3946 call->nSoftAcked = 0;
3948 if (call->flags & RX_CALL_FAST_RECOVER) {
3949 call->flags &= ~RX_CALL_FAST_RECOVER;
3950 call->cwind = call->nextCwind;
3951 call->nextCwind = 0;
3954 CV_SIGNAL(&call->cv_twind);
3956 #endif /* RX_ENABLE_LOCKS */
3958 /* Clear out the transmit queue for the current call (all packets have
3959 * been received by peer) */
3960 void rxi_ClearTransmitQueue(call, force)
3961 register struct rx_call *call;
3964 register struct rx_packet *p, *tp;
3966 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3967 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3969 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3976 call->flags |= RX_CALL_TQ_CLEARME;
3977 call->flags |= RX_CALL_TQ_SOME_ACKED;
3980 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3981 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3987 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3988 call->flags &= ~RX_CALL_TQ_CLEARME;
3990 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3992 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3993 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3994 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3995 call->nSoftAcked = 0;
3997 if (call->flags & RX_CALL_FAST_RECOVER) {
3998 call->flags &= ~RX_CALL_FAST_RECOVER;
3999 call->cwind = call->nextCwind;
4002 #ifdef RX_ENABLE_LOCKS
4003 CV_SIGNAL(&call->cv_twind);
4005 osi_rxWakeup(&call->twind);
4009 void rxi_ClearReceiveQueue(call)
4010 register struct rx_call *call;
4012 register struct rx_packet *p, *tp;
4013 if (queue_IsNotEmpty(&call->rq)) {
4014 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4019 rx_packetReclaims++;
4021 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4023 if (call->state == RX_STATE_PRECALL) {
4024 call->flags |= RX_CALL_CLEARED;
4028 /* Send an abort packet for the specified call */
4029 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4030 register struct rx_call *call;
4031 struct rx_packet *packet;
4041 /* Clients should never delay abort messages */
4042 if (rx_IsClientConn(call->conn))
4045 if (call->abortCode != call->error) {
4046 call->abortCode = call->error;
4047 call->abortCount = 0;
4050 if (force || rxi_callAbortThreshhold == 0 ||
4051 call->abortCount < rxi_callAbortThreshhold) {
4052 if (call->delayedAbortEvent) {
4053 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4055 error = htonl(call->error);
4057 packet = rxi_SendSpecial(call, call->conn, packet,
4058 RX_PACKET_TYPE_ABORT, (char *)&error,
4059 sizeof(error), istack);
4060 } else if (!call->delayedAbortEvent) {
4061 clock_GetTime(&when);
4062 clock_Addmsec(&when, rxi_callAbortDelay);
4063 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4064 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4070 /* Send an abort packet for the specified connection. Packet is an
4071 * optional pointer to a packet that can be used to send the abort.
4072 * Once the number of abort messages reaches the threshhold, an
4073 * event is scheduled to send the abort. Setting the force flag
4074 * overrides sending delayed abort messages.
4076 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4077 * to send the abort packet.
4079 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4080 register struct rx_connection *conn;
4081 struct rx_packet *packet;
4091 /* Clients should never delay abort messages */
4092 if (rx_IsClientConn(conn))
4095 if (force || rxi_connAbortThreshhold == 0 ||
4096 conn->abortCount < rxi_connAbortThreshhold) {
4097 if (conn->delayedAbortEvent) {
4098 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4100 error = htonl(conn->error);
4102 MUTEX_EXIT(&conn->conn_data_lock);
4103 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4104 RX_PACKET_TYPE_ABORT, (char *)&error,
4105 sizeof(error), istack);
4106 MUTEX_ENTER(&conn->conn_data_lock);
4107 } else if (!conn->delayedAbortEvent) {
4108 clock_GetTime(&when);
4109 clock_Addmsec(&when, rxi_connAbortDelay);
4110 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4116 /* Associate an error all of the calls owned by a connection. Called
4117 * with error non-zero. This is only for really fatal things, like
4118 * bad authentication responses. The connection itself is set in
4119 * error at this point, so that future packets received will be
4121 void rxi_ConnectionError(conn, error)
4122 register struct rx_connection *conn;
4123 register afs_int32 error;
4127 if (conn->challengeEvent)
4128 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4129 for (i=0; i<RX_MAXCALLS; i++) {
4130 struct rx_call *call = conn->call[i];
4132 MUTEX_ENTER(&call->lock);
4133 rxi_CallError(call, error);
4134 MUTEX_EXIT(&call->lock);
4137 conn->error = error;
4138 MUTEX_ENTER(&rx_stats_mutex);
4139 rx_stats.fatalErrors++;
4140 MUTEX_EXIT(&rx_stats_mutex);
4144 void rxi_CallError(call, error)
4145 register struct rx_call *call;
4148 if (call->error) error = call->error;
4149 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4150 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4151 rxi_ResetCall(call, 0);
4154 rxi_ResetCall(call, 0);
4156 call->error = error;
4157 call->mode = RX_MODE_ERROR;
4160 /* Reset various fields in a call structure, and wakeup waiting
4161 * processes. Some fields aren't changed: state & mode are not
4162 * touched (these must be set by the caller), and bufptr, nLeft, and
4163 * nFree are not reset, since these fields are manipulated by
4164 * unprotected macros, and may only be reset by non-interrupting code.
4167 /* this code requires that call->conn be set properly as a pre-condition. */
4168 #endif /* ADAPT_WINDOW */
4170 void rxi_ResetCall(call, newcall)
4171 register struct rx_call *call;
4172 register int newcall;
4175 register struct rx_peer *peer;
4176 struct rx_packet *packet;
4178 /* Notify anyone who is waiting for asynchronous packet arrival */
4179 if (call->arrivalProc) {
4180 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4181 call->arrivalProc = (VOID (*)()) 0;
4184 if (call->delayedAbortEvent) {
4185 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4186 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4188 rxi_SendCallAbort(call, packet, 0, 1);
4189 rxi_FreePacket(packet);
4194 * Update the peer with the congestion information in this call
4195 * so other calls on this connection can pick up where this call
4196 * left off. If the congestion sequence numbers don't match then
4197 * another call experienced a retransmission.
4199 peer = call->conn->peer;
4200 MUTEX_ENTER(&peer->peer_lock);
4202 if (call->congestSeq == peer->congestSeq) {
4203 peer->cwind = MAX(peer->cwind, call->cwind);
4204 peer->MTU = MAX(peer->MTU, call->MTU);
4205 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4208 call->abortCode = 0;
4209 call->abortCount = 0;
4211 if (peer->maxDgramPackets > 1) {
4212 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4214 call->MTU = peer->MTU;
4216 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4217 call->ssthresh = rx_maxSendWindow;
4218 call->nDgramPackets = peer->nDgramPackets;
4219 call->congestSeq = peer->congestSeq;
4220 MUTEX_EXIT(&peer->peer_lock);
4222 flags = call->flags;
4223 rxi_ClearReceiveQueue(call);
4224 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4225 if (call->flags & RX_CALL_TQ_BUSY) {
4226 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4227 call->flags |= (flags & RX_CALL_TQ_WAIT);
4229 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4231 rxi_ClearTransmitQueue(call, 0);
4232 queue_Init(&call->tq);
4235 queue_Init(&call->rq);
4237 call->rwind = rx_initReceiveWindow;
4238 call->twind = rx_initSendWindow;
4239 call->nSoftAcked = 0;
4240 call->nextCwind = 0;
4243 call->nCwindAcks = 0;
4244 call->nSoftAcks = 0;
4245 call->nHardAcks = 0;
4247 call->tfirst = call->rnext = call->tnext = 1;
4249 call->lastAcked = 0;
4250 call->localStatus = call->remoteStatus = 0;
4252 if (flags & RX_CALL_READER_WAIT) {
4253 #ifdef RX_ENABLE_LOCKS
4254 CV_BROADCAST(&call->cv_rq);
4256 osi_rxWakeup(&call->rq);
4259 if (flags & RX_CALL_WAIT_PACKETS) {
4260 MUTEX_ENTER(&rx_freePktQ_lock);
4261 rxi_PacketsUnWait(); /* XXX */
4262 MUTEX_EXIT(&rx_freePktQ_lock);
4265 #ifdef RX_ENABLE_LOCKS
4266 CV_SIGNAL(&call->cv_twind);
4268 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4269 osi_rxWakeup(&call->twind);
4272 #ifdef RX_ENABLE_LOCKS
4273 /* The following ensures that we don't mess with any queue while some
4274 * other thread might also be doing so. The call_queue_lock field is
4275 * is only modified under the call lock. If the call is in the process
4276 * of being removed from a queue, the call is not locked until the
4277 * the queue lock is dropped and only then is the call_queue_lock field
4278 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4279 * Note that any other routine which removes a call from a queue has to
4280 * obtain the queue lock before examing the queue and removing the call.
4282 if (call->call_queue_lock) {
4283 MUTEX_ENTER(call->call_queue_lock);
4284 if (queue_IsOnQueue(call)) {
4286 if (flags & RX_CALL_WAIT_PROC) {
4287 MUTEX_ENTER(&rx_stats_mutex);
4289 MUTEX_EXIT(&rx_stats_mutex);
4292 MUTEX_EXIT(call->call_queue_lock);
4293 CLEAR_CALL_QUEUE_LOCK(call);
4295 #else /* RX_ENABLE_LOCKS */
4296 if (queue_IsOnQueue(call)) {
4298 if (flags & RX_CALL_WAIT_PROC)
4301 #endif /* RX_ENABLE_LOCKS */
4303 rxi_KeepAliveOff(call);
4304 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4307 /* Send an acknowledge for the indicated packet (seq,serial) of the
4308 * indicated call, for the indicated reason (reason). This
4309 * acknowledge will specifically acknowledge receiving the packet, and
4310 * will also specify which other packets for this call have been
4311 * received. This routine returns the packet that was used to the
4312 * caller. The caller is responsible for freeing it or re-using it.
4313 * This acknowledgement also returns the highest sequence number
4314 * actually read out by the higher level to the sender; the sender
4315 * promises to keep around packets that have not been read by the
4316 * higher level yet (unless, of course, the sender decides to abort
4317 * the call altogether). Any of p, seq, serial, pflags, or reason may
4318 * be set to zero without ill effect. That is, if they are zero, they
4319 * will not convey any information.
4320 * NOW there is a trailer field, after the ack where it will safely be
4321 * ignored by mundanes, which indicates the maximum size packet this
4322 * host can swallow. */
4323 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4324 register struct rx_call *call;
4325 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4326 int seq; /* Sequence number of the packet we are acking */
4327 int serial; /* Serial number of the packet */
4328 int pflags; /* Flags field from packet header */
4329 int reason; /* Reason an acknowledge was prompted */
4332 struct rx_ackPacket *ap;
4333 register struct rx_packet *rqp;
4334 register struct rx_packet *nxp; /* For queue_Scan */
4335 register struct rx_packet *p;
4340 * Open the receive window once a thread starts reading packets
4342 if (call->rnext > 1) {
4343 call->rwind = rx_maxReceiveWindow;
4346 call->nHardAcks = 0;
4347 call->nSoftAcks = 0;
4348 if (call->rnext > call->lastAcked)
4349 call->lastAcked = call->rnext;
4353 rx_computelen(p, p->length); /* reset length, you never know */
4354 } /* where that's been... */
4356 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4357 /* We won't send the ack, but don't panic. */
4358 return optionalPacket;
4361 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4363 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4364 if (!optionalPacket) rxi_FreePacket(p);
4365 return optionalPacket;
4367 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4368 if (rx_Contiguous(p)<templ) {
4369 if (!optionalPacket) rxi_FreePacket(p);
4370 return optionalPacket;
4372 } /* MTUXXX failing to send an ack is very serious. We should */
4373 /* try as hard as possible to send even a partial ack; it's */
4374 /* better than nothing. */
4376 ap = (struct rx_ackPacket *) rx_DataOf(p);
4377 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4378 ap->reason = reason;
4380 /* The skew computation used to be bogus, I think it's better now. */
4381 /* We should start paying attention to skew. XXX */
4382 ap->serial = htonl(call->conn->maxSerial);
4383 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4385 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4386 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4388 /* No fear of running out of ack packet here because there can only be at most
4389 * one window full of unacknowledged packets. The window size must be constrained
4390 * to be less than the maximum ack size, of course. Also, an ack should always
4391 * fit into a single packet -- it should not ever be fragmented. */
4392 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4393 if (!rqp || !call->rq.next
4394 || (rqp->header.seq > (call->rnext + call->rwind))) {
4395 if (!optionalPacket) rxi_FreePacket(p);
4396 rxi_CallError(call, RX_CALL_DEAD);
4397 return optionalPacket;
4400 while (rqp->header.seq > call->rnext + offset)
4401 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4402 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4404 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4405 if (!optionalPacket) rxi_FreePacket(p);
4406 rxi_CallError(call, RX_CALL_DEAD);
4407 return optionalPacket;
4412 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4414 /* these are new for AFS 3.3 */
4415 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4416 templ = htonl(templ);
4417 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4418 templ = htonl(call->conn->peer->ifMTU);
4419 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4421 /* new for AFS 3.4 */
4422 templ = htonl(call->rwind);
4423 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4425 /* new for AFS 3.5 */
4426 templ = htonl(call->conn->peer->ifDgramPackets);
4427 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4429 p->header.serviceId = call->conn->serviceId;
4430 p->header.cid = (call->conn->cid | call->channel);
4431 p->header.callNumber = *call->callNumber;
4432 p->header.seq = seq;
4433 p->header.securityIndex = call->conn->securityIndex;
4434 p->header.epoch = call->conn->epoch;
4435 p->header.type = RX_PACKET_TYPE_ACK;
4436 p->header.flags = RX_SLOW_START_OK;
4437 if (reason == RX_ACK_PING) {
4438 p->header.flags |= RX_REQUEST_ACK;
4440 clock_GetTime(&call->pingRequestTime);
4443 if (call->conn->type == RX_CLIENT_CONNECTION)
4444 p->header.flags |= RX_CLIENT_INITIATED;
4448 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4449 ap->reason, ntohl(ap->previousPacket),
4450 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4452 for (offset = 0; offset < ap->nAcks; offset++)
4453 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4460 register int i, nbytes = p->length;
4462 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4463 if (nbytes <= p->wirevec[i].iov_len) {
4464 register int savelen, saven;
4466 savelen = p->wirevec[i].iov_len;
4468 p->wirevec[i].iov_len = nbytes;
4470 rxi_Send(call, p, istack);
4471 p->wirevec[i].iov_len = savelen;
4475 else nbytes -= p->wirevec[i].iov_len;
4478 MUTEX_ENTER(&rx_stats_mutex);
4479 rx_stats.ackPacketsSent++;
4480 MUTEX_EXIT(&rx_stats_mutex);
4481 if (!optionalPacket) rxi_FreePacket(p);
4482 return optionalPacket; /* Return packet for re-use by caller */
4485 /* Send all of the packets in the list in single datagram */
4486 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4487 struct rx_call *call;
4488 struct rx_packet **list;
4493 struct clock *retryTime;
4499 struct rx_connection *conn = call->conn;
4500 struct rx_peer *peer = conn->peer;
4502 MUTEX_ENTER(&peer->peer_lock);
4504 if (resending) peer->reSends += len;
4505 MUTEX_ENTER(&rx_stats_mutex);
4506 rx_stats.dataPacketsSent += len;
4507 MUTEX_EXIT(&rx_stats_mutex);
4508 MUTEX_EXIT(&peer->peer_lock);
4510 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4514 /* Set the packet flags and schedule the resend events */
4515 /* Only request an ack for the last packet in the list */
4516 for (i = 0 ; i < len ; i++) {
4517 list[i]->retryTime = *retryTime;
4518 if (list[i]->header.serial) {
4519 /* Exponentially backoff retry times */
4520 if (list[i]->backoff < MAXBACKOFF) {
4521 /* so it can't stay == 0 */
4522 list[i]->backoff = (list[i]->backoff << 1) +1;
4524 else list[i]->backoff++;
4525 clock_Addmsec(&(list[i]->retryTime),
4526 ((afs_uint32) list[i]->backoff) << 8);
4529 /* Wait a little extra for the ack on the last packet */
4530 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4531 clock_Addmsec(&(list[i]->retryTime), 400);
4534 /* Record the time sent */
4535 list[i]->timeSent = *now;
4537 /* Ask for an ack on retransmitted packets, on every other packet
4538 * if the peer doesn't support slow start. Ask for an ack on every
4539 * packet until the congestion window reaches the ack rate. */
4540 if (list[i]->header.serial) {
4542 MUTEX_ENTER(&rx_stats_mutex);
4543 rx_stats.dataPacketsReSent++;
4544 MUTEX_EXIT(&rx_stats_mutex);
4546 /* improved RTO calculation- not Karn */
4547 list[i]->firstSent = *now;
4549 && (call->cwind <= (u_short)(conn->ackRate+1)
4550 || (!(call->flags & RX_CALL_SLOW_START_OK)
4551 && (list[i]->header.seq & 1)))) {
4556 MUTEX_ENTER(&peer->peer_lock);
4558 if (resending) peer->reSends++;
4559 MUTEX_ENTER(&rx_stats_mutex);
4560 rx_stats.dataPacketsSent++;
4561 MUTEX_EXIT(&rx_stats_mutex);
4562 MUTEX_EXIT(&peer->peer_lock);
4564 /* Tag this packet as not being the last in this group,
4565 * for the receiver's benefit */
4566 if (i < len-1 || moreFlag) {
4567 list[i]->header.flags |= RX_MORE_PACKETS;
4570 /* Install the new retransmit time for the packet, and
4571 * record the time sent */
4572 list[i]->timeSent = *now;
4576 list[len-1]->header.flags |= RX_REQUEST_ACK;
4579 /* Since we're about to send a data packet to the peer, it's
4580 * safe to nuke any scheduled end-of-packets ack */
4581 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4583 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4584 MUTEX_EXIT(&call->lock);
4586 rxi_SendPacketList(conn, list, len, istack);
4588 rxi_SendPacket(conn, list[0], istack);
4590 MUTEX_ENTER(&call->lock);
4591 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4593 /* Update last send time for this call (for keep-alive
4594 * processing), and for the connection (so that we can discover
4595 * idle connections) */
4596 conn->lastSendTime = call->lastSendTime = clock_Sec();
4599 /* When sending packets we need to follow these rules:
4600 * 1. Never send more than maxDgramPackets in a jumbogram.
4601 * 2. Never send a packet with more than two iovecs in a jumbogram.
4602 * 3. Never send a retransmitted packet in a jumbogram.
4603 * 4. Never send more than cwind/4 packets in a jumbogram
4604 * We always keep the last list we should have sent so we
4605 * can set the RX_MORE_PACKETS flags correctly.
4607 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4608 struct rx_call *call;
4609 struct rx_packet **list;
4613 struct clock *retryTime;
4616 int i, cnt, lastCnt = 0;
4617 struct rx_packet **listP, **lastP = 0;
4618 struct rx_peer *peer = call->conn->peer;
4619 int morePackets = 0;
4621 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4622 /* Does the current packet force us to flush the current list? */
4624 && (list[i]->header.serial
4626 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4628 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4629 /* If the call enters an error state stop sending, or if
4630 * we entered congestion recovery mode, stop sending */
4631 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4639 /* Add the current packet to the list if it hasn't been acked.
4640 * Otherwise adjust the list pointer to skip the current packet. */
4641 if (!list[i]->acked) {
4643 /* Do we need to flush the list? */
4644 if (cnt >= (int)peer->maxDgramPackets
4645 || cnt >= (int)call->nDgramPackets
4646 || cnt >= (int)call->cwind
4647 || list[i]->header.serial
4648 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4650 rxi_SendList(call, lastP, lastCnt, istack, 1,
4651 now, retryTime, resending);
4652 /* If the call enters an error state stop sending, or if
4653 * we entered congestion recovery mode, stop sending */
4654 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4664 osi_Panic("rxi_SendList error");
4670 /* Send the whole list when the call is in receive mode, when
4671 * the call is in eof mode, when we are in fast recovery mode,
4672 * and when we have the last packet */
4673 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4674 || call->mode == RX_MODE_RECEIVING
4675 || call->mode == RX_MODE_EOF
4676 || (call->flags & RX_CALL_FAST_RECOVER)) {
4677 /* Check for the case where the current list contains
4678 * an acked packet. Since we always send retransmissions
4679 * in a separate packet, we only need to check the first
4680 * packet in the list */
4681 if (cnt > 0 && !listP[0]->acked) {
4685 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4686 now, retryTime, resending);
4687 /* If the call enters an error state stop sending, or if
4688 * we entered congestion recovery mode, stop sending */
4689 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4693 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4695 } else if (lastCnt > 0) {
4696 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4700 #ifdef RX_ENABLE_LOCKS
4701 /* Call rxi_Start, below, but with the call lock held. */
4702 void rxi_StartUnlocked(event, call, istack)
4703 struct rxevent *event;
4704 register struct rx_call *call;
4707 MUTEX_ENTER(&call->lock);
4708 rxi_Start(event, call, istack);
4709 MUTEX_EXIT(&call->lock);
4711 #endif /* RX_ENABLE_LOCKS */
4713 /* This routine is called when new packets are readied for
4714 * transmission and when retransmission may be necessary, or when the
4715 * transmission window or burst count are favourable. This should be
4716 * better optimized for new packets, the usual case, now that we've
4717 * got rid of queues of send packets. XXXXXXXXXXX */
4718 void rxi_Start(event, call, istack)
4719 struct rxevent *event;
4720 register struct rx_call *call;
4723 struct rx_packet *p;
4724 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4725 struct rx_peer *peer = call->conn->peer;
4726 struct clock now, retryTime;
4730 struct rx_packet **xmitList;
4733 /* If rxi_Start is being called as a result of a resend event,
4734 * then make sure that the event pointer is removed from the call
4735 * structure, since there is no longer a per-call retransmission
4737 if (event && event == call->resendEvent) {
4738 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4739 call->resendEvent = NULL;
4741 if (queue_IsEmpty(&call->tq)) {
4745 /* Timeouts trigger congestion recovery */
4746 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4747 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4748 /* someone else is waiting to start recovery */
4751 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4752 while (call->flags & RX_CALL_TQ_BUSY) {
4753 call->flags |= RX_CALL_TQ_WAIT;
4754 #ifdef RX_ENABLE_LOCKS
4755 CV_WAIT(&call->cv_tq, &call->lock);
4756 #else /* RX_ENABLE_LOCKS */
4757 osi_rxSleep(&call->tq);
4758 #endif /* RX_ENABLE_LOCKS */
4760 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4761 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4762 call->flags |= RX_CALL_FAST_RECOVER;
4763 if (peer->maxDgramPackets > 1) {
4764 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4766 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4768 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4769 call->nDgramPackets = 1;
4771 call->nextCwind = 1;
4774 MUTEX_ENTER(&peer->peer_lock);
4775 peer->MTU = call->MTU;
4776 peer->cwind = call->cwind;
4777 peer->nDgramPackets = 1;
4779 call->congestSeq = peer->congestSeq;
4780 MUTEX_EXIT(&peer->peer_lock);
4781 /* Clear retry times on packets. Otherwise, it's possible for
4782 * some packets in the queue to force resends at rates faster
4783 * than recovery rates.
4785 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4787 clock_Zero(&p->retryTime);
4792 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4793 MUTEX_ENTER(&rx_stats_mutex);
4794 rx_tq_debug.rxi_start_in_error ++;
4795 MUTEX_EXIT(&rx_stats_mutex);
4800 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4801 /* Get clock to compute the re-transmit time for any packets
4802 * in this burst. Note, if we back off, it's reasonable to
4803 * back off all of the packets in the same manner, even if
4804 * some of them have been retransmitted more times than more
4805 * recent additions */
4806 clock_GetTime(&now);
4807 retryTime = now; /* initialize before use */
4808 MUTEX_ENTER(&peer->peer_lock);
4809 clock_Add(&retryTime, &peer->timeout);
4810 MUTEX_EXIT(&peer->peer_lock);
4812 /* Send (or resend) any packets that need it, subject to
4813 * window restrictions and congestion burst control
4814 * restrictions. Ask for an ack on the last packet sent in
4815 * this burst. For now, we're relying upon the window being
4816 * considerably bigger than the largest number of packets that
4817 * are typically sent at once by one initial call to
4818 * rxi_Start. This is probably bogus (perhaps we should ask
4819 * for an ack when we're half way through the current
4820 * window?). Also, for non file transfer applications, this
4821 * may end up asking for an ack for every packet. Bogus. XXXX
4824 * But check whether we're here recursively, and let the other guy
4827 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4828 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4829 call->flags |= RX_CALL_TQ_BUSY;
4831 call->flags &= ~RX_CALL_NEED_START;
4832 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4834 maxXmitPackets = MIN(call->twind, call->cwind);
4835 xmitList = (struct rx_packet **)
4836 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4837 if (xmitList == NULL)
4838 osi_Panic("rxi_Start, failed to allocate xmit list");
4839 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4840 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4841 /* We shouldn't be sending packets if a thread is waiting
4842 * to initiate congestion recovery */
4845 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4846 /* Only send one packet during fast recovery */
4849 if ((p->header.flags == RX_FREE_PACKET) ||
4850 (!queue_IsEnd(&call->tq, nxp)
4851 && (nxp->header.flags == RX_FREE_PACKET)) ||
4852 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4853 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4854 osi_Panic("rxi_Start: xmit queue clobbered");
4857 MUTEX_ENTER(&rx_stats_mutex);
4858 rx_stats.ignoreAckedPacket++;
4859 MUTEX_EXIT(&rx_stats_mutex);
4860 continue; /* Ignore this packet if it has been acknowledged */
4863 /* Turn off all flags except these ones, which are the same
4864 * on each transmission */
4865 p->header.flags &= RX_PRESET_FLAGS;
4867 if (p->header.seq >= call->tfirst +
4868 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4869 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4870 /* Note: if we're waiting for more window space, we can
4871 * still send retransmits; hence we don't return here, but
4872 * break out to schedule a retransmit event */
4873 dpf(("call %d waiting for window", *(call->callNumber)));
4877 /* Transmit the packet if it needs to be sent. */
4878 if (!clock_Lt(&now, &p->retryTime)) {
4879 if (nXmitPackets == maxXmitPackets) {
4880 osi_Panic("rxi_Start: xmit list overflowed");
4882 xmitList[nXmitPackets++] = p;
4886 /* xmitList now hold pointers to all of the packets that are
4887 * ready to send. Now we loop to send the packets */
4888 if (nXmitPackets > 0) {
4889 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4890 &now, &retryTime, resending);
4892 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4894 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4896 * TQ references no longer protected by this flag; they must remain
4897 * protected by the global lock.
4899 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4900 call->flags &= ~RX_CALL_TQ_BUSY;
4901 if (call->flags & RX_CALL_TQ_WAIT) {
4902 call->flags &= ~RX_CALL_TQ_WAIT;
4903 #ifdef RX_ENABLE_LOCKS
4904 CV_BROADCAST(&call->cv_tq);
4905 #else /* RX_ENABLE_LOCKS */
4906 osi_rxWakeup(&call->tq);
4907 #endif /* RX_ENABLE_LOCKS */
4912 /* We went into the error state while sending packets. Now is
4913 * the time to reset the call. This will also inform the using
4914 * process that the call is in an error state.
4916 MUTEX_ENTER(&rx_stats_mutex);
4917 rx_tq_debug.rxi_start_aborted ++;
4918 MUTEX_EXIT(&rx_stats_mutex);
4919 call->flags &= ~RX_CALL_TQ_BUSY;
4920 if (call->flags & RX_CALL_TQ_WAIT) {
4921 call->flags &= ~RX_CALL_TQ_WAIT;
4922 #ifdef RX_ENABLE_LOCKS
4923 CV_BROADCAST(&call->cv_tq);
4924 #else /* RX_ENABLE_LOCKS */
4925 osi_rxWakeup(&call->tq);
4926 #endif /* RX_ENABLE_LOCKS */
4928 rxi_CallError(call, call->error);
4931 #ifdef RX_ENABLE_LOCKS
4932 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4933 register int missing;
4934 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4935 /* Some packets have received acks. If they all have, we can clear
4936 * the transmit queue.
4938 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4939 if (p->header.seq < call->tfirst && p->acked) {
4947 call->flags |= RX_CALL_TQ_CLEARME;
4949 #endif /* RX_ENABLE_LOCKS */
4950 /* Don't bother doing retransmits if the TQ is cleared. */
4951 if (call->flags & RX_CALL_TQ_CLEARME) {
4952 rxi_ClearTransmitQueue(call, 1);
4954 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4957 /* Always post a resend event, if there is anything in the
4958 * queue, and resend is possible. There should be at least
4959 * one unacknowledged packet in the queue ... otherwise none
4960 * of these packets should be on the queue in the first place.
4962 if (call->resendEvent) {
4963 /* Cancel the existing event and post a new one */
4964 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4967 /* The retry time is the retry time on the first unacknowledged
4968 * packet inside the current window */
4969 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4970 /* Don't set timers for packets outside the window */
4971 if (p->header.seq >= call->tfirst + call->twind) {
4975 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4977 retryTime = p->retryTime;
4982 /* Post a new event to re-run rxi_Start when retries may be needed */
4983 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4984 #ifdef RX_ENABLE_LOCKS
4985 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4986 call->resendEvent = rxevent_Post(&retryTime,
4988 (char *)call, istack);
4989 #else /* RX_ENABLE_LOCKS */
4990 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4991 (char *)call, (void*)(long)istack);
4992 #endif /* RX_ENABLE_LOCKS */
4995 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4996 } while (call->flags & RX_CALL_NEED_START);
4998 * TQ references no longer protected by this flag; they must remain
4999 * protected by the global lock.
5001 call->flags &= ~RX_CALL_TQ_BUSY;
5002 if (call->flags & RX_CALL_TQ_WAIT) {
5003 call->flags &= ~RX_CALL_TQ_WAIT;
5004 #ifdef RX_ENABLE_LOCKS
5005 CV_BROADCAST(&call->cv_tq);
5006 #else /* RX_ENABLE_LOCKS */
5007 osi_rxWakeup(&call->tq);
5008 #endif /* RX_ENABLE_LOCKS */
5011 call->flags |= RX_CALL_NEED_START;
5013 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5015 if (call->resendEvent) {
5016 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5021 /* Also adjusts the keep alive parameters for the call, to reflect
5022 * that we have just sent a packet (so keep alives aren't sent
5024 void rxi_Send(call, p, istack)
5025 register struct rx_call *call;
5026 register struct rx_packet *p;
5029 register struct rx_connection *conn = call->conn;
5031 /* Stamp each packet with the user supplied status */
5032 p->header.userStatus = call->localStatus;
5034 /* Allow the security object controlling this call's security to
5035 * make any last-minute changes to the packet */
5036 RXS_SendPacket(conn->securityObject, call, p);
5038 /* Since we're about to send SOME sort of packet to the peer, it's
5039 * safe to nuke any scheduled end-of-packets ack */
5040 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5042 /* Actually send the packet, filling in more connection-specific fields */
5043 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5044 MUTEX_EXIT(&call->lock);
5045 rxi_SendPacket(conn, p, istack);
5046 MUTEX_ENTER(&call->lock);
5047 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5049 /* Update last send time for this call (for keep-alive
5050 * processing), and for the connection (so that we can discover
5051 * idle connections) */
5052 conn->lastSendTime = call->lastSendTime = clock_Sec();
5056 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5057 * that things are fine. Also called periodically to guarantee that nothing
5058 * falls through the cracks (e.g. (error + dally) connections have keepalive
5059 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5062 #ifdef RX_ENABLE_LOCKS
5063 int rxi_CheckCall(call, haveCTLock)
5064 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5065 #else /* RX_ENABLE_LOCKS */
5066 int rxi_CheckCall(call)
5067 #endif /* RX_ENABLE_LOCKS */
5068 register struct rx_call *call;
5070 register struct rx_connection *conn = call->conn;
5071 register struct rx_service *tservice;
5073 afs_uint32 deadTime;
5075 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5076 if (call->flags & RX_CALL_TQ_BUSY) {
5077 /* Call is active and will be reset by rxi_Start if it's
5078 * in an error state.
5083 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5084 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5085 ((afs_uint32)conn->peer->rtt >> 3) +
5086 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5088 /* These are computed to the second (+- 1 second). But that's
5089 * good enough for these values, which should be a significant
5090 * number of seconds. */
5091 if (now > (call->lastReceiveTime + deadTime)) {
5092 if (call->state == RX_STATE_ACTIVE) {
5093 rxi_CallError(call, RX_CALL_DEAD);
5097 #ifdef RX_ENABLE_LOCKS
5098 /* Cancel pending events */
5099 rxevent_Cancel(call->delayedAckEvent, call,
5100 RX_CALL_REFCOUNT_DELAY);
5101 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5102 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5103 if (call->refCount == 0) {
5104 rxi_FreeCall(call, haveCTLock);
5108 #else /* RX_ENABLE_LOCKS */
5111 #endif /* RX_ENABLE_LOCKS */
5113 /* Non-active calls are destroyed if they are not responding
5114 * to pings; active calls are simply flagged in error, so the
5115 * attached process can die reasonably gracefully. */
5117 /* see if we have a non-activity timeout */
5118 tservice = conn->service;
5119 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5120 && tservice->idleDeadTime
5121 && ((call->startWait + tservice->idleDeadTime) < now)) {
5122 if (call->state == RX_STATE_ACTIVE) {
5123 rxi_CallError(call, RX_CALL_TIMEOUT);
5127 /* see if we have a hard timeout */
5128 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5129 if (call->state == RX_STATE_ACTIVE)
5130 rxi_CallError(call, RX_CALL_TIMEOUT);
5137 /* When a call is in progress, this routine is called occasionally to
5138 * make sure that some traffic has arrived (or been sent to) the peer.
5139 * If nothing has arrived in a reasonable amount of time, the call is
5140 * declared dead; if nothing has been sent for a while, we send a
5141 * keep-alive packet (if we're actually trying to keep the call alive)
5143 void rxi_KeepAliveEvent(event, call, dummy)
5144 struct rxevent *event;
5145 register struct rx_call *call;
5147 struct rx_connection *conn;
5150 MUTEX_ENTER(&call->lock);
5151 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5152 if (event == call->keepAliveEvent)
5153 call->keepAliveEvent = (struct rxevent *) 0;
5156 #ifdef RX_ENABLE_LOCKS
5157 if(rxi_CheckCall(call, 0)) {
5158 MUTEX_EXIT(&call->lock);
5161 #else /* RX_ENABLE_LOCKS */
5162 if (rxi_CheckCall(call)) return;
5163 #endif /* RX_ENABLE_LOCKS */
5165 /* Don't try to keep alive dallying calls */
5166 if (call->state == RX_STATE_DALLY) {
5167 MUTEX_EXIT(&call->lock);
5172 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5173 /* Don't try to send keepalives if there is unacknowledged data */
5174 /* the rexmit code should be good enough, this little hack
5175 * doesn't quite work XXX */
5176 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5178 rxi_ScheduleKeepAliveEvent(call);
5179 MUTEX_EXIT(&call->lock);
5183 void rxi_ScheduleKeepAliveEvent(call)
5184 register struct rx_call *call;
5186 if (!call->keepAliveEvent) {
5188 clock_GetTime(&when);
5189 when.sec += call->conn->secondsUntilPing;
5190 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5191 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5195 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5196 void rxi_KeepAliveOn(call)
5197 register struct rx_call *call;
5199 /* Pretend last packet received was received now--i.e. if another
5200 * packet isn't received within the keep alive time, then the call
5201 * will die; Initialize last send time to the current time--even
5202 * if a packet hasn't been sent yet. This will guarantee that a
5203 * keep-alive is sent within the ping time */
5204 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5205 rxi_ScheduleKeepAliveEvent(call);
5208 /* This routine is called to send connection abort messages
5209 * that have been delayed to throttle looping clients. */
5210 void rxi_SendDelayedConnAbort(event, conn, dummy)
5211 struct rxevent *event;
5212 register struct rx_connection *conn;
5216 struct rx_packet *packet;
5218 MUTEX_ENTER(&conn->conn_data_lock);
5219 conn->delayedAbortEvent = (struct rxevent *) 0;
5220 error = htonl(conn->error);
5222 MUTEX_EXIT(&conn->conn_data_lock);
5223 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5225 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5226 RX_PACKET_TYPE_ABORT, (char *)&error,
5228 rxi_FreePacket(packet);
5232 /* This routine is called to send call abort messages
5233 * that have been delayed to throttle looping clients. */
5234 void rxi_SendDelayedCallAbort(event, call, dummy)
5235 struct rxevent *event;
5236 register struct rx_call *call;
5240 struct rx_packet *packet;
5242 MUTEX_ENTER(&call->lock);
5243 call->delayedAbortEvent = (struct rxevent *) 0;
5244 error = htonl(call->error);
5246 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5248 packet = rxi_SendSpecial(call, call->conn, packet,
5249 RX_PACKET_TYPE_ABORT, (char *)&error,
5251 rxi_FreePacket(packet);
5253 MUTEX_EXIT(&call->lock);
5256 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5257 * seconds) to ask the client to authenticate itself. The routine
5258 * issues a challenge to the client, which is obtained from the
5259 * security object associated with the connection */
5260 void rxi_ChallengeEvent(event, conn, dummy)
5261 struct rxevent *event;
5262 register struct rx_connection *conn;
5265 conn->challengeEvent = (struct rxevent *) 0;
5266 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5267 register struct rx_packet *packet;
5269 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5271 /* If there's no packet available, do this later. */
5272 RXS_GetChallenge(conn->securityObject, conn, packet);
5273 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5274 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5275 rxi_FreePacket(packet);
5277 clock_GetTime(&when);
5278 when.sec += RX_CHALLENGE_TIMEOUT;
5279 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5283 /* Call this routine to start requesting the client to authenticate
5284 * itself. This will continue until authentication is established,
5285 * the call times out, or an invalid response is returned. The
5286 * security object associated with the connection is asked to create
5287 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5288 * defined earlier. */
5289 void rxi_ChallengeOn(conn)
5290 register struct rx_connection *conn;
5292 if (!conn->challengeEvent) {
5293 RXS_CreateChallenge(conn->securityObject, conn);
5294 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5299 /* Compute round trip time of the packet provided, in *rttp.
5302 /* rxi_ComputeRoundTripTime is called with peer locked. */
5303 void rxi_ComputeRoundTripTime(p, sentp, peer)
5304 register struct clock *sentp; /* may be null */
5305 register struct rx_peer *peer; /* may be null */
5306 register struct rx_packet *p;
5308 struct clock thisRtt, *rttp = &thisRtt;
5310 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5311 /* making year 2038 bugs to get this running now - stroucki */
5312 struct timeval temptime;
5314 register int rtt_timeout;
5316 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5317 /* yet again. This was the worst Heisenbug of the port - stroucki */
5318 clock_GetTime(&temptime);
5319 rttp->sec=(afs_int32)temptime.tv_sec;
5320 rttp->usec=(afs_int32)temptime.tv_usec;
5322 clock_GetTime(rttp);
5324 if (clock_Lt(rttp, sentp)) {
5326 return; /* somebody set the clock back, don't count this time. */
5328 clock_Sub(rttp, sentp);
5329 MUTEX_ENTER(&rx_stats_mutex);
5330 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5331 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5332 if (rttp->sec > 60) {
5333 MUTEX_EXIT(&rx_stats_mutex);
5334 return; /* somebody set the clock ahead */
5336 rx_stats.maxRtt = *rttp;
5338 clock_Add(&rx_stats.totalRtt, rttp);
5339 rx_stats.nRttSamples++;
5340 MUTEX_EXIT(&rx_stats_mutex);
5342 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5344 /* Apply VanJacobson round-trip estimations */
5349 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5350 * srtt is stored as fixed point with 3 bits after the binary
5351 * point (i.e., scaled by 8). The following magic is
5352 * equivalent to the smoothing algorithm in rfc793 with an
5353 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5354 * srtt*8 = srtt*8 + rtt - srtt
5355 * srtt = srtt + rtt/8 - srtt/8
5358 delta = MSEC(rttp) - (peer->rtt >> 3);
5362 * We accumulate a smoothed rtt variance (actually, a smoothed
5363 * mean difference), then set the retransmit timer to smoothed
5364 * rtt + 4 times the smoothed variance (was 2x in van's original
5365 * paper, but 4x works better for me, and apparently for him as
5367 * rttvar is stored as
5368 * fixed point with 2 bits after the binary point (scaled by
5369 * 4). The following is equivalent to rfc793 smoothing with
5370 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5371 * replaces rfc793's wired-in beta.
5372 * dev*4 = dev*4 + (|actual - expected| - dev)
5378 delta -= (peer->rtt_dev >> 2);
5379 peer->rtt_dev += delta;
5382 /* I don't have a stored RTT so I start with this value. Since I'm
5383 * probably just starting a call, and will be pushing more data down
5384 * this, I expect congestion to increase rapidly. So I fudge a
5385 * little, and I set deviance to half the rtt. In practice,
5386 * deviance tends to approach something a little less than
5387 * half the smoothed rtt. */
5388 peer->rtt = (MSEC(rttp) << 3) + 8;
5389 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5391 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5392 * the other of these connections is usually in a user process, and can
5393 * be switched and/or swapped out. So on fast, reliable networks, the
5394 * timeout would otherwise be too short.
5396 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5397 clock_Zero(&(peer->timeout));
5398 clock_Addmsec(&(peer->timeout), rtt_timeout);
5400 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5401 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5402 (peer->timeout.sec),(peer->timeout.usec)) );
5406 /* Find all server connections that have not been active for a long time, and
5408 void rxi_ReapConnections()
5411 clock_GetTime(&now);
5413 /* Find server connection structures that haven't been used for
5414 * greater than rx_idleConnectionTime */
5415 { struct rx_connection **conn_ptr, **conn_end;
5416 int i, havecalls = 0;
5417 MUTEX_ENTER(&rx_connHashTable_lock);
5418 for (conn_ptr = &rx_connHashTable[0],
5419 conn_end = &rx_connHashTable[rx_hashTableSize];
5420 conn_ptr < conn_end; conn_ptr++) {
5421 struct rx_connection *conn, *next;
5422 struct rx_call *call;
5426 for (conn = *conn_ptr; conn; conn = next) {
5427 /* XXX -- Shouldn't the connection be locked? */
5430 for(i=0;i<RX_MAXCALLS;i++) {
5431 call = conn->call[i];
5434 MUTEX_ENTER(&call->lock);
5435 #ifdef RX_ENABLE_LOCKS
5436 result = rxi_CheckCall(call, 1);
5437 #else /* RX_ENABLE_LOCKS */
5438 result = rxi_CheckCall(call);
5439 #endif /* RX_ENABLE_LOCKS */
5440 MUTEX_EXIT(&call->lock);
5442 /* If CheckCall freed the call, it might
5443 * have destroyed the connection as well,
5444 * which screws up the linked lists.
5450 if (conn->type == RX_SERVER_CONNECTION) {
5451 /* This only actually destroys the connection if
5452 * there are no outstanding calls */
5453 MUTEX_ENTER(&conn->conn_data_lock);
5454 if (!havecalls && !conn->refCount &&
5455 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5456 conn->refCount++; /* it will be decr in rx_DestroyConn */
5457 MUTEX_EXIT(&conn->conn_data_lock);
5458 #ifdef RX_ENABLE_LOCKS
5459 rxi_DestroyConnectionNoLock(conn);
5460 #else /* RX_ENABLE_LOCKS */
5461 rxi_DestroyConnection(conn);
5462 #endif /* RX_ENABLE_LOCKS */
5464 #ifdef RX_ENABLE_LOCKS
5466 MUTEX_EXIT(&conn->conn_data_lock);
5468 #endif /* RX_ENABLE_LOCKS */
5472 #ifdef RX_ENABLE_LOCKS
5473 while (rx_connCleanup_list) {
5474 struct rx_connection *conn;
5475 conn = rx_connCleanup_list;
5476 rx_connCleanup_list = rx_connCleanup_list->next;
5477 MUTEX_EXIT(&rx_connHashTable_lock);
5478 rxi_CleanupConnection(conn);
5479 MUTEX_ENTER(&rx_connHashTable_lock);
5481 MUTEX_EXIT(&rx_connHashTable_lock);
5482 #endif /* RX_ENABLE_LOCKS */
5485 /* Find any peer structures that haven't been used (haven't had an
5486 * associated connection) for greater than rx_idlePeerTime */
5487 { struct rx_peer **peer_ptr, **peer_end;
5489 MUTEX_ENTER(&rx_rpc_stats);
5490 MUTEX_ENTER(&rx_peerHashTable_lock);
5491 for (peer_ptr = &rx_peerHashTable[0],
5492 peer_end = &rx_peerHashTable[rx_hashTableSize];
5493 peer_ptr < peer_end; peer_ptr++) {
5494 struct rx_peer *peer, *next, *prev;
5495 for (prev = peer = *peer_ptr; peer; peer = next) {
5497 code = MUTEX_TRYENTER(&peer->peer_lock);
5498 if ((code) && (peer->refCount == 0)
5499 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5500 rx_interface_stat_p rpc_stat, nrpc_stat;
5502 MUTEX_EXIT(&peer->peer_lock);
5503 MUTEX_DESTROY(&peer->peer_lock);
5504 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5505 rx_interface_stat)) {
5506 unsigned int num_funcs;
5507 if (!rpc_stat) break;
5508 queue_Remove(&rpc_stat->queue_header);
5509 queue_Remove(&rpc_stat->all_peers);
5510 num_funcs = rpc_stat->stats[0].func_total;
5511 space = sizeof(rx_interface_stat_t) +
5512 rpc_stat->stats[0].func_total *
5513 sizeof(rx_function_entry_v1_t);
5515 rxi_Free(rpc_stat, space);
5516 rxi_rpc_peer_stat_cnt -= num_funcs;
5519 MUTEX_ENTER(&rx_stats_mutex);
5520 rx_stats.nPeerStructs--;
5521 MUTEX_EXIT(&rx_stats_mutex);
5522 if (prev == *peer_ptr) {
5531 MUTEX_EXIT(&peer->peer_lock);
5537 MUTEX_EXIT(&rx_peerHashTable_lock);
5538 MUTEX_EXIT(&rx_rpc_stats);
5541 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5542 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5543 GC, just below. Really, we shouldn't have to keep moving packets from
5544 one place to another, but instead ought to always know if we can
5545 afford to hold onto a packet in its particular use. */
5546 MUTEX_ENTER(&rx_freePktQ_lock);
5547 if (rx_waitingForPackets) {
5548 rx_waitingForPackets = 0;
5549 #ifdef RX_ENABLE_LOCKS
5550 CV_BROADCAST(&rx_waitingForPackets_cv);
5552 osi_rxWakeup(&rx_waitingForPackets);
5555 MUTEX_EXIT(&rx_freePktQ_lock);
5557 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5558 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5562 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5563 * rx.h is sort of strange this is better. This is called with a security
5564 * object before it is discarded. Each connection using a security object has
5565 * its own refcount to the object so it won't actually be freed until the last
5566 * connection is destroyed.
5568 * This is the only rxs module call. A hold could also be written but no one
5571 int rxs_Release (aobj)
5572 struct rx_securityClass *aobj;
5574 return RXS_Close (aobj);
5578 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5579 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5580 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5581 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5583 /* Adjust our estimate of the transmission rate to this peer, given
5584 * that the packet p was just acked. We can adjust peer->timeout and
5585 * call->twind. Pragmatically, this is called
5586 * only with packets of maximal length.
5587 * Called with peer and call locked.
5590 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5591 register struct rx_peer *peer;
5592 register struct rx_call *call;
5593 struct rx_packet *p, *ackp;
5596 afs_int32 xferSize, xferMs;
5597 register afs_int32 minTime;
5600 /* Count down packets */
5601 if (peer->rateFlag > 0) peer->rateFlag--;
5602 /* Do nothing until we're enabled */
5603 if (peer->rateFlag != 0) return;
5604 if (!call->conn) return;
5606 /* Count only when the ack seems legitimate */
5607 switch (ackReason) {
5608 case RX_ACK_REQUESTED:
5609 xferSize = p->length + RX_HEADER_SIZE +
5610 call->conn->securityMaxTrailerSize;
5614 case RX_ACK_PING_RESPONSE:
5615 if (p) /* want the response to ping-request, not data send */
5617 clock_GetTime(&newTO);
5618 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5619 clock_Sub(&newTO, &call->pingRequestTime);
5620 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5624 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5631 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5632 ntohl(peer->host), ntohs(peer->port),
5633 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5634 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5637 /* Track only packets that are big enough. */
5638 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5642 /* absorb RTT data (in milliseconds) for these big packets */
5643 if (peer->smRtt == 0) {
5644 peer->smRtt = xferMs;
5646 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5647 if (!peer->smRtt) peer->smRtt = 1;
5650 if (peer->countDown) {
5654 peer->countDown = 10; /* recalculate only every so often */
5656 /* In practice, we can measure only the RTT for full packets,
5657 * because of the way Rx acks the data that it receives. (If it's
5658 * smaller than a full packet, it often gets implicitly acked
5659 * either by the call response (from a server) or by the next call
5660 * (from a client), and either case confuses transmission times
5661 * with processing times.) Therefore, replace the above
5662 * more-sophisticated processing with a simpler version, where the
5663 * smoothed RTT is kept for full-size packets, and the time to
5664 * transmit a windowful of full-size packets is simply RTT *
5665 * windowSize. Again, we take two steps:
5666 - ensure the timeout is large enough for a single packet's RTT;
5667 - ensure that the window is small enough to fit in the desired timeout.*/
5669 /* First, the timeout check. */
5670 minTime = peer->smRtt;
5671 /* Get a reasonable estimate for a timeout period */
5673 newTO.sec = minTime / 1000;
5674 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5676 /* Increase the timeout period so that we can always do at least
5677 * one packet exchange */
5678 if (clock_Gt(&newTO, &peer->timeout)) {
5680 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5681 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5682 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5685 peer->timeout = newTO;
5688 /* Now, get an estimate for the transmit window size. */
5689 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5690 /* Now, convert to the number of full packets that could fit in a
5691 * reasonable fraction of that interval */
5692 minTime /= (peer->smRtt << 1);
5693 xferSize = minTime; /* (make a copy) */
5695 /* Now clamp the size to reasonable bounds. */
5696 if (minTime <= 1) minTime = 1;
5697 else if (minTime > rx_Window) minTime = rx_Window;
5698 /* if (minTime != peer->maxWindow) {
5699 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5700 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5701 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5703 peer->maxWindow = minTime;
5704 elide... call->twind = minTime;
5708 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5709 * Discern this by calculating the timeout necessary for rx_Window
5711 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5712 /* calculate estimate for transmission interval in milliseconds */
5713 minTime = rx_Window * peer->smRtt;
5714 if (minTime < 1000) {
5715 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5716 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5717 peer->timeout.usec, peer->smRtt,
5720 newTO.sec = 0; /* cut back on timeout by half a second */
5721 newTO.usec = 500000;
5722 clock_Sub(&peer->timeout, &newTO);
5727 } /* end of rxi_ComputeRate */
5728 #endif /* ADAPT_WINDOW */
5736 /* Don't call this debugging routine directly; use dpf */
5738 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5739 a11, a12, a13, a14, a15)
5743 clock_GetTime(&now);
5744 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5745 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5752 * This function is used to process the rx_stats structure that is local
5753 * to a process as well as an rx_stats structure received from a remote
5754 * process (via rxdebug). Therefore, it needs to do minimal version
5757 void rx_PrintTheseStats (file, s, size, freePackets, version)
5760 int size; /* some idea of version control */
5761 afs_int32 freePackets;
5766 if (size != sizeof(struct rx_stats)) {
5768 "Unexpected size of stats structure: was %d, expected %d\n",
5769 size, sizeof(struct rx_stats));
5773 "rx stats: free packets %d, allocs %d, ",
5777 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5779 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5780 s->receivePktAllocFailures,
5781 s->receiveCbufPktAllocFailures,
5782 s->sendPktAllocFailures,
5783 s->sendCbufPktAllocFailures,
5784 s->specialPktAllocFailures);
5787 "alloc-failures(rcv %d,send %d,ack %d)\n",
5788 s->receivePktAllocFailures,
5789 s->sendPktAllocFailures,
5790 s->specialPktAllocFailures);
5795 "bogusReads %d (last from host %x), "
5801 s->bogusPacketOnRead,
5804 s->noPacketBuffersOnRead,
5808 fprintf(file, " packets read: ");
5809 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5815 fprintf(file, "\n");
5818 " other read counters: data %d, "
5826 s->spuriousPacketsRead,
5827 s->ignorePacketDally);
5829 fprintf(file, " packets sent: ");
5830 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5836 fprintf(file, "\n");
5839 " other send counters: ack %d, "
5840 "data %d (not resends), "
5843 "acked&ignored %d\n",
5846 s->dataPacketsReSent,
5847 s->dataPacketsPushed,
5848 s->ignoreAckedPacket);
5851 " \t(these should be small) sendFailed %d, "
5854 (int) s->fatalErrors);
5856 if (s->nRttSamples) {
5858 " Average rtt is %0.3f, with %d samples\n",
5859 clock_Float(&s->totalRtt)/s->nRttSamples,
5863 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5864 clock_Float(&s->minRtt),
5865 clock_Float(&s->maxRtt));
5869 " %d server connections, "
5870 "%d client connections, "
5873 "%d free call structs\n",
5878 s->nFreeCallStructs);
5880 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5882 " %d clock updates\n",
5888 /* for backward compatibility */
5889 void rx_PrintStats(file)
5892 MUTEX_ENTER(&rx_stats_mutex);
5893 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5894 MUTEX_EXIT(&rx_stats_mutex);
5897 void rx_PrintPeerStats(file, peer)
5899 struct rx_peer *peer;
5904 "burst wait %u.%d.\n",
5907 (int) peer->burstSize,
5908 (int) peer->burstWait.sec,
5909 (int) peer->burstWait.usec);
5913 "retry time %u.%06d, "
5917 (int) peer->timeout.sec,
5918 (int) peer->timeout.usec,
5924 "max in packet skew %d, "
5925 "max out packet skew %d\n",
5927 (int) peer->inPacketSkew,
5928 (int) peer->outPacketSkew);
5931 #ifdef AFS_PTHREAD_ENV
5933 * This mutex protects the following static variables:
5937 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5938 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5940 #define LOCK_RX_DEBUG
5941 #define UNLOCK_RX_DEBUG
5942 #endif /* AFS_PTHREAD_ENV */
5944 static int MakeDebugCall(
5946 afs_uint32 remoteAddr,
5947 afs_uint16 remotePort,
5955 static afs_int32 counter = 100;
5957 struct rx_header theader;
5959 register afs_int32 code;
5961 struct sockaddr_in taddr, faddr;
5966 endTime = time(0) + 20; /* try for 20 seconds */
5970 tp = &tbuffer[sizeof(struct rx_header)];
5971 taddr.sin_family = AF_INET;
5972 taddr.sin_port = remotePort;
5973 taddr.sin_addr.s_addr = remoteAddr;
5975 memset(&theader, 0, sizeof(theader));
5976 theader.epoch = htonl(999);
5978 theader.callNumber = htonl(counter);
5981 theader.type = type;
5982 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5983 theader.serviceId = 0;
5985 bcopy(&theader, tbuffer, sizeof(theader));
5986 bcopy(inputData, tp, inputLength);
5987 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5988 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5990 /* see if there's a packet available */
5992 FD_SET(socket, &imask);
5995 code = select(socket+1, &imask, 0, 0, &tv);
5997 /* now receive a packet */
5998 faddrLen = sizeof(struct sockaddr_in);
5999 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6000 (struct sockaddr *) &faddr, &faddrLen);
6002 bcopy(tbuffer, &theader, sizeof(struct rx_header));
6003 if (counter == ntohl(theader.callNumber)) break;
6006 /* see if we've timed out */
6007 if (endTime < time(0)) return -1;
6009 code -= sizeof(struct rx_header);
6010 if (code > outputLength) code = outputLength;
6011 bcopy(tp, outputData, code);
6015 afs_int32 rx_GetServerDebug(
6017 afs_uint32 remoteAddr,
6018 afs_uint16 remotePort,
6019 struct rx_debugStats *stat,
6020 afs_uint32 *supportedValues
6023 struct rx_debugIn in;
6026 *supportedValues = 0;
6027 in.type = htonl(RX_DEBUGI_GETSTATS);
6030 rc = MakeDebugCall(socket,
6033 RX_PACKET_TYPE_DEBUG,
6040 * If the call was successful, fixup the version and indicate
6041 * what contents of the stat structure are valid.
6042 * Also do net to host conversion of fields here.
6046 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6047 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6049 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6050 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6052 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6053 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6055 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6056 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6058 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6059 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6061 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6062 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6064 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6065 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6068 stat->nFreePackets = ntohl(stat->nFreePackets);
6069 stat->packetReclaims = ntohl(stat->packetReclaims);
6070 stat->callsExecuted = ntohl(stat->callsExecuted);
6071 stat->nWaiting = ntohl(stat->nWaiting);
6072 stat->idleThreads = ntohl(stat->idleThreads);
6078 afs_int32 rx_GetServerStats(
6080 afs_uint32 remoteAddr,
6081 afs_uint16 remotePort,
6082 struct rx_stats *stat,
6083 afs_uint32 *supportedValues
6086 struct rx_debugIn in;
6087 afs_int32 *lp = (afs_int32 *) stat;
6092 * supportedValues is currently unused, but added to allow future
6093 * versioning of this function.
6096 *supportedValues = 0;
6097 in.type = htonl(RX_DEBUGI_RXSTATS);
6099 memset(stat, 0, sizeof(*stat));
6101 rc = MakeDebugCall(socket,
6104 RX_PACKET_TYPE_DEBUG,
6113 * Do net to host conversion here
6116 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6124 afs_int32 rx_GetServerVersion(
6126 afs_uint32 remoteAddr,
6127 afs_uint16 remotePort,
6128 size_t version_length,
6133 return MakeDebugCall(socket,
6136 RX_PACKET_TYPE_VERSION,
6143 afs_int32 rx_GetServerConnections(
6145 afs_uint32 remoteAddr,
6146 afs_uint16 remotePort,
6147 afs_int32 *nextConnection,
6149 afs_uint32 debugSupportedValues,
6150 struct rx_debugConn *conn,
6151 afs_uint32 *supportedValues
6154 struct rx_debugIn in;
6159 * supportedValues is currently unused, but added to allow future
6160 * versioning of this function.
6163 *supportedValues = 0;
6164 if (allConnections) {
6165 in.type = htonl(RX_DEBUGI_GETALLCONN);
6167 in.type = htonl(RX_DEBUGI_GETCONN);
6169 in.index = htonl(*nextConnection);
6170 memset(conn, 0, sizeof(*conn));
6172 rc = MakeDebugCall(socket,
6175 RX_PACKET_TYPE_DEBUG,
6182 *nextConnection += 1;
6185 * Convert old connection format to new structure.
6188 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6189 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6190 #define MOVEvL(a) (conn->a = vL->a)
6192 /* any old or unrecognized version... */
6193 for (i=0;i<RX_MAXCALLS;i++) {
6194 MOVEvL(callState[i]);
6195 MOVEvL(callMode[i]);
6196 MOVEvL(callFlags[i]);
6197 MOVEvL(callOther[i]);
6199 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6200 MOVEvL(secStats.type);
6201 MOVEvL(secStats.level);
6202 MOVEvL(secStats.flags);
6203 MOVEvL(secStats.expires);
6204 MOVEvL(secStats.packetsReceived);
6205 MOVEvL(secStats.packetsSent);
6206 MOVEvL(secStats.bytesReceived);
6207 MOVEvL(secStats.bytesSent);
6212 * Do net to host conversion here
6214 * I don't convert host or port since we are most likely
6215 * going to want these in NBO.
6217 conn->cid = ntohl(conn->cid);
6218 conn->serial = ntohl(conn->serial);
6219 for(i=0;i<RX_MAXCALLS;i++) {
6220 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6222 conn->error = ntohl(conn->error);
6223 conn->secStats.flags = ntohl(conn->secStats.flags);
6224 conn->secStats.expires = ntohl(conn->secStats.expires);
6225 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6226 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6227 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6228 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6229 conn->epoch = ntohl(conn->epoch);
6230 conn->natMTU = ntohl(conn->natMTU);
6236 afs_int32 rx_GetServerPeers(
6238 afs_uint32 remoteAddr,
6239 afs_uint16 remotePort,
6240 afs_int32 *nextPeer,
6241 afs_uint32 debugSupportedValues,
6242 struct rx_debugPeer *peer,
6243 afs_uint32 *supportedValues
6246 struct rx_debugIn in;
6250 * supportedValues is currently unused, but added to allow future
6251 * versioning of this function.
6254 *supportedValues = 0;
6255 in.type = htonl(RX_DEBUGI_GETPEER);
6256 in.index = htonl(*nextPeer);
6257 memset(peer, 0, sizeof(*peer));
6259 rc = MakeDebugCall(socket,
6262 RX_PACKET_TYPE_DEBUG,
6272 * Do net to host conversion here
6274 * I don't convert host or port since we are most likely
6275 * going to want these in NBO.
6277 peer->ifMTU = ntohs(peer->ifMTU);
6278 peer->idleWhen = ntohl(peer->idleWhen);
6279 peer->refCount = ntohs(peer->refCount);
6280 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6281 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6282 peer->rtt = ntohl(peer->rtt);
6283 peer->rtt_dev = ntohl(peer->rtt_dev);
6284 peer->timeout.sec = ntohl(peer->timeout.sec);
6285 peer->timeout.usec = ntohl(peer->timeout.usec);
6286 peer->nSent = ntohl(peer->nSent);
6287 peer->reSends = ntohl(peer->reSends);
6288 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6289 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6290 peer->rateFlag = ntohl(peer->rateFlag);
6291 peer->natMTU = ntohs(peer->natMTU);
6292 peer->maxMTU = ntohs(peer->maxMTU);
6293 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6294 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6295 peer->MTU = ntohs(peer->MTU);
6296 peer->cwind = ntohs(peer->cwind);
6297 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6298 peer->congestSeq = ntohs(peer->congestSeq);
6299 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6300 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6301 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6302 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6307 #endif /* RXDEBUG */
6309 void shutdown_rx(void)
6311 struct rx_serverQueueEntry *np;
6313 register struct rx_call *call;
6314 register struct rx_serverQueueEntry *sq;
6317 if (rxinit_status == 1) {
6319 return; /* Already shutdown. */
6324 #ifndef AFS_PTHREAD_ENV
6325 FD_ZERO(&rx_selectMask);
6326 #endif /* AFS_PTHREAD_ENV */
6327 rxi_dataQuota = RX_MAX_QUOTA;
6328 #ifndef AFS_PTHREAD_ENV
6330 #endif /* AFS_PTHREAD_ENV */
6333 #ifndef AFS_PTHREAD_ENV
6334 #ifndef AFS_USE_GETTIMEOFDAY
6336 #endif /* AFS_USE_GETTIMEOFDAY */
6337 #endif /* AFS_PTHREAD_ENV */
6339 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6340 call = queue_First(&rx_freeCallQueue, rx_call);
6342 rxi_Free(call, sizeof(struct rx_call));
6345 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6346 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6352 struct rx_peer **peer_ptr, **peer_end;
6353 for (peer_ptr = &rx_peerHashTable[0],
6354 peer_end = &rx_peerHashTable[rx_hashTableSize];
6355 peer_ptr < peer_end; peer_ptr++) {
6356 struct rx_peer *peer, *next;
6357 for (peer = *peer_ptr; peer; peer = next) {
6358 rx_interface_stat_p rpc_stat, nrpc_stat;
6360 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6361 rx_interface_stat)) {
6362 unsigned int num_funcs;
6363 if (!rpc_stat) break;
6364 queue_Remove(&rpc_stat->queue_header);
6365 queue_Remove(&rpc_stat->all_peers);
6366 num_funcs = rpc_stat->stats[0].func_total;
6367 space = sizeof(rx_interface_stat_t) +
6368 rpc_stat->stats[0].func_total *
6369 sizeof(rx_function_entry_v1_t);
6371 rxi_Free(rpc_stat, space);
6372 MUTEX_ENTER(&rx_rpc_stats);
6373 rxi_rpc_peer_stat_cnt -= num_funcs;
6374 MUTEX_EXIT(&rx_rpc_stats);
6378 MUTEX_ENTER(&rx_stats_mutex);
6379 rx_stats.nPeerStructs--;
6380 MUTEX_EXIT(&rx_stats_mutex);
6384 for (i = 0; i<RX_MAX_SERVICES; i++) {
6386 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6388 for (i = 0; i < rx_hashTableSize; i++) {
6389 register struct rx_connection *tc, *ntc;
6390 MUTEX_ENTER(&rx_connHashTable_lock);
6391 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6393 for (j = 0; j < RX_MAXCALLS; j++) {
6395 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6398 rxi_Free(tc, sizeof(*tc));
6400 MUTEX_EXIT(&rx_connHashTable_lock);
6403 MUTEX_ENTER(&freeSQEList_lock);
6405 while ((np = rx_FreeSQEList)) {
6406 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6407 MUTEX_DESTROY(&np->lock);
6408 rxi_Free(np, sizeof(*np));
6411 MUTEX_EXIT(&freeSQEList_lock);
6412 MUTEX_DESTROY(&freeSQEList_lock);
6413 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6414 MUTEX_DESTROY(&rx_connHashTable_lock);
6415 MUTEX_DESTROY(&rx_peerHashTable_lock);
6416 MUTEX_DESTROY(&rx_serverPool_lock);
6418 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6419 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6421 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6422 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6424 rxi_FreeAllPackets();
6426 MUTEX_ENTER(&rx_stats_mutex);
6427 rxi_dataQuota = RX_MAX_QUOTA;
6428 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6429 MUTEX_EXIT(&rx_stats_mutex);
6435 #ifdef RX_ENABLE_LOCKS
6436 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6438 if (!MUTEX_ISMINE(lockaddr))
6439 osi_Panic("Lock not held: %s", msg);
6441 #endif /* RX_ENABLE_LOCKS */
6446 * Routines to implement connection specific data.
6449 int rx_KeyCreate(rx_destructor_t rtn)
6452 MUTEX_ENTER(&rxi_keyCreate_lock);
6453 key = rxi_keyCreate_counter++;
6454 rxi_keyCreate_destructor = (rx_destructor_t *)
6455 realloc((void *)rxi_keyCreate_destructor,
6456 (key+1) * sizeof(rx_destructor_t));
6457 rxi_keyCreate_destructor[key] = rtn;
6458 MUTEX_EXIT(&rxi_keyCreate_lock);
6462 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6465 MUTEX_ENTER(&conn->conn_data_lock);
6466 if (!conn->specific) {
6467 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6468 for (i = 0 ; i < key ; i++)
6469 conn->specific[i] = NULL;
6470 conn->nSpecific = key+1;
6471 conn->specific[key] = ptr;
6472 } else if (key >= conn->nSpecific) {
6473 conn->specific = (void **)
6474 realloc(conn->specific,(key+1)*sizeof(void *));
6475 for (i = conn->nSpecific ; i < key ; i++)
6476 conn->specific[i] = NULL;
6477 conn->nSpecific = key+1;
6478 conn->specific[key] = ptr;
6480 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6481 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6482 conn->specific[key] = ptr;
6484 MUTEX_EXIT(&conn->conn_data_lock);
6487 void *rx_GetSpecific(struct rx_connection *conn, int key)
6490 MUTEX_ENTER(&conn->conn_data_lock);
6491 if (key >= conn->nSpecific)
6494 ptr = conn->specific[key];
6495 MUTEX_EXIT(&conn->conn_data_lock);
6499 #endif /* !KERNEL */
6502 * processStats is a queue used to store the statistics for the local
6503 * process. Its contents are similar to the contents of the rpcStats
6504 * queue on a rx_peer structure, but the actual data stored within
6505 * this queue contains totals across the lifetime of the process (assuming
6506 * the stats have not been reset) - unlike the per peer structures
6507 * which can come and go based upon the peer lifetime.
6510 static struct rx_queue processStats = {&processStats,&processStats};
6513 * peerStats is a queue used to store the statistics for all peer structs.
6514 * Its contents are the union of all the peer rpcStats queues.
6517 static struct rx_queue peerStats = {&peerStats,&peerStats};
6520 * rxi_monitor_processStats is used to turn process wide stat collection
6524 static int rxi_monitor_processStats = 0;
6527 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6530 static int rxi_monitor_peerStats = 0;
6533 * rxi_AddRpcStat - given all of the information for a particular rpc
6534 * call, create (if needed) and update the stat totals for the rpc.
6538 * IN stats - the queue of stats that will be updated with the new value
6540 * IN rxInterface - a unique number that identifies the rpc interface
6542 * IN currentFunc - the index of the function being invoked
6544 * IN totalFunc - the total number of functions in this interface
6546 * IN queueTime - the amount of time this function waited for a thread
6548 * IN execTime - the amount of time this function invocation took to execute
6550 * IN bytesSent - the number bytes sent by this invocation
6552 * IN bytesRcvd - the number bytes received by this invocation
6554 * IN isServer - if true, this invocation was made to a server
6556 * IN remoteHost - the ip address of the remote host
6558 * IN remotePort - the port of the remote host
6560 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6562 * INOUT counter - if a new stats structure is allocated, the counter will
6563 * be updated with the new number of allocated stat structures
6570 static int rxi_AddRpcStat(
6571 struct rx_queue *stats,
6572 afs_uint32 rxInterface,
6573 afs_uint32 currentFunc,
6574 afs_uint32 totalFunc,
6575 struct clock *queueTime,
6576 struct clock *execTime,
6577 afs_hyper_t *bytesSent,
6578 afs_hyper_t *bytesRcvd,
6580 afs_uint32 remoteHost,
6581 afs_uint32 remotePort,
6583 unsigned int *counter)
6586 rx_interface_stat_p rpc_stat, nrpc_stat;
6589 * See if there's already a structure for this interface
6592 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6593 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6594 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6598 * Didn't find a match so allocate a new structure and add it to the
6602 if (queue_IsEnd(stats, rpc_stat) ||
6603 (rpc_stat == NULL) ||
6604 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6605 (rpc_stat->stats[0].remote_is_server != isServer)) {
6609 space = sizeof(rx_interface_stat_t) + totalFunc *
6610 sizeof(rx_function_entry_v1_t);
6612 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6613 if (rpc_stat == NULL) {
6617 *counter += totalFunc;
6618 for(i=0;i<totalFunc;i++) {
6619 rpc_stat->stats[i].remote_peer = remoteHost;
6620 rpc_stat->stats[i].remote_port = remotePort;
6621 rpc_stat->stats[i].remote_is_server = isServer;
6622 rpc_stat->stats[i].interfaceId = rxInterface;
6623 rpc_stat->stats[i].func_total = totalFunc;
6624 rpc_stat->stats[i].func_index = i;
6625 hzero(rpc_stat->stats[i].invocations);
6626 hzero(rpc_stat->stats[i].bytes_sent);
6627 hzero(rpc_stat->stats[i].bytes_rcvd);
6628 rpc_stat->stats[i].queue_time_sum.sec = 0;
6629 rpc_stat->stats[i].queue_time_sum.usec = 0;
6630 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6631 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6632 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6633 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6634 rpc_stat->stats[i].queue_time_max.sec = 0;
6635 rpc_stat->stats[i].queue_time_max.usec = 0;
6636 rpc_stat->stats[i].execution_time_sum.sec = 0;
6637 rpc_stat->stats[i].execution_time_sum.usec = 0;
6638 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6639 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6640 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6641 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6642 rpc_stat->stats[i].execution_time_max.sec = 0;
6643 rpc_stat->stats[i].execution_time_max.usec = 0;
6645 queue_Prepend(stats, rpc_stat);
6646 if (addToPeerList) {
6647 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6652 * Increment the stats for this function
6655 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6656 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6657 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6658 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6659 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6660 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6661 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6663 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6664 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6666 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6667 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6668 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6669 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6671 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6672 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6680 * rx_IncrementTimeAndCount - increment the times and count for a particular
6685 * IN peer - the peer who invoked the rpc
6687 * IN rxInterface - a unique number that identifies the rpc interface
6689 * IN currentFunc - the index of the function being invoked
6691 * IN totalFunc - the total number of functions in this interface
6693 * IN queueTime - the amount of time this function waited for a thread
6695 * IN execTime - the amount of time this function invocation took to execute
6697 * IN bytesSent - the number bytes sent by this invocation
6699 * IN bytesRcvd - the number bytes received by this invocation
6701 * IN isServer - if true, this invocation was made to a server
6708 void rx_IncrementTimeAndCount(
6709 struct rx_peer *peer,
6710 afs_uint32 rxInterface,
6711 afs_uint32 currentFunc,
6712 afs_uint32 totalFunc,
6713 struct clock *queueTime,
6714 struct clock *execTime,
6715 afs_hyper_t *bytesSent,
6716 afs_hyper_t *bytesRcvd,
6720 MUTEX_ENTER(&rx_rpc_stats);
6721 MUTEX_ENTER(&peer->peer_lock);
6723 if (rxi_monitor_peerStats) {
6724 rxi_AddRpcStat(&peer->rpcStats,
6736 &rxi_rpc_peer_stat_cnt);
6739 if (rxi_monitor_processStats) {
6740 rxi_AddRpcStat(&processStats,
6752 &rxi_rpc_process_stat_cnt);
6755 MUTEX_EXIT(&peer->peer_lock);
6756 MUTEX_EXIT(&rx_rpc_stats);
6761 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6765 * IN callerVersion - the rpc stat version of the caller.
6767 * IN count - the number of entries to marshall.
6769 * IN stats - pointer to stats to be marshalled.
6771 * OUT ptr - Where to store the marshalled data.
6777 void rx_MarshallProcessRPCStats(
6778 afs_uint32 callerVersion,
6780 rx_function_entry_v1_t *stats,
6787 * We only support the first version
6789 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6790 *(ptr++) = stats->remote_peer;
6791 *(ptr++) = stats->remote_port;
6792 *(ptr++) = stats->remote_is_server;
6793 *(ptr++) = stats->interfaceId;
6794 *(ptr++) = stats->func_total;
6795 *(ptr++) = stats->func_index;
6796 *(ptr++) = hgethi(stats->invocations);
6797 *(ptr++) = hgetlo(stats->invocations);
6798 *(ptr++) = hgethi(stats->bytes_sent);
6799 *(ptr++) = hgetlo(stats->bytes_sent);
6800 *(ptr++) = hgethi(stats->bytes_rcvd);
6801 *(ptr++) = hgetlo(stats->bytes_rcvd);
6802 *(ptr++) = stats->queue_time_sum.sec;
6803 *(ptr++) = stats->queue_time_sum.usec;
6804 *(ptr++) = stats->queue_time_sum_sqr.sec;
6805 *(ptr++) = stats->queue_time_sum_sqr.usec;
6806 *(ptr++) = stats->queue_time_min.sec;
6807 *(ptr++) = stats->queue_time_min.usec;
6808 *(ptr++) = stats->queue_time_max.sec;
6809 *(ptr++) = stats->queue_time_max.usec;
6810 *(ptr++) = stats->execution_time_sum.sec;
6811 *(ptr++) = stats->execution_time_sum.usec;
6812 *(ptr++) = stats->execution_time_sum_sqr.sec;
6813 *(ptr++) = stats->execution_time_sum_sqr.usec;
6814 *(ptr++) = stats->execution_time_min.sec;
6815 *(ptr++) = stats->execution_time_min.usec;
6816 *(ptr++) = stats->execution_time_max.sec;
6817 *(ptr++) = stats->execution_time_max.usec;
6823 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6828 * IN callerVersion - the rpc stat version of the caller
6830 * OUT myVersion - the rpc stat version of this function
6832 * OUT clock_sec - local time seconds
6834 * OUT clock_usec - local time microseconds
6836 * OUT allocSize - the number of bytes allocated to contain stats
6838 * OUT statCount - the number stats retrieved from this process.
6840 * OUT stats - the actual stats retrieved from this process.
6844 * Returns void. If successful, stats will != NULL.
6847 int rx_RetrieveProcessRPCStats(
6848 afs_uint32 callerVersion,
6849 afs_uint32 *myVersion,
6850 afs_uint32 *clock_sec,
6851 afs_uint32 *clock_usec,
6853 afs_uint32 *statCount,
6864 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6867 * Check to see if stats are enabled
6870 MUTEX_ENTER(&rx_rpc_stats);
6871 if (!rxi_monitor_processStats) {
6872 MUTEX_EXIT(&rx_rpc_stats);
6876 clock_GetTime(&now);
6877 *clock_sec = now.sec;
6878 *clock_usec = now.usec;
6881 * Allocate the space based upon the caller version
6883 * If the client is at an older version than we are,
6884 * we return the statistic data in the older data format, but
6885 * we still return our version number so the client knows we
6886 * are maintaining more data than it can retrieve.
6889 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6890 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6891 *statCount = rxi_rpc_process_stat_cnt;
6894 * This can't happen yet, but in the future version changes
6895 * can be handled by adding additional code here
6899 if (space > (size_t) 0) {
6901 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6904 rx_interface_stat_p rpc_stat, nrpc_stat;
6907 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6908 rx_interface_stat)) {
6910 * Copy the data based upon the caller version
6912 rx_MarshallProcessRPCStats(callerVersion,
6913 rpc_stat->stats[0].func_total,
6914 rpc_stat->stats, &ptr);
6920 MUTEX_EXIT(&rx_rpc_stats);
6925 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6929 * IN callerVersion - the rpc stat version of the caller
6931 * OUT myVersion - the rpc stat version of this function
6933 * OUT clock_sec - local time seconds
6935 * OUT clock_usec - local time microseconds
6937 * OUT allocSize - the number of bytes allocated to contain stats
6939 * OUT statCount - the number of stats retrieved from the individual
6942 * OUT stats - the actual stats retrieved from the individual peer structures.
6946 * Returns void. If successful, stats will != NULL.
6949 int rx_RetrievePeerRPCStats(
6950 afs_uint32 callerVersion,
6951 afs_uint32 *myVersion,
6952 afs_uint32 *clock_sec,
6953 afs_uint32 *clock_usec,
6955 afs_uint32 *statCount,
6966 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6969 * Check to see if stats are enabled
6972 MUTEX_ENTER(&rx_rpc_stats);
6973 if (!rxi_monitor_peerStats) {
6974 MUTEX_EXIT(&rx_rpc_stats);
6978 clock_GetTime(&now);
6979 *clock_sec = now.sec;
6980 *clock_usec = now.usec;
6983 * Allocate the space based upon the caller version
6985 * If the client is at an older version than we are,
6986 * we return the statistic data in the older data format, but
6987 * we still return our version number so the client knows we
6988 * are maintaining more data than it can retrieve.
6991 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6992 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6993 *statCount = rxi_rpc_peer_stat_cnt;
6996 * This can't happen yet, but in the future version changes
6997 * can be handled by adding additional code here
7001 if (space > (size_t) 0) {
7003 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7006 rx_interface_stat_p rpc_stat, nrpc_stat;
7009 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7010 rx_interface_stat)) {
7012 * We have to fix the offset of rpc_stat since we are
7013 * keeping this structure on two rx_queues. The rx_queue
7014 * package assumes that the rx_queue member is the first
7015 * member of the structure. That is, rx_queue assumes that
7016 * any one item is only on one queue at a time. We are
7017 * breaking that assumption and so we have to do a little
7018 * math to fix our pointers.
7021 fix_offset = (char *) rpc_stat;
7022 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7023 rpc_stat = (rx_interface_stat_p) fix_offset;
7026 * Copy the data based upon the caller version
7028 rx_MarshallProcessRPCStats(callerVersion,
7029 rpc_stat->stats[0].func_total,
7030 rpc_stat->stats, &ptr);
7036 MUTEX_EXIT(&rx_rpc_stats);
7041 * rx_FreeRPCStats - free memory allocated by
7042 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7046 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7047 * rx_RetrievePeerRPCStats
7049 * IN allocSize - the number of bytes in stats.
7056 void rx_FreeRPCStats(
7060 rxi_Free(stats, allocSize);
7064 * rx_queryProcessRPCStats - see if process rpc stat collection is
7065 * currently enabled.
7071 * Returns 0 if stats are not enabled != 0 otherwise
7074 int rx_queryProcessRPCStats()
7077 MUTEX_ENTER(&rx_rpc_stats);
7078 rc = rxi_monitor_processStats;
7079 MUTEX_EXIT(&rx_rpc_stats);
7084 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7090 * Returns 0 if stats are not enabled != 0 otherwise
7093 int rx_queryPeerRPCStats()
7096 MUTEX_ENTER(&rx_rpc_stats);
7097 rc = rxi_monitor_peerStats;
7098 MUTEX_EXIT(&rx_rpc_stats);
7103 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7112 void rx_enableProcessRPCStats()
7114 MUTEX_ENTER(&rx_rpc_stats);
7115 rx_enable_stats = 1;
7116 rxi_monitor_processStats = 1;
7117 MUTEX_EXIT(&rx_rpc_stats);
7121 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7130 void rx_enablePeerRPCStats()
7132 MUTEX_ENTER(&rx_rpc_stats);
7133 rx_enable_stats = 1;
7134 rxi_monitor_peerStats = 1;
7135 MUTEX_EXIT(&rx_rpc_stats);
7139 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7148 void rx_disableProcessRPCStats()
7150 rx_interface_stat_p rpc_stat, nrpc_stat;
7153 MUTEX_ENTER(&rx_rpc_stats);
7156 * Turn off process statistics and if peer stats is also off, turn
7160 rxi_monitor_processStats = 0;
7161 if (rxi_monitor_peerStats == 0) {
7162 rx_enable_stats = 0;
7165 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7166 unsigned int num_funcs = 0;
7167 if (!rpc_stat) break;
7168 queue_Remove(rpc_stat);
7169 num_funcs = rpc_stat->stats[0].func_total;
7170 space = sizeof(rx_interface_stat_t) +
7171 rpc_stat->stats[0].func_total *
7172 sizeof(rx_function_entry_v1_t);
7174 rxi_Free(rpc_stat, space);
7175 rxi_rpc_process_stat_cnt -= num_funcs;
7177 MUTEX_EXIT(&rx_rpc_stats);
7181 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7190 void rx_disablePeerRPCStats()
7192 struct rx_peer **peer_ptr, **peer_end;
7195 MUTEX_ENTER(&rx_rpc_stats);
7198 * Turn off peer statistics and if process stats is also off, turn
7202 rxi_monitor_peerStats = 0;
7203 if (rxi_monitor_processStats == 0) {
7204 rx_enable_stats = 0;
7207 MUTEX_ENTER(&rx_peerHashTable_lock);
7208 for (peer_ptr = &rx_peerHashTable[0],
7209 peer_end = &rx_peerHashTable[rx_hashTableSize];
7210 peer_ptr < peer_end; peer_ptr++) {
7211 struct rx_peer *peer, *next, *prev;
7212 for (prev = peer = *peer_ptr; peer; peer = next) {
7214 code = MUTEX_TRYENTER(&peer->peer_lock);
7216 rx_interface_stat_p rpc_stat, nrpc_stat;
7218 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7219 rx_interface_stat)) {
7220 unsigned int num_funcs = 0;
7221 if (!rpc_stat) break;
7222 queue_Remove(&rpc_stat->queue_header);
7223 queue_Remove(&rpc_stat->all_peers);
7224 num_funcs = rpc_stat->stats[0].func_total;
7225 space = sizeof(rx_interface_stat_t) +
7226 rpc_stat->stats[0].func_total *
7227 sizeof(rx_function_entry_v1_t);
7229 rxi_Free(rpc_stat, space);
7230 rxi_rpc_peer_stat_cnt -= num_funcs;
7232 MUTEX_EXIT(&peer->peer_lock);
7233 if (prev == *peer_ptr) {
7245 MUTEX_EXIT(&rx_peerHashTable_lock);
7246 MUTEX_EXIT(&rx_rpc_stats);
7250 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7255 * IN clearFlag - flag indicating which stats to clear
7262 void rx_clearProcessRPCStats(
7263 afs_uint32 clearFlag)
7265 rx_interface_stat_p rpc_stat, nrpc_stat;
7267 MUTEX_ENTER(&rx_rpc_stats);
7269 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7270 unsigned int num_funcs = 0, i;
7271 num_funcs = rpc_stat->stats[0].func_total;
7272 for(i=0;i<num_funcs;i++) {
7273 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7274 hzero(rpc_stat->stats[i].invocations);
7276 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7277 hzero(rpc_stat->stats[i].bytes_sent);
7279 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7280 hzero(rpc_stat->stats[i].bytes_rcvd);
7282 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7283 rpc_stat->stats[i].queue_time_sum.sec = 0;
7284 rpc_stat->stats[i].queue_time_sum.usec = 0;
7286 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7287 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7288 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7290 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7291 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7292 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7294 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7295 rpc_stat->stats[i].queue_time_max.sec = 0;
7296 rpc_stat->stats[i].queue_time_max.usec = 0;
7298 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7299 rpc_stat->stats[i].execution_time_sum.sec = 0;
7300 rpc_stat->stats[i].execution_time_sum.usec = 0;
7302 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7303 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7304 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7306 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7307 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7308 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7310 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7311 rpc_stat->stats[i].execution_time_max.sec = 0;
7312 rpc_stat->stats[i].execution_time_max.usec = 0;
7317 MUTEX_EXIT(&rx_rpc_stats);
7321 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7326 * IN clearFlag - flag indicating which stats to clear
7333 void rx_clearPeerRPCStats(
7334 afs_uint32 clearFlag)
7336 rx_interface_stat_p rpc_stat, nrpc_stat;
7338 MUTEX_ENTER(&rx_rpc_stats);
7340 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7341 unsigned int num_funcs = 0, i;
7344 * We have to fix the offset of rpc_stat since we are
7345 * keeping this structure on two rx_queues. The rx_queue
7346 * package assumes that the rx_queue member is the first
7347 * member of the structure. That is, rx_queue assumes that
7348 * any one item is only on one queue at a time. We are
7349 * breaking that assumption and so we have to do a little
7350 * math to fix our pointers.
7353 fix_offset = (char *) rpc_stat;
7354 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7355 rpc_stat = (rx_interface_stat_p) fix_offset;
7357 num_funcs = rpc_stat->stats[0].func_total;
7358 for(i=0;i<num_funcs;i++) {
7359 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7360 hzero(rpc_stat->stats[i].invocations);
7362 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7363 hzero(rpc_stat->stats[i].bytes_sent);
7365 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7366 hzero(rpc_stat->stats[i].bytes_rcvd);
7368 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7369 rpc_stat->stats[i].queue_time_sum.sec = 0;
7370 rpc_stat->stats[i].queue_time_sum.usec = 0;
7372 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7373 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7374 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7376 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7377 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7378 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7380 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7381 rpc_stat->stats[i].queue_time_max.sec = 0;
7382 rpc_stat->stats[i].queue_time_max.usec = 0;
7384 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7385 rpc_stat->stats[i].execution_time_sum.sec = 0;
7386 rpc_stat->stats[i].execution_time_sum.usec = 0;
7388 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7389 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7390 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7392 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7393 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7394 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7396 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7397 rpc_stat->stats[i].execution_time_max.sec = 0;
7398 rpc_stat->stats[i].execution_time_max.usec = 0;
7403 MUTEX_EXIT(&rx_rpc_stats);
7407 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7408 * is authorized to enable/disable/clear RX statistics.
7410 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7412 void rx_SetRxStatUserOk(
7413 int (*proc)(struct rx_call *call))
7415 rxi_rxstat_userok = proc;
7418 int rx_RxStatUserOk(
7419 struct rx_call *call)
7421 if (!rxi_rxstat_userok)
7423 return rxi_rxstat_userok(call);