2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "../afs/param.h"
16 #include <afs/param.h>
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
46 #include "../afsint/afsint.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
116 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
118 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119 afs_int32 rxi_start_in_error;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
124 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125 * currently allocated within rx. This number is used to allocate the
126 * memory required to return the statistics when queried.
129 static unsigned int rxi_rpc_peer_stat_cnt;
132 * rxi_rpc_process_stat_cnt counts the total number of local process stat
133 * structures currently allocated within rx. The number is used to allocate
134 * the memory required to return the statistics when queried.
137 static unsigned int rxi_rpc_process_stat_cnt;
139 #if !defined(offsetof)
140 #include <stddef.h> /* for definition of offsetof() */
143 #ifdef AFS_PTHREAD_ENV
147 * Use procedural initialization of mutexes/condition variables
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
172 static void rxi_InitPthread(void) {
173 assert(pthread_mutex_init(&rx_clock_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxi_connCacheMutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_init_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&epoch_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_event_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&des_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&des_random_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&osi_malloc_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&event_handler_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&listener_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_if_init_mutex,
194 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_mutex_init(&rx_if_mutex,
196 (const pthread_mutexattr_t*)0)==0);
197 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198 (const pthread_mutexattr_t*)0)==0);
199 assert(pthread_mutex_init(&rxkad_random_mutex,
200 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_mutex_init(&rxkad_stats_mutex,
202 (const pthread_mutexattr_t*)0)==0);
203 assert(pthread_mutex_init(&rx_debug_mutex,
204 (const pthread_mutexattr_t*)0)==0);
206 assert(pthread_cond_init(&rx_event_handler_cond,
207 (const pthread_condattr_t*)0)==0);
208 assert(pthread_cond_init(&rx_listener_cond,
209 (const pthread_condattr_t*)0)==0);
210 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
217 * The rx_stats_mutex mutex protects the following global variables:
222 * rxi_lowConnRefCount
223 * rxi_lowPeerRefCount
232 #define INIT_PTHREAD_LOCKS
236 /* Variables for handling the minProcs implementation. availProcs gives the
237 * number of threads available in the pool at this moment (not counting dudes
238 * executing right now). totalMin gives the total number of procs required
239 * for handling all minProcs requests. minDeficit is a dynamic variable
240 * tracking the # of procs required to satisfy all of the remaining minProcs
242 * For fine grain locking to work, the quota check and the reservation of
243 * a server thread has to come while rxi_availProcs and rxi_minDeficit
244 * are locked. To this end, the code has been modified under #ifdef
245 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246 * same time. A new function, ReturnToServerPool() returns the allocation.
248 * A call can be on several queue's (but only one at a time). When
249 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250 * that no one else is touching the queue. To this end, we store the address
251 * of the queue lock in the call structure (under the call lock) when we
252 * put the call on a queue, and we clear the call_queue_lock when the
253 * call is removed from a queue (once the call lock has been obtained).
254 * This allows rxi_ResetCall to safely synchronize with others wishing
255 * to manipulate the queue.
258 #ifdef RX_ENABLE_LOCKS
259 static int rxi_ServerThreadSelectingCall;
260 static afs_kmutex_t rx_rpc_stats;
261 void rxi_StartUnlocked();
264 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
265 ** pretty good that the next packet coming in is from the same connection
266 ** as the last packet, since we're send multiple packets in a transmit window.
268 struct rx_connection *rxLastConn = 0;
270 #ifdef RX_ENABLE_LOCKS
271 /* The locking hierarchy for rx fine grain locking is composed of these
274 * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
275 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
276 * call->lock - locks call data fields.
277 * These are independent of each other:
278 * rx_freeCallQueue_lock
283 * serverQueueEntry->lock
285 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
286 * peer->lock - locks peer data fields.
287 * conn_data_lock - that more than one thread is not updating a conn data
288 * field at the same time.
296 * Do we need a lock to protect the peer field in the conn structure?
297 * conn->peer was previously a constant for all intents and so has no
298 * lock protecting this field. The multihomed client delta introduced
299 * a RX code change : change the peer field in the connection structure
300 * to that remote inetrface from which the last packet for this
301 * connection was sent out. This may become an issue if further changes
304 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
305 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
307 /* rxdb_fileID is used to identify the lock location, along with line#. */
308 static int rxdb_fileID = RXDB_FILE_RX;
309 #endif /* RX_LOCKS_DB */
310 #else /* RX_ENABLE_LOCKS */
311 #define SET_CALL_QUEUE_LOCK(C, L)
312 #define CLEAR_CALL_QUEUE_LOCK(C)
313 #endif /* RX_ENABLE_LOCKS */
314 struct rx_serverQueueEntry *rx_waitForPacket = 0;
316 /* ------------Exported Interfaces------------- */
318 /* This function allows rxkad to set the epoch to a suitably random number
319 * which rx_NewConnection will use in the future. The principle purpose is to
320 * get rxnull connections to use the same epoch as the rxkad connections do, at
321 * least once the first rxkad connection is established. This is important now
322 * that the host/port addresses aren't used in FindConnection: the uniqueness
323 * of epoch/cid matters and the start time won't do. */
325 #ifdef AFS_PTHREAD_ENV
327 * This mutex protects the following global variables:
331 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
332 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
336 #endif /* AFS_PTHREAD_ENV */
338 void rx_SetEpoch (afs_uint32 epoch)
345 /* Initialize rx. A port number may be mentioned, in which case this
346 * becomes the default port number for any service installed later.
347 * If 0 is provided for the port number, a random port will be chosen
348 * by the kernel. Whether this will ever overlap anything in
349 * /etc/services is anybody's guess... Returns 0 on success, -1 on
351 static int rxinit_status = 1;
352 #ifdef AFS_PTHREAD_ENV
354 * This mutex protects the following global variables:
358 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
359 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
362 #define UNLOCK_RX_INIT
365 int rx_Init(u_int port)
372 char *htable, *ptable;
375 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
376 __djgpp_set_quiet_socket(1);
383 if (rxinit_status == 0) {
384 tmp_status = rxinit_status;
386 return tmp_status; /* Already started; return previous error code. */
390 if (afs_winsockInit()<0)
396 * Initialize anything necessary to provide a non-premptive threading
399 rxi_InitializeThreadSupport();
402 /* Allocate and initialize a socket for client and perhaps server
405 rx_socket = rxi_GetUDPSocket((u_short)port);
406 if (rx_socket == OSI_NULLSOCKET) {
412 #ifdef RX_ENABLE_LOCKS
415 #endif /* RX_LOCKS_DB */
416 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
417 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
418 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
419 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
420 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
422 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
423 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
424 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
425 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
427 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
429 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
430 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
432 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
433 #endif /* KERNEL && AFS_HPUX110_ENV */
434 #else /* RX_ENABLE_LOCKS */
435 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
436 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
437 #endif /* AFS_GLOBAL_SUNLOCK */
438 #endif /* RX_ENABLE_LOCKS */
441 rx_connDeadTime = 12;
442 rx_tranquil = 0; /* reset flag */
443 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
445 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
446 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
447 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
448 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
449 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
450 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
452 /* Malloc up a bunch of packets & buffers */
454 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
455 queue_Init(&rx_freePacketQueue);
456 rxi_NeedMorePackets = FALSE;
457 rxi_MorePackets(rx_nPackets);
465 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
466 tv.tv_sec = clock_now.sec;
467 tv.tv_usec = clock_now.usec;
468 srand((unsigned int) tv.tv_usec);
475 #if defined(KERNEL) && !defined(UKERNEL)
476 /* Really, this should never happen in a real kernel */
479 struct sockaddr_in addr;
480 int addrlen = sizeof(addr);
481 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
485 rx_port = addr.sin_port;
488 rx_stats.minRtt.sec = 9999999;
490 rx_SetEpoch (tv.tv_sec | 0x80000000);
492 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
493 * will provide a randomer value. */
495 MUTEX_ENTER(&rx_stats_mutex);
496 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
497 MUTEX_EXIT(&rx_stats_mutex);
498 /* *Slightly* random start time for the cid. This is just to help
499 * out with the hashing function at the peer */
500 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
501 rx_connHashTable = (struct rx_connection **) htable;
502 rx_peerHashTable = (struct rx_peer **) ptable;
504 rx_lastAckDelay.sec = 0;
505 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
506 rx_hardAckDelay.sec = 0;
507 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
508 rx_softAckDelay.sec = 0;
509 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
511 rxevent_Init(20, rxi_ReScheduleEvents);
513 /* Initialize various global queues */
514 queue_Init(&rx_idleServerQueue);
515 queue_Init(&rx_incomingCallQueue);
516 queue_Init(&rx_freeCallQueue);
518 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
519 /* Initialize our list of usable IP addresses. */
523 /* Start listener process (exact function is dependent on the
524 * implementation environment--kernel or user space) */
529 tmp_status = rxinit_status = 0;
534 /* called with unincremented nRequestsRunning to see if it is OK to start
535 * a new thread in this service. Could be "no" for two reasons: over the
536 * max quota, or would prevent others from reaching their min quota.
538 #ifdef RX_ENABLE_LOCKS
539 /* This verion of QuotaOK reserves quota if it's ok while the
540 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
542 static int QuotaOK(register struct rx_service *aservice)
544 /* check if over max quota */
545 if (aservice->nRequestsRunning >= aservice->maxProcs) {
549 /* under min quota, we're OK */
550 /* otherwise, can use only if there are enough to allow everyone
551 * to go to their min quota after this guy starts.
553 MUTEX_ENTER(&rx_stats_mutex);
554 if ((aservice->nRequestsRunning < aservice->minProcs) ||
555 (rxi_availProcs > rxi_minDeficit)) {
556 aservice->nRequestsRunning++;
557 /* just started call in minProcs pool, need fewer to maintain
559 if (aservice->nRequestsRunning <= aservice->minProcs)
562 MUTEX_EXIT(&rx_stats_mutex);
565 MUTEX_EXIT(&rx_stats_mutex);
570 static void ReturnToServerPool(register struct rx_service *aservice)
572 aservice->nRequestsRunning--;
573 MUTEX_ENTER(&rx_stats_mutex);
574 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
576 MUTEX_EXIT(&rx_stats_mutex);
579 #else /* RX_ENABLE_LOCKS */
580 static int QuotaOK(register struct rx_service *aservice)
583 /* under min quota, we're OK */
584 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
586 /* check if over max quota */
587 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
589 /* otherwise, can use only if there are enough to allow everyone
590 * to go to their min quota after this guy starts.
592 if (rxi_availProcs > rxi_minDeficit) rc = 1;
595 #endif /* RX_ENABLE_LOCKS */
598 /* Called by rx_StartServer to start up lwp's to service calls.
599 NExistingProcs gives the number of procs already existing, and which
600 therefore needn't be created. */
601 void rxi_StartServerProcs(int nExistingProcs)
603 register struct rx_service *service;
608 /* For each service, reserve N processes, where N is the "minimum"
609 number of processes that MUST be able to execute a request in parallel,
610 at any time, for that process. Also compute the maximum difference
611 between any service's maximum number of processes that can run
612 (i.e. the maximum number that ever will be run, and a guarantee
613 that this number will run if other services aren't running), and its
614 minimum number. The result is the extra number of processes that
615 we need in order to provide the latter guarantee */
616 for (i=0; i<RX_MAX_SERVICES; i++) {
618 service = rx_services[i];
619 if (service == (struct rx_service *) 0) break;
620 nProcs += service->minProcs;
621 diff = service->maxProcs - service->minProcs;
622 if (diff > maxdiff) maxdiff = diff;
624 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
625 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
626 for (i = 0; i<nProcs; i++) {
627 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
632 /* This routine must be called if any services are exported. If the
633 * donateMe flag is set, the calling process is donated to the server
635 void rx_StartServer(int donateMe)
637 register struct rx_service *service;
638 register int i, nProcs=0;
644 /* Start server processes, if necessary (exact function is dependent
645 * on the implementation environment--kernel or user space). DonateMe
646 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
647 * case, one less new proc will be created rx_StartServerProcs.
649 rxi_StartServerProcs(donateMe);
651 /* count up the # of threads in minProcs, and add set the min deficit to
652 * be that value, too.
654 for (i=0; i<RX_MAX_SERVICES; i++) {
655 service = rx_services[i];
656 if (service == (struct rx_service *) 0) break;
657 MUTEX_ENTER(&rx_stats_mutex);
658 rxi_totalMin += service->minProcs;
659 /* below works even if a thread is running, since minDeficit would
660 * still have been decremented and later re-incremented.
662 rxi_minDeficit += service->minProcs;
663 MUTEX_EXIT(&rx_stats_mutex);
666 /* Turn on reaping of idle server connections */
667 rxi_ReapConnections();
676 #ifdef AFS_PTHREAD_ENV
678 pid = (pid_t) pthread_self();
679 #else /* AFS_PTHREAD_ENV */
681 LWP_CurrentProcess(&pid);
682 #endif /* AFS_PTHREAD_ENV */
684 sprintf(name,"srv_%d", ++nProcs);
686 (*registerProgram)(pid, name);
688 #endif /* AFS_NT40_ENV */
689 rx_ServerProc(); /* Never returns */
694 /* Create a new client connection to the specified service, using the
695 * specified security object to implement the security model for this
697 struct rx_connection *rx_NewConnection(register afs_uint32 shost,
698 u_short sport, u_short sservice,
699 register struct rx_securityClass *securityObject, int serviceSecurityIndex)
703 register struct rx_connection *conn;
708 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
709 shost, sport, sservice, securityObject, serviceSecurityIndex));
711 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
712 * the case of kmem_alloc? */
713 conn = rxi_AllocConnection();
714 #ifdef RX_ENABLE_LOCKS
715 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
716 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
717 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
721 MUTEX_ENTER(&rx_connHashTable_lock);
722 cid = (rx_nextCid += RX_MAXCALLS);
723 conn->type = RX_CLIENT_CONNECTION;
725 conn->epoch = rx_epoch;
726 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
727 conn->serviceId = sservice;
728 conn->securityObject = securityObject;
729 /* This doesn't work in all compilers with void (they're buggy), so fake it
731 conn->securityData = (VOID *) 0;
732 conn->securityIndex = serviceSecurityIndex;
733 rx_SetConnDeadTime(conn, rx_connDeadTime);
734 conn->ackRate = RX_FAST_ACK_RATE;
736 conn->specific = NULL;
737 conn->challengeEvent = NULL;
738 conn->delayedAbortEvent = NULL;
739 conn->abortCount = 0;
742 RXS_NewConnection(securityObject, conn);
743 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
745 conn->refCount++; /* no lock required since only this thread knows... */
746 conn->next = rx_connHashTable[hashindex];
747 rx_connHashTable[hashindex] = conn;
748 MUTEX_ENTER(&rx_stats_mutex);
749 rx_stats.nClientConns++;
750 MUTEX_EXIT(&rx_stats_mutex);
752 MUTEX_EXIT(&rx_connHashTable_lock);
758 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
760 /* The idea is to set the dead time to a value that allows several
761 * keepalives to be dropped without timing out the connection. */
762 conn->secondsUntilDead = MAX(seconds, 6);
763 conn->secondsUntilPing = conn->secondsUntilDead/6;
766 int rxi_lowPeerRefCount = 0;
767 int rxi_lowConnRefCount = 0;
770 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
771 * NOTE: must not be called with rx_connHashTable_lock held.
773 void rxi_CleanupConnection(struct rx_connection *conn)
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(register struct rx_connection *conn)
831 MUTEX_ENTER(&rx_connHashTable_lock);
832 rxi_DestroyConnectionNoLock(conn);
833 /* conn should be at the head of the cleanup list */
834 if (conn == rx_connCleanup_list) {
835 rx_connCleanup_list = rx_connCleanup_list->next;
836 MUTEX_EXIT(&rx_connHashTable_lock);
837 rxi_CleanupConnection(conn);
839 #ifdef RX_ENABLE_LOCKS
841 MUTEX_EXIT(&rx_connHashTable_lock);
843 #endif /* RX_ENABLE_LOCKS */
846 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
848 register struct rx_connection **conn_ptr;
849 register int havecalls = 0;
850 struct rx_packet *packet;
857 MUTEX_ENTER(&conn->conn_data_lock);
858 if (conn->refCount > 0)
861 MUTEX_ENTER(&rx_stats_mutex);
862 rxi_lowConnRefCount++;
863 MUTEX_EXIT(&rx_stats_mutex);
866 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
867 /* Busy; wait till the last guy before proceeding */
868 MUTEX_EXIT(&conn->conn_data_lock);
873 /* If the client previously called rx_NewCall, but it is still
874 * waiting, treat this as a running call, and wait to destroy the
875 * connection later when the call completes. */
876 if ((conn->type == RX_CLIENT_CONNECTION) &&
877 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
878 conn->flags |= RX_CONN_DESTROY_ME;
879 MUTEX_EXIT(&conn->conn_data_lock);
883 MUTEX_EXIT(&conn->conn_data_lock);
885 /* Check for extant references to this connection */
886 for (i = 0; i<RX_MAXCALLS; i++) {
887 register struct rx_call *call = conn->call[i];
890 if (conn->type == RX_CLIENT_CONNECTION) {
891 MUTEX_ENTER(&call->lock);
892 if (call->delayedAckEvent) {
893 /* Push the final acknowledgment out now--there
894 * won't be a subsequent call to acknowledge the
895 * last reply packets */
896 rxevent_Cancel(call->delayedAckEvent, call,
897 RX_CALL_REFCOUNT_DELAY);
898 if (call->state == RX_STATE_PRECALL ||
899 call->state == RX_STATE_ACTIVE) {
900 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
902 rxi_AckAll(NULL, call, 0);
905 MUTEX_EXIT(&call->lock);
909 #ifdef RX_ENABLE_LOCKS
911 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
912 MUTEX_EXIT(&conn->conn_data_lock);
915 /* Someone is accessing a packet right now. */
919 #endif /* RX_ENABLE_LOCKS */
922 /* Don't destroy the connection if there are any call
923 * structures still in use */
924 MUTEX_ENTER(&conn->conn_data_lock);
925 conn->flags |= RX_CONN_DESTROY_ME;
926 MUTEX_EXIT(&conn->conn_data_lock);
931 if (conn->delayedAbortEvent) {
932 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
933 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
935 MUTEX_ENTER(&conn->conn_data_lock);
936 rxi_SendConnectionAbort(conn, packet, 0, 1);
937 MUTEX_EXIT(&conn->conn_data_lock);
938 rxi_FreePacket(packet);
942 /* Remove from connection hash table before proceeding */
943 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
944 conn->epoch, conn->type) ];
945 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
946 if (*conn_ptr == conn) {
947 *conn_ptr = conn->next;
951 /* if the conn that we are destroying was the last connection, then we
952 * clear rxLastConn as well */
953 if ( rxLastConn == conn )
956 /* Make sure the connection is completely reset before deleting it. */
957 /* get rid of pending events that could zap us later */
958 if (conn->challengeEvent)
959 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
960 if (conn->checkReachEvent)
961 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
963 /* Add the connection to the list of destroyed connections that
964 * need to be cleaned up. This is necessary to avoid deadlocks
965 * in the routines we call to inform others that this connection is
966 * being destroyed. */
967 conn->next = rx_connCleanup_list;
968 rx_connCleanup_list = conn;
971 /* Externally available version */
972 void rx_DestroyConnection(register struct rx_connection *conn)
978 rxi_DestroyConnection (conn);
983 /* Start a new rx remote procedure call, on the specified connection.
984 * If wait is set to 1, wait for a free call channel; otherwise return
985 * 0. Maxtime gives the maximum number of seconds this call may take,
986 * after rx_MakeCall returns. After this time interval, a call to any
987 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
988 * For fine grain locking, we hold the conn_call_lock in order to
989 * to ensure that we don't get signalle after we found a call in an active
990 * state and before we go to sleep.
992 struct rx_call *rx_NewCall(register struct rx_connection *conn)
995 register struct rx_call *call;
996 struct clock queueTime;
1000 dpf (("rx_MakeCall(conn %x)\n", conn));
1003 clock_GetTime(&queueTime);
1005 MUTEX_ENTER(&conn->conn_call_lock);
1008 * Check if there are others waiting for a new call.
1009 * If so, let them go first to avoid starving them.
1010 * This is a fairly simple scheme, and might not be
1011 * a complete solution for large numbers of waiters.
1013 if (conn->makeCallWaiters) {
1014 #ifdef RX_ENABLE_LOCKS
1015 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1022 for (i=0; i<RX_MAXCALLS; i++) {
1023 call = conn->call[i];
1025 MUTEX_ENTER(&call->lock);
1026 if (call->state == RX_STATE_DALLY) {
1027 rxi_ResetCall(call, 0);
1028 (*call->callNumber)++;
1031 MUTEX_EXIT(&call->lock);
1034 call = rxi_NewCall(conn, i);
1038 if (i < RX_MAXCALLS) {
1041 MUTEX_ENTER(&conn->conn_data_lock);
1042 conn->flags |= RX_CONN_MAKECALL_WAITING;
1043 MUTEX_EXIT(&conn->conn_data_lock);
1045 conn->makeCallWaiters++;
1046 #ifdef RX_ENABLE_LOCKS
1047 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1051 conn->makeCallWaiters--;
1054 * Wake up anyone else who might be giving us a chance to
1055 * run (see code above that avoids resource starvation).
1057 #ifdef RX_ENABLE_LOCKS
1058 CV_BROADCAST(&conn->conn_call_cv);
1063 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1065 /* Client is initially in send mode */
1066 call->state = RX_STATE_ACTIVE;
1067 call->mode = RX_MODE_SENDING;
1069 /* remember start time for call in case we have hard dead time limit */
1070 call->queueTime = queueTime;
1071 clock_GetTime(&call->startTime);
1072 hzero(call->bytesSent);
1073 hzero(call->bytesRcvd);
1075 /* Turn on busy protocol. */
1076 rxi_KeepAliveOn(call);
1078 MUTEX_EXIT(&call->lock);
1079 MUTEX_EXIT(&conn->conn_call_lock);
1083 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1084 /* Now, if TQ wasn't cleared earlier, do it now. */
1086 MUTEX_ENTER(&call->lock);
1087 while (call->flags & RX_CALL_TQ_BUSY) {
1088 call->flags |= RX_CALL_TQ_WAIT;
1089 #ifdef RX_ENABLE_LOCKS
1090 CV_WAIT(&call->cv_tq, &call->lock);
1091 #else /* RX_ENABLE_LOCKS */
1092 osi_rxSleep(&call->tq);
1093 #endif /* RX_ENABLE_LOCKS */
1095 if (call->flags & RX_CALL_TQ_CLEARME) {
1096 rxi_ClearTransmitQueue(call, 0);
1097 queue_Init(&call->tq);
1099 MUTEX_EXIT(&call->lock);
1101 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1106 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1109 register struct rx_call *tcall;
1113 for(i=0; i<RX_MAXCALLS; i++) {
1114 if ((tcall = aconn->call[i])) {
1115 if ((tcall->state == RX_STATE_ACTIVE)
1116 || (tcall->state == RX_STATE_PRECALL)) {
1126 int rxi_GetCallNumberVector(register struct rx_connection *aconn,
1127 register afs_int32 *aint32s)
1130 register struct rx_call *tcall;
1134 for(i=0; i<RX_MAXCALLS; i++) {
1135 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1136 aint32s[i] = aconn->callNumber[i]+1;
1138 aint32s[i] = aconn->callNumber[i];
1144 int rxi_SetCallNumberVector(register struct rx_connection *aconn,
1145 register afs_int32 *aint32s)
1148 register struct rx_call *tcall;
1152 for(i=0; i<RX_MAXCALLS; i++) {
1153 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1154 aconn->callNumber[i] = aint32s[i] - 1;
1156 aconn->callNumber[i] = aint32s[i];
1162 /* Advertise a new service. A service is named locally by a UDP port
1163 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1166 char *serviceName; Name for identification purposes (e.g. the
1167 service name might be used for probing for
1169 struct rx_service *rx_NewService(u_short port, u_short serviceId,
1171 struct rx_securityClass **securityObjects,
1172 int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1174 osi_socket socket = OSI_NULLSOCKET;
1175 register struct rx_service *tservice;
1181 if (serviceId == 0) {
1182 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1188 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1195 tservice = rxi_AllocService();
1198 for (i = 0; i<RX_MAX_SERVICES; i++) {
1199 register struct rx_service *service = rx_services[i];
1201 if (port == service->servicePort) {
1202 if (service->serviceId == serviceId) {
1203 /* The identical service has already been
1204 * installed; if the caller was intending to
1205 * change the security classes used by this
1206 * service, he/she loses. */
1207 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1210 rxi_FreeService(tservice);
1213 /* Different service, same port: re-use the socket
1214 * which is bound to the same port */
1215 socket = service->socket;
1218 if (socket == OSI_NULLSOCKET) {
1219 /* If we don't already have a socket (from another
1220 * service on same port) get a new one */
1221 socket = rxi_GetUDPSocket(port);
1222 if (socket == OSI_NULLSOCKET) {
1225 rxi_FreeService(tservice);
1230 service->socket = socket;
1231 service->servicePort = port;
1232 service->serviceId = serviceId;
1233 service->serviceName = serviceName;
1234 service->nSecurityObjects = nSecurityObjects;
1235 service->securityObjects = securityObjects;
1236 service->minProcs = 0;
1237 service->maxProcs = 1;
1238 service->idleDeadTime = 60;
1239 service->connDeadTime = rx_connDeadTime;
1240 service->executeRequestProc = serviceProc;
1241 service->checkReach = 0;
1242 rx_services[i] = service; /* not visible until now */
1250 rxi_FreeService(tservice);
1251 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1255 /* Generic request processing loop. This routine should be called
1256 * by the implementation dependent rx_ServerProc. If socketp is
1257 * non-null, it will be set to the file descriptor that this thread
1258 * is now listening on. If socketp is null, this routine will never
1260 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1262 register struct rx_call *call;
1263 register afs_int32 code;
1264 register struct rx_service *tservice = NULL;
1271 call = rx_GetCall(threadID, tservice, socketp);
1272 if (socketp && *socketp != OSI_NULLSOCKET) {
1273 /* We are now a listener thread */
1278 /* if server is restarting( typically smooth shutdown) then do not
1279 * allow any new calls.
1282 if ( rx_tranquil && (call != NULL) ) {
1287 MUTEX_ENTER(&call->lock);
1289 rxi_CallError(call, RX_RESTARTING);
1290 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1292 MUTEX_EXIT(&call->lock);
1298 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1299 #ifdef RX_ENABLE_LOCKS
1301 #endif /* RX_ENABLE_LOCKS */
1302 afs_termState = AFSOP_STOP_AFS;
1303 afs_osi_Wakeup(&afs_termState);
1304 #ifdef RX_ENABLE_LOCKS
1306 #endif /* RX_ENABLE_LOCKS */
1311 tservice = call->conn->service;
1313 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1315 code = call->conn->service->executeRequestProc(call);
1317 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1319 rx_EndCall(call, code);
1320 MUTEX_ENTER(&rx_stats_mutex);
1322 MUTEX_EXIT(&rx_stats_mutex);
1327 void rx_WakeupServerProcs(void)
1329 struct rx_serverQueueEntry *np, *tqp;
1334 MUTEX_ENTER(&rx_serverPool_lock);
1336 #ifdef RX_ENABLE_LOCKS
1337 if (rx_waitForPacket)
1338 CV_BROADCAST(&rx_waitForPacket->cv);
1339 #else /* RX_ENABLE_LOCKS */
1340 if (rx_waitForPacket)
1341 osi_rxWakeup(rx_waitForPacket);
1342 #endif /* RX_ENABLE_LOCKS */
1343 MUTEX_ENTER(&freeSQEList_lock);
1344 for (np = rx_FreeSQEList; np; np = tqp) {
1345 tqp = *(struct rx_serverQueueEntry **)np;
1346 #ifdef RX_ENABLE_LOCKS
1347 CV_BROADCAST(&np->cv);
1348 #else /* RX_ENABLE_LOCKS */
1350 #endif /* RX_ENABLE_LOCKS */
1352 MUTEX_EXIT(&freeSQEList_lock);
1353 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1354 #ifdef RX_ENABLE_LOCKS
1355 CV_BROADCAST(&np->cv);
1356 #else /* RX_ENABLE_LOCKS */
1358 #endif /* RX_ENABLE_LOCKS */
1360 MUTEX_EXIT(&rx_serverPool_lock);
1366 * One thing that seems to happen is that all the server threads get
1367 * tied up on some empty or slow call, and then a whole bunch of calls
1368 * arrive at once, using up the packet pool, so now there are more
1369 * empty calls. The most critical resources here are server threads
1370 * and the free packet pool. The "doreclaim" code seems to help in
1371 * general. I think that eventually we arrive in this state: there
1372 * are lots of pending calls which do have all their packets present,
1373 * so they won't be reclaimed, are multi-packet calls, so they won't
1374 * be scheduled until later, and thus are tying up most of the free
1375 * packet pool for a very long time.
1377 * 1. schedule multi-packet calls if all the packets are present.
1378 * Probably CPU-bound operation, useful to return packets to pool.
1379 * Do what if there is a full window, but the last packet isn't here?
1380 * 3. preserve one thread which *only* runs "best" calls, otherwise
1381 * it sleeps and waits for that type of call.
1382 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1383 * the current dataquota business is badly broken. The quota isn't adjusted
1384 * to reflect how many packets are presently queued for a running call.
1385 * So, when we schedule a queued call with a full window of packets queued
1386 * up for it, that *should* free up a window full of packets for other 2d-class
1387 * calls to be able to use from the packet pool. But it doesn't.
1389 * NB. Most of the time, this code doesn't run -- since idle server threads
1390 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1391 * as a new call arrives.
1393 /* Sleep until a call arrives. Returns a pointer to the call, ready
1394 * for an rx_Read. */
1395 #ifdef RX_ENABLE_LOCKS
1396 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1398 struct rx_serverQueueEntry *sq;
1399 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1400 struct rx_service *service = NULL;
1403 MUTEX_ENTER(&freeSQEList_lock);
1405 if ((sq = rx_FreeSQEList)) {
1406 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1407 MUTEX_EXIT(&freeSQEList_lock);
1408 } else { /* otherwise allocate a new one and return that */
1409 MUTEX_EXIT(&freeSQEList_lock);
1410 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1411 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1412 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1415 MUTEX_ENTER(&rx_serverPool_lock);
1416 if (cur_service != NULL) {
1417 ReturnToServerPool(cur_service);
1420 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1421 register struct rx_call *tcall, *ncall;
1422 choice2 = (struct rx_call *) 0;
1423 /* Scan for eligible incoming calls. A call is not eligible
1424 * if the maximum number of calls for its service type are
1425 * already executing */
1426 /* One thread will process calls FCFS (to prevent starvation),
1427 * while the other threads may run ahead looking for calls which
1428 * have all their input data available immediately. This helps
1429 * keep threads from blocking, waiting for data from the client. */
1430 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1431 service = tcall->conn->service;
1432 if (!QuotaOK(service)) {
1435 if (!tno || !tcall->queue_item_header.next ) {
1436 /* If we're thread 0, then we'll just use
1437 * this call. If we haven't been able to find an optimal
1438 * choice, and we're at the end of the list, then use a
1439 * 2d choice if one has been identified. Otherwise... */
1440 call = (choice2 ? choice2 : tcall);
1441 service = call->conn->service;
1442 } else if (!queue_IsEmpty(&tcall->rq)) {
1443 struct rx_packet *rp;
1444 rp = queue_First(&tcall->rq, rx_packet);
1445 if (rp->header.seq == 1) {
1446 if (!meltdown_1pkt ||
1447 (rp->header.flags & RX_LAST_PACKET)) {
1449 } else if (rxi_2dchoice && !choice2 &&
1450 !(tcall->flags & RX_CALL_CLEARED) &&
1451 (tcall->rprev > rxi_HardAckRate)) {
1453 } else rxi_md2cnt++;
1459 ReturnToServerPool(service);
1466 rxi_ServerThreadSelectingCall = 1;
1467 MUTEX_EXIT(&rx_serverPool_lock);
1468 MUTEX_ENTER(&call->lock);
1469 MUTEX_ENTER(&rx_serverPool_lock);
1471 if (queue_IsEmpty(&call->rq) ||
1472 queue_First(&call->rq, rx_packet)->header.seq != 1)
1473 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1475 CLEAR_CALL_QUEUE_LOCK(call);
1477 MUTEX_EXIT(&call->lock);
1478 ReturnToServerPool(service);
1479 rxi_ServerThreadSelectingCall = 0;
1480 CV_SIGNAL(&rx_serverPool_cv);
1481 call = (struct rx_call*)0;
1484 call->flags &= (~RX_CALL_WAIT_PROC);
1485 MUTEX_ENTER(&rx_stats_mutex);
1487 MUTEX_EXIT(&rx_stats_mutex);
1488 rxi_ServerThreadSelectingCall = 0;
1489 CV_SIGNAL(&rx_serverPool_cv);
1490 MUTEX_EXIT(&rx_serverPool_lock);
1494 /* If there are no eligible incoming calls, add this process
1495 * to the idle server queue, to wait for one */
1499 *socketp = OSI_NULLSOCKET;
1501 sq->socketp = socketp;
1502 queue_Append(&rx_idleServerQueue, sq);
1503 #ifndef AFS_AIX41_ENV
1504 rx_waitForPacket = sq;
1505 #endif /* AFS_AIX41_ENV */
1507 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1509 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1510 MUTEX_EXIT(&rx_serverPool_lock);
1511 return (struct rx_call *)0;
1514 } while (!(call = sq->newcall) &&
1515 !(socketp && *socketp != OSI_NULLSOCKET));
1516 MUTEX_EXIT(&rx_serverPool_lock);
1518 MUTEX_ENTER(&call->lock);
1524 MUTEX_ENTER(&freeSQEList_lock);
1525 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1526 rx_FreeSQEList = sq;
1527 MUTEX_EXIT(&freeSQEList_lock);
1530 clock_GetTime(&call->startTime);
1531 call->state = RX_STATE_ACTIVE;
1532 call->mode = RX_MODE_RECEIVING;
1534 rxi_calltrace(RX_CALL_START, call);
1535 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1536 call->conn->service->servicePort,
1537 call->conn->service->serviceId, call));
1539 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1540 MUTEX_EXIT(&call->lock);
1542 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1547 #else /* RX_ENABLE_LOCKS */
1548 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1550 struct rx_serverQueueEntry *sq;
1551 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1552 struct rx_service *service = NULL;
1557 MUTEX_ENTER(&freeSQEList_lock);
1559 if ((sq = rx_FreeSQEList)) {
1560 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1561 MUTEX_EXIT(&freeSQEList_lock);
1562 } else { /* otherwise allocate a new one and return that */
1563 MUTEX_EXIT(&freeSQEList_lock);
1564 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1565 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1566 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1568 MUTEX_ENTER(&sq->lock);
1570 if (cur_service != NULL) {
1571 cur_service->nRequestsRunning--;
1572 if (cur_service->nRequestsRunning < cur_service->minProcs)
1576 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1577 register struct rx_call *tcall, *ncall;
1578 /* Scan for eligible incoming calls. A call is not eligible
1579 * if the maximum number of calls for its service type are
1580 * already executing */
1581 /* One thread will process calls FCFS (to prevent starvation),
1582 * while the other threads may run ahead looking for calls which
1583 * have all their input data available immediately. This helps
1584 * keep threads from blocking, waiting for data from the client. */
1585 choice2 = (struct rx_call *) 0;
1586 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1587 service = tcall->conn->service;
1588 if (QuotaOK(service)) {
1589 if (!tno || !tcall->queue_item_header.next ) {
1590 /* If we're thread 0, then we'll just use
1591 * this call. If we haven't been able to find an optimal
1592 * choice, and we're at the end of the list, then use a
1593 * 2d choice if one has been identified. Otherwise... */
1594 call = (choice2 ? choice2 : tcall);
1595 service = call->conn->service;
1596 } else if (!queue_IsEmpty(&tcall->rq)) {
1597 struct rx_packet *rp;
1598 rp = queue_First(&tcall->rq, rx_packet);
1599 if (rp->header.seq == 1
1600 && (!meltdown_1pkt ||
1601 (rp->header.flags & RX_LAST_PACKET))) {
1603 } else if (rxi_2dchoice && !choice2 &&
1604 !(tcall->flags & RX_CALL_CLEARED) &&
1605 (tcall->rprev > rxi_HardAckRate)) {
1607 } else rxi_md2cnt++;
1617 /* we can't schedule a call if there's no data!!! */
1618 /* send an ack if there's no data, if we're missing the
1619 * first packet, or we're missing something between first
1620 * and last -- there's a "hole" in the incoming data. */
1621 if (queue_IsEmpty(&call->rq) ||
1622 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1623 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1624 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1626 call->flags &= (~RX_CALL_WAIT_PROC);
1627 service->nRequestsRunning++;
1628 /* just started call in minProcs pool, need fewer to maintain
1630 if (service->nRequestsRunning <= service->minProcs)
1634 /* MUTEX_EXIT(&call->lock); */
1637 /* If there are no eligible incoming calls, add this process
1638 * to the idle server queue, to wait for one */
1641 *socketp = OSI_NULLSOCKET;
1643 sq->socketp = socketp;
1644 queue_Append(&rx_idleServerQueue, sq);
1648 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1651 return (struct rx_call *)0;
1654 } while (!(call = sq->newcall) &&
1655 !(socketp && *socketp != OSI_NULLSOCKET));
1657 MUTEX_EXIT(&sq->lock);
1659 MUTEX_ENTER(&freeSQEList_lock);
1660 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1661 rx_FreeSQEList = sq;
1662 MUTEX_EXIT(&freeSQEList_lock);
1665 clock_GetTime(&call->startTime);
1666 call->state = RX_STATE_ACTIVE;
1667 call->mode = RX_MODE_RECEIVING;
1669 rxi_calltrace(RX_CALL_START, call);
1670 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1671 call->conn->service->servicePort,
1672 call->conn->service->serviceId, call));
1674 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1682 #endif /* RX_ENABLE_LOCKS */
1686 /* Establish a procedure to be called when a packet arrives for a
1687 * call. This routine will be called at most once after each call,
1688 * and will also be called if there is an error condition on the or
1689 * the call is complete. Used by multi rx to build a selection
1690 * function which determines which of several calls is likely to be a
1691 * good one to read from.
1692 * NOTE: the way this is currently implemented it is probably only a
1693 * good idea to (1) use it immediately after a newcall (clients only)
1694 * and (2) only use it once. Other uses currently void your warranty
1696 void rx_SetArrivalProc(register struct rx_call *call,
1697 register VOID (*proc)(register struct rx_call *call,
1698 register struct multi_handle *mh, register int index),
1699 register VOID *handle, register VOID *arg)
1701 call->arrivalProc = proc;
1702 call->arrivalProcHandle = handle;
1703 call->arrivalProcArg = arg;
1706 /* Call is finished (possibly prematurely). Return rc to the peer, if
1707 * appropriate, and return the final error code from the conversation
1710 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1712 register struct rx_connection *conn = call->conn;
1713 register struct rx_service *service;
1714 register struct rx_packet *tp; /* Temporary packet pointer */
1715 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1719 dpf(("rx_EndCall(call %x)\n", call));
1723 MUTEX_ENTER(&call->lock);
1725 if (rc == 0 && call->error == 0) {
1726 call->abortCode = 0;
1727 call->abortCount = 0;
1730 call->arrivalProc = (VOID (*)()) 0;
1731 if (rc && call->error == 0) {
1732 rxi_CallError(call, rc);
1733 /* Send an abort message to the peer if this error code has
1734 * only just been set. If it was set previously, assume the
1735 * peer has already been sent the error code or will request it
1737 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1739 if (conn->type == RX_SERVER_CONNECTION) {
1740 /* Make sure reply or at least dummy reply is sent */
1741 if (call->mode == RX_MODE_RECEIVING) {
1742 rxi_WriteProc(call, 0, 0);
1744 if (call->mode == RX_MODE_SENDING) {
1745 rxi_FlushWrite(call);
1747 service = conn->service;
1748 rxi_calltrace(RX_CALL_END, call);
1749 /* Call goes to hold state until reply packets are acknowledged */
1750 if (call->tfirst + call->nSoftAcked < call->tnext) {
1751 call->state = RX_STATE_HOLD;
1753 call->state = RX_STATE_DALLY;
1754 rxi_ClearTransmitQueue(call, 0);
1755 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1756 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1759 else { /* Client connection */
1761 /* Make sure server receives input packets, in the case where
1762 * no reply arguments are expected */
1763 if ((call->mode == RX_MODE_SENDING)
1764 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1765 (void) rxi_ReadProc(call, &dummy, 1);
1767 /* We need to release the call lock since it's lower than the
1768 * conn_call_lock and we don't want to hold the conn_call_lock
1769 * over the rx_ReadProc call. The conn_call_lock needs to be held
1770 * here for the case where rx_NewCall is perusing the calls on
1771 * the connection structure. We don't want to signal until
1772 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1773 * have checked this call, found it active and by the time it
1774 * goes to sleep, will have missed the signal.
1776 MUTEX_EXIT(&call->lock);
1777 MUTEX_ENTER(&conn->conn_call_lock);
1778 MUTEX_ENTER(&call->lock);
1779 MUTEX_ENTER(&conn->conn_data_lock);
1780 conn->flags |= RX_CONN_BUSY;
1781 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1782 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1783 MUTEX_EXIT(&conn->conn_data_lock);
1784 #ifdef RX_ENABLE_LOCKS
1785 CV_BROADCAST(&conn->conn_call_cv);
1790 #ifdef RX_ENABLE_LOCKS
1792 MUTEX_EXIT(&conn->conn_data_lock);
1794 #endif /* RX_ENABLE_LOCKS */
1795 call->state = RX_STATE_DALLY;
1797 error = call->error;
1799 /* currentPacket, nLeft, and NFree must be zeroed here, because
1800 * ResetCall cannot: ResetCall may be called at splnet(), in the
1801 * kernel version, and may interrupt the macros rx_Read or
1802 * rx_Write, which run at normal priority for efficiency. */
1803 if (call->currentPacket) {
1804 rxi_FreePacket(call->currentPacket);
1805 call->currentPacket = (struct rx_packet *) 0;
1806 call->nLeft = call->nFree = call->curlen = 0;
1809 call->nLeft = call->nFree = call->curlen = 0;
1811 /* Free any packets from the last call to ReadvProc/WritevProc */
1812 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1817 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1818 MUTEX_EXIT(&call->lock);
1819 if (conn->type == RX_CLIENT_CONNECTION) {
1820 MUTEX_EXIT(&conn->conn_call_lock);
1821 conn->flags &= ~RX_CONN_BUSY;
1826 * Map errors to the local host's errno.h format.
1828 error = ntoh_syserr_conv(error);
1832 #if !defined(KERNEL)
1834 /* Call this routine when shutting down a server or client (especially
1835 * clients). This will allow Rx to gracefully garbage collect server
1836 * connections, and reduce the number of retries that a server might
1837 * make to a dead client.
1838 * This is not quite right, since some calls may still be ongoing and
1839 * we can't lock them to destroy them. */
1840 void rx_Finalize(void)
1842 register struct rx_connection **conn_ptr, **conn_end;
1846 if (rxinit_status == 1) {
1848 return; /* Already shutdown. */
1850 rxi_DeleteCachedConnections();
1851 if (rx_connHashTable) {
1852 MUTEX_ENTER(&rx_connHashTable_lock);
1853 for (conn_ptr = &rx_connHashTable[0],
1854 conn_end = &rx_connHashTable[rx_hashTableSize];
1855 conn_ptr < conn_end; conn_ptr++) {
1856 struct rx_connection *conn, *next;
1857 for (conn = *conn_ptr; conn; conn = next) {
1859 if (conn->type == RX_CLIENT_CONNECTION) {
1860 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1862 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1863 #ifdef RX_ENABLE_LOCKS
1864 rxi_DestroyConnectionNoLock(conn);
1865 #else /* RX_ENABLE_LOCKS */
1866 rxi_DestroyConnection(conn);
1867 #endif /* RX_ENABLE_LOCKS */
1871 #ifdef RX_ENABLE_LOCKS
1872 while (rx_connCleanup_list) {
1873 struct rx_connection *conn;
1874 conn = rx_connCleanup_list;
1875 rx_connCleanup_list = rx_connCleanup_list->next;
1876 MUTEX_EXIT(&rx_connHashTable_lock);
1877 rxi_CleanupConnection(conn);
1878 MUTEX_ENTER(&rx_connHashTable_lock);
1880 MUTEX_EXIT(&rx_connHashTable_lock);
1881 #endif /* RX_ENABLE_LOCKS */
1890 /* if we wakeup packet waiter too often, can get in loop with two
1891 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1892 void rxi_PacketsUnWait(void)
1894 if (!rx_waitingForPackets) {
1898 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1899 return; /* still over quota */
1902 rx_waitingForPackets = 0;
1903 #ifdef RX_ENABLE_LOCKS
1904 CV_BROADCAST(&rx_waitingForPackets_cv);
1906 osi_rxWakeup(&rx_waitingForPackets);
1912 /* ------------------Internal interfaces------------------------- */
1914 /* Return this process's service structure for the
1915 * specified socket and service */
1916 struct rx_service *rxi_FindService(register osi_socket socket,
1917 register u_short serviceId)
1919 register struct rx_service **sp;
1920 for (sp = &rx_services[0]; *sp; sp++) {
1921 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1927 /* Allocate a call structure, for the indicated channel of the
1928 * supplied connection. The mode and state of the call must be set by
1929 * the caller. Returns the call with mutex locked. */
1930 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1931 register int channel)
1933 register struct rx_call *call;
1934 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1935 register struct rx_call *cp; /* Call pointer temp */
1936 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1937 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1939 /* Grab an existing call structure, or allocate a new one.
1940 * Existing call structures are assumed to have been left reset by
1942 MUTEX_ENTER(&rx_freeCallQueue_lock);
1944 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1946 * EXCEPT that the TQ might not yet be cleared out.
1947 * Skip over those with in-use TQs.
1950 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1951 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1957 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1958 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1959 call = queue_First(&rx_freeCallQueue, rx_call);
1960 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1962 MUTEX_ENTER(&rx_stats_mutex);
1963 rx_stats.nFreeCallStructs--;
1964 MUTEX_EXIT(&rx_stats_mutex);
1965 MUTEX_EXIT(&rx_freeCallQueue_lock);
1966 MUTEX_ENTER(&call->lock);
1967 CLEAR_CALL_QUEUE_LOCK(call);
1968 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1969 /* Now, if TQ wasn't cleared earlier, do it now. */
1970 if (call->flags & RX_CALL_TQ_CLEARME) {
1971 rxi_ClearTransmitQueue(call, 0);
1972 queue_Init(&call->tq);
1974 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1975 /* Bind the call to its connection structure */
1977 rxi_ResetCall(call, 1);
1980 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1982 MUTEX_EXIT(&rx_freeCallQueue_lock);
1983 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1984 MUTEX_ENTER(&call->lock);
1985 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1986 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1987 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1989 MUTEX_ENTER(&rx_stats_mutex);
1990 rx_stats.nCallStructs++;
1991 MUTEX_EXIT(&rx_stats_mutex);
1992 /* Initialize once-only items */
1993 queue_Init(&call->tq);
1994 queue_Init(&call->rq);
1995 queue_Init(&call->iovq);
1996 /* Bind the call to its connection structure (prereq for reset) */
1998 rxi_ResetCall(call, 1);
2000 call->channel = channel;
2001 call->callNumber = &conn->callNumber[channel];
2002 /* Note that the next expected call number is retained (in
2003 * conn->callNumber[i]), even if we reallocate the call structure
2005 conn->call[channel] = call;
2006 /* if the channel's never been used (== 0), we should start at 1, otherwise
2007 the call number is valid from the last time this channel was used */
2008 if (*call->callNumber == 0) *call->callNumber = 1;
2013 /* A call has been inactive long enough that so we can throw away
2014 * state, including the call structure, which is placed on the call
2016 * Call is locked upon entry.
2017 * haveCTLock set if called from rxi_ReapConnections
2019 #ifdef RX_ENABLE_LOCKS
2020 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2021 #else /* RX_ENABLE_LOCKS */
2022 void rxi_FreeCall(register struct rx_call *call)
2023 #endif /* RX_ENABLE_LOCKS */
2025 register int channel = call->channel;
2026 register struct rx_connection *conn = call->conn;
2029 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2030 (*call->callNumber)++;
2031 rxi_ResetCall(call, 0);
2032 call->conn->call[channel] = (struct rx_call *) 0;
2034 MUTEX_ENTER(&rx_freeCallQueue_lock);
2035 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2036 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2037 /* A call may be free even though its transmit queue is still in use.
2038 * Since we search the call list from head to tail, put busy calls at
2039 * the head of the list, and idle calls at the tail.
2041 if (call->flags & RX_CALL_TQ_BUSY)
2042 queue_Prepend(&rx_freeCallQueue, call);
2044 queue_Append(&rx_freeCallQueue, call);
2045 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2046 queue_Append(&rx_freeCallQueue, call);
2047 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2048 MUTEX_ENTER(&rx_stats_mutex);
2049 rx_stats.nFreeCallStructs++;
2050 MUTEX_EXIT(&rx_stats_mutex);
2052 MUTEX_EXIT(&rx_freeCallQueue_lock);
2054 /* Destroy the connection if it was previously slated for
2055 * destruction, i.e. the Rx client code previously called
2056 * rx_DestroyConnection (client connections), or
2057 * rxi_ReapConnections called the same routine (server
2058 * connections). Only do this, however, if there are no
2059 * outstanding calls. Note that for fine grain locking, there appears
2060 * to be a deadlock in that rxi_FreeCall has a call locked and
2061 * DestroyConnectionNoLock locks each call in the conn. But note a
2062 * few lines up where we have removed this call from the conn.
2063 * If someone else destroys a connection, they either have no
2064 * call lock held or are going through this section of code.
2066 if (conn->flags & RX_CONN_DESTROY_ME) {
2067 MUTEX_ENTER(&conn->conn_data_lock);
2069 MUTEX_EXIT(&conn->conn_data_lock);
2070 #ifdef RX_ENABLE_LOCKS
2072 rxi_DestroyConnectionNoLock(conn);
2074 rxi_DestroyConnection(conn);
2075 #else /* RX_ENABLE_LOCKS */
2076 rxi_DestroyConnection(conn);
2077 #endif /* RX_ENABLE_LOCKS */
2081 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2082 char *rxi_Alloc(register size_t size)
2086 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2087 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2090 int glockOwner = ISAFS_GLOCK();
2094 MUTEX_ENTER(&rx_stats_mutex);
2095 rxi_Alloccnt++; rxi_Allocsize += size;
2096 MUTEX_EXIT(&rx_stats_mutex);
2097 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2098 if (size > AFS_SMALLOCSIZ) {
2099 p = (char *) osi_AllocMediumSpace(size);
2101 p = (char *) osi_AllocSmall(size, 1);
2102 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2107 p = (char *) osi_Alloc(size);
2109 if (!p) osi_Panic("rxi_Alloc error");
2114 void rxi_Free(void *addr, register size_t size)
2116 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2117 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2120 int glockOwner = ISAFS_GLOCK();
2124 MUTEX_ENTER(&rx_stats_mutex);
2125 rxi_Alloccnt--; rxi_Allocsize -= size;
2126 MUTEX_EXIT(&rx_stats_mutex);
2127 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2128 if (size > AFS_SMALLOCSIZ)
2129 osi_FreeMediumSpace(addr);
2131 osi_FreeSmall(addr);
2132 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2137 osi_Free(addr, size);
2141 /* Find the peer process represented by the supplied (host,port)
2142 * combination. If there is no appropriate active peer structure, a
2143 * new one will be allocated and initialized
2144 * The origPeer, if set, is a pointer to a peer structure on which the
2145 * refcount will be be decremented. This is used to replace the peer
2146 * structure hanging off a connection structure */
2147 struct rx_peer *rxi_FindPeer(register afs_uint32 host,
2148 register u_short port, struct rx_peer *origPeer, int create)
2150 register struct rx_peer *pp;
2152 hashIndex = PEER_HASH(host, port);
2153 MUTEX_ENTER(&rx_peerHashTable_lock);
2154 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2155 if ((pp->host == host) && (pp->port == port)) break;
2159 pp = rxi_AllocPeer(); /* This bzero's *pp */
2160 pp->host = host; /* set here or in InitPeerParams is zero */
2162 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2163 queue_Init(&pp->congestionQueue);
2164 queue_Init(&pp->rpcStats);
2165 pp->next = rx_peerHashTable[hashIndex];
2166 rx_peerHashTable[hashIndex] = pp;
2167 rxi_InitPeerParams(pp);
2168 MUTEX_ENTER(&rx_stats_mutex);
2169 rx_stats.nPeerStructs++;
2170 MUTEX_EXIT(&rx_stats_mutex);
2177 origPeer->refCount--;
2178 MUTEX_EXIT(&rx_peerHashTable_lock);
2183 /* Find the connection at (host, port) started at epoch, and with the
2184 * given connection id. Creates the server connection if necessary.
2185 * The type specifies whether a client connection or a server
2186 * connection is desired. In both cases, (host, port) specify the
2187 * peer's (host, pair) pair. Client connections are not made
2188 * automatically by this routine. The parameter socket gives the
2189 * socket descriptor on which the packet was received. This is used,
2190 * in the case of server connections, to check that *new* connections
2191 * come via a valid (port, serviceId). Finally, the securityIndex
2192 * parameter must match the existing index for the connection. If a
2193 * server connection is created, it will be created using the supplied
2194 * index, if the index is valid for this service */
2195 struct rx_connection *rxi_FindConnection(osi_socket socket,
2196 register afs_int32 host, register u_short port, u_short serviceId,
2197 afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2199 int hashindex, flag;
2200 register struct rx_connection *conn;
2201 struct rx_peer *peer;
2202 hashindex = CONN_HASH(host, port, cid, epoch, type);
2203 MUTEX_ENTER(&rx_connHashTable_lock);
2204 rxLastConn ? (conn = rxLastConn, flag = 0) :
2205 (conn = rx_connHashTable[hashindex], flag = 1);
2207 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2208 && (epoch == conn->epoch)) {
2209 register struct rx_peer *pp = conn->peer;
2210 if (securityIndex != conn->securityIndex) {
2211 /* this isn't supposed to happen, but someone could forge a packet
2212 like this, and there seems to be some CM bug that makes this
2213 happen from time to time -- in which case, the fileserver
2215 MUTEX_EXIT(&rx_connHashTable_lock);
2216 return (struct rx_connection *) 0;
2218 /* epoch's high order bits mean route for security reasons only on
2219 * the cid, not the host and port fields.
2221 if (conn->epoch & 0x80000000) break;
2222 if (((type == RX_CLIENT_CONNECTION)
2223 || (pp->host == host)) && (pp->port == port))
2228 /* the connection rxLastConn that was used the last time is not the
2229 ** one we are looking for now. Hence, start searching in the hash */
2231 conn = rx_connHashTable[hashindex];
2237 struct rx_service *service;
2238 if (type == RX_CLIENT_CONNECTION) {
2239 MUTEX_EXIT(&rx_connHashTable_lock);
2240 return (struct rx_connection *) 0;
2242 service = rxi_FindService(socket, serviceId);
2243 if (!service || (securityIndex >= service->nSecurityObjects)
2244 || (service->securityObjects[securityIndex] == 0)) {
2245 MUTEX_EXIT(&rx_connHashTable_lock);
2246 return (struct rx_connection *) 0;
2248 conn = rxi_AllocConnection(); /* This bzero's the connection */
2249 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2251 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2253 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2254 conn->next = rx_connHashTable[hashindex];
2255 rx_connHashTable[hashindex] = conn;
2256 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2257 conn->type = RX_SERVER_CONNECTION;
2258 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2259 conn->epoch = epoch;
2260 conn->cid = cid & RX_CIDMASK;
2261 /* conn->serial = conn->lastSerial = 0; */
2262 /* conn->timeout = 0; */
2263 conn->ackRate = RX_FAST_ACK_RATE;
2264 conn->service = service;
2265 conn->serviceId = serviceId;
2266 conn->securityIndex = securityIndex;
2267 conn->securityObject = service->securityObjects[securityIndex];
2268 conn->nSpecific = 0;
2269 conn->specific = NULL;
2270 rx_SetConnDeadTime(conn, service->connDeadTime);
2271 /* Notify security object of the new connection */
2272 RXS_NewConnection(conn->securityObject, conn);
2273 /* XXXX Connection timeout? */
2274 if (service->newConnProc) (*service->newConnProc)(conn);
2275 MUTEX_ENTER(&rx_stats_mutex);
2276 rx_stats.nServerConns++;
2277 MUTEX_EXIT(&rx_stats_mutex);
2281 /* Ensure that the peer structure is set up in such a way that
2282 ** replies in this connection go back to that remote interface
2283 ** from which the last packet was sent out. In case, this packet's
2284 ** source IP address does not match the peer struct for this conn,
2285 ** then drop the refCount on conn->peer and get a new peer structure.
2286 ** We can check the host,port field in the peer structure without the
2287 ** rx_peerHashTable_lock because the peer structure has its refCount
2288 ** incremented and the only time the host,port in the peer struct gets
2289 ** updated is when the peer structure is created.
2291 if (conn->peer->host == host )
2292 peer = conn->peer; /* no change to the peer structure */
2294 peer = rxi_FindPeer(host, port, conn->peer, 1);
2297 MUTEX_ENTER(&conn->conn_data_lock);
2300 MUTEX_EXIT(&conn->conn_data_lock);
2302 rxLastConn = conn; /* store this connection as the last conn used */
2303 MUTEX_EXIT(&rx_connHashTable_lock);
2307 /* There are two packet tracing routines available for testing and monitoring
2308 * Rx. One is called just after every packet is received and the other is
2309 * called just before every packet is sent. Received packets, have had their
2310 * headers decoded, and packets to be sent have not yet had their headers
2311 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2312 * containing the network address. Both can be modified. The return value, if
2313 * non-zero, indicates that the packet should be dropped. */
2315 int (*rx_justReceived)() = 0;
2316 int (*rx_almostSent)() = 0;
2318 /* A packet has been received off the interface. Np is the packet, socket is
2319 * the socket number it was received from (useful in determining which service
2320 * this packet corresponds to), and (host, port) reflect the host,port of the
2321 * sender. This call returns the packet to the caller if it is finished with
2322 * it, rather than de-allocating it, just as a small performance hack */
2324 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np,
2325 osi_socket socket, afs_uint32 host, u_short port,
2326 int *tnop, struct rx_call **newcallp)
2328 register struct rx_call *call;
2329 register struct rx_connection *conn;
2331 afs_uint32 currentCallNumber;
2337 struct rx_packet *tnp;
2340 /* We don't print out the packet until now because (1) the time may not be
2341 * accurate enough until now in the lwp implementation (rx_Listener only gets
2342 * the time after the packet is read) and (2) from a protocol point of view,
2343 * this is the first time the packet has been seen */
2344 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2345 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2346 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2347 np->header.serial, packetType, host, port, np->header.serviceId,
2348 np->header.epoch, np->header.cid, np->header.callNumber,
2349 np->header.seq, np->header.flags, np));
2352 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2353 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2356 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2357 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2360 /* If an input tracer function is defined, call it with the packet and
2361 * network address. Note this function may modify its arguments. */
2362 if (rx_justReceived) {
2363 struct sockaddr_in addr;
2365 addr.sin_family = AF_INET;
2366 addr.sin_port = port;
2367 addr.sin_addr.s_addr = host;
2368 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2369 addr.sin_len = sizeof(addr);
2370 #endif /* AFS_OSF_ENV */
2371 drop = (*rx_justReceived) (np, &addr);
2372 /* drop packet if return value is non-zero */
2373 if (drop) return np;
2374 port = addr.sin_port; /* in case fcn changed addr */
2375 host = addr.sin_addr.s_addr;
2379 /* If packet was not sent by the client, then *we* must be the client */
2380 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2381 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2383 /* Find the connection (or fabricate one, if we're the server & if
2384 * necessary) associated with this packet */
2385 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2386 np->header.cid, np->header.epoch, type,
2387 np->header.securityIndex);
2390 /* If no connection found or fabricated, just ignore the packet.
2391 * (An argument could be made for sending an abort packet for
2396 MUTEX_ENTER(&conn->conn_data_lock);
2397 if (conn->maxSerial < np->header.serial)
2398 conn->maxSerial = np->header.serial;
2399 MUTEX_EXIT(&conn->conn_data_lock);
2401 /* If the connection is in an error state, send an abort packet and ignore
2402 * the incoming packet */
2404 /* Don't respond to an abort packet--we don't want loops! */
2405 MUTEX_ENTER(&conn->conn_data_lock);
2406 if (np->header.type != RX_PACKET_TYPE_ABORT)
2407 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2409 MUTEX_EXIT(&conn->conn_data_lock);
2413 /* Check for connection-only requests (i.e. not call specific). */
2414 if (np->header.callNumber == 0) {
2415 switch (np->header.type) {
2416 case RX_PACKET_TYPE_ABORT:
2417 /* What if the supplied error is zero? */
2418 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2419 MUTEX_ENTER(&conn->conn_data_lock);
2421 MUTEX_EXIT(&conn->conn_data_lock);
2423 case RX_PACKET_TYPE_CHALLENGE:
2424 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2425 MUTEX_ENTER(&conn->conn_data_lock);
2427 MUTEX_EXIT(&conn->conn_data_lock);
2429 case RX_PACKET_TYPE_RESPONSE:
2430 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2431 MUTEX_ENTER(&conn->conn_data_lock);
2433 MUTEX_EXIT(&conn->conn_data_lock);
2435 case RX_PACKET_TYPE_PARAMS:
2436 case RX_PACKET_TYPE_PARAMS+1:
2437 case RX_PACKET_TYPE_PARAMS+2:
2438 /* ignore these packet types for now */
2439 MUTEX_ENTER(&conn->conn_data_lock);
2441 MUTEX_EXIT(&conn->conn_data_lock);
2446 /* Should not reach here, unless the peer is broken: send an
2448 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2449 MUTEX_ENTER(&conn->conn_data_lock);
2450 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2452 MUTEX_EXIT(&conn->conn_data_lock);
2457 channel = np->header.cid & RX_CHANNELMASK;
2458 call = conn->call[channel];
2459 #ifdef RX_ENABLE_LOCKS
2461 MUTEX_ENTER(&call->lock);
2462 /* Test to see if call struct is still attached to conn. */
2463 if (call != conn->call[channel]) {
2465 MUTEX_EXIT(&call->lock);
2466 if (type == RX_SERVER_CONNECTION) {
2467 call = conn->call[channel];
2468 /* If we started with no call attached and there is one now,
2469 * another thread is also running this routine and has gotten
2470 * the connection channel. We should drop this packet in the tests
2471 * below. If there was a call on this connection and it's now
2472 * gone, then we'll be making a new call below.
2473 * If there was previously a call and it's now different then
2474 * the old call was freed and another thread running this routine
2475 * has created a call on this channel. One of these two threads
2476 * has a packet for the old call and the code below handles those
2480 MUTEX_ENTER(&call->lock);
2483 /* This packet can't be for this call. If the new call address is
2484 * 0 then no call is running on this channel. If there is a call
2485 * then, since this is a client connection we're getting data for
2486 * it must be for the previous call.
2488 MUTEX_ENTER(&rx_stats_mutex);
2489 rx_stats.spuriousPacketsRead++;
2490 MUTEX_EXIT(&rx_stats_mutex);
2491 MUTEX_ENTER(&conn->conn_data_lock);
2493 MUTEX_EXIT(&conn->conn_data_lock);
2498 currentCallNumber = conn->callNumber[channel];
2500 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2501 if (np->header.callNumber < currentCallNumber) {
2502 MUTEX_ENTER(&rx_stats_mutex);
2503 rx_stats.spuriousPacketsRead++;
2504 MUTEX_EXIT(&rx_stats_mutex);
2505 #ifdef RX_ENABLE_LOCKS
2507 MUTEX_EXIT(&call->lock);
2509 MUTEX_ENTER(&conn->conn_data_lock);
2511 MUTEX_EXIT(&conn->conn_data_lock);
2515 MUTEX_ENTER(&conn->conn_call_lock);
2516 call = rxi_NewCall(conn, channel);
2517 MUTEX_EXIT(&conn->conn_call_lock);
2518 *call->callNumber = np->header.callNumber;
2519 call->state = RX_STATE_PRECALL;
2520 clock_GetTime(&call->queueTime);
2521 hzero(call->bytesSent);
2522 hzero(call->bytesRcvd);
2523 rxi_KeepAliveOn(call);
2525 else if (np->header.callNumber != currentCallNumber) {
2526 /* Wait until the transmit queue is idle before deciding
2527 * whether to reset the current call. Chances are that the
2528 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2531 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2532 while ((call->state == RX_STATE_ACTIVE) &&
2533 (call->flags & RX_CALL_TQ_BUSY)) {
2534 call->flags |= RX_CALL_TQ_WAIT;
2535 #ifdef RX_ENABLE_LOCKS
2536 CV_WAIT(&call->cv_tq, &call->lock);
2537 #else /* RX_ENABLE_LOCKS */
2538 osi_rxSleep(&call->tq);
2539 #endif /* RX_ENABLE_LOCKS */
2541 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2542 /* If the new call cannot be taken right now send a busy and set
2543 * the error condition in this call, so that it terminates as
2544 * quickly as possible */
2545 if (call->state == RX_STATE_ACTIVE) {
2546 struct rx_packet *tp;
2548 rxi_CallError(call, RX_CALL_DEAD);
2549 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2550 MUTEX_EXIT(&call->lock);
2551 MUTEX_ENTER(&conn->conn_data_lock);
2553 MUTEX_EXIT(&conn->conn_data_lock);
2556 rxi_ResetCall(call, 0);
2557 *call->callNumber = np->header.callNumber;
2558 call->state = RX_STATE_PRECALL;
2559 clock_GetTime(&call->queueTime);
2560 hzero(call->bytesSent);
2561 hzero(call->bytesRcvd);
2563 * If the number of queued calls exceeds the overload
2564 * threshold then abort this call.
2566 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2567 struct rx_packet *tp;
2569 rxi_CallError(call, rx_BusyError);
2570 tp = rxi_SendCallAbort(call, np, 1, 0);
2571 MUTEX_EXIT(&call->lock);
2572 MUTEX_ENTER(&conn->conn_data_lock);
2574 MUTEX_EXIT(&conn->conn_data_lock);
2577 rxi_KeepAliveOn(call);
2580 /* Continuing call; do nothing here. */
2582 } else { /* we're the client */
2583 /* Ignore all incoming acknowledgements for calls in DALLY state */
2584 if ( call && (call->state == RX_STATE_DALLY)
2585 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2586 MUTEX_ENTER(&rx_stats_mutex);
2587 rx_stats.ignorePacketDally++;
2588 MUTEX_EXIT(&rx_stats_mutex);
2589 #ifdef RX_ENABLE_LOCKS
2591 MUTEX_EXIT(&call->lock);
2594 MUTEX_ENTER(&conn->conn_data_lock);
2596 MUTEX_EXIT(&conn->conn_data_lock);
2600 /* Ignore anything that's not relevant to the current call. If there
2601 * isn't a current call, then no packet is relevant. */
2602 if (!call || (np->header.callNumber != currentCallNumber)) {
2603 MUTEX_ENTER(&rx_stats_mutex);
2604 rx_stats.spuriousPacketsRead++;
2605 MUTEX_EXIT(&rx_stats_mutex);
2606 #ifdef RX_ENABLE_LOCKS
2608 MUTEX_EXIT(&call->lock);
2611 MUTEX_ENTER(&conn->conn_data_lock);
2613 MUTEX_EXIT(&conn->conn_data_lock);
2616 /* If the service security object index stamped in the packet does not
2617 * match the connection's security index, ignore the packet */
2618 if (np->header.securityIndex != conn->securityIndex) {
2619 #ifdef RX_ENABLE_LOCKS
2620 MUTEX_EXIT(&call->lock);
2622 MUTEX_ENTER(&conn->conn_data_lock);
2624 MUTEX_EXIT(&conn->conn_data_lock);
2628 /* If we're receiving the response, then all transmit packets are
2629 * implicitly acknowledged. Get rid of them. */
2630 if (np->header.type == RX_PACKET_TYPE_DATA) {
2631 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2632 /* XXX Hack. Because we must release the global rx lock when
2633 * sending packets (osi_NetSend) we drop all acks while we're
2634 * traversing the tq in rxi_Start sending packets out because
2635 * packets may move to the freePacketQueue as result of being here!
2636 * So we drop these packets until we're safely out of the
2637 * traversing. Really ugly!
2638 * For fine grain RX locking, we set the acked field in the
2639 * packets and let rxi_Start remove them from the transmit queue.
2641 if (call->flags & RX_CALL_TQ_BUSY) {
2642 #ifdef RX_ENABLE_LOCKS
2643 rxi_SetAcksInTransmitQueue(call);
2646 return np; /* xmitting; drop packet */
2650 rxi_ClearTransmitQueue(call, 0);
2652 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2653 rxi_ClearTransmitQueue(call, 0);
2654 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2656 if (np->header.type == RX_PACKET_TYPE_ACK) {
2657 /* now check to see if this is an ack packet acknowledging that the
2658 * server actually *lost* some hard-acked data. If this happens we
2659 * ignore this packet, as it may indicate that the server restarted in
2660 * the middle of a call. It is also possible that this is an old ack
2661 * packet. We don't abort the connection in this case, because this
2662 * *might* just be an old ack packet. The right way to detect a server
2663 * restart in the midst of a call is to notice that the server epoch
2665 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2666 * XXX unacknowledged. I think that this is off-by-one, but
2667 * XXX I don't dare change it just yet, since it will
2668 * XXX interact badly with the server-restart detection
2669 * XXX code in receiveackpacket. */
2670 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2671 MUTEX_ENTER(&rx_stats_mutex);
2672 rx_stats.spuriousPacketsRead++;
2673 MUTEX_EXIT(&rx_stats_mutex);
2674 MUTEX_EXIT(&call->lock);
2675 MUTEX_ENTER(&conn->conn_data_lock);
2677 MUTEX_EXIT(&conn->conn_data_lock);
2681 } /* else not a data packet */
2684 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2685 /* Set remote user defined status from packet */
2686 call->remoteStatus = np->header.userStatus;
2688 /* Note the gap between the expected next packet and the actual
2689 * packet that arrived, when the new packet has a smaller serial number
2690 * than expected. Rioses frequently reorder packets all by themselves,
2691 * so this will be quite important with very large window sizes.
2692 * Skew is checked against 0 here to avoid any dependence on the type of
2693 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2695 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2696 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2697 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2699 MUTEX_ENTER(&conn->conn_data_lock);
2700 skew = conn->lastSerial - np->header.serial;
2701 conn->lastSerial = np->header.serial;
2702 MUTEX_EXIT(&conn->conn_data_lock);
2704 register struct rx_peer *peer;
2706 if (skew > peer->inPacketSkew) {
2707 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2708 peer->inPacketSkew = skew;
2712 /* Now do packet type-specific processing */
2713 switch (np->header.type) {
2714 case RX_PACKET_TYPE_DATA:
2715 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2718 case RX_PACKET_TYPE_ACK:
2719 /* Respond immediately to ack packets requesting acknowledgement
2721 if (np->header.flags & RX_REQUEST_ACK) {
2722 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2723 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2725 np = rxi_ReceiveAckPacket(call, np, 1);
2727 case RX_PACKET_TYPE_ABORT:
2728 /* An abort packet: reset the connection, passing the error up to
2730 /* What if error is zero? */
2731 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2733 case RX_PACKET_TYPE_BUSY:
2736 case RX_PACKET_TYPE_ACKALL:
2737 /* All packets acknowledged, so we can drop all packets previously
2738 * readied for sending */
2739 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2740 /* XXX Hack. We because we can't release the global rx lock when
2741 * sending packets (osi_NetSend) we drop all ack pkts while we're
2742 * traversing the tq in rxi_Start sending packets out because
2743 * packets may move to the freePacketQueue as result of being
2744 * here! So we drop these packets until we're safely out of the
2745 * traversing. Really ugly!
2746 * For fine grain RX locking, we set the acked field in the packets
2747 * and let rxi_Start remove the packets from the transmit queue.
2749 if (call->flags & RX_CALL_TQ_BUSY) {
2750 #ifdef RX_ENABLE_LOCKS
2751 rxi_SetAcksInTransmitQueue(call);
2753 #else /* RX_ENABLE_LOCKS */
2755 return np; /* xmitting; drop packet */
2756 #endif /* RX_ENABLE_LOCKS */
2758 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2759 rxi_ClearTransmitQueue(call, 0);
2762 /* Should not reach here, unless the peer is broken: send an abort
2764 rxi_CallError(call, RX_PROTOCOL_ERROR);
2765 np = rxi_SendCallAbort(call, np, 1, 0);
2768 /* Note when this last legitimate packet was received, for keep-alive
2769 * processing. Note, we delay getting the time until now in the hope that
2770 * the packet will be delivered to the user before any get time is required
2771 * (if not, then the time won't actually be re-evaluated here). */
2772 call->lastReceiveTime = clock_Sec();
2773 MUTEX_EXIT(&call->lock);
2774 MUTEX_ENTER(&conn->conn_data_lock);
2776 MUTEX_EXIT(&conn->conn_data_lock);
2780 /* return true if this is an "interesting" connection from the point of view
2781 of someone trying to debug the system */
2782 int rxi_IsConnInteresting(struct rx_connection *aconn)
2785 register struct rx_call *tcall;
2787 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2789 for(i=0;i<RX_MAXCALLS;i++) {
2790 tcall = aconn->call[i];
2792 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2794 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2802 /* if this is one of the last few packets AND it wouldn't be used by the
2803 receiving call to immediately satisfy a read request, then drop it on
2804 the floor, since accepting it might prevent a lock-holding thread from
2805 making progress in its reading. If a call has been cleared while in
2806 the precall state then ignore all subsequent packets until the call
2807 is assigned to a thread. */
2809 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2812 MUTEX_ENTER(&rx_stats_mutex);
2813 if (((ap->header.seq != 1) &&
2814 (acall->flags & RX_CALL_CLEARED) &&
2815 (acall->state == RX_STATE_PRECALL)) ||
2816 ((rx_nFreePackets < rxi_dataQuota+2) &&
2817 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2818 && (acall->flags & RX_CALL_READER_WAIT)))) {
2821 MUTEX_EXIT(&rx_stats_mutex);
2826 static void rxi_CheckReachEvent(struct rxevent *event,
2827 struct rx_connection *conn, struct rx_call *acall)
2829 struct rx_call *call = acall;
2833 MUTEX_ENTER(&conn->conn_data_lock);
2834 conn->checkReachEvent = NULL;
2835 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2836 if (event) conn->refCount--;
2837 MUTEX_EXIT(&conn->conn_data_lock);
2841 MUTEX_ENTER(&conn->conn_call_lock);
2842 MUTEX_ENTER(&conn->conn_data_lock);
2843 for (i=0; i<RX_MAXCALLS; i++) {
2844 struct rx_call *tc = conn->call[i];
2845 if (tc && tc->state == RX_STATE_PRECALL) {
2851 /* Indicate that rxi_CheckReachEvent is no longer running by
2852 * clearing the flag. Must be atomic under conn_data_lock to
2853 * avoid a new call slipping by: rxi_CheckConnReach holds
2854 * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2856 conn->flags &= ~RX_CONN_ATTACHWAIT;
2857 MUTEX_EXIT(&conn->conn_data_lock);
2858 MUTEX_EXIT(&conn->conn_call_lock);
2862 if (call != acall) MUTEX_ENTER(&call->lock);
2863 rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
2864 if (call != acall) MUTEX_EXIT(&call->lock);
2866 clock_GetTime(&when);
2867 when.sec += RX_CHECKREACH_TIMEOUT;
2868 MUTEX_ENTER(&conn->conn_data_lock);
2869 if (!conn->checkReachEvent) {
2871 conn->checkReachEvent =
2872 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2874 MUTEX_EXIT(&conn->conn_data_lock);
2879 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2881 struct rx_service *service = conn->service;
2882 struct rx_peer *peer = conn->peer;
2883 afs_uint32 now, lastReach;
2885 if (service->checkReach == 0)
2889 MUTEX_ENTER(&peer->peer_lock);
2890 lastReach = peer->lastReachTime;
2891 MUTEX_EXIT(&peer->peer_lock);
2892 if (now - lastReach < RX_CHECKREACH_TTL)
2895 MUTEX_ENTER(&conn->conn_data_lock);
2896 if (conn->flags & RX_CONN_ATTACHWAIT) {
2897 MUTEX_EXIT(&conn->conn_data_lock);
2900 conn->flags |= RX_CONN_ATTACHWAIT;
2901 MUTEX_EXIT(&conn->conn_data_lock);
2902 if (!conn->checkReachEvent)
2903 rxi_CheckReachEvent(NULL, conn, call);
2908 /* try to attach call, if authentication is complete */
2909 static void TryAttach(register struct rx_call *acall,
2910 register osi_socket socket, register int *tnop,
2911 register struct rx_call **newcallp, int reachOverride)
2913 struct rx_connection *conn = acall->conn;
2915 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2916 /* Don't attach until we have any req'd. authentication. */
2917 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2918 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2919 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2920 /* Note: this does not necessarily succeed; there
2921 * may not any proc available
2925 rxi_ChallengeOn(acall->conn);
2930 /* A data packet has been received off the interface. This packet is
2931 * appropriate to the call (the call is in the right state, etc.). This
2932 * routine can return a packet to the caller, for re-use */
2934 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call,
2935 register struct rx_packet *np, int istack, osi_socket socket,
2936 afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2942 afs_uint32 seq, serial, flags;
2944 struct rx_packet *tnp;
2946 MUTEX_ENTER(&rx_stats_mutex);
2947 rx_stats.dataPacketsRead++;
2948 MUTEX_EXIT(&rx_stats_mutex);
2951 /* If there are no packet buffers, drop this new packet, unless we can find
2952 * packet buffers from inactive calls */
2954 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2955 MUTEX_ENTER(&rx_freePktQ_lock);
2956 rxi_NeedMorePackets = TRUE;
2957 MUTEX_EXIT(&rx_freePktQ_lock);
2958 MUTEX_ENTER(&rx_stats_mutex);
2959 rx_stats.noPacketBuffersOnRead++;
2960 MUTEX_EXIT(&rx_stats_mutex);
2961 call->rprev = np->header.serial;
2962 rxi_calltrace(RX_TRACE_DROP, call);
2963 dpf (("packet %x dropped on receipt - quota problems", np));
2965 rxi_ClearReceiveQueue(call);
2966 clock_GetTime(&when);
2967 clock_Add(&when, &rx_softAckDelay);
2968 if (!call->delayedAckEvent ||
2969 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2970 rxevent_Cancel(call->delayedAckEvent, call,
2971 RX_CALL_REFCOUNT_DELAY);
2972 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2973 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2976 /* we've damaged this call already, might as well do it in. */
2982 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2983 * packet is one of several packets transmitted as a single
2984 * datagram. Do not send any soft or hard acks until all packets
2985 * in a jumbogram have been processed. Send negative acks right away.
2987 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2988 /* tnp is non-null when there are more packets in the
2989 * current jumbo gram */
2996 seq = np->header.seq;
2997 serial = np->header.serial;
2998 flags = np->header.flags;
3000 /* If the call is in an error state, send an abort message */
3002 return rxi_SendCallAbort(call, np, istack, 0);
3004 /* The RX_JUMBO_PACKET is set in all but the last packet in each
3005 * AFS 3.5 jumbogram. */
3006 if (flags & RX_JUMBO_PACKET) {
3007 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3012 if (np->header.spare != 0) {
3013 MUTEX_ENTER(&call->conn->conn_data_lock);
3014 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3015 MUTEX_EXIT(&call->conn->conn_data_lock);
3018 /* The usual case is that this is the expected next packet */
3019 if (seq == call->rnext) {
3021 /* Check to make sure it is not a duplicate of one already queued */
3022 if (queue_IsNotEmpty(&call->rq)
3023 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3024 MUTEX_ENTER(&rx_stats_mutex);
3025 rx_stats.dupPacketsRead++;
3026 MUTEX_EXIT(&rx_stats_mutex);
3027 dpf (("packet %x dropped on receipt - duplicate", np));
3028 rxevent_Cancel(call->delayedAckEvent, call,
3029 RX_CALL_REFCOUNT_DELAY);
3030 np = rxi_SendAck(call, np, seq, serial,
3031 flags, RX_ACK_DUPLICATE, istack);
3037 /* It's the next packet. Stick it on the receive queue
3038 * for this call. Set newPackets to make sure we wake
3039 * the reader once all packets have been processed */
3040 queue_Prepend(&call->rq, np);
3042 np = NULL; /* We can't use this anymore */
3045 /* If an ack is requested then set a flag to make sure we
3046 * send an acknowledgement for this packet */
3047 if (flags & RX_REQUEST_ACK) {
3051 /* Keep track of whether we have received the last packet */
3052 if (flags & RX_LAST_PACKET) {
3053 call->flags |= RX_CALL_HAVE_LAST;
3057 /* Check whether we have all of the packets for this call */
3058 if (call->flags & RX_CALL_HAVE_LAST) {
3059 afs_uint32 tseq; /* temporary sequence number */
3060 struct rx_packet *tp; /* Temporary packet pointer */
3061 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3063 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3064 if (tseq != tp->header.seq)
3066 if (tp->header.flags & RX_LAST_PACKET) {
3067 call->flags |= RX_CALL_RECEIVE_DONE;
3074 /* Provide asynchronous notification for those who want it
3075 * (e.g. multi rx) */
3076 if (call->arrivalProc) {
3077 (*call->arrivalProc)(call, call->arrivalProcHandle,
3078 (int) call->arrivalProcArg);
3079 call->arrivalProc = (VOID (*)()) 0;
3082 /* Update last packet received */
3085 /* If there is no server process serving this call, grab
3086 * one, if available. We only need to do this once. If a
3087 * server thread is available, this thread becomes a server
3088 * thread and the server thread becomes a listener thread. */
3090 TryAttach(call, socket, tnop, newcallp, 0);
3093 /* This is not the expected next packet. */
3095 /* Determine whether this is a new or old packet, and if it's
3096 * a new one, whether it fits into the current receive window.
3097 * Also figure out whether the packet was delivered in sequence.
3098 * We use the prev variable to determine whether the new packet
3099 * is the successor of its immediate predecessor in the
3100 * receive queue, and the missing flag to determine whether
3101 * any of this packets predecessors are missing. */
3103 afs_uint32 prev; /* "Previous packet" sequence number */
3104 struct rx_packet *tp; /* Temporary packet pointer */
3105 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3106 int missing; /* Are any predecessors missing? */
3108 /* If the new packet's sequence number has been sent to the
3109 * application already, then this is a duplicate */
3110 if (seq < call->rnext) {
3111 MUTEX_ENTER(&rx_stats_mutex);
3112 rx_stats.dupPacketsRead++;
3113 MUTEX_EXIT(&rx_stats_mutex);
3114 rxevent_Cancel(call->delayedAckEvent, call,
3115 RX_CALL_REFCOUNT_DELAY);
3116 np = rxi_SendAck(call, np, seq, serial,
3117 flags, RX_ACK_DUPLICATE, istack);
3123 /* If the sequence number is greater than what can be
3124 * accomodated by the current window, then send a negative
3125 * acknowledge and drop the packet */
3126 if ((call->rnext + call->rwind) <= seq) {
3127 rxevent_Cancel(call->delayedAckEvent, call,
3128 RX_CALL_REFCOUNT_DELAY);
3129 np = rxi_SendAck(call, np, seq, serial,
3130 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3136 /* Look for the packet in the queue of old received packets */
3137 for (prev = call->rnext - 1, missing = 0,
3138 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3139 /*Check for duplicate packet */
3140 if (seq == tp->header.seq) {
3141 MUTEX_ENTER(&rx_stats_mutex);
3142 rx_stats.dupPacketsRead++;
3143 MUTEX_EXIT(&rx_stats_mutex);
3144 rxevent_Cancel(call->delayedAckEvent, call,
3145 RX_CALL_REFCOUNT_DELAY);
3146 np = rxi_SendAck(call, np, seq, serial,
3147 flags, RX_ACK_DUPLICATE, istack);
3152 /* If we find a higher sequence packet, break out and
3153 * insert the new packet here. */
3154 if (seq < tp->header.seq) break;
3155 /* Check for missing packet */
3156 if (tp->header.seq != prev+1) {
3160 prev = tp->header.seq;
3163 /* Keep track of whether we have received the last packet. */
3164 if (flags & RX_LAST_PACKET) {
3165 call->flags |= RX_CALL_HAVE_LAST;
3168 /* It's within the window: add it to the the receive queue.
3169 * tp is left by the previous loop either pointing at the
3170 * packet before which to insert the new packet, or at the
3171 * queue head if the queue is empty or the packet should be
3173 queue_InsertBefore(tp, np);
3177 /* Check whether we have all of the packets for this call */
3178 if ((call->flags & RX_CALL_HAVE_LAST)
3179 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3180 afs_uint32 tseq; /* temporary sequence number */
3182 for (tseq = call->rnext,
3183 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3184 if (tseq != tp->header.seq)
3186 if (tp->header.flags & RX_LAST_PACKET) {
3187 call->flags |= RX_CALL_RECEIVE_DONE;
3194 /* We need to send an ack of the packet is out of sequence,
3195 * or if an ack was requested by the peer. */
3196 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3200 /* Acknowledge the last packet for each call */
3201 if (flags & RX_LAST_PACKET) {
3212 * If the receiver is waiting for an iovec, fill the iovec
3213 * using the data from the receive queue */
3214 if (call->flags & RX_CALL_IOVEC_WAIT) {
3215 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3216 /* the call may have been aborted */
3225 /* Wakeup the reader if any */
3226 if ((call->flags & RX_CALL_READER_WAIT) &&
3227 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3228 (call->iovNext >= call->iovMax) ||
3229 (call->flags & RX_CALL_RECEIVE_DONE))) {
3230 call->flags &= ~RX_CALL_READER_WAIT;
3231 #ifdef RX_ENABLE_LOCKS
3232 CV_BROADCAST(&call->cv_rq);
3234 osi_rxWakeup(&call->rq);
3240 * Send an ack when requested by the peer, or once every
3241 * rxi_SoftAckRate packets until the last packet has been
3242 * received. Always send a soft ack for the last packet in
3243 * the server's reply. */
3245 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3246 np = rxi_SendAck(call, np, seq, serial, flags,
3247 RX_ACK_REQUESTED, istack);
3248 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3249 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3250 np = rxi_SendAck(call, np, seq, serial, flags,
3251 RX_ACK_IDLE, istack);
3252 } else if (call->nSoftAcks) {
3253 clock_GetTime(&when);
3254 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3255 clock_Add(&when, &rx_lastAckDelay);
3257 clock_Add(&when, &rx_softAckDelay);
3259 if (!call->delayedAckEvent ||
3260 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3261 rxevent_Cancel(call->delayedAckEvent, call,
3262 RX_CALL_REFCOUNT_DELAY);
3263 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3264 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3267 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3268 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3275 static void rxi_ComputeRate();
3278 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3280 struct rx_peer *peer = conn->peer;
3282 MUTEX_ENTER(&peer->peer_lock);
3283 peer->lastReachTime = clock_Sec();
3284 MUTEX_EXIT(&peer->peer_lock);
3286 MUTEX_ENTER(&conn->conn_data_lock);
3287 if (conn->flags & RX_CONN_ATTACHWAIT) {
3290 conn->flags &= ~RX_CONN_ATTACHWAIT;
3291 MUTEX_EXIT(&conn->conn_data_lock);
3293 for (i=0; i<RX_MAXCALLS; i++) {
3294 struct rx_call *call = conn->call[i];
3296 if (call != acall) MUTEX_ENTER(&call->lock);
3297 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3298 if (call != acall) MUTEX_EXIT(&call->lock);
3302 MUTEX_EXIT(&conn->conn_data_lock);
3305 /* The real smarts of the whole thing. */
3306 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call,
3307 struct rx_packet *np, int istack)
3309 struct rx_ackPacket *ap;
3311 register struct rx_packet *tp;
3312 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3313 register struct rx_connection *conn = call->conn;
3314 struct rx_peer *peer = conn->peer;
3317 /* because there are CM's that are bogus, sending weird values for this. */
3318 afs_uint32 skew = 0;
3323 int newAckCount = 0;
3324 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3325 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3327 MUTEX_ENTER(&rx_stats_mutex);
3328 rx_stats.ackPacketsRead++;
3329 MUTEX_EXIT(&rx_stats_mutex);
3330 ap = (struct rx_ackPacket *) rx_DataOf(np);
3331 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3333 return np; /* truncated ack packet */
3335 /* depends on ack packet struct */
3336 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3337 first = ntohl(ap->firstPacket);
3338 serial = ntohl(ap->serial);
3339 /* temporarily disabled -- needs to degrade over time
3340 skew = ntohs(ap->maxSkew); */
3342 /* Ignore ack packets received out of order */
3343 if (first < call->tfirst) {
3347 if (np->header.flags & RX_SLOW_START_OK) {
3348 call->flags |= RX_CALL_SLOW_START_OK;
3351 if (ap->reason == RX_ACK_PING_RESPONSE)
3352 rxi_UpdatePeerReach(conn, call);
3357 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3358 ap->reason, ntohl(ap->previousPacket),
3359 (unsigned int) np->header.seq, (unsigned int) serial,
3360 (unsigned int) skew, ntohl(ap->firstPacket));
3363 for (offset = 0; offset < nAcks; offset++)
3364 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3370 /* if a server connection has been re-created, it doesn't remember what
3371 serial # it was up to. An ack will tell us, since the serial field
3372 contains the largest serial received by the other side */
3373 MUTEX_ENTER(&conn->conn_data_lock);
3374 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3375 conn->serial = serial+1;
3377 MUTEX_EXIT(&conn->conn_data_lock);
3379 /* Update the outgoing packet skew value to the latest value of
3380 * the peer's incoming packet skew value. The ack packet, of
3381 * course, could arrive out of order, but that won't affect things
3383 MUTEX_ENTER(&peer->peer_lock);
3384 peer->outPacketSkew = skew;
3386 /* Check for packets that no longer need to be transmitted, and
3387 * discard them. This only applies to packets positively
3388 * acknowledged as having been sent to the peer's upper level.
3389 * All other packets must be retained. So only packets with
3390 * sequence numbers < ap->firstPacket are candidates. */
3391 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3392 if (tp->header.seq >= first) break;
3393 call->tfirst = tp->header.seq + 1;
3394 if (tp->header.serial == serial) {
3395 /* Use RTT if not delayed by client. */
3396 if (ap->reason != RX_ACK_DELAY)
3397 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3399 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3402 else if (tp->firstSerial == serial) {
3403 /* Use RTT if not delayed by client. */
3404 if (ap->reason != RX_ACK_DELAY)
3405 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3407 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3410 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3411 /* XXX Hack. Because we have to release the global rx lock when sending
3412 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3413 * in rxi_Start sending packets out because packets may move to the
3414 * freePacketQueue as result of being here! So we drop these packets until
3415 * we're safely out of the traversing. Really ugly!
3416 * To make it even uglier, if we're using fine grain locking, we can
3417 * set the ack bits in the packets and have rxi_Start remove the packets
3418 * when it's done transmitting.
3420 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3423 if (call->flags & RX_CALL_TQ_BUSY) {
3424 #ifdef RX_ENABLE_LOCKS
3425 tp->flags |= RX_PKTFLAG_ACKED;
3426 call->flags |= RX_CALL_TQ_SOME_ACKED;
3427 #else /* RX_ENABLE_LOCKS */
3429 #endif /* RX_ENABLE_LOCKS */
3431 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3434 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3439 /* Give rate detector a chance to respond to ping requests */
3440 if (ap->reason == RX_ACK_PING_RESPONSE) {
3441 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3445 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3447 /* Now go through explicit acks/nacks and record the results in
3448 * the waiting packets. These are packets that can't be released
3449 * yet, even with a positive acknowledge. This positive
3450 * acknowledge only means the packet has been received by the
3451 * peer, not that it will be retained long enough to be sent to
3452 * the peer's upper level. In addition, reset the transmit timers
3453 * of any missing packets (those packets that must be missing
3454 * because this packet was out of sequence) */
3456 call->nSoftAcked = 0;
3457 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3458 /* Update round trip time if the ack was stimulated on receipt
3460 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3461 #ifdef RX_ENABLE_LOCKS
3462 if (tp->header.seq >= first) {
3463 #endif /* RX_ENABLE_LOCKS */
3464 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3465 if (tp->header.serial == serial) {
3466 /* Use RTT if not delayed by client. */
3467 if (ap->reason != RX_ACK_DELAY)
3468 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3470 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3473 else if ((tp->firstSerial == serial)) {
3474 /* Use RTT if not delayed by client. */
3475 if (ap->reason != RX_ACK_DELAY)
3476 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3478 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3481 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3482 #ifdef RX_ENABLE_LOCKS
3484 #endif /* RX_ENABLE_LOCKS */
3485 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3487 /* Set the acknowledge flag per packet based on the
3488 * information in the ack packet. An acknowlegded packet can
3489 * be downgraded when the server has discarded a packet it
3490 * soacked previously, or when an ack packet is received
3491 * out of sequence. */
3492 if (tp->header.seq < first) {
3493 /* Implicit ack information */
3494 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3497 tp->flags |= RX_PKTFLAG_ACKED;
3499 else if (tp->header.seq < first + nAcks) {
3500 /* Explicit ack information: set it in the packet appropriately */
3501 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3502 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3504 tp->flags |= RX_PKTFLAG_ACKED;
3512 tp->flags &= ~RX_PKTFLAG_ACKED;
3517 tp->flags &= ~RX_PKTFLAG_ACKED;
3521 /* If packet isn't yet acked, and it has been transmitted at least
3522 * once, reset retransmit time using latest timeout
3523 * ie, this should readjust the retransmit timer for all outstanding
3524 * packets... So we don't just retransmit when we should know better*/
3526 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3527 tp->retryTime = tp->timeSent;
3528 clock_Add(&tp->retryTime, &peer->timeout);
3529 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3530 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3534 /* If the window has been extended by this acknowledge packet,
3535 * then wakeup a sender waiting in alloc for window space, or try
3536 * sending packets now, if he's been sitting on packets due to
3537 * lack of window space */
3538 if (call->tnext < (call->tfirst + call->twind)) {
3539 #ifdef RX_ENABLE_LOCKS
3540 CV_SIGNAL(&call->cv_twind);
3542 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3543 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3544 osi_rxWakeup(&call->twind);
3547 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3548 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3552 /* if the ack packet has a receivelen field hanging off it,
3553 * update our state */
3554 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3557 /* If the ack packet has a "recommended" size that is less than
3558 * what I am using now, reduce my size to match */
3559 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3560 sizeof(afs_int32), &tSize);
3561 tSize = (afs_uint32) ntohl(tSize);
3562 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3564 /* Get the maximum packet size to send to this peer */
3565 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3567 tSize = (afs_uint32)ntohl(tSize);
3568 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3569 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3571 /* sanity check - peer might have restarted with different params.
3572 * If peer says "send less", dammit, send less... Peer should never
3573 * be unable to accept packets of the size that prior AFS versions would
3574 * send without asking. */
3575 if (peer->maxMTU != tSize) {
3576 peer->maxMTU = tSize;
3577 peer->MTU = MIN(tSize, peer->MTU);
3578 call->MTU = MIN(call->MTU, tSize);
3582 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3584 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3585 sizeof(afs_int32), &tSize);
3586 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3587 if (tSize < call->twind) { /* smaller than our send */
3588 call->twind = tSize; /* window, we must send less... */
3589 call->ssthresh = MIN(call->twind, call->ssthresh);
3592 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3593 * network MTU confused with the loopback MTU. Calculate the
3594 * maximum MTU here for use in the slow start code below.
3596 maxMTU = peer->maxMTU;
3597 /* Did peer restart with older RX version? */
3598 if (peer->maxDgramPackets > 1) {
3599 peer->maxDgramPackets = 1;
3601 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3603 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3604 sizeof(afs_int32), &tSize);
3605 tSize = (afs_uint32) ntohl(tSize);
3607 * As of AFS 3.5 we set the send window to match the receive window.
3609 if (tSize < call->twind) {
3610 call->twind = tSize;
3611 call->ssthresh = MIN(call->twind, call->ssthresh);
3612 } else if (tSize > call->twind) {
3613 call->twind = tSize;
3617 * As of AFS 3.5, a jumbogram is more than one fixed size
3618 * packet transmitted in a single UDP datagram. If the remote
3619 * MTU is smaller than our local MTU then never send a datagram
3620 * larger than the natural MTU.
3622 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3623 sizeof(afs_int32), &tSize);
3624 maxDgramPackets = (afs_uint32) ntohl(tSize);
3625 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3626 maxDgramPackets = MIN(maxDgramPackets,
3627 (int)(peer->ifDgramPackets));
3628 maxDgramPackets = MIN(maxDgramPackets, tSize);
3629 if (maxDgramPackets > 1) {
3630 peer->maxDgramPackets = maxDgramPackets;
3631 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3633 peer->maxDgramPackets = 1;
3634 call->MTU = peer->natMTU;
3636 } else if (peer->maxDgramPackets > 1) {
3637 /* Restarted with lower version of RX */
3638 peer->maxDgramPackets = 1;
3640 } else if (peer->maxDgramPackets > 1 ||
3641 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3642 /* Restarted with lower version of RX */
3643 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3644 peer->natMTU = OLD_MAX_PACKET_SIZE;
3645 peer->MTU = OLD_MAX_PACKET_SIZE;
3646 peer->maxDgramPackets = 1;
3647 peer->nDgramPackets = 1;
3649 call->MTU = OLD_MAX_PACKET_SIZE;
3654 * Calculate how many datagrams were successfully received after
3655 * the first missing packet and adjust the negative ack counter
3660 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3661 if (call->nNacks < nNacked) {
3662 call->nNacks = nNacked;
3671 if (call->flags & RX_CALL_FAST_RECOVER) {
3673 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3675 call->flags &= ~RX_CALL_FAST_RECOVER;
3676 call->cwind = call->nextCwind;
3677 call->nextCwind = 0;
3680 call->nCwindAcks = 0;
3682 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3683 /* Three negative acks in a row trigger congestion recovery */
3684 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3685 MUTEX_EXIT(&peer->peer_lock);
3686 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3687 /* someone else is waiting to start recovery */
3690 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3691 while (call->flags & RX_CALL_TQ_BUSY) {
3692 call->flags |= RX_CALL_TQ_WAIT;
3693 #ifdef RX_ENABLE_LOCKS
3694 CV_WAIT(&call->cv_tq, &call->lock);
3695 #else /* RX_ENABLE_LOCKS */
3696 osi_rxSleep(&call->tq);
3697 #endif /* RX_ENABLE_LOCKS */
3699 MUTEX_ENTER(&peer->peer_lock);
3700 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3701 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3702 call->flags |= RX_CALL_FAST_RECOVER;
3703 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3704 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3706 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3707 call->nextCwind = call->ssthresh;
3710 peer->MTU = call->MTU;
3711 peer->cwind = call->nextCwind;
3712 peer->nDgramPackets = call->nDgramPackets;
3714 call->congestSeq = peer->congestSeq;
3715 /* Reset the resend times on the packets that were nacked
3716 * so we will retransmit as soon as the window permits*/
3717 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3719 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3720 clock_Zero(&tp->retryTime);
3722 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3727 /* If cwind is smaller than ssthresh, then increase
3728 * the window one packet for each ack we receive (exponential
3730 * If cwind is greater than or equal to ssthresh then increase
3731 * the congestion window by one packet for each cwind acks we
3732 * receive (linear growth). */
3733 if (call->cwind < call->ssthresh) {
3734 call->cwind = MIN((int)call->ssthresh,
3735 (int)(call->cwind + newAckCount));
3736 call->nCwindAcks = 0;
3738 call->nCwindAcks += newAckCount;
3739 if (call->nCwindAcks >= call->cwind) {
3740 call->nCwindAcks = 0;
3741 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3745 * If we have received several acknowledgements in a row then
3746 * it is time to increase the size of our datagrams
3748 if ((int)call->nAcks > rx_nDgramThreshold) {
3749 if (peer->maxDgramPackets > 1) {
3750 if (call->nDgramPackets < peer->maxDgramPackets) {
3751 call->nDgramPackets++;
3753 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3754 } else if (call->MTU < peer->maxMTU) {
3755 call->MTU += peer->natMTU;
3756 call->MTU = MIN(call->MTU, peer->maxMTU);
3762 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3764 /* Servers need to hold the call until all response packets have
3765 * been acknowledged. Soft acks are good enough since clients
3766 * are not allowed to clear their receive queues. */
3767 if (call->state == RX_STATE_HOLD &&
3768 call->tfirst + call->nSoftAcked >= call->tnext) {
3769 call->state = RX_STATE_DALLY;
3770 rxi_ClearTransmitQueue(call, 0);
3771 } else if (!queue_IsEmpty(&call->tq)) {
3772 rxi_Start(0, call, istack);
3777 /* Received a response to a challenge packet */
3778 struct rx_packet *rxi_ReceiveResponsePacket(register struct rx_connection *conn,
3779 register struct rx_packet *np, int istack)
3783 /* Ignore the packet if we're the client */
3784 if (conn->type == RX_CLIENT_CONNECTION) return np;
3786 /* If already authenticated, ignore the packet (it's probably a retry) */
3787 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3790 /* Otherwise, have the security object evaluate the response packet */
3791 error = RXS_CheckResponse(conn->securityObject, conn, np);
3793 /* If the response is invalid, reset the connection, sending
3794 * an abort to the peer */
3798 rxi_ConnectionError(conn, error);
3799 MUTEX_ENTER(&conn->conn_data_lock);
3800 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3801 MUTEX_EXIT(&conn->conn_data_lock);
3805 /* If the response is valid, any calls waiting to attach
3806 * servers can now do so */
3809 for (i=0; i<RX_MAXCALLS; i++) {
3810 struct rx_call *call = conn->call[i];
3812 MUTEX_ENTER(&call->lock);
3813 if (call->state == RX_STATE_PRECALL)
3814 rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
3815 MUTEX_EXIT(&call->lock);
3819 /* Update the peer reachability information, just in case
3820 * some calls went into attach-wait while we were waiting
3821 * for authentication..
3823 rxi_UpdatePeerReach(conn, NULL);
3828 /* A client has received an authentication challenge: the security
3829 * object is asked to cough up a respectable response packet to send
3830 * back to the server. The server is responsible for retrying the
3831 * challenge if it fails to get a response. */
3833 struct rx_packet *rxi_ReceiveChallengePacket(register struct rx_connection *conn,
3834 register struct rx_packet *np, int istack)
3838 /* Ignore the challenge if we're the server */
3839 if (conn->type == RX_SERVER_CONNECTION) return np;
3841 /* Ignore the challenge if the connection is otherwise idle; someone's
3842 * trying to use us as an oracle. */
3843 if (!rxi_HasActiveCalls(conn)) return np;
3845 /* Send the security object the challenge packet. It is expected to fill
3846 * in the response. */
3847 error = RXS_GetResponse(conn->securityObject, conn, np);
3849 /* If the security object is unable to return a valid response, reset the
3850 * connection and send an abort to the peer. Otherwise send the response
3851 * packet to the peer connection. */
3853 rxi_ConnectionError(conn, error);
3854 MUTEX_ENTER(&conn->conn_data_lock);
3855 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3856 MUTEX_EXIT(&conn->conn_data_lock);
3859 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3860 RX_PACKET_TYPE_RESPONSE, NULL, -1, istack);
3866 /* Find an available server process to service the current request in
3867 * the given call structure. If one isn't available, queue up this
3868 * call so it eventually gets one */
3869 void rxi_AttachServerProc(register struct rx_call *call,
3870 register osi_socket socket, register int *tnop, register struct rx_call **newcallp)
3872 register struct rx_serverQueueEntry *sq;
3873 register struct rx_service *service = call->conn->service;
3874 #ifdef RX_ENABLE_LOCKS
3875 register int haveQuota = 0;
3876 #endif /* RX_ENABLE_LOCKS */
3877 /* May already be attached */
3878 if (call->state == RX_STATE_ACTIVE) return;
3880 MUTEX_ENTER(&rx_serverPool_lock);
3881 #ifdef RX_ENABLE_LOCKS
3882 while(rxi_ServerThreadSelectingCall) {
3883 MUTEX_EXIT(&call->lock);
3884 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3885 MUTEX_EXIT(&rx_serverPool_lock);
3886 MUTEX_ENTER(&call->lock);
3887 MUTEX_ENTER(&rx_serverPool_lock);
3888 /* Call may have been attached */
3889 if (call->state == RX_STATE_ACTIVE) return;
3892 haveQuota = QuotaOK(service);
3893 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3894 /* If there are no processes available to service this call,
3895 * put the call on the incoming call queue (unless it's
3896 * already on the queue).
3899 ReturnToServerPool(service);
3900 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3901 call->flags |= RX_CALL_WAIT_PROC;
3902 MUTEX_ENTER(&rx_stats_mutex);
3904 MUTEX_EXIT(&rx_stats_mutex);
3905 rxi_calltrace(RX_CALL_ARRIVAL, call);
3906 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3907 queue_Append(&rx_incomingCallQueue, call);
3910 #else /* RX_ENABLE_LOCKS */
3911 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3912 /* If there are no processes available to service this call,
3913 * put the call on the incoming call queue (unless it's
3914 * already on the queue).
3916 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3917 call->flags |= RX_CALL_WAIT_PROC;
3919 rxi_calltrace(RX_CALL_ARRIVAL, call);
3920 queue_Append(&rx_incomingCallQueue, call);
3923 #endif /* RX_ENABLE_LOCKS */
3925 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3927 /* If hot threads are enabled, and both newcallp and sq->socketp
3928 * are non-null, then this thread will process the call, and the
3929 * idle server thread will start listening on this threads socket.
3932 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3935 *sq->socketp = socket;
3936 clock_GetTime(&call->startTime);
3937 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3941 if (call->flags & RX_CALL_WAIT_PROC) {
3942 /* Conservative: I don't think this should happen */
3943 call->flags &= ~RX_CALL_WAIT_PROC;
3944 MUTEX_ENTER(&rx_stats_mutex);
3946 MUTEX_EXIT(&rx_stats_mutex);
3949 call->state = RX_STATE_ACTIVE;
3950 call->mode = RX_MODE_RECEIVING;
3951 if (call->flags & RX_CALL_CLEARED) {
3952 /* send an ack now to start the packet flow up again */
3953 call->flags &= ~RX_CALL_CLEARED;
3954 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3956 #ifdef RX_ENABLE_LOCKS
3959 service->nRequestsRunning++;
3960 if (service->nRequestsRunning <= service->minProcs)
3966 MUTEX_EXIT(&rx_serverPool_lock);
3969 /* Delay the sending of an acknowledge event for a short while, while
3970 * a new call is being prepared (in the case of a client) or a reply
3971 * is being prepared (in the case of a server). Rather than sending
3972 * an ack packet, an ACKALL packet is sent. */
3973 void rxi_AckAll(struct rxevent *event, register struct rx_call *call, char *dummy)
3975 #ifdef RX_ENABLE_LOCKS
3977 MUTEX_ENTER(&call->lock);
3978 call->delayedAckEvent = NULL;
3979 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3981 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3982 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3984 MUTEX_EXIT(&call->lock);
3985 #else /* RX_ENABLE_LOCKS */
3986 if (event) call->delayedAckEvent = NULL;
3987 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3988 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3989 #endif /* RX_ENABLE_LOCKS */
3992 void rxi_SendDelayedAck(struct rxevent *event, register struct rx_call *call, char *dummy)
3994 #ifdef RX_ENABLE_LOCKS
3996 MUTEX_ENTER(&call->lock);
3997 if (event == call->delayedAckEvent)
3998 call->delayedAckEvent = NULL;
3999 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
4001 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4003 MUTEX_EXIT(&call->lock);
4004 #else /* RX_ENABLE_LOCKS */
4005 if (event) call->delayedAckEvent = NULL;
4006 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4007 #endif /* RX_ENABLE_LOCKS */
4011 #ifdef RX_ENABLE_LOCKS
4012 /* Set ack in all packets in transmit queue. rxi_Start will deal with
4013 * clearing them out.
4015 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call)
4017 register struct rx_packet *p, *tp;
4020 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4023 p->flags |= RX_PKTFLAG_ACKED;
4027 call->flags |= RX_CALL_TQ_CLEARME;
4028 call->flags |= RX_CALL_TQ_SOME_ACKED;
4031 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4032 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4033 call->tfirst = call->tnext;
4034 call->nSoftAcked = 0;
4036 if (call->flags & RX_CALL_FAST_RECOVER) {
4037 call->flags &= ~RX_CALL_FAST_RECOVER;
4038 call->cwind = call->nextCwind;
4039 call->nextCwind = 0;
4042 CV_SIGNAL(&call->cv_twind);
4044 #endif /* RX_ENABLE_LOCKS */
4046 /* Clear out the transmit queue for the current call (all packets have
4047 * been received by peer) */
4048 void rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
4050 register struct rx_packet *p, *tp;
4052 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4053 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
4055 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4058 p->flags |= RX_PKTFLAG_ACKED;
4062 call->flags |= RX_CALL_TQ_CLEARME;
4063 call->flags |= RX_CALL_TQ_SOME_ACKED;
4066 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4067 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4073 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4074 call->flags &= ~RX_CALL_TQ_CLEARME;
4076 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4078 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4079 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4080 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4081 call->nSoftAcked = 0;
4083 if (call->flags & RX_CALL_FAST_RECOVER) {
4084 call->flags &= ~RX_CALL_FAST_RECOVER;
4085 call->cwind = call->nextCwind;
4088 #ifdef RX_ENABLE_LOCKS
4089 CV_SIGNAL(&call->cv_twind);
4091 osi_rxWakeup(&call->twind);
4095 void rxi_ClearReceiveQueue(register struct rx_call *call)
4097 register struct rx_packet *p, *tp;
4098 if (queue_IsNotEmpty(&call->rq)) {
4099 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4104 rx_packetReclaims++;
4106 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4108 if (call->state == RX_STATE_PRECALL) {
4109 call->flags |= RX_CALL_CLEARED;
4113 /* Send an abort packet for the specified call */
4114 struct rx_packet *rxi_SendCallAbort(register struct rx_call *call,
4115 struct rx_packet *packet, int istack, int force)
4123 /* Clients should never delay abort messages */
4124 if (rx_IsClientConn(call->conn))
4127 if (call->abortCode != call->error) {
4128 call->abortCode = call->error;
4129 call->abortCount = 0;
4132 if (force || rxi_callAbortThreshhold == 0 ||
4133 call->abortCount < rxi_callAbortThreshhold) {
4134 if (call->delayedAbortEvent) {
4135 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4137 error = htonl(call->error);
4139 packet = rxi_SendSpecial(call, call->conn, packet,
4140 RX_PACKET_TYPE_ABORT, (char *)&error,
4141 sizeof(error), istack);
4142 } else if (!call->delayedAbortEvent) {
4143 clock_GetTime(&when);
4144 clock_Addmsec(&when, rxi_callAbortDelay);
4145 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4146 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4152 /* Send an abort packet for the specified connection. Packet is an
4153 * optional pointer to a packet that can be used to send the abort.
4154 * Once the number of abort messages reaches the threshhold, an
4155 * event is scheduled to send the abort. Setting the force flag
4156 * overrides sending delayed abort messages.
4158 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4159 * to send the abort packet.
4161 struct rx_packet *rxi_SendConnectionAbort(register struct rx_connection *conn,
4162 struct rx_packet *packet, int istack, int force)
4170 /* Clients should never delay abort messages */
4171 if (rx_IsClientConn(conn))
4174 if (force || rxi_connAbortThreshhold == 0 ||
4175 conn->abortCount < rxi_connAbortThreshhold) {
4176 if (conn->delayedAbortEvent) {
4177 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4179 error = htonl(conn->error);
4181 MUTEX_EXIT(&conn->conn_data_lock);
4182 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4183 RX_PACKET_TYPE_ABORT, (char *)&error,
4184 sizeof(error), istack);
4185 MUTEX_ENTER(&conn->conn_data_lock);
4186 } else if (!conn->delayedAbortEvent) {
4187 clock_GetTime(&when);
4188 clock_Addmsec(&when, rxi_connAbortDelay);
4189 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4195 /* Associate an error all of the calls owned by a connection. Called
4196 * with error non-zero. This is only for really fatal things, like
4197 * bad authentication responses. The connection itself is set in
4198 * error at this point, so that future packets received will be
4200 void rxi_ConnectionError(register struct rx_connection *conn,
4201 register afs_int32 error)
4205 MUTEX_ENTER(&conn->conn_data_lock);
4206 if (conn->challengeEvent)
4207 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4208 if (conn->checkReachEvent) {
4209 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4210 conn->checkReachEvent = 0;
4211 conn->flags &= ~RX_CONN_ATTACHWAIT;
4214 MUTEX_EXIT(&conn->conn_data_lock);
4215 for (i=0; i<RX_MAXCALLS; i++) {
4216 struct rx_call *call = conn->call[i];
4218 MUTEX_ENTER(&call->lock);
4219 rxi_CallError(call, error);
4220 MUTEX_EXIT(&call->lock);
4223 conn->error = error;
4224 MUTEX_ENTER(&rx_stats_mutex);
4225 rx_stats.fatalErrors++;
4226 MUTEX_EXIT(&rx_stats_mutex);
4230 void rxi_CallError(register struct rx_call *call, afs_int32 error)
4232 if (call->error) error = call->error;
4233 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4234 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4235 rxi_ResetCall(call, 0);
4238 rxi_ResetCall(call, 0);
4240 call->error = error;
4241 call->mode = RX_MODE_ERROR;
4244 /* Reset various fields in a call structure, and wakeup waiting
4245 * processes. Some fields aren't changed: state & mode are not
4246 * touched (these must be set by the caller), and bufptr, nLeft, and
4247 * nFree are not reset, since these fields are manipulated by
4248 * unprotected macros, and may only be reset by non-interrupting code.
4251 /* this code requires that call->conn be set properly as a pre-condition. */
4252 #endif /* ADAPT_WINDOW */
4254 void rxi_ResetCall(register struct rx_call *call, register int newcall)
4257 register struct rx_peer *peer;
4258 struct rx_packet *packet;
4260 /* Notify anyone who is waiting for asynchronous packet arrival */
4261 if (call->arrivalProc) {
4262 (*call->arrivalProc)(call, call->arrivalProcHandle, (int) call->arrivalProcArg);
4263 call->arrivalProc = (VOID (*)()) 0;
4266 if (call->delayedAbortEvent) {
4267 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4268 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4270 rxi_SendCallAbort(call, packet, 0, 1);
4271 rxi_FreePacket(packet);
4276 * Update the peer with the congestion information in this call
4277 * so other calls on this connection can pick up where this call
4278 * left off. If the congestion sequence numbers don't match then
4279 * another call experienced a retransmission.
4281 peer = call->conn->peer;
4282 MUTEX_ENTER(&peer->peer_lock);
4284 if (call->congestSeq == peer->congestSeq) {
4285 peer->cwind = MAX(peer->cwind, call->cwind);
4286 peer->MTU = MAX(peer->MTU, call->MTU);
4287 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4290 call->abortCode = 0;
4291 call->abortCount = 0;
4293 if (peer->maxDgramPackets > 1) {
4294 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4296 call->MTU = peer->MTU;
4298 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4299 call->ssthresh = rx_maxSendWindow;
4300 call->nDgramPackets = peer->nDgramPackets;
4301 call->congestSeq = peer->congestSeq;
4302 MUTEX_EXIT(&peer->peer_lock);
4304 flags = call->flags;
4305 rxi_ClearReceiveQueue(call);
4306 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4307 if (call->flags & RX_CALL_TQ_BUSY) {
4308 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4309 call->flags |= (flags & RX_CALL_TQ_WAIT);
4311 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4313 rxi_ClearTransmitQueue(call, 0);
4314 queue_Init(&call->tq);
4317 queue_Init(&call->rq);
4319 call->rwind = rx_initReceiveWindow;
4320 call->twind = rx_initSendWindow;
4321 call->nSoftAcked = 0;
4322 call->nextCwind = 0;
4325 call->nCwindAcks = 0;
4326 call->nSoftAcks = 0;
4327 call->nHardAcks = 0;
4329 call->tfirst = call->rnext = call->tnext = 1;
4331 call->lastAcked = 0;
4332 call->localStatus = call->remoteStatus = 0;
4334 if (flags & RX_CALL_READER_WAIT) {
4335 #ifdef RX_ENABLE_LOCKS
4336 CV_BROADCAST(&call->cv_rq);
4338 osi_rxWakeup(&call->rq);
4341 if (flags & RX_CALL_WAIT_PACKETS) {
4342 MUTEX_ENTER(&rx_freePktQ_lock);
4343 rxi_PacketsUnWait(); /* XXX */
4344 MUTEX_EXIT(&rx_freePktQ_lock);
4347 #ifdef RX_ENABLE_LOCKS
4348 CV_SIGNAL(&call->cv_twind);
4350 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4351 osi_rxWakeup(&call->twind);
4354 #ifdef RX_ENABLE_LOCKS
4355 /* The following ensures that we don't mess with any queue while some
4356 * other thread might also be doing so. The call_queue_lock field is
4357 * is only modified under the call lock. If the call is in the process
4358 * of being removed from a queue, the call is not locked until the
4359 * the queue lock is dropped and only then is the call_queue_lock field
4360 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4361 * Note that any other routine which removes a call from a queue has to
4362 * obtain the queue lock before examing the queue and removing the call.
4364 if (call->call_queue_lock) {
4365 MUTEX_ENTER(call->call_queue_lock);
4366 if (queue_IsOnQueue(call)) {
4368 if (flags & RX_CALL_WAIT_PROC) {
4369 MUTEX_ENTER(&rx_stats_mutex);
4371 MUTEX_EXIT(&rx_stats_mutex);
4374 MUTEX_EXIT(call->call_queue_lock);
4375 CLEAR_CALL_QUEUE_LOCK(call);
4377 #else /* RX_ENABLE_LOCKS */
4378 if (queue_IsOnQueue(call)) {
4380 if (flags & RX_CALL_WAIT_PROC)
4383 #endif /* RX_ENABLE_LOCKS */
4385 rxi_KeepAliveOff(call);
4386 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4389 /* Send an acknowledge for the indicated packet (seq,serial) of the
4390 * indicated call, for the indicated reason (reason). This
4391 * acknowledge will specifically acknowledge receiving the packet, and
4392 * will also specify which other packets for this call have been
4393 * received. This routine returns the packet that was used to the
4394 * caller. The caller is responsible for freeing it or re-using it.
4395 * This acknowledgement also returns the highest sequence number
4396 * actually read out by the higher level to the sender; the sender
4397 * promises to keep around packets that have not been read by the
4398 * higher level yet (unless, of course, the sender decides to abort
4399 * the call altogether). Any of p, seq, serial, pflags, or reason may
4400 * be set to zero without ill effect. That is, if they are zero, they
4401 * will not convey any information.
4402 * NOW there is a trailer field, after the ack where it will safely be
4403 * ignored by mundanes, which indicates the maximum size packet this
4404 * host can swallow. */
4406 register struct rx_packet *optionalPacket; use to send ack (or null)
4407 int seq; Sequence number of the packet we are acking
4408 int serial; Serial number of the packet
4409 int pflags; Flags field from packet header
4410 int reason; Reason an acknowledge was prompted
4413 struct rx_packet *rxi_SendAck(register struct rx_call *call,
4414 register struct rx_packet *optionalPacket, int seq, int serial,
4415 int pflags, int reason, int istack)
4417 struct rx_ackPacket *ap;
4418 register struct rx_packet *rqp;
4419 register struct rx_packet *nxp; /* For queue_Scan */
4420 register struct rx_packet *p;
4425 * Open the receive window once a thread starts reading packets
4427 if (call->rnext > 1) {
4428 call->rwind = rx_maxReceiveWindow;
4431 call->nHardAcks = 0;
4432 call->nSoftAcks = 0;
4433 if (call->rnext > call->lastAcked)
4434 call->lastAcked = call->rnext;
4438 rx_computelen(p, p->length); /* reset length, you never know */
4439 } /* where that's been... */
4441 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4442 /* We won't send the ack, but don't panic. */
4443 return optionalPacket;
4446 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4448 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4449 if (!optionalPacket) rxi_FreePacket(p);
4450 return optionalPacket;
4452 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4453 if (rx_Contiguous(p)<templ) {
4454 if (!optionalPacket) rxi_FreePacket(p);
4455 return optionalPacket;
4457 } /* MTUXXX failing to send an ack is very serious. We should */
4458 /* try as hard as possible to send even a partial ack; it's */
4459 /* better than nothing. */
4461 ap = (struct rx_ackPacket *) rx_DataOf(p);
4462 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4463 ap->reason = reason;
4465 /* The skew computation used to be bogus, I think it's better now. */
4466 /* We should start paying attention to skew. XXX */
4467 ap->serial = htonl(call->conn->maxSerial);
4468 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4470 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4471 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4473 /* No fear of running out of ack packet here because there can only be at most
4474 * one window full of unacknowledged packets. The window size must be constrained
4475 * to be less than the maximum ack size, of course. Also, an ack should always
4476 * fit into a single packet -- it should not ever be fragmented. */
4477 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4478 if (!rqp || !call->rq.next
4479 || (rqp->header.seq > (call->rnext + call->rwind))) {
4480 if (!optionalPacket) rxi_FreePacket(p);
4481 rxi_CallError(call, RX_CALL_DEAD);
4482 return optionalPacket;
4485 while (rqp->header.seq > call->rnext + offset)
4486 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4487 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4489 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4490 if (!optionalPacket) rxi_FreePacket(p);
4491 rxi_CallError(call, RX_CALL_DEAD);
4492 return optionalPacket;
4497 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4499 /* these are new for AFS 3.3 */
4500 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4501 templ = htonl(templ);
4502 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4503 templ = htonl(call->conn->peer->ifMTU);
4504 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4506 /* new for AFS 3.4 */
4507 templ = htonl(call->rwind);
4508 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4510 /* new for AFS 3.5 */
4511 templ = htonl(call->conn->peer->ifDgramPackets);
4512 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4514 p->header.serviceId = call->conn->serviceId;
4515 p->header.cid = (call->conn->cid | call->channel);
4516 p->header.callNumber = *call->callNumber;
4517 p->header.seq = seq;
4518 p->header.securityIndex = call->conn->securityIndex;
4519 p->header.epoch = call->conn->epoch;
4520 p->header.type = RX_PACKET_TYPE_ACK;
4521 p->header.flags = RX_SLOW_START_OK;
4522 if (reason == RX_ACK_PING) {
4523 p->header.flags |= RX_REQUEST_ACK;
4525 clock_GetTime(&call->pingRequestTime);
4528 if (call->conn->type == RX_CLIENT_CONNECTION)
4529 p->header.flags |= RX_CLIENT_INITIATED;
4533 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4534 ap->reason, ntohl(ap->previousPacket),
4535 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4537 for (offset = 0; offset < ap->nAcks; offset++)
4538 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4545 register int i, nbytes = p->length;
4547 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4548 if (nbytes <= p->wirevec[i].iov_len) {
4549 register int savelen, saven;
4551 savelen = p->wirevec[i].iov_len;
4553 p->wirevec[i].iov_len = nbytes;
4555 rxi_Send(call, p, istack);
4556 p->wirevec[i].iov_len = savelen;
4560 else nbytes -= p->wirevec[i].iov_len;
4563 MUTEX_ENTER(&rx_stats_mutex);
4564 rx_stats.ackPacketsSent++;
4565 MUTEX_EXIT(&rx_stats_mutex);
4566 if (!optionalPacket) rxi_FreePacket(p);
4567 return optionalPacket; /* Return packet for re-use by caller */
4570 /* Send all of the packets in the list in single datagram */
4571 static void rxi_SendList(struct rx_call *call, struct rx_packet **list,
4572 int len, int istack, int moreFlag, struct clock *now,
4573 struct clock *retryTime, int resending)
4578 struct rx_connection *conn = call->conn;
4579 struct rx_peer *peer = conn->peer;
4581 MUTEX_ENTER(&peer->peer_lock);
4583 if (resending) peer->reSends += len;
4584 MUTEX_ENTER(&rx_stats_mutex);
4585 rx_stats.dataPacketsSent += len;
4586 MUTEX_EXIT(&rx_stats_mutex);
4587 MUTEX_EXIT(&peer->peer_lock);
4589 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4593 /* Set the packet flags and schedule the resend events */
4594 /* Only request an ack for the last packet in the list */
4595 for (i = 0 ; i < len ; i++) {
4596 list[i]->retryTime = *retryTime;
4597 if (list[i]->header.serial) {
4598 /* Exponentially backoff retry times */
4599 if (list[i]->backoff < MAXBACKOFF) {
4600 /* so it can't stay == 0 */
4601 list[i]->backoff = (list[i]->backoff << 1) +1;
4603 else list[i]->backoff++;
4604 clock_Addmsec(&(list[i]->retryTime),
4605 ((afs_uint32) list[i]->backoff) << 8);
4608 /* Wait a little extra for the ack on the last packet */
4609 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4610 clock_Addmsec(&(list[i]->retryTime), 400);
4613 /* Record the time sent */
4614 list[i]->timeSent = *now;
4616 /* Ask for an ack on retransmitted packets, on every other packet
4617 * if the peer doesn't support slow start. Ask for an ack on every
4618 * packet until the congestion window reaches the ack rate. */
4619 if (list[i]->header.serial) {
4621 MUTEX_ENTER(&rx_stats_mutex);
4622 rx_stats.dataPacketsReSent++;
4623 MUTEX_EXIT(&rx_stats_mutex);
4625 /* improved RTO calculation- not Karn */
4626 list[i]->firstSent = *now;
4628 && (call->cwind <= (u_short)(conn->ackRate+1)
4629 || (!(call->flags & RX_CALL_SLOW_START_OK)
4630 && (list[i]->header.seq & 1)))) {
4635 MUTEX_ENTER(&peer->peer_lock);
4637 if (resending) peer->reSends++;
4638 MUTEX_ENTER(&rx_stats_mutex);
4639 rx_stats.dataPacketsSent++;
4640 MUTEX_EXIT(&rx_stats_mutex);
4641 MUTEX_EXIT(&peer->peer_lock);
4643 /* Tag this packet as not being the last in this group,
4644 * for the receiver's benefit */
4645 if (i < len-1 || moreFlag) {
4646 list[i]->header.flags |= RX_MORE_PACKETS;
4649 /* Install the new retransmit time for the packet, and
4650 * record the time sent */
4651 list[i]->timeSent = *now;
4655 list[len-1]->header.flags |= RX_REQUEST_ACK;
4658 /* Since we're about to send a data packet to the peer, it's
4659 * safe to nuke any scheduled end-of-packets ack */
4660 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4662 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4663 MUTEX_EXIT(&call->lock);
4665 rxi_SendPacketList(conn, list, len, istack);
4667 rxi_SendPacket(conn, list[0], istack);
4669 MUTEX_ENTER(&call->lock);
4670 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4672 /* Update last send time for this call (for keep-alive
4673 * processing), and for the connection (so that we can discover
4674 * idle connections) */
4675 conn->lastSendTime = call->lastSendTime = clock_Sec();
4678 /* When sending packets we need to follow these rules:
4679 * 1. Never send more than maxDgramPackets in a jumbogram.
4680 * 2. Never send a packet with more than two iovecs in a jumbogram.
4681 * 3. Never send a retransmitted packet in a jumbogram.
4682 * 4. Never send more than cwind/4 packets in a jumbogram
4683 * We always keep the last list we should have sent so we
4684 * can set the RX_MORE_PACKETS flags correctly.
4686 static void rxi_SendXmitList(struct rx_call *call, struct rx_packet **list,
4687 int len, int istack, struct clock *now, struct clock *retryTime,
4690 int i, cnt, lastCnt = 0;
4691 struct rx_packet **listP, **lastP = 0;
4692 struct rx_peer *peer = call->conn->peer;
4693 int morePackets = 0;
4695 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4696 /* Does the current packet force us to flush the current list? */
4698 && (list[i]->header.serial
4699 || (list[i]->flags & RX_PKTFLAG_ACKED)
4700 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4702 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4703 /* If the call enters an error state stop sending, or if
4704 * we entered congestion recovery mode, stop sending */
4705 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4713 /* Add the current packet to the list if it hasn't been acked.
4714 * Otherwise adjust the list pointer to skip the current packet. */
4715 if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
4717 /* Do we need to flush the list? */
4718 if (cnt >= (int)peer->maxDgramPackets
4719 || cnt >= (int)call->nDgramPackets
4720 || cnt >= (int)call->cwind
4721 || list[i]->header.serial
4722 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4724 rxi_SendList(call, lastP, lastCnt, istack, 1,
4725 now, retryTime, resending);
4726 /* If the call enters an error state stop sending, or if
4727 * we entered congestion recovery mode, stop sending */
4728 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4738 osi_Panic("rxi_SendList error");
4744 /* Send the whole list when the call is in receive mode, when
4745 * the call is in eof mode, when we are in fast recovery mode,
4746 * and when we have the last packet */
4747 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4748 || call->mode == RX_MODE_RECEIVING
4749 || call->mode == RX_MODE_EOF
4750 || (call->flags & RX_CALL_FAST_RECOVER)) {
4751 /* Check for the case where the current list contains
4752 * an acked packet. Since we always send retransmissions
4753 * in a separate packet, we only need to check the first
4754 * packet in the list */
4755 if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
4759 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4760 now, retryTime, resending);
4761 /* If the call enters an error state stop sending, or if
4762 * we entered congestion recovery mode, stop sending */
4763 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4767 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4769 } else if (lastCnt > 0) {
4770 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4774 #ifdef RX_ENABLE_LOCKS
4775 /* Call rxi_Start, below, but with the call lock held. */
4776 void rxi_StartUnlocked(struct rxevent *event, register struct rx_call *call,
4779 MUTEX_ENTER(&call->lock);
4780 rxi_Start(event, call, istack);
4781 MUTEX_EXIT(&call->lock);
4783 #endif /* RX_ENABLE_LOCKS */
4785 /* This routine is called when new packets are readied for
4786 * transmission and when retransmission may be necessary, or when the
4787 * transmission window or burst count are favourable. This should be
4788 * better optimized for new packets, the usual case, now that we've
4789 * got rid of queues of send packets. XXXXXXXXXXX */
4790 void rxi_Start(struct rxevent *event, register struct rx_call *call,
4793 struct rx_packet *p;
4794 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4795 struct rx_peer *peer = call->conn->peer;
4796 struct clock now, retryTime;
4800 struct rx_packet **xmitList;
4803 /* If rxi_Start is being called as a result of a resend event,
4804 * then make sure that the event pointer is removed from the call
4805 * structure, since there is no longer a per-call retransmission
4807 if (event && event == call->resendEvent) {
4808 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4809 call->resendEvent = NULL;
4811 if (queue_IsEmpty(&call->tq)) {
4815 /* Timeouts trigger congestion recovery */
4816 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4817 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4818 /* someone else is waiting to start recovery */
4821 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4822 while (call->flags & RX_CALL_TQ_BUSY) {
4823 call->flags |= RX_CALL_TQ_WAIT;
4824 #ifdef RX_ENABLE_LOCKS
4825 CV_WAIT(&call->cv_tq, &call->lock);
4826 #else /* RX_ENABLE_LOCKS */
4827 osi_rxSleep(&call->tq);
4828 #endif /* RX_ENABLE_LOCKS */
4830 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4831 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4832 call->flags |= RX_CALL_FAST_RECOVER;
4833 if (peer->maxDgramPackets > 1) {
4834 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4836 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4838 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4839 call->nDgramPackets = 1;
4841 call->nextCwind = 1;
4844 MUTEX_ENTER(&peer->peer_lock);
4845 peer->MTU = call->MTU;
4846 peer->cwind = call->cwind;
4847 peer->nDgramPackets = 1;
4849 call->congestSeq = peer->congestSeq;
4850 MUTEX_EXIT(&peer->peer_lock);
4851 /* Clear retry times on packets. Otherwise, it's possible for
4852 * some packets in the queue to force resends at rates faster
4853 * than recovery rates.
4855 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4856 if (!(p->flags & RX_PKTFLAG_ACKED)) {
4857 clock_Zero(&p->retryTime);
4862 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4863 MUTEX_ENTER(&rx_stats_mutex);
4864 rx_tq_debug.rxi_start_in_error ++;
4865 MUTEX_EXIT(&rx_stats_mutex);
4870 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4871 /* Get clock to compute the re-transmit time for any packets
4872 * in this burst. Note, if we back off, it's reasonable to
4873 * back off all of the packets in the same manner, even if
4874 * some of them have been retransmitted more times than more
4875 * recent additions */
4876 clock_GetTime(&now);
4877 retryTime = now; /* initialize before use */
4878 MUTEX_ENTER(&peer->peer_lock);
4879 clock_Add(&retryTime, &peer->timeout);
4880 MUTEX_EXIT(&peer->peer_lock);
4882 /* Send (or resend) any packets that need it, subject to
4883 * window restrictions and congestion burst control
4884 * restrictions. Ask for an ack on the last packet sent in
4885 * this burst. For now, we're relying upon the window being
4886 * considerably bigger than the largest number of packets that
4887 * are typically sent at once by one initial call to
4888 * rxi_Start. This is probably bogus (perhaps we should ask
4889 * for an ack when we're half way through the current
4890 * window?). Also, for non file transfer applications, this
4891 * may end up asking for an ack for every packet. Bogus. XXXX
4894 * But check whether we're here recursively, and let the other guy
4897 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4898 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4899 call->flags |= RX_CALL_TQ_BUSY;
4901 call->flags &= ~RX_CALL_NEED_START;
4902 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4904 maxXmitPackets = MIN(call->twind, call->cwind);
4905 xmitList = (struct rx_packet **)
4906 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4907 if (xmitList == NULL)
4908 osi_Panic("rxi_Start, failed to allocate xmit list");
4909 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4910 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4911 /* We shouldn't be sending packets if a thread is waiting
4912 * to initiate congestion recovery */
4915 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4916 /* Only send one packet during fast recovery */
4919 if ((p->flags & RX_PKTFLAG_FREE) ||
4920 (!queue_IsEnd(&call->tq, nxp)
4921 && (nxp->flags & RX_PKTFLAG_FREE)) ||
4922 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4923 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4924 osi_Panic("rxi_Start: xmit queue clobbered");
4926 if (p->flags & RX_PKTFLAG_ACKED) {
4927 MUTEX_ENTER(&rx_stats_mutex);
4928 rx_stats.ignoreAckedPacket++;
4929 MUTEX_EXIT(&rx_stats_mutex);
4930 continue; /* Ignore this packet if it has been acknowledged */
4933 /* Turn off all flags except these ones, which are the same
4934 * on each transmission */
4935 p->header.flags &= RX_PRESET_FLAGS;
4937 if (p->header.seq >= call->tfirst +
4938 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4939 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4940 /* Note: if we're waiting for more window space, we can
4941 * still send retransmits; hence we don't return here, but
4942 * break out to schedule a retransmit event */
4943 dpf(("call %d waiting for window", *(call->callNumber)));
4947 /* Transmit the packet if it needs to be sent. */
4948 if (!clock_Lt(&now, &p->retryTime)) {
4949 if (nXmitPackets == maxXmitPackets) {
4950 osi_Panic("rxi_Start: xmit list overflowed");
4952 xmitList[nXmitPackets++] = p;
4956 /* xmitList now hold pointers to all of the packets that are
4957 * ready to send. Now we loop to send the packets */
4958 if (nXmitPackets > 0) {
4959 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4960 &now, &retryTime, resending);
4962 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4964 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4966 * TQ references no longer protected by this flag; they must remain
4967 * protected by the global lock.
4969 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4970 call->flags &= ~RX_CALL_TQ_BUSY;
4971 if (call->flags & RX_CALL_TQ_WAIT) {
4972 call->flags &= ~RX_CALL_TQ_WAIT;
4973 #ifdef RX_ENABLE_LOCKS
4974 CV_BROADCAST(&call->cv_tq);
4975 #else /* RX_ENABLE_LOCKS */
4976 osi_rxWakeup(&call->tq);
4977 #endif /* RX_ENABLE_LOCKS */
4982 /* We went into the error state while sending packets. Now is
4983 * the time to reset the call. This will also inform the using
4984 * process that the call is in an error state.
4986 MUTEX_ENTER(&rx_stats_mutex);
4987 rx_tq_debug.rxi_start_aborted ++;
4988 MUTEX_EXIT(&rx_stats_mutex);
4989 call->flags &= ~RX_CALL_TQ_BUSY;
4990 if (call->flags & RX_CALL_TQ_WAIT) {
4991 call->flags &= ~RX_CALL_TQ_WAIT;
4992 #ifdef RX_ENABLE_LOCKS
4993 CV_BROADCAST(&call->cv_tq);
4994 #else /* RX_ENABLE_LOCKS */
4995 osi_rxWakeup(&call->tq);
4996 #endif /* RX_ENABLE_LOCKS */
4998 rxi_CallError(call, call->error);
5001 #ifdef RX_ENABLE_LOCKS
5002 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
5003 register int missing;
5004 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
5005 /* Some packets have received acks. If they all have, we can clear
5006 * the transmit queue.
5008 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5009 if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
5017 call->flags |= RX_CALL_TQ_CLEARME;
5019 #endif /* RX_ENABLE_LOCKS */
5020 /* Don't bother doing retransmits if the TQ is cleared. */
5021 if (call->flags & RX_CALL_TQ_CLEARME) {
5022 rxi_ClearTransmitQueue(call, 1);
5024 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5027 /* Always post a resend event, if there is anything in the
5028 * queue, and resend is possible. There should be at least
5029 * one unacknowledged packet in the queue ... otherwise none
5030 * of these packets should be on the queue in the first place.
5032 if (call->resendEvent) {
5033 /* Cancel the existing event and post a new one */
5034 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5037 /* The retry time is the retry time on the first unacknowledged
5038 * packet inside the current window */
5039 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5040 /* Don't set timers for packets outside the window */
5041 if (p->header.seq >= call->tfirst + call->twind) {
5045 if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
5047 retryTime = p->retryTime;
5052 /* Post a new event to re-run rxi_Start when retries may be needed */
5053 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
5054 #ifdef RX_ENABLE_LOCKS
5055 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
5056 call->resendEvent = rxevent_Post(&retryTime,
5058 (void *)call, (void *)istack);
5059 #else /* RX_ENABLE_LOCKS */
5060 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
5061 (void *)call, (void *)istack);
5062 #endif /* RX_ENABLE_LOCKS */
5065 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5066 } while (call->flags & RX_CALL_NEED_START);
5068 * TQ references no longer protected by this flag; they must remain
5069 * protected by the global lock.
5071 call->flags &= ~RX_CALL_TQ_BUSY;
5072 if (call->flags & RX_CALL_TQ_WAIT) {
5073 call->flags &= ~RX_CALL_TQ_WAIT;
5074 #ifdef RX_ENABLE_LOCKS
5075 CV_BROADCAST(&call->cv_tq);
5076 #else /* RX_ENABLE_LOCKS */
5077 osi_rxWakeup(&call->tq);
5078 #endif /* RX_ENABLE_LOCKS */
5081 call->flags |= RX_CALL_NEED_START;
5083 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5085 if (call->resendEvent) {
5086 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5091 /* Also adjusts the keep alive parameters for the call, to reflect
5092 * that we have just sent a packet (so keep alives aren't sent
5094 void rxi_Send(register struct rx_call *call, register struct rx_packet *p,
5097 register struct rx_connection *conn = call->conn;
5099 /* Stamp each packet with the user supplied status */
5100 p->header.userStatus = call->localStatus;
5102 /* Allow the security object controlling this call's security to
5103 * make any last-minute changes to the packet */
5104 RXS_SendPacket(conn->securityObject, call, p);
5106 /* Since we're about to send SOME sort of packet to the peer, it's
5107 * safe to nuke any scheduled end-of-packets ack */
5108 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5110 /* Actually send the packet, filling in more connection-specific fields */
5111 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5112 MUTEX_EXIT(&call->lock);
5113 rxi_SendPacket(conn, p, istack);
5114 MUTEX_ENTER(&call->lock);
5115 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5117 /* Update last send time for this call (for keep-alive
5118 * processing), and for the connection (so that we can discover
5119 * idle connections) */
5120 conn->lastSendTime = call->lastSendTime = clock_Sec();
5124 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5125 * that things are fine. Also called periodically to guarantee that nothing
5126 * falls through the cracks (e.g. (error + dally) connections have keepalive
5127 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5129 * haveCTLock Set if calling from rxi_ReapConnections
5131 #ifdef RX_ENABLE_LOCKS
5132 int rxi_CheckCall(register struct rx_call *call, int haveCTLock)
5133 #else /* RX_ENABLE_LOCKS */
5134 int rxi_CheckCall(register struct rx_call *call)
5135 #endif /* RX_ENABLE_LOCKS */
5137 register struct rx_connection *conn = call->conn;
5138 register struct rx_service *tservice;
5140 afs_uint32 deadTime;
5142 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5143 if (call->flags & RX_CALL_TQ_BUSY) {
5144 /* Call is active and will be reset by rxi_Start if it's
5145 * in an error state.
5150 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5151 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5152 ((afs_uint32)conn->peer->rtt >> 3) +
5153 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5155 /* These are computed to the second (+- 1 second). But that's
5156 * good enough for these values, which should be a significant
5157 * number of seconds. */
5158 if (now > (call->lastReceiveTime + deadTime)) {
5159 if (call->state == RX_STATE_ACTIVE) {
5160 rxi_CallError(call, RX_CALL_DEAD);
5164 #ifdef RX_ENABLE_LOCKS
5165 /* Cancel pending events */
5166 rxevent_Cancel(call->delayedAckEvent, call,
5167 RX_CALL_REFCOUNT_DELAY);
5168 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5169 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5170 if (call->refCount == 0) {
5171 rxi_FreeCall(call, haveCTLock);
5175 #else /* RX_ENABLE_LOCKS */
5178 #endif /* RX_ENABLE_LOCKS */
5180 /* Non-active calls are destroyed if they are not responding
5181 * to pings; active calls are simply flagged in error, so the
5182 * attached process can die reasonably gracefully. */
5184 /* see if we have a non-activity timeout */
5185 tservice = conn->service;
5186 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5187 && tservice->idleDeadTime
5188 && ((call->startWait + tservice->idleDeadTime) < now)) {
5189 if (call->state == RX_STATE_ACTIVE) {
5190 rxi_CallError(call, RX_CALL_TIMEOUT);
5194 /* see if we have a hard timeout */
5195 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5196 if (call->state == RX_STATE_ACTIVE)
5197 rxi_CallError(call, RX_CALL_TIMEOUT);
5204 /* When a call is in progress, this routine is called occasionally to
5205 * make sure that some traffic has arrived (or been sent to) the peer.
5206 * If nothing has arrived in a reasonable amount of time, the call is
5207 * declared dead; if nothing has been sent for a while, we send a
5208 * keep-alive packet (if we're actually trying to keep the call alive)
5210 void rxi_KeepAliveEvent(struct rxevent *event, register struct rx_call *call,
5213 struct rx_connection *conn;
5216 MUTEX_ENTER(&call->lock);
5217 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5218 if (event == call->keepAliveEvent)
5219 call->keepAliveEvent = NULL;
5222 #ifdef RX_ENABLE_LOCKS
5223 if(rxi_CheckCall(call, 0)) {
5224 MUTEX_EXIT(&call->lock);
5227 #else /* RX_ENABLE_LOCKS */
5228 if (rxi_CheckCall(call)) return;
5229 #endif /* RX_ENABLE_LOCKS */
5231 /* Don't try to keep alive dallying calls */
5232 if (call->state == RX_STATE_DALLY) {
5233 MUTEX_EXIT(&call->lock);
5238 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5239 /* Don't try to send keepalives if there is unacknowledged data */
5240 /* the rexmit code should be good enough, this little hack
5241 * doesn't quite work XXX */
5242 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5244 rxi_ScheduleKeepAliveEvent(call);
5245 MUTEX_EXIT(&call->lock);
5249 void rxi_ScheduleKeepAliveEvent(register struct rx_call *call)
5251 if (!call->keepAliveEvent) {
5253 clock_GetTime(&when);
5254 when.sec += call->conn->secondsUntilPing;
5255 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5256 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5260 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5261 void rxi_KeepAliveOn(register struct rx_call *call)
5263 /* Pretend last packet received was received now--i.e. if another
5264 * packet isn't received within the keep alive time, then the call
5265 * will die; Initialize last send time to the current time--even
5266 * if a packet hasn't been sent yet. This will guarantee that a
5267 * keep-alive is sent within the ping time */
5268 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5269 rxi_ScheduleKeepAliveEvent(call);
5272 /* This routine is called to send connection abort messages
5273 * that have been delayed to throttle looping clients. */
5274 void rxi_SendDelayedConnAbort(struct rxevent *event, register struct rx_connection *conn,
5278 struct rx_packet *packet;
5280 MUTEX_ENTER(&conn->conn_data_lock);
5281 conn->delayedAbortEvent = NULL;
5282 error = htonl(conn->error);
5284 MUTEX_EXIT(&conn->conn_data_lock);
5285 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5287 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5288 RX_PACKET_TYPE_ABORT, (char *)&error,
5290 rxi_FreePacket(packet);
5294 /* This routine is called to send call abort messages
5295 * that have been delayed to throttle looping clients. */
5296 void rxi_SendDelayedCallAbort(struct rxevent *event, register struct rx_call *call,
5300 struct rx_packet *packet;
5302 MUTEX_ENTER(&call->lock);
5303 call->delayedAbortEvent = NULL;
5304 error = htonl(call->error);
5306 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5308 packet = rxi_SendSpecial(call, call->conn, packet,
5309 RX_PACKET_TYPE_ABORT, (char *)&error,
5311 rxi_FreePacket(packet);
5313 MUTEX_EXIT(&call->lock);
5316 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5317 * seconds) to ask the client to authenticate itself. The routine
5318 * issues a challenge to the client, which is obtained from the
5319 * security object associated with the connection */
5320 void rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn,
5323 int tries = (int) atries;
5324 conn->challengeEvent = NULL;
5325 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5326 register struct rx_packet *packet;
5330 /* We've failed to authenticate for too long.
5331 * Reset any calls waiting for authentication;
5332 * they are all in RX_STATE_PRECALL.
5336 MUTEX_ENTER(&conn->conn_call_lock);
5337 for (i=0; i<RX_MAXCALLS; i++) {
5338 struct rx_call *call = conn->call[i];
5340 MUTEX_ENTER(&call->lock);
5341 if (call->state == RX_STATE_PRECALL) {
5342 rxi_CallError(call, RX_CALL_DEAD);
5343 rxi_SendCallAbort(call, NULL, 0, 0);
5345 MUTEX_EXIT(&call->lock);
5348 MUTEX_EXIT(&conn->conn_call_lock);
5352 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5354 /* If there's no packet available, do this later. */
5355 RXS_GetChallenge(conn->securityObject, conn, packet);
5356 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5357 RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
5358 rxi_FreePacket(packet);
5360 clock_GetTime(&when);
5361 when.sec += RX_CHALLENGE_TIMEOUT;
5362 conn->challengeEvent =
5363 rxevent_Post(&when, rxi_ChallengeEvent, conn, (void *) (tries-1));
5367 /* Call this routine to start requesting the client to authenticate
5368 * itself. This will continue until authentication is established,
5369 * the call times out, or an invalid response is returned. The
5370 * security object associated with the connection is asked to create
5371 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5372 * defined earlier. */
5373 void rxi_ChallengeOn(register struct rx_connection *conn)
5375 if (!conn->challengeEvent) {
5376 RXS_CreateChallenge(conn->securityObject, conn);
5377 rxi_ChallengeEvent(NULL, conn, (void *) RX_CHALLENGE_MAXTRIES);
5382 /* Compute round trip time of the packet provided, in *rttp.
5385 /* rxi_ComputeRoundTripTime is called with peer locked. */
5386 /* sentp and/or peer may be null */
5387 void rxi_ComputeRoundTripTime(register struct rx_packet *p,
5388 register struct clock *sentp, register struct rx_peer *peer)
5390 struct clock thisRtt, *rttp = &thisRtt;
5392 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5393 /* making year 2038 bugs to get this running now - stroucki */
5394 struct timeval temptime;
5396 register int rtt_timeout;
5398 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5399 /* yet again. This was the worst Heisenbug of the port - stroucki */
5400 clock_GetTime(&temptime);
5401 rttp->sec=(afs_int32)temptime.tv_sec;
5402 rttp->usec=(afs_int32)temptime.tv_usec;
5404 clock_GetTime(rttp);
5406 if (clock_Lt(rttp, sentp)) {
5408 return; /* somebody set the clock back, don't count this time. */
5410 clock_Sub(rttp, sentp);
5411 MUTEX_ENTER(&rx_stats_mutex);
5412 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5413 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5414 if (rttp->sec > 60) {
5415 MUTEX_EXIT(&rx_stats_mutex);
5416 return; /* somebody set the clock ahead */
5418 rx_stats.maxRtt = *rttp;
5420 clock_Add(&rx_stats.totalRtt, rttp);
5421 rx_stats.nRttSamples++;
5422 MUTEX_EXIT(&rx_stats_mutex);
5424 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5426 /* Apply VanJacobson round-trip estimations */
5431 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5432 * srtt is stored as fixed point with 3 bits after the binary
5433 * point (i.e., scaled by 8). The following magic is
5434 * equivalent to the smoothing algorithm in rfc793 with an
5435 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5436 * srtt*8 = srtt*8 + rtt - srtt
5437 * srtt = srtt + rtt/8 - srtt/8
5440 delta = MSEC(rttp) - (peer->rtt >> 3);
5444 * We accumulate a smoothed rtt variance (actually, a smoothed
5445 * mean difference), then set the retransmit timer to smoothed
5446 * rtt + 4 times the smoothed variance (was 2x in van's original
5447 * paper, but 4x works better for me, and apparently for him as
5449 * rttvar is stored as
5450 * fixed point with 2 bits after the binary point (scaled by
5451 * 4). The following is equivalent to rfc793 smoothing with
5452 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5453 * replaces rfc793's wired-in beta.
5454 * dev*4 = dev*4 + (|actual - expected| - dev)
5460 delta -= (peer->rtt_dev >> 2);
5461 peer->rtt_dev += delta;
5464 /* I don't have a stored RTT so I start with this value. Since I'm
5465 * probably just starting a call, and will be pushing more data down
5466 * this, I expect congestion to increase rapidly. So I fudge a
5467 * little, and I set deviance to half the rtt. In practice,
5468 * deviance tends to approach something a little less than
5469 * half the smoothed rtt. */
5470 peer->rtt = (MSEC(rttp) << 3) + 8;
5471 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5473 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5474 * the other of these connections is usually in a user process, and can
5475 * be switched and/or swapped out. So on fast, reliable networks, the
5476 * timeout would otherwise be too short.
5478 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5479 clock_Zero(&(peer->timeout));
5480 clock_Addmsec(&(peer->timeout), rtt_timeout);
5482 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5483 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5484 (peer->timeout.sec),(peer->timeout.usec)) );
5488 /* Find all server connections that have not been active for a long time, and
5490 void rxi_ReapConnections(void)
5493 clock_GetTime(&now);
5495 /* Find server connection structures that haven't been used for
5496 * greater than rx_idleConnectionTime */
5497 { struct rx_connection **conn_ptr, **conn_end;
5498 int i, havecalls = 0;
5499 MUTEX_ENTER(&rx_connHashTable_lock);
5500 for (conn_ptr = &rx_connHashTable[0],
5501 conn_end = &rx_connHashTable[rx_hashTableSize];
5502 conn_ptr < conn_end; conn_ptr++) {
5503 struct rx_connection *conn, *next;
5504 struct rx_call *call;
5508 for (conn = *conn_ptr; conn; conn = next) {
5509 /* XXX -- Shouldn't the connection be locked? */
5512 for(i=0;i<RX_MAXCALLS;i++) {
5513 call = conn->call[i];
5516 MUTEX_ENTER(&call->lock);
5517 #ifdef RX_ENABLE_LOCKS
5518 result = rxi_CheckCall(call, 1);
5519 #else /* RX_ENABLE_LOCKS */
5520 result = rxi_CheckCall(call);
5521 #endif /* RX_ENABLE_LOCKS */
5522 MUTEX_EXIT(&call->lock);
5524 /* If CheckCall freed the call, it might
5525 * have destroyed the connection as well,
5526 * which screws up the linked lists.
5532 if (conn->type == RX_SERVER_CONNECTION) {
5533 /* This only actually destroys the connection if
5534 * there are no outstanding calls */
5535 MUTEX_ENTER(&conn->conn_data_lock);
5536 if (!havecalls && !conn->refCount &&
5537 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5538 conn->refCount++; /* it will be decr in rx_DestroyConn */
5539 MUTEX_EXIT(&conn->conn_data_lock);
5540 #ifdef RX_ENABLE_LOCKS
5541 rxi_DestroyConnectionNoLock(conn);
5542 #else /* RX_ENABLE_LOCKS */
5543 rxi_DestroyConnection(conn);
5544 #endif /* RX_ENABLE_LOCKS */
5546 #ifdef RX_ENABLE_LOCKS
5548 MUTEX_EXIT(&conn->conn_data_lock);
5550 #endif /* RX_ENABLE_LOCKS */
5554 #ifdef RX_ENABLE_LOCKS
5555 while (rx_connCleanup_list) {
5556 struct rx_connection *conn;
5557 conn = rx_connCleanup_list;
5558 rx_connCleanup_list = rx_connCleanup_list->next;
5559 MUTEX_EXIT(&rx_connHashTable_lock);
5560 rxi_CleanupConnection(conn);
5561 MUTEX_ENTER(&rx_connHashTable_lock);
5563 MUTEX_EXIT(&rx_connHashTable_lock);
5564 #endif /* RX_ENABLE_LOCKS */
5567 /* Find any peer structures that haven't been used (haven't had an
5568 * associated connection) for greater than rx_idlePeerTime */
5569 { struct rx_peer **peer_ptr, **peer_end;
5571 MUTEX_ENTER(&rx_rpc_stats);
5572 MUTEX_ENTER(&rx_peerHashTable_lock);
5573 for (peer_ptr = &rx_peerHashTable[0],
5574 peer_end = &rx_peerHashTable[rx_hashTableSize];
5575 peer_ptr < peer_end; peer_ptr++) {
5576 struct rx_peer *peer, *next, *prev;
5577 for (prev = peer = *peer_ptr; peer; peer = next) {
5579 code = MUTEX_TRYENTER(&peer->peer_lock);
5580 if ((code) && (peer->refCount == 0)
5581 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5582 rx_interface_stat_p rpc_stat, nrpc_stat;
5584 MUTEX_EXIT(&peer->peer_lock);
5585 MUTEX_DESTROY(&peer->peer_lock);
5586 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5587 rx_interface_stat)) {
5588 unsigned int num_funcs;
5589 if (!rpc_stat) break;
5590 queue_Remove(&rpc_stat->queue_header);
5591 queue_Remove(&rpc_stat->all_peers);
5592 num_funcs = rpc_stat->stats[0].func_total;
5593 space = sizeof(rx_interface_stat_t) +
5594 rpc_stat->stats[0].func_total *
5595 sizeof(rx_function_entry_v1_t);
5597 rxi_Free(rpc_stat, space);
5598 rxi_rpc_peer_stat_cnt -= num_funcs;
5601 MUTEX_ENTER(&rx_stats_mutex);
5602 rx_stats.nPeerStructs--;
5603 MUTEX_EXIT(&rx_stats_mutex);
5604 if (prev == *peer_ptr) {
5613 MUTEX_EXIT(&peer->peer_lock);
5619 MUTEX_EXIT(&rx_peerHashTable_lock);
5620 MUTEX_EXIT(&rx_rpc_stats);
5623 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5624 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5625 GC, just below. Really, we shouldn't have to keep moving packets from
5626 one place to another, but instead ought to always know if we can
5627 afford to hold onto a packet in its particular use. */
5628 MUTEX_ENTER(&rx_freePktQ_lock);
5629 if (rx_waitingForPackets) {
5630 rx_waitingForPackets = 0;
5631 #ifdef RX_ENABLE_LOCKS
5632 CV_BROADCAST(&rx_waitingForPackets_cv);
5634 osi_rxWakeup(&rx_waitingForPackets);
5637 MUTEX_EXIT(&rx_freePktQ_lock);
5639 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5640 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5644 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5645 * rx.h is sort of strange this is better. This is called with a security
5646 * object before it is discarded. Each connection using a security object has
5647 * its own refcount to the object so it won't actually be freed until the last
5648 * connection is destroyed.
5650 * This is the only rxs module call. A hold could also be written but no one
5653 int rxs_Release (struct rx_securityClass *aobj)
5655 return RXS_Close (aobj);
5659 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5660 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5661 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5662 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5664 /* Adjust our estimate of the transmission rate to this peer, given
5665 * that the packet p was just acked. We can adjust peer->timeout and
5666 * call->twind. Pragmatically, this is called
5667 * only with packets of maximal length.
5668 * Called with peer and call locked.
5671 static void rxi_ComputeRate(register struct rx_peer *peer,
5672 register struct rx_call *call, struct rx_packet *p,
5673 struct rx_packet *ackp, u_char ackReason)
5675 afs_int32 xferSize, xferMs;
5676 register afs_int32 minTime;
5679 /* Count down packets */
5680 if (peer->rateFlag > 0) peer->rateFlag--;
5681 /* Do nothing until we're enabled */
5682 if (peer->rateFlag != 0) return;
5683 if (!call->conn) return;
5685 /* Count only when the ack seems legitimate */
5686 switch (ackReason) {
5687 case RX_ACK_REQUESTED:
5688 xferSize = p->length + RX_HEADER_SIZE +
5689 call->conn->securityMaxTrailerSize;
5693 case RX_ACK_PING_RESPONSE:
5694 if (p) /* want the response to ping-request, not data send */
5696 clock_GetTime(&newTO);
5697 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5698 clock_Sub(&newTO, &call->pingRequestTime);
5699 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5703 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5710 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5711 ntohl(peer->host), ntohs(peer->port),
5712 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5713 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5716 /* Track only packets that are big enough. */
5717 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5721 /* absorb RTT data (in milliseconds) for these big packets */
5722 if (peer->smRtt == 0) {
5723 peer->smRtt = xferMs;
5725 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5726 if (!peer->smRtt) peer->smRtt = 1;
5729 if (peer->countDown) {
5733 peer->countDown = 10; /* recalculate only every so often */
5735 /* In practice, we can measure only the RTT for full packets,
5736 * because of the way Rx acks the data that it receives. (If it's
5737 * smaller than a full packet, it often gets implicitly acked
5738 * either by the call response (from a server) or by the next call
5739 * (from a client), and either case confuses transmission times
5740 * with processing times.) Therefore, replace the above
5741 * more-sophisticated processing with a simpler version, where the
5742 * smoothed RTT is kept for full-size packets, and the time to
5743 * transmit a windowful of full-size packets is simply RTT *
5744 * windowSize. Again, we take two steps:
5745 - ensure the timeout is large enough for a single packet's RTT;
5746 - ensure that the window is small enough to fit in the desired timeout.*/
5748 /* First, the timeout check. */
5749 minTime = peer->smRtt;
5750 /* Get a reasonable estimate for a timeout period */
5752 newTO.sec = minTime / 1000;
5753 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5755 /* Increase the timeout period so that we can always do at least
5756 * one packet exchange */
5757 if (clock_Gt(&newTO, &peer->timeout)) {
5759 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5760 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5761 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5764 peer->timeout = newTO;
5767 /* Now, get an estimate for the transmit window size. */
5768 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5769 /* Now, convert to the number of full packets that could fit in a
5770 * reasonable fraction of that interval */
5771 minTime /= (peer->smRtt << 1);
5772 xferSize = minTime; /* (make a copy) */
5774 /* Now clamp the size to reasonable bounds. */
5775 if (minTime <= 1) minTime = 1;
5776 else if (minTime > rx_Window) minTime = rx_Window;
5777 /* if (minTime != peer->maxWindow) {
5778 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5779 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5780 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5782 peer->maxWindow = minTime;
5783 elide... call->twind = minTime;
5787 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5788 * Discern this by calculating the timeout necessary for rx_Window
5790 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5791 /* calculate estimate for transmission interval in milliseconds */
5792 minTime = rx_Window * peer->smRtt;
5793 if (minTime < 1000) {
5794 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5795 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5796 peer->timeout.usec, peer->smRtt,
5799 newTO.sec = 0; /* cut back on timeout by half a second */
5800 newTO.usec = 500000;
5801 clock_Sub(&peer->timeout, &newTO);
5806 } /* end of rxi_ComputeRate */
5807 #endif /* ADAPT_WINDOW */
5815 /* Don't call this debugging routine directly; use dpf */
5817 rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8, int a9, int a10,
5818 int a11, int a12, int a13, int a14, int a15)
5821 clock_GetTime(&now);
5822 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5823 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5830 * This function is used to process the rx_stats structure that is local
5831 * to a process as well as an rx_stats structure received from a remote
5832 * process (via rxdebug). Therefore, it needs to do minimal version
5835 void rx_PrintTheseStats (FILE *file, struct rx_stats *s, int size,
5836 afs_int32 freePackets, char version)
5840 if (size != sizeof(struct rx_stats)) {
5842 "Unexpected size of stats structure: was %d, expected %d\n",
5843 size, sizeof(struct rx_stats));
5847 "rx stats: free packets %d, allocs %d, ",
5851 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5853 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5854 s->receivePktAllocFailures,
5855 s->receiveCbufPktAllocFailures,
5856 s->sendPktAllocFailures,
5857 s->sendCbufPktAllocFailures,
5858 s->specialPktAllocFailures);
5861 "alloc-failures(rcv %d,send %d,ack %d)\n",
5862 s->receivePktAllocFailures,
5863 s->sendPktAllocFailures,
5864 s->specialPktAllocFailures);
5869 "bogusReads %d (last from host %x), "
5875 s->bogusPacketOnRead,
5878 s->noPacketBuffersOnRead,
5882 fprintf(file, " packets read: ");
5883 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5889 fprintf(file, "\n");
5892 " other read counters: data %d, "
5900 s->spuriousPacketsRead,
5901 s->ignorePacketDally);
5903 fprintf(file, " packets sent: ");
5904 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5910 fprintf(file, "\n");
5913 " other send counters: ack %d, "
5914 "data %d (not resends), "
5917 "acked&ignored %d\n",
5920 s->dataPacketsReSent,
5921 s->dataPacketsPushed,
5922 s->ignoreAckedPacket);
5925 " \t(these should be small) sendFailed %d, "
5928 (int) s->fatalErrors);
5930 if (s->nRttSamples) {
5932 " Average rtt is %0.3f, with %d samples\n",
5933 clock_Float(&s->totalRtt)/s->nRttSamples,
5937 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5938 clock_Float(&s->minRtt),
5939 clock_Float(&s->maxRtt));
5943 " %d server connections, "
5944 "%d client connections, "
5947 "%d free call structs\n",
5952 s->nFreeCallStructs);
5954 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5956 " %d clock updates\n",
5962 /* for backward compatibility */
5963 void rx_PrintStats(FILE *file)
5965 MUTEX_ENTER(&rx_stats_mutex);
5966 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5967 MUTEX_EXIT(&rx_stats_mutex);
5970 void rx_PrintPeerStats(FILE *file, struct rx_peer *peer)
5975 "burst wait %u.%d.\n",
5978 (int) peer->burstSize,
5979 (int) peer->burstWait.sec,
5980 (int) peer->burstWait.usec);
5984 "retry time %u.%06d, "
5988 (int) peer->timeout.sec,
5989 (int) peer->timeout.usec,
5995 "max in packet skew %d, "
5996 "max out packet skew %d\n",
5998 (int) peer->inPacketSkew,
5999 (int) peer->outPacketSkew);
6002 #ifdef AFS_PTHREAD_ENV
6004 * This mutex protects the following static variables:
6008 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
6009 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
6011 #define LOCK_RX_DEBUG
6012 #define UNLOCK_RX_DEBUG
6013 #endif /* AFS_PTHREAD_ENV */
6015 static int MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr,
6016 afs_uint16 remotePort, u_char type, void *inputData, size_t inputLength,
6017 void *outputData, size_t outputLength)
6019 static afs_int32 counter = 100;
6021 struct rx_header theader;
6023 register afs_int32 code;
6025 struct sockaddr_in taddr, faddr;
6030 endTime = time(0) + 20; /* try for 20 seconds */
6034 tp = &tbuffer[sizeof(struct rx_header)];
6035 taddr.sin_family = AF_INET;
6036 taddr.sin_port = remotePort;
6037 taddr.sin_addr.s_addr = remoteAddr;
6038 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
6039 taddr.sin_len = sizeof(struct sockaddr_in);
6042 memset(&theader, 0, sizeof(theader));
6043 theader.epoch = htonl(999);
6045 theader.callNumber = htonl(counter);
6048 theader.type = type;
6049 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
6050 theader.serviceId = 0;
6052 memcpy(tbuffer, &theader, sizeof(theader));
6053 memcpy(tp, inputData, inputLength);
6054 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
6055 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
6057 /* see if there's a packet available */
6059 FD_SET(socket, &imask);
6062 code = select(socket+1, &imask, 0, 0, &tv);
6064 /* now receive a packet */
6065 faddrLen = sizeof(struct sockaddr_in);
6066 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6067 (struct sockaddr *) &faddr, &faddrLen);
6069 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6070 if (counter == ntohl(theader.callNumber)) break;
6073 /* see if we've timed out */
6074 if (endTime < time(0)) return -1;
6076 code -= sizeof(struct rx_header);
6077 if (code > outputLength) code = outputLength;
6078 memcpy(outputData, tp, code);
6082 afs_int32 rx_GetServerDebug(osi_socket socket, afs_uint32 remoteAddr,
6083 afs_uint16 remotePort, struct rx_debugStats *stat, afs_uint32 *supportedValues)
6085 struct rx_debugIn in;
6088 *supportedValues = 0;
6089 in.type = htonl(RX_DEBUGI_GETSTATS);
6092 rc = MakeDebugCall(socket,
6095 RX_PACKET_TYPE_DEBUG,
6102 * If the call was successful, fixup the version and indicate
6103 * what contents of the stat structure are valid.
6104 * Also do net to host conversion of fields here.
6108 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6109 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6111 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6112 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6114 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6115 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6117 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6118 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6120 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6121 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6123 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6124 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6126 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6127 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6130 stat->nFreePackets = ntohl(stat->nFreePackets);
6131 stat->packetReclaims = ntohl(stat->packetReclaims);
6132 stat->callsExecuted = ntohl(stat->callsExecuted);
6133 stat->nWaiting = ntohl(stat->nWaiting);
6134 stat->idleThreads = ntohl(stat->idleThreads);
6140 afs_int32 rx_GetServerStats(osi_socket socket, afs_uint32 remoteAddr,
6141 afs_uint16 remotePort, struct rx_stats *stat, afs_uint32 *supportedValues)
6143 struct rx_debugIn in;
6144 afs_int32 *lp = (afs_int32 *) stat;
6149 * supportedValues is currently unused, but added to allow future
6150 * versioning of this function.
6153 *supportedValues = 0;
6154 in.type = htonl(RX_DEBUGI_RXSTATS);
6156 memset(stat, 0, sizeof(*stat));
6158 rc = MakeDebugCall(socket,
6161 RX_PACKET_TYPE_DEBUG,
6170 * Do net to host conversion here
6173 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6181 afs_int32 rx_GetServerVersion(osi_socket socket, afs_uint32 remoteAddr,
6182 afs_uint16 remotePort, size_t version_length, char *version)
6185 return MakeDebugCall(socket,
6188 RX_PACKET_TYPE_VERSION,
6195 afs_int32 rx_GetServerConnections(osi_socket socket, afs_uint32 remoteAddr,
6196 afs_uint16 remotePort, afs_int32 *nextConnection, int allConnections,
6197 afs_uint32 debugSupportedValues, struct rx_debugConn *conn, afs_uint32 *supportedValues)
6199 struct rx_debugIn in;
6204 * supportedValues is currently unused, but added to allow future
6205 * versioning of this function.
6208 *supportedValues = 0;
6209 if (allConnections) {
6210 in.type = htonl(RX_DEBUGI_GETALLCONN);
6212 in.type = htonl(RX_DEBUGI_GETCONN);
6214 in.index = htonl(*nextConnection);
6215 memset(conn, 0, sizeof(*conn));
6217 rc = MakeDebugCall(socket,
6220 RX_PACKET_TYPE_DEBUG,
6227 *nextConnection += 1;
6230 * Convert old connection format to new structure.
6233 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6234 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6235 #define MOVEvL(a) (conn->a = vL->a)
6237 /* any old or unrecognized version... */
6238 for (i=0;i<RX_MAXCALLS;i++) {
6239 MOVEvL(callState[i]);
6240 MOVEvL(callMode[i]);
6241 MOVEvL(callFlags[i]);
6242 MOVEvL(callOther[i]);
6244 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6245 MOVEvL(secStats.type);
6246 MOVEvL(secStats.level);
6247 MOVEvL(secStats.flags);
6248 MOVEvL(secStats.expires);
6249 MOVEvL(secStats.packetsReceived);
6250 MOVEvL(secStats.packetsSent);
6251 MOVEvL(secStats.bytesReceived);
6252 MOVEvL(secStats.bytesSent);
6257 * Do net to host conversion here
6259 * I don't convert host or port since we are most likely
6260 * going to want these in NBO.
6262 conn->cid = ntohl(conn->cid);
6263 conn->serial = ntohl(conn->serial);
6264 for(i=0;i<RX_MAXCALLS;i++) {
6265 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6267 conn->error = ntohl(conn->error);
6268 conn->secStats.flags = ntohl(conn->secStats.flags);
6269 conn->secStats.expires = ntohl(conn->secStats.expires);
6270 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6271 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6272 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6273 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6274 conn->epoch = ntohl(conn->epoch);
6275 conn->natMTU = ntohl(conn->natMTU);
6281 afs_int32 rx_GetServerPeers(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
6282 afs_int32 *nextPeer, afs_uint32 debugSupportedValues, struct rx_debugPeer *peer,
6283 afs_uint32 *supportedValues)
6285 struct rx_debugIn in;
6289 * supportedValues is currently unused, but added to allow future
6290 * versioning of this function.
6293 *supportedValues = 0;
6294 in.type = htonl(RX_DEBUGI_GETPEER);
6295 in.index = htonl(*nextPeer);
6296 memset(peer, 0, sizeof(*peer));
6298 rc = MakeDebugCall(socket,
6301 RX_PACKET_TYPE_DEBUG,
6311 * Do net to host conversion here
6313 * I don't convert host or port since we are most likely
6314 * going to want these in NBO.
6316 peer->ifMTU = ntohs(peer->ifMTU);
6317 peer->idleWhen = ntohl(peer->idleWhen);
6318 peer->refCount = ntohs(peer->refCount);
6319 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6320 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6321 peer->rtt = ntohl(peer->rtt);
6322 peer->rtt_dev = ntohl(peer->rtt_dev);
6323 peer->timeout.sec = ntohl(peer->timeout.sec);
6324 peer->timeout.usec = ntohl(peer->timeout.usec);
6325 peer->nSent = ntohl(peer->nSent);
6326 peer->reSends = ntohl(peer->reSends);
6327 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6328 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6329 peer->rateFlag = ntohl(peer->rateFlag);
6330 peer->natMTU = ntohs(peer->natMTU);
6331 peer->maxMTU = ntohs(peer->maxMTU);
6332 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6333 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6334 peer->MTU = ntohs(peer->MTU);
6335 peer->cwind = ntohs(peer->cwind);
6336 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6337 peer->congestSeq = ntohs(peer->congestSeq);
6338 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6339 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6340 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6341 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6346 #endif /* RXDEBUG */
6348 void shutdown_rx(void)
6350 struct rx_serverQueueEntry *np;
6352 register struct rx_call *call;
6353 register struct rx_serverQueueEntry *sq;
6356 if (rxinit_status == 1) {
6358 return; /* Already shutdown. */
6363 #ifndef AFS_PTHREAD_ENV
6364 FD_ZERO(&rx_selectMask);
6365 #endif /* AFS_PTHREAD_ENV */
6366 rxi_dataQuota = RX_MAX_QUOTA;
6367 #ifndef AFS_PTHREAD_ENV
6369 #endif /* AFS_PTHREAD_ENV */
6372 #ifndef AFS_PTHREAD_ENV
6373 #ifndef AFS_USE_GETTIMEOFDAY
6375 #endif /* AFS_USE_GETTIMEOFDAY */
6376 #endif /* AFS_PTHREAD_ENV */
6378 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6379 call = queue_First(&rx_freeCallQueue, rx_call);
6381 rxi_Free(call, sizeof(struct rx_call));
6384 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6385 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6391 struct rx_peer **peer_ptr, **peer_end;
6392 for (peer_ptr = &rx_peerHashTable[0],
6393 peer_end = &rx_peerHashTable[rx_hashTableSize];
6394 peer_ptr < peer_end; peer_ptr++) {
6395 struct rx_peer *peer, *next;
6396 for (peer = *peer_ptr; peer; peer = next) {
6397 rx_interface_stat_p rpc_stat, nrpc_stat;
6399 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6400 rx_interface_stat)) {
6401 unsigned int num_funcs;
6402 if (!rpc_stat) break;
6403 queue_Remove(&rpc_stat->queue_header);
6404 queue_Remove(&rpc_stat->all_peers);
6405 num_funcs = rpc_stat->stats[0].func_total;
6406 space = sizeof(rx_interface_stat_t) +
6407 rpc_stat->stats[0].func_total *
6408 sizeof(rx_function_entry_v1_t);
6410 rxi_Free(rpc_stat, space);
6411 MUTEX_ENTER(&rx_rpc_stats);
6412 rxi_rpc_peer_stat_cnt -= num_funcs;
6413 MUTEX_EXIT(&rx_rpc_stats);
6417 MUTEX_ENTER(&rx_stats_mutex);
6418 rx_stats.nPeerStructs--;
6419 MUTEX_EXIT(&rx_stats_mutex);
6423 for (i = 0; i<RX_MAX_SERVICES; i++) {
6425 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6427 for (i = 0; i < rx_hashTableSize; i++) {
6428 register struct rx_connection *tc, *ntc;
6429 MUTEX_ENTER(&rx_connHashTable_lock);
6430 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6432 for (j = 0; j < RX_MAXCALLS; j++) {
6434 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6437 rxi_Free(tc, sizeof(*tc));
6439 MUTEX_EXIT(&rx_connHashTable_lock);
6442 MUTEX_ENTER(&freeSQEList_lock);
6444 while ((np = rx_FreeSQEList)) {
6445 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6446 MUTEX_DESTROY(&np->lock);
6447 rxi_Free(np, sizeof(*np));
6450 MUTEX_EXIT(&freeSQEList_lock);
6451 MUTEX_DESTROY(&freeSQEList_lock);
6452 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6453 MUTEX_DESTROY(&rx_connHashTable_lock);
6454 MUTEX_DESTROY(&rx_peerHashTable_lock);
6455 MUTEX_DESTROY(&rx_serverPool_lock);
6457 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6458 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6460 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6461 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6463 rxi_FreeAllPackets();
6465 MUTEX_ENTER(&rx_stats_mutex);
6466 rxi_dataQuota = RX_MAX_QUOTA;
6467 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6468 MUTEX_EXIT(&rx_stats_mutex);
6474 #ifdef RX_ENABLE_LOCKS
6475 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6477 if (!MUTEX_ISMINE(lockaddr))
6478 osi_Panic("Lock not held: %s", msg);
6480 #endif /* RX_ENABLE_LOCKS */
6485 * Routines to implement connection specific data.
6488 int rx_KeyCreate(rx_destructor_t rtn)
6491 MUTEX_ENTER(&rxi_keyCreate_lock);
6492 key = rxi_keyCreate_counter++;
6493 rxi_keyCreate_destructor = (rx_destructor_t *)
6494 realloc((void *)rxi_keyCreate_destructor,
6495 (key+1) * sizeof(rx_destructor_t));
6496 rxi_keyCreate_destructor[key] = rtn;
6497 MUTEX_EXIT(&rxi_keyCreate_lock);
6501 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6504 MUTEX_ENTER(&conn->conn_data_lock);
6505 if (!conn->specific) {
6506 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6507 for (i = 0 ; i < key ; i++)
6508 conn->specific[i] = NULL;
6509 conn->nSpecific = key+1;
6510 conn->specific[key] = ptr;
6511 } else if (key >= conn->nSpecific) {
6512 conn->specific = (void **)
6513 realloc(conn->specific,(key+1)*sizeof(void *));
6514 for (i = conn->nSpecific ; i < key ; i++)
6515 conn->specific[i] = NULL;
6516 conn->nSpecific = key+1;
6517 conn->specific[key] = ptr;
6519 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6520 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6521 conn->specific[key] = ptr;
6523 MUTEX_EXIT(&conn->conn_data_lock);
6526 void *rx_GetSpecific(struct rx_connection *conn, int key)
6529 MUTEX_ENTER(&conn->conn_data_lock);
6530 if (key >= conn->nSpecific)
6533 ptr = conn->specific[key];
6534 MUTEX_EXIT(&conn->conn_data_lock);
6538 #endif /* !KERNEL */
6541 * processStats is a queue used to store the statistics for the local
6542 * process. Its contents are similar to the contents of the rpcStats
6543 * queue on a rx_peer structure, but the actual data stored within
6544 * this queue contains totals across the lifetime of the process (assuming
6545 * the stats have not been reset) - unlike the per peer structures
6546 * which can come and go based upon the peer lifetime.
6549 static struct rx_queue processStats = {&processStats,&processStats};
6552 * peerStats is a queue used to store the statistics for all peer structs.
6553 * Its contents are the union of all the peer rpcStats queues.
6556 static struct rx_queue peerStats = {&peerStats,&peerStats};
6559 * rxi_monitor_processStats is used to turn process wide stat collection
6563 static int rxi_monitor_processStats = 0;
6566 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6569 static int rxi_monitor_peerStats = 0;
6572 * rxi_AddRpcStat - given all of the information for a particular rpc
6573 * call, create (if needed) and update the stat totals for the rpc.
6577 * IN stats - the queue of stats that will be updated with the new value
6579 * IN rxInterface - a unique number that identifies the rpc interface
6581 * IN currentFunc - the index of the function being invoked
6583 * IN totalFunc - the total number of functions in this interface
6585 * IN queueTime - the amount of time this function waited for a thread
6587 * IN execTime - the amount of time this function invocation took to execute
6589 * IN bytesSent - the number bytes sent by this invocation
6591 * IN bytesRcvd - the number bytes received by this invocation
6593 * IN isServer - if true, this invocation was made to a server
6595 * IN remoteHost - the ip address of the remote host
6597 * IN remotePort - the port of the remote host
6599 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6601 * INOUT counter - if a new stats structure is allocated, the counter will
6602 * be updated with the new number of allocated stat structures
6609 static int rxi_AddRpcStat(
6610 struct rx_queue *stats,
6611 afs_uint32 rxInterface,
6612 afs_uint32 currentFunc,
6613 afs_uint32 totalFunc,
6614 struct clock *queueTime,
6615 struct clock *execTime,
6616 afs_hyper_t *bytesSent,
6617 afs_hyper_t *bytesRcvd,
6619 afs_uint32 remoteHost,
6620 afs_uint32 remotePort,
6622 unsigned int *counter)
6625 rx_interface_stat_p rpc_stat, nrpc_stat;
6628 * See if there's already a structure for this interface
6631 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6632 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6633 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6637 * Didn't find a match so allocate a new structure and add it to the
6641 if (queue_IsEnd(stats, rpc_stat) ||
6642 (rpc_stat == NULL) ||
6643 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6644 (rpc_stat->stats[0].remote_is_server != isServer)) {
6648 space = sizeof(rx_interface_stat_t) + totalFunc *
6649 sizeof(rx_function_entry_v1_t);
6651 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6652 if (rpc_stat == NULL) {
6656 *counter += totalFunc;
6657 for(i=0;i<totalFunc;i++) {
6658 rpc_stat->stats[i].remote_peer = remoteHost;
6659 rpc_stat->stats[i].remote_port = remotePort;
6660 rpc_stat->stats[i].remote_is_server = isServer;
6661 rpc_stat->stats[i].interfaceId = rxInterface;
6662 rpc_stat->stats[i].func_total = totalFunc;
6663 rpc_stat->stats[i].func_index = i;
6664 hzero(rpc_stat->stats[i].invocations);
6665 hzero(rpc_stat->stats[i].bytes_sent);
6666 hzero(rpc_stat->stats[i].bytes_rcvd);
6667 rpc_stat->stats[i].queue_time_sum.sec = 0;
6668 rpc_stat->stats[i].queue_time_sum.usec = 0;
6669 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6670 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6671 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6672 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6673 rpc_stat->stats[i].queue_time_max.sec = 0;
6674 rpc_stat->stats[i].queue_time_max.usec = 0;
6675 rpc_stat->stats[i].execution_time_sum.sec = 0;
6676 rpc_stat->stats[i].execution_time_sum.usec = 0;
6677 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6678 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6679 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6680 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6681 rpc_stat->stats[i].execution_time_max.sec = 0;
6682 rpc_stat->stats[i].execution_time_max.usec = 0;
6684 queue_Prepend(stats, rpc_stat);
6685 if (addToPeerList) {
6686 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6691 * Increment the stats for this function
6694 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6695 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6696 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6697 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6698 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6699 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6700 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6702 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6703 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6705 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6706 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6707 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6708 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6710 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6711 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6719 * rx_IncrementTimeAndCount - increment the times and count for a particular
6724 * IN peer - the peer who invoked the rpc
6726 * IN rxInterface - a unique number that identifies the rpc interface
6728 * IN currentFunc - the index of the function being invoked
6730 * IN totalFunc - the total number of functions in this interface
6732 * IN queueTime - the amount of time this function waited for a thread
6734 * IN execTime - the amount of time this function invocation took to execute
6736 * IN bytesSent - the number bytes sent by this invocation
6738 * IN bytesRcvd - the number bytes received by this invocation
6740 * IN isServer - if true, this invocation was made to a server
6747 void rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
6748 afs_uint32 currentFunc, afs_uint32 totalFunc, struct clock *queueTime,
6749 struct clock *execTime, afs_hyper_t *bytesSent, afs_hyper_t *bytesRcvd, int isServer)
6752 MUTEX_ENTER(&rx_rpc_stats);
6753 MUTEX_ENTER(&peer->peer_lock);
6755 if (rxi_monitor_peerStats) {
6756 rxi_AddRpcStat(&peer->rpcStats,
6768 &rxi_rpc_peer_stat_cnt);
6771 if (rxi_monitor_processStats) {
6772 rxi_AddRpcStat(&processStats,
6784 &rxi_rpc_process_stat_cnt);
6787 MUTEX_EXIT(&peer->peer_lock);
6788 MUTEX_EXIT(&rx_rpc_stats);
6793 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6797 * IN callerVersion - the rpc stat version of the caller.
6799 * IN count - the number of entries to marshall.
6801 * IN stats - pointer to stats to be marshalled.
6803 * OUT ptr - Where to store the marshalled data.
6809 void rx_MarshallProcessRPCStats(afs_uint32 callerVersion,
6810 int count, rx_function_entry_v1_t *stats, afs_uint32 **ptrP)
6816 * We only support the first version
6818 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6819 *(ptr++) = stats->remote_peer;
6820 *(ptr++) = stats->remote_port;
6821 *(ptr++) = stats->remote_is_server;
6822 *(ptr++) = stats->interfaceId;
6823 *(ptr++) = stats->func_total;
6824 *(ptr++) = stats->func_index;
6825 *(ptr++) = hgethi(stats->invocations);
6826 *(ptr++) = hgetlo(stats->invocations);
6827 *(ptr++) = hgethi(stats->bytes_sent);
6828 *(ptr++) = hgetlo(stats->bytes_sent);
6829 *(ptr++) = hgethi(stats->bytes_rcvd);
6830 *(ptr++) = hgetlo(stats->bytes_rcvd);
6831 *(ptr++) = stats->queue_time_sum.sec;
6832 *(ptr++) = stats->queue_time_sum.usec;
6833 *(ptr++) = stats->queue_time_sum_sqr.sec;
6834 *(ptr++) = stats->queue_time_sum_sqr.usec;
6835 *(ptr++) = stats->queue_time_min.sec;
6836 *(ptr++) = stats->queue_time_min.usec;
6837 *(ptr++) = stats->queue_time_max.sec;
6838 *(ptr++) = stats->queue_time_max.usec;
6839 *(ptr++) = stats->execution_time_sum.sec;
6840 *(ptr++) = stats->execution_time_sum.usec;
6841 *(ptr++) = stats->execution_time_sum_sqr.sec;
6842 *(ptr++) = stats->execution_time_sum_sqr.usec;
6843 *(ptr++) = stats->execution_time_min.sec;
6844 *(ptr++) = stats->execution_time_min.usec;
6845 *(ptr++) = stats->execution_time_max.sec;
6846 *(ptr++) = stats->execution_time_max.usec;
6852 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6857 * IN callerVersion - the rpc stat version of the caller
6859 * OUT myVersion - the rpc stat version of this function
6861 * OUT clock_sec - local time seconds
6863 * OUT clock_usec - local time microseconds
6865 * OUT allocSize - the number of bytes allocated to contain stats
6867 * OUT statCount - the number stats retrieved from this process.
6869 * OUT stats - the actual stats retrieved from this process.
6873 * Returns void. If successful, stats will != NULL.
6876 int rx_RetrieveProcessRPCStats(afs_uint32 callerVersion,
6877 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6878 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6888 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6891 * Check to see if stats are enabled
6894 MUTEX_ENTER(&rx_rpc_stats);
6895 if (!rxi_monitor_processStats) {
6896 MUTEX_EXIT(&rx_rpc_stats);
6900 clock_GetTime(&now);
6901 *clock_sec = now.sec;
6902 *clock_usec = now.usec;
6905 * Allocate the space based upon the caller version
6907 * If the client is at an older version than we are,
6908 * we return the statistic data in the older data format, but
6909 * we still return our version number so the client knows we
6910 * are maintaining more data than it can retrieve.
6913 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6914 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6915 *statCount = rxi_rpc_process_stat_cnt;
6918 * This can't happen yet, but in the future version changes
6919 * can be handled by adding additional code here
6923 if (space > (size_t) 0) {
6925 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6928 rx_interface_stat_p rpc_stat, nrpc_stat;
6931 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6932 rx_interface_stat)) {
6934 * Copy the data based upon the caller version
6936 rx_MarshallProcessRPCStats(callerVersion,
6937 rpc_stat->stats[0].func_total,
6938 rpc_stat->stats, &ptr);
6944 MUTEX_EXIT(&rx_rpc_stats);
6949 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6953 * IN callerVersion - the rpc stat version of the caller
6955 * OUT myVersion - the rpc stat version of this function
6957 * OUT clock_sec - local time seconds
6959 * OUT clock_usec - local time microseconds
6961 * OUT allocSize - the number of bytes allocated to contain stats
6963 * OUT statCount - the number of stats retrieved from the individual
6966 * OUT stats - the actual stats retrieved from the individual peer structures.
6970 * Returns void. If successful, stats will != NULL.
6973 int rx_RetrievePeerRPCStats(afs_uint32 callerVersion,
6974 afs_uint32 *myVersion, afs_uint32 *clock_sec, afs_uint32 *clock_usec,
6975 size_t *allocSize, afs_uint32 *statCount, afs_uint32 **stats)
6985 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6988 * Check to see if stats are enabled
6991 MUTEX_ENTER(&rx_rpc_stats);
6992 if (!rxi_monitor_peerStats) {
6993 MUTEX_EXIT(&rx_rpc_stats);
6997 clock_GetTime(&now);
6998 *clock_sec = now.sec;
6999 *clock_usec = now.usec;
7002 * Allocate the space based upon the caller version
7004 * If the client is at an older version than we are,
7005 * we return the statistic data in the older data format, but
7006 * we still return our version number so the client knows we
7007 * are maintaining more data than it can retrieve.
7010 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7011 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
7012 *statCount = rxi_rpc_peer_stat_cnt;
7015 * This can't happen yet, but in the future version changes
7016 * can be handled by adding additional code here
7020 if (space > (size_t) 0) {
7022 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7025 rx_interface_stat_p rpc_stat, nrpc_stat;
7028 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7029 rx_interface_stat)) {
7031 * We have to fix the offset of rpc_stat since we are
7032 * keeping this structure on two rx_queues. The rx_queue
7033 * package assumes that the rx_queue member is the first
7034 * member of the structure. That is, rx_queue assumes that
7035 * any one item is only on one queue at a time. We are
7036 * breaking that assumption and so we have to do a little
7037 * math to fix our pointers.
7040 fix_offset = (char *) rpc_stat;
7041 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7042 rpc_stat = (rx_interface_stat_p) fix_offset;
7045 * Copy the data based upon the caller version
7047 rx_MarshallProcessRPCStats(callerVersion,
7048 rpc_stat->stats[0].func_total,
7049 rpc_stat->stats, &ptr);
7055 MUTEX_EXIT(&rx_rpc_stats);
7060 * rx_FreeRPCStats - free memory allocated by
7061 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7065 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7066 * rx_RetrievePeerRPCStats
7068 * IN allocSize - the number of bytes in stats.
7075 void rx_FreeRPCStats(afs_uint32 *stats, size_t allocSize)
7077 rxi_Free(stats, allocSize);
7081 * rx_queryProcessRPCStats - see if process rpc stat collection is
7082 * currently enabled.
7088 * Returns 0 if stats are not enabled != 0 otherwise
7091 int rx_queryProcessRPCStats(void)
7094 MUTEX_ENTER(&rx_rpc_stats);
7095 rc = rxi_monitor_processStats;
7096 MUTEX_EXIT(&rx_rpc_stats);
7101 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7107 * Returns 0 if stats are not enabled != 0 otherwise
7110 int rx_queryPeerRPCStats(void)
7113 MUTEX_ENTER(&rx_rpc_stats);
7114 rc = rxi_monitor_peerStats;
7115 MUTEX_EXIT(&rx_rpc_stats);
7120 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7129 void rx_enableProcessRPCStats(void)
7131 MUTEX_ENTER(&rx_rpc_stats);
7132 rx_enable_stats = 1;
7133 rxi_monitor_processStats = 1;
7134 MUTEX_EXIT(&rx_rpc_stats);
7138 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7147 void rx_enablePeerRPCStats(void)
7149 MUTEX_ENTER(&rx_rpc_stats);
7150 rx_enable_stats = 1;
7151 rxi_monitor_peerStats = 1;
7152 MUTEX_EXIT(&rx_rpc_stats);
7156 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7165 void rx_disableProcessRPCStats(void)
7167 rx_interface_stat_p rpc_stat, nrpc_stat;
7170 MUTEX_ENTER(&rx_rpc_stats);
7173 * Turn off process statistics and if peer stats is also off, turn
7177 rxi_monitor_processStats = 0;
7178 if (rxi_monitor_peerStats == 0) {
7179 rx_enable_stats = 0;
7182 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7183 unsigned int num_funcs = 0;
7184 if (!rpc_stat) break;
7185 queue_Remove(rpc_stat);
7186 num_funcs = rpc_stat->stats[0].func_total;
7187 space = sizeof(rx_interface_stat_t) +
7188 rpc_stat->stats[0].func_total *
7189 sizeof(rx_function_entry_v1_t);
7191 rxi_Free(rpc_stat, space);
7192 rxi_rpc_process_stat_cnt -= num_funcs;
7194 MUTEX_EXIT(&rx_rpc_stats);
7198 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7207 void rx_disablePeerRPCStats(void)
7209 struct rx_peer **peer_ptr, **peer_end;
7212 MUTEX_ENTER(&rx_rpc_stats);
7215 * Turn off peer statistics and if process stats is also off, turn
7219 rxi_monitor_peerStats = 0;
7220 if (rxi_monitor_processStats == 0) {
7221 rx_enable_stats = 0;
7224 MUTEX_ENTER(&rx_peerHashTable_lock);
7225 for (peer_ptr = &rx_peerHashTable[0],
7226 peer_end = &rx_peerHashTable[rx_hashTableSize];
7227 peer_ptr < peer_end; peer_ptr++) {
7228 struct rx_peer *peer, *next, *prev;
7229 for (prev = peer = *peer_ptr; peer; peer = next) {
7231 code = MUTEX_TRYENTER(&peer->peer_lock);
7233 rx_interface_stat_p rpc_stat, nrpc_stat;
7235 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7236 rx_interface_stat)) {
7237 unsigned int num_funcs = 0;
7238 if (!rpc_stat) break;
7239 queue_Remove(&rpc_stat->queue_header);
7240 queue_Remove(&rpc_stat->all_peers);
7241 num_funcs = rpc_stat->stats[0].func_total;
7242 space = sizeof(rx_interface_stat_t) +
7243 rpc_stat->stats[0].func_total *
7244 sizeof(rx_function_entry_v1_t);
7246 rxi_Free(rpc_stat, space);
7247 rxi_rpc_peer_stat_cnt -= num_funcs;
7249 MUTEX_EXIT(&peer->peer_lock);
7250 if (prev == *peer_ptr) {
7262 MUTEX_EXIT(&rx_peerHashTable_lock);
7263 MUTEX_EXIT(&rx_rpc_stats);
7267 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7272 * IN clearFlag - flag indicating which stats to clear
7279 void rx_clearProcessRPCStats(afs_uint32 clearFlag)
7281 rx_interface_stat_p rpc_stat, nrpc_stat;
7283 MUTEX_ENTER(&rx_rpc_stats);
7285 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7286 unsigned int num_funcs = 0, i;
7287 num_funcs = rpc_stat->stats[0].func_total;
7288 for(i=0;i<num_funcs;i++) {
7289 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7290 hzero(rpc_stat->stats[i].invocations);
7292 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7293 hzero(rpc_stat->stats[i].bytes_sent);
7295 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7296 hzero(rpc_stat->stats[i].bytes_rcvd);
7298 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7299 rpc_stat->stats[i].queue_time_sum.sec = 0;
7300 rpc_stat->stats[i].queue_time_sum.usec = 0;
7302 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7303 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7304 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7306 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7307 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7308 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7310 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7311 rpc_stat->stats[i].queue_time_max.sec = 0;
7312 rpc_stat->stats[i].queue_time_max.usec = 0;
7314 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7315 rpc_stat->stats[i].execution_time_sum.sec = 0;
7316 rpc_stat->stats[i].execution_time_sum.usec = 0;
7318 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7319 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7320 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7322 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7323 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7324 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7326 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7327 rpc_stat->stats[i].execution_time_max.sec = 0;
7328 rpc_stat->stats[i].execution_time_max.usec = 0;
7333 MUTEX_EXIT(&rx_rpc_stats);
7337 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7342 * IN clearFlag - flag indicating which stats to clear
7349 void rx_clearPeerRPCStats(afs_uint32 clearFlag)
7351 rx_interface_stat_p rpc_stat, nrpc_stat;
7353 MUTEX_ENTER(&rx_rpc_stats);
7355 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7356 unsigned int num_funcs = 0, i;
7359 * We have to fix the offset of rpc_stat since we are
7360 * keeping this structure on two rx_queues. The rx_queue
7361 * package assumes that the rx_queue member is the first
7362 * member of the structure. That is, rx_queue assumes that
7363 * any one item is only on one queue at a time. We are
7364 * breaking that assumption and so we have to do a little
7365 * math to fix our pointers.
7368 fix_offset = (char *) rpc_stat;
7369 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7370 rpc_stat = (rx_interface_stat_p) fix_offset;
7372 num_funcs = rpc_stat->stats[0].func_total;
7373 for(i=0;i<num_funcs;i++) {
7374 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7375 hzero(rpc_stat->stats[i].invocations);
7377 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7378 hzero(rpc_stat->stats[i].bytes_sent);
7380 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7381 hzero(rpc_stat->stats[i].bytes_rcvd);
7383 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7384 rpc_stat->stats[i].queue_time_sum.sec = 0;
7385 rpc_stat->stats[i].queue_time_sum.usec = 0;
7387 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7388 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7389 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7391 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7392 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7393 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7395 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7396 rpc_stat->stats[i].queue_time_max.sec = 0;
7397 rpc_stat->stats[i].queue_time_max.usec = 0;
7399 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7400 rpc_stat->stats[i].execution_time_sum.sec = 0;
7401 rpc_stat->stats[i].execution_time_sum.usec = 0;
7403 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7404 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7405 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7407 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7408 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7409 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7411 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7412 rpc_stat->stats[i].execution_time_max.sec = 0;
7413 rpc_stat->stats[i].execution_time_max.usec = 0;
7418 MUTEX_EXIT(&rx_rpc_stats);
7422 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7423 * is authorized to enable/disable/clear RX statistics.
7425 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7427 void rx_SetRxStatUserOk(int (*proc)(struct rx_call *call))
7429 rxi_rxstat_userok = proc;
7432 int rx_RxStatUserOk(struct rx_call *call)
7434 if (!rxi_rxstat_userok)
7436 return rxi_rxstat_userok(call);