2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "../afs/param.h"
16 #include <afs/param.h>
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
46 #include "../afsint/afsint.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include "rx_internal.h"
105 # include <afs/rxgen_consts.h>
108 int (*registerProgram)() = 0;
109 int (*swapNameProgram)() = 0;
111 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
113 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
114 afs_int32 rxi_start_in_error;
116 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
119 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
120 * currently allocated within rx. This number is used to allocate the
121 * memory required to return the statistics when queried.
124 static unsigned int rxi_rpc_peer_stat_cnt;
127 * rxi_rpc_process_stat_cnt counts the total number of local process stat
128 * structures currently allocated within rx. The number is used to allocate
129 * the memory required to return the statistics when queried.
132 static unsigned int rxi_rpc_process_stat_cnt;
134 #if !defined(offsetof)
135 #include <stddef.h> /* for definition of offsetof() */
138 #ifdef AFS_PTHREAD_ENV
142 * Use procedural initialization of mutexes/condition variables
146 extern pthread_mutex_t rxkad_stats_mutex;
147 extern pthread_mutex_t des_init_mutex;
148 extern pthread_mutex_t des_random_mutex;
149 extern pthread_mutex_t rx_clock_mutex;
150 extern pthread_mutex_t rxi_connCacheMutex;
151 extern pthread_mutex_t rx_event_mutex;
152 extern pthread_mutex_t osi_malloc_mutex;
153 extern pthread_mutex_t event_handler_mutex;
154 extern pthread_mutex_t listener_mutex;
155 extern pthread_mutex_t rx_if_init_mutex;
156 extern pthread_mutex_t rx_if_mutex;
157 extern pthread_mutex_t rxkad_client_uid_mutex;
158 extern pthread_mutex_t rxkad_random_mutex;
160 extern pthread_cond_t rx_event_handler_cond;
161 extern pthread_cond_t rx_listener_cond;
163 static pthread_mutex_t epoch_mutex;
164 static pthread_mutex_t rx_init_mutex;
165 static pthread_mutex_t rx_debug_mutex;
167 static void rxi_InitPthread(void) {
168 assert(pthread_mutex_init(&rx_clock_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rxi_connCacheMutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&rx_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&epoch_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&rx_event_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&des_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&des_random_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&osi_malloc_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&event_handler_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&listener_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_if_init_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rx_if_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
193 (const pthread_mutexattr_t*)0)==0);
194 assert(pthread_mutex_init(&rxkad_random_mutex,
195 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_mutex_init(&rxkad_stats_mutex,
197 (const pthread_mutexattr_t*)0)==0);
198 assert(pthread_mutex_init(&rx_debug_mutex,
199 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_cond_init(&rx_event_handler_cond,
202 (const pthread_condattr_t*)0)==0);
203 assert(pthread_cond_init(&rx_listener_cond,
204 (const pthread_condattr_t*)0)==0);
205 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
208 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
209 #define INIT_PTHREAD_LOCKS \
210 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
212 * The rx_stats_mutex mutex protects the following global variables:
217 * rxi_lowConnRefCount
218 * rxi_lowPeerRefCount
227 #define INIT_PTHREAD_LOCKS
231 /* Variables for handling the minProcs implementation. availProcs gives the
232 * number of threads available in the pool at this moment (not counting dudes
233 * executing right now). totalMin gives the total number of procs required
234 * for handling all minProcs requests. minDeficit is a dynamic variable
235 * tracking the # of procs required to satisfy all of the remaining minProcs
237 * For fine grain locking to work, the quota check and the reservation of
238 * a server thread has to come while rxi_availProcs and rxi_minDeficit
239 * are locked. To this end, the code has been modified under #ifdef
240 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
241 * same time. A new function, ReturnToServerPool() returns the allocation.
243 * A call can be on several queue's (but only one at a time). When
244 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
245 * that no one else is touching the queue. To this end, we store the address
246 * of the queue lock in the call structure (under the call lock) when we
247 * put the call on a queue, and we clear the call_queue_lock when the
248 * call is removed from a queue (once the call lock has been obtained).
249 * This allows rxi_ResetCall to safely synchronize with others wishing
250 * to manipulate the queue.
253 #ifdef RX_ENABLE_LOCKS
254 static int rxi_ServerThreadSelectingCall;
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
260 ** pretty good that the next packet coming in is from the same connection
261 ** as the last packet, since we're send multiple packets in a transmit window.
263 struct rx_connection *rxLastConn = 0;
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
268 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269 * call->lock - locks call data fields.
270 * Most any other lock - these are all independent of each other.....
272 * rx_freeCallQueue_lock
274 * rx_connHashTable_lock
277 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
280 * peer_lock - locks peer data fields.
281 * conn_data_lock - that more than one thread is not updating a conn data
282 * field at the same time.
283 * Do we need a lock to protect the peer field in the conn structure?
284 * conn->peer was previously a constant for all intents and so has no
285 * lock protecting this field. The multihomed client delta introduced
286 * a RX code change : change the peer field in the connection structure
287 * to that remote inetrface from which the last packet for this
288 * connection was sent out. This may become an issue if further changes
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 struct rx_serverQueueEntry *rx_waitForPacket = 0;
306 /* ------------Exported Interfaces------------- */
308 /* This function allows rxkad to set the epoch to a suitably random number
309 * which rx_NewConnection will use in the future. The principle purpose is to
310 * get rxnull connections to use the same epoch as the rxkad connections do, at
311 * least once the first rxkad connection is established. This is important now
312 * that the host/port addresses aren't used in FindConnection: the uniqueness
313 * of epoch/cid matters and the start time won't do. */
315 #ifdef AFS_PTHREAD_ENV
317 * This mutex protects the following global variables:
321 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
322 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
326 #endif /* AFS_PTHREAD_ENV */
328 void rx_SetEpoch (epoch)
336 /* Initialize rx. A port number may be mentioned, in which case this
337 * becomes the default port number for any service installed later.
338 * If 0 is provided for the port number, a random port will be chosen
339 * by the kernel. Whether this will ever overlap anything in
340 * /etc/services is anybody's guess... Returns 0 on success, -1 on
342 static int rxinit_status = 1;
343 #ifdef AFS_PTHREAD_ENV
345 * This mutex protects the following global variables:
349 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
353 #define UNLOCK_RX_INIT
356 int rx_Init(u_int port)
363 char *htable, *ptable;
366 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
367 __djgpp_set_quiet_socket(1);
374 if (rxinit_status == 0) {
375 tmp_status = rxinit_status;
377 return tmp_status; /* Already started; return previous error code. */
381 if (afs_winsockInit()<0)
387 * Initialize anything necessary to provide a non-premptive threading
390 rxi_InitializeThreadSupport();
393 /* Allocate and initialize a socket for client and perhaps server
396 rx_socket = rxi_GetUDPSocket((u_short)port);
397 if (rx_socket == OSI_NULLSOCKET) {
403 #ifdef RX_ENABLE_LOCKS
406 #endif /* RX_LOCKS_DB */
407 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
413 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
414 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
415 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
418 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
420 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
421 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
423 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
424 #endif /* KERNEL && AFS_HPUX110_ENV */
425 #else /* RX_ENABLE_LOCKS */
426 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
427 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
428 #endif /* AFS_GLOBAL_SUNLOCK */
429 #endif /* RX_ENABLE_LOCKS */
432 rx_connDeadTime = 12;
433 rx_tranquil = 0; /* reset flag */
434 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
436 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
437 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
438 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
439 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
440 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
441 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
443 /* Malloc up a bunch of packets & buffers */
445 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
446 queue_Init(&rx_freePacketQueue);
447 rxi_NeedMorePackets = FALSE;
448 rxi_MorePackets(rx_nPackets);
456 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
457 tv.tv_sec = clock_now.sec;
458 tv.tv_usec = clock_now.usec;
459 srand((unsigned int) tv.tv_usec);
466 #if defined(KERNEL) && !defined(UKERNEL)
467 /* Really, this should never happen in a real kernel */
470 struct sockaddr_in addr;
471 int addrlen = sizeof(addr);
472 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
476 rx_port = addr.sin_port;
479 rx_stats.minRtt.sec = 9999999;
481 rx_SetEpoch (tv.tv_sec | 0x80000000);
483 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
484 * will provide a randomer value. */
486 MUTEX_ENTER(&rx_stats_mutex);
487 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
488 MUTEX_EXIT(&rx_stats_mutex);
489 /* *Slightly* random start time for the cid. This is just to help
490 * out with the hashing function at the peer */
491 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
492 rx_connHashTable = (struct rx_connection **) htable;
493 rx_peerHashTable = (struct rx_peer **) ptable;
495 rx_lastAckDelay.sec = 0;
496 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
497 rx_hardAckDelay.sec = 0;
498 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
499 rx_softAckDelay.sec = 0;
500 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
502 rxevent_Init(20, rxi_ReScheduleEvents);
504 /* Initialize various global queues */
505 queue_Init(&rx_idleServerQueue);
506 queue_Init(&rx_incomingCallQueue);
507 queue_Init(&rx_freeCallQueue);
509 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
510 /* Initialize our list of usable IP addresses. */
514 /* Start listener process (exact function is dependent on the
515 * implementation environment--kernel or user space) */
520 tmp_status = rxinit_status = 0;
525 /* called with unincremented nRequestsRunning to see if it is OK to start
526 * a new thread in this service. Could be "no" for two reasons: over the
527 * max quota, or would prevent others from reaching their min quota.
529 #ifdef RX_ENABLE_LOCKS
530 /* This verion of QuotaOK reserves quota if it's ok while the
531 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
533 static int QuotaOK(aservice)
534 register struct rx_service *aservice;
536 /* check if over max quota */
537 if (aservice->nRequestsRunning >= aservice->maxProcs) {
541 /* under min quota, we're OK */
542 /* otherwise, can use only if there are enough to allow everyone
543 * to go to their min quota after this guy starts.
545 MUTEX_ENTER(&rx_stats_mutex);
546 if ((aservice->nRequestsRunning < aservice->minProcs) ||
547 (rxi_availProcs > rxi_minDeficit)) {
548 aservice->nRequestsRunning++;
549 /* just started call in minProcs pool, need fewer to maintain
551 if (aservice->nRequestsRunning <= aservice->minProcs)
554 MUTEX_EXIT(&rx_stats_mutex);
557 MUTEX_EXIT(&rx_stats_mutex);
561 static void ReturnToServerPool(aservice)
562 register struct rx_service *aservice;
564 aservice->nRequestsRunning--;
565 MUTEX_ENTER(&rx_stats_mutex);
566 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
568 MUTEX_EXIT(&rx_stats_mutex);
571 #else /* RX_ENABLE_LOCKS */
572 static int QuotaOK(aservice)
573 register struct rx_service *aservice; {
575 /* under min quota, we're OK */
576 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
578 /* check if over max quota */
579 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
581 /* otherwise, can use only if there are enough to allow everyone
582 * to go to their min quota after this guy starts.
584 if (rxi_availProcs > rxi_minDeficit) rc = 1;
587 #endif /* RX_ENABLE_LOCKS */
590 /* Called by rx_StartServer to start up lwp's to service calls.
591 NExistingProcs gives the number of procs already existing, and which
592 therefore needn't be created. */
593 void rxi_StartServerProcs(nExistingProcs)
596 register struct rx_service *service;
601 /* For each service, reserve N processes, where N is the "minimum"
602 number of processes that MUST be able to execute a request in parallel,
603 at any time, for that process. Also compute the maximum difference
604 between any service's maximum number of processes that can run
605 (i.e. the maximum number that ever will be run, and a guarantee
606 that this number will run if other services aren't running), and its
607 minimum number. The result is the extra number of processes that
608 we need in order to provide the latter guarantee */
609 for (i=0; i<RX_MAX_SERVICES; i++) {
611 service = rx_services[i];
612 if (service == (struct rx_service *) 0) break;
613 nProcs += service->minProcs;
614 diff = service->maxProcs - service->minProcs;
615 if (diff > maxdiff) maxdiff = diff;
617 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
618 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
619 for (i = 0; i<nProcs; i++) {
620 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
625 /* This routine must be called if any services are exported. If the
626 * donateMe flag is set, the calling process is donated to the server
628 void rx_StartServer(donateMe)
630 register struct rx_service *service;
631 register int i, nProcs=0;
637 /* Start server processes, if necessary (exact function is dependent
638 * on the implementation environment--kernel or user space). DonateMe
639 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
640 * case, one less new proc will be created rx_StartServerProcs.
642 rxi_StartServerProcs(donateMe);
644 /* count up the # of threads in minProcs, and add set the min deficit to
645 * be that value, too.
647 for (i=0; i<RX_MAX_SERVICES; i++) {
648 service = rx_services[i];
649 if (service == (struct rx_service *) 0) break;
650 MUTEX_ENTER(&rx_stats_mutex);
651 rxi_totalMin += service->minProcs;
652 /* below works even if a thread is running, since minDeficit would
653 * still have been decremented and later re-incremented.
655 rxi_minDeficit += service->minProcs;
656 MUTEX_EXIT(&rx_stats_mutex);
659 /* Turn on reaping of idle server connections */
660 rxi_ReapConnections();
669 #ifdef AFS_PTHREAD_ENV
671 pid = (pid_t) pthread_self();
672 #else /* AFS_PTHREAD_ENV */
674 LWP_CurrentProcess(&pid);
675 #endif /* AFS_PTHREAD_ENV */
677 sprintf(name,"srv_%d", ++nProcs);
679 (*registerProgram)(pid, name);
681 #endif /* AFS_NT40_ENV */
682 rx_ServerProc(); /* Never returns */
687 /* Create a new client connection to the specified service, using the
688 * specified security object to implement the security model for this
690 struct rx_connection *
691 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
692 register afs_uint32 shost; /* Server host */
693 u_short sport; /* Server port */
694 u_short sservice; /* Server service id */
695 register struct rx_securityClass *securityObject;
696 int serviceSecurityIndex;
700 register struct rx_connection *conn;
705 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
706 shost, sport, sservice, securityObject, serviceSecurityIndex));
708 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
709 * the case of kmem_alloc? */
710 conn = rxi_AllocConnection();
711 #ifdef RX_ENABLE_LOCKS
712 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
713 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
714 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
718 MUTEX_ENTER(&rx_connHashTable_lock);
719 cid = (rx_nextCid += RX_MAXCALLS);
720 conn->type = RX_CLIENT_CONNECTION;
722 conn->epoch = rx_epoch;
723 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
724 conn->serviceId = sservice;
725 conn->securityObject = securityObject;
726 /* This doesn't work in all compilers with void (they're buggy), so fake it
728 conn->securityData = (VOID *) 0;
729 conn->securityIndex = serviceSecurityIndex;
730 rx_SetConnDeadTime(conn, rx_connDeadTime);
731 conn->ackRate = RX_FAST_ACK_RATE;
733 conn->specific = NULL;
734 conn->challengeEvent = (struct rxevent *)0;
735 conn->delayedAbortEvent = (struct rxevent *)0;
736 conn->abortCount = 0;
739 RXS_NewConnection(securityObject, conn);
740 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
742 conn->refCount++; /* no lock required since only this thread knows... */
743 conn->next = rx_connHashTable[hashindex];
744 rx_connHashTable[hashindex] = conn;
745 MUTEX_ENTER(&rx_stats_mutex);
746 rx_stats.nClientConns++;
747 MUTEX_EXIT(&rx_stats_mutex);
749 MUTEX_EXIT(&rx_connHashTable_lock);
755 void rx_SetConnDeadTime(conn, seconds)
756 register struct rx_connection *conn;
757 register int seconds;
759 /* The idea is to set the dead time to a value that allows several
760 * keepalives to be dropped without timing out the connection. */
761 conn->secondsUntilDead = MAX(seconds, 6);
762 conn->secondsUntilPing = conn->secondsUntilDead/6;
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
769 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770 * NOTE: must not be called with rx_connHashTable_lock held.
772 void rxi_CleanupConnection(conn)
773 struct rx_connection *conn;
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(conn)
830 register struct rx_connection *conn;
832 MUTEX_ENTER(&rx_connHashTable_lock);
833 rxi_DestroyConnectionNoLock(conn);
834 /* conn should be at the head of the cleanup list */
835 if (conn == rx_connCleanup_list) {
836 rx_connCleanup_list = rx_connCleanup_list->next;
837 MUTEX_EXIT(&rx_connHashTable_lock);
838 rxi_CleanupConnection(conn);
840 #ifdef RX_ENABLE_LOCKS
842 MUTEX_EXIT(&rx_connHashTable_lock);
844 #endif /* RX_ENABLE_LOCKS */
847 static void rxi_DestroyConnectionNoLock(conn)
848 register struct rx_connection *conn;
850 register struct rx_connection **conn_ptr;
851 register int havecalls = 0;
852 struct rx_packet *packet;
859 MUTEX_ENTER(&conn->conn_data_lock);
860 if (conn->refCount > 0)
863 MUTEX_ENTER(&rx_stats_mutex);
864 rxi_lowConnRefCount++;
865 MUTEX_EXIT(&rx_stats_mutex);
868 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
869 /* Busy; wait till the last guy before proceeding */
870 MUTEX_EXIT(&conn->conn_data_lock);
875 /* If the client previously called rx_NewCall, but it is still
876 * waiting, treat this as a running call, and wait to destroy the
877 * connection later when the call completes. */
878 if ((conn->type == RX_CLIENT_CONNECTION) &&
879 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
880 conn->flags |= RX_CONN_DESTROY_ME;
881 MUTEX_EXIT(&conn->conn_data_lock);
885 MUTEX_EXIT(&conn->conn_data_lock);
887 /* Check for extant references to this connection */
888 for (i = 0; i<RX_MAXCALLS; i++) {
889 register struct rx_call *call = conn->call[i];
892 if (conn->type == RX_CLIENT_CONNECTION) {
893 MUTEX_ENTER(&call->lock);
894 if (call->delayedAckEvent) {
895 /* Push the final acknowledgment out now--there
896 * won't be a subsequent call to acknowledge the
897 * last reply packets */
898 rxevent_Cancel(call->delayedAckEvent, call,
899 RX_CALL_REFCOUNT_DELAY);
900 if (call->state == RX_STATE_PRECALL ||
901 call->state == RX_STATE_ACTIVE) {
902 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
904 rxi_AckAll((struct rxevent *)0, call, 0);
907 MUTEX_EXIT(&call->lock);
911 #ifdef RX_ENABLE_LOCKS
913 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
914 MUTEX_EXIT(&conn->conn_data_lock);
917 /* Someone is accessing a packet right now. */
921 #endif /* RX_ENABLE_LOCKS */
924 /* Don't destroy the connection if there are any call
925 * structures still in use */
926 MUTEX_ENTER(&conn->conn_data_lock);
927 conn->flags |= RX_CONN_DESTROY_ME;
928 MUTEX_EXIT(&conn->conn_data_lock);
933 if (conn->delayedAbortEvent) {
934 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
935 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
937 MUTEX_ENTER(&conn->conn_data_lock);
938 rxi_SendConnectionAbort(conn, packet, 0, 1);
939 MUTEX_EXIT(&conn->conn_data_lock);
940 rxi_FreePacket(packet);
944 /* Remove from connection hash table before proceeding */
945 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
946 conn->epoch, conn->type) ];
947 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
948 if (*conn_ptr == conn) {
949 *conn_ptr = conn->next;
953 /* if the conn that we are destroying was the last connection, then we
954 * clear rxLastConn as well */
955 if ( rxLastConn == conn )
958 /* Make sure the connection is completely reset before deleting it. */
959 /* get rid of pending events that could zap us later */
960 if (conn->challengeEvent)
961 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
962 if (conn->checkReachEvent)
963 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
965 /* Add the connection to the list of destroyed connections that
966 * need to be cleaned up. This is necessary to avoid deadlocks
967 * in the routines we call to inform others that this connection is
968 * being destroyed. */
969 conn->next = rx_connCleanup_list;
970 rx_connCleanup_list = conn;
973 /* Externally available version */
974 void rx_DestroyConnection(conn)
975 register struct rx_connection *conn;
981 rxi_DestroyConnection (conn);
986 /* Start a new rx remote procedure call, on the specified connection.
987 * If wait is set to 1, wait for a free call channel; otherwise return
988 * 0. Maxtime gives the maximum number of seconds this call may take,
989 * after rx_MakeCall returns. After this time interval, a call to any
990 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
991 * For fine grain locking, we hold the conn_call_lock in order to
992 * to ensure that we don't get signalle after we found a call in an active
993 * state and before we go to sleep.
995 struct rx_call *rx_NewCall(conn)
996 register struct rx_connection *conn;
999 register struct rx_call *call;
1000 struct clock queueTime;
1004 dpf (("rx_MakeCall(conn %x)\n", conn));
1007 clock_GetTime(&queueTime);
1009 MUTEX_ENTER(&conn->conn_call_lock);
1012 * Check if there are others waiting for a new call.
1013 * If so, let them go first to avoid starving them.
1014 * This is a fairly simple scheme, and might not be
1015 * a complete solution for large numbers of waiters.
1017 if (conn->makeCallWaiters) {
1018 #ifdef RX_ENABLE_LOCKS
1019 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1026 for (i=0; i<RX_MAXCALLS; i++) {
1027 call = conn->call[i];
1029 MUTEX_ENTER(&call->lock);
1030 if (call->state == RX_STATE_DALLY) {
1031 rxi_ResetCall(call, 0);
1032 (*call->callNumber)++;
1035 MUTEX_EXIT(&call->lock);
1038 call = rxi_NewCall(conn, i);
1042 if (i < RX_MAXCALLS) {
1045 MUTEX_ENTER(&conn->conn_data_lock);
1046 conn->flags |= RX_CONN_MAKECALL_WAITING;
1047 MUTEX_EXIT(&conn->conn_data_lock);
1049 conn->makeCallWaiters++;
1050 #ifdef RX_ENABLE_LOCKS
1051 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1055 conn->makeCallWaiters--;
1058 * Wake up anyone else who might be giving us a chance to
1059 * run (see code above that avoids resource starvation).
1061 #ifdef RX_ENABLE_LOCKS
1062 CV_BROADCAST(&conn->conn_call_cv);
1067 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1069 /* Client is initially in send mode */
1070 call->state = RX_STATE_ACTIVE;
1071 call->mode = RX_MODE_SENDING;
1073 /* remember start time for call in case we have hard dead time limit */
1074 call->queueTime = queueTime;
1075 clock_GetTime(&call->startTime);
1076 hzero(call->bytesSent);
1077 hzero(call->bytesRcvd);
1079 /* Turn on busy protocol. */
1080 rxi_KeepAliveOn(call);
1082 MUTEX_EXIT(&call->lock);
1083 MUTEX_EXIT(&conn->conn_call_lock);
1087 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1088 /* Now, if TQ wasn't cleared earlier, do it now. */
1090 MUTEX_ENTER(&call->lock);
1091 while (call->flags & RX_CALL_TQ_BUSY) {
1092 call->flags |= RX_CALL_TQ_WAIT;
1093 #ifdef RX_ENABLE_LOCKS
1094 CV_WAIT(&call->cv_tq, &call->lock);
1095 #else /* RX_ENABLE_LOCKS */
1096 osi_rxSleep(&call->tq);
1097 #endif /* RX_ENABLE_LOCKS */
1099 if (call->flags & RX_CALL_TQ_CLEARME) {
1100 rxi_ClearTransmitQueue(call, 0);
1101 queue_Init(&call->tq);
1103 MUTEX_EXIT(&call->lock);
1105 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1111 rxi_HasActiveCalls(aconn)
1112 register struct rx_connection *aconn; {
1114 register struct rx_call *tcall;
1118 for(i=0; i<RX_MAXCALLS; i++) {
1119 if ((tcall = aconn->call[i])) {
1120 if ((tcall->state == RX_STATE_ACTIVE)
1121 || (tcall->state == RX_STATE_PRECALL)) {
1132 rxi_GetCallNumberVector(aconn, aint32s)
1133 register struct rx_connection *aconn;
1134 register afs_int32 *aint32s; {
1136 register struct rx_call *tcall;
1140 for(i=0; i<RX_MAXCALLS; i++) {
1141 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1142 aint32s[i] = aconn->callNumber[i]+1;
1144 aint32s[i] = aconn->callNumber[i];
1151 rxi_SetCallNumberVector(aconn, aint32s)
1152 register struct rx_connection *aconn;
1153 register afs_int32 *aint32s; {
1155 register struct rx_call *tcall;
1159 for(i=0; i<RX_MAXCALLS; i++) {
1160 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1161 aconn->callNumber[i] = aint32s[i] - 1;
1163 aconn->callNumber[i] = aint32s[i];
1169 /* Advertise a new service. A service is named locally by a UDP port
1170 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1173 rx_NewService(port, serviceId, serviceName, securityObjects,
1174 nSecurityObjects, serviceProc)
1177 char *serviceName; /* Name for identification purposes (e.g. the
1178 * service name might be used for probing for
1180 struct rx_securityClass **securityObjects;
1181 int nSecurityObjects;
1182 afs_int32 (*serviceProc)();
1184 osi_socket socket = OSI_NULLSOCKET;
1185 register struct rx_service *tservice;
1191 if (serviceId == 0) {
1192 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1198 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1205 tservice = rxi_AllocService();
1208 for (i = 0; i<RX_MAX_SERVICES; i++) {
1209 register struct rx_service *service = rx_services[i];
1211 if (port == service->servicePort) {
1212 if (service->serviceId == serviceId) {
1213 /* The identical service has already been
1214 * installed; if the caller was intending to
1215 * change the security classes used by this
1216 * service, he/she loses. */
1217 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1220 rxi_FreeService(tservice);
1223 /* Different service, same port: re-use the socket
1224 * which is bound to the same port */
1225 socket = service->socket;
1228 if (socket == OSI_NULLSOCKET) {
1229 /* If we don't already have a socket (from another
1230 * service on same port) get a new one */
1231 socket = rxi_GetUDPSocket(port);
1232 if (socket == OSI_NULLSOCKET) {
1235 rxi_FreeService(tservice);
1240 service->socket = socket;
1241 service->servicePort = port;
1242 service->serviceId = serviceId;
1243 service->serviceName = serviceName;
1244 service->nSecurityObjects = nSecurityObjects;
1245 service->securityObjects = securityObjects;
1246 service->minProcs = 0;
1247 service->maxProcs = 1;
1248 service->idleDeadTime = 60;
1249 service->connDeadTime = rx_connDeadTime;
1250 service->executeRequestProc = serviceProc;
1251 rx_services[i] = service; /* not visible until now */
1259 rxi_FreeService(tservice);
1260 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1264 /* Generic request processing loop. This routine should be called
1265 * by the implementation dependent rx_ServerProc. If socketp is
1266 * non-null, it will be set to the file descriptor that this thread
1267 * is now listening on. If socketp is null, this routine will never
1269 void rxi_ServerProc(threadID, newcall, socketp)
1271 struct rx_call *newcall;
1272 osi_socket *socketp;
1274 register struct rx_call *call;
1275 register afs_int32 code;
1276 register struct rx_service *tservice = NULL;
1283 call = rx_GetCall(threadID, tservice, socketp);
1284 if (socketp && *socketp != OSI_NULLSOCKET) {
1285 /* We are now a listener thread */
1290 /* if server is restarting( typically smooth shutdown) then do not
1291 * allow any new calls.
1294 if ( rx_tranquil && (call != NULL) ) {
1299 MUTEX_ENTER(&call->lock);
1301 rxi_CallError(call, RX_RESTARTING);
1302 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1304 MUTEX_EXIT(&call->lock);
1310 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1311 #ifdef RX_ENABLE_LOCKS
1313 #endif /* RX_ENABLE_LOCKS */
1314 afs_termState = AFSOP_STOP_AFS;
1315 afs_osi_Wakeup(&afs_termState);
1316 #ifdef RX_ENABLE_LOCKS
1318 #endif /* RX_ENABLE_LOCKS */
1323 tservice = call->conn->service;
1325 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1327 code = call->conn->service->executeRequestProc(call);
1329 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1331 rx_EndCall(call, code);
1332 MUTEX_ENTER(&rx_stats_mutex);
1334 MUTEX_EXIT(&rx_stats_mutex);
1339 void rx_WakeupServerProcs()
1341 struct rx_serverQueueEntry *np, *tqp;
1346 MUTEX_ENTER(&rx_serverPool_lock);
1348 #ifdef RX_ENABLE_LOCKS
1349 if (rx_waitForPacket)
1350 CV_BROADCAST(&rx_waitForPacket->cv);
1351 #else /* RX_ENABLE_LOCKS */
1352 if (rx_waitForPacket)
1353 osi_rxWakeup(rx_waitForPacket);
1354 #endif /* RX_ENABLE_LOCKS */
1355 MUTEX_ENTER(&freeSQEList_lock);
1356 for (np = rx_FreeSQEList; np; np = tqp) {
1357 tqp = *(struct rx_serverQueueEntry **)np;
1358 #ifdef RX_ENABLE_LOCKS
1359 CV_BROADCAST(&np->cv);
1360 #else /* RX_ENABLE_LOCKS */
1362 #endif /* RX_ENABLE_LOCKS */
1364 MUTEX_EXIT(&freeSQEList_lock);
1365 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1366 #ifdef RX_ENABLE_LOCKS
1367 CV_BROADCAST(&np->cv);
1368 #else /* RX_ENABLE_LOCKS */
1370 #endif /* RX_ENABLE_LOCKS */
1372 MUTEX_EXIT(&rx_serverPool_lock);
1378 * One thing that seems to happen is that all the server threads get
1379 * tied up on some empty or slow call, and then a whole bunch of calls
1380 * arrive at once, using up the packet pool, so now there are more
1381 * empty calls. The most critical resources here are server threads
1382 * and the free packet pool. The "doreclaim" code seems to help in
1383 * general. I think that eventually we arrive in this state: there
1384 * are lots of pending calls which do have all their packets present,
1385 * so they won't be reclaimed, are multi-packet calls, so they won't
1386 * be scheduled until later, and thus are tying up most of the free
1387 * packet pool for a very long time.
1389 * 1. schedule multi-packet calls if all the packets are present.
1390 * Probably CPU-bound operation, useful to return packets to pool.
1391 * Do what if there is a full window, but the last packet isn't here?
1392 * 3. preserve one thread which *only* runs "best" calls, otherwise
1393 * it sleeps and waits for that type of call.
1394 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1395 * the current dataquota business is badly broken. The quota isn't adjusted
1396 * to reflect how many packets are presently queued for a running call.
1397 * So, when we schedule a queued call with a full window of packets queued
1398 * up for it, that *should* free up a window full of packets for other 2d-class
1399 * calls to be able to use from the packet pool. But it doesn't.
1401 * NB. Most of the time, this code doesn't run -- since idle server threads
1402 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1403 * as a new call arrives.
1405 /* Sleep until a call arrives. Returns a pointer to the call, ready
1406 * for an rx_Read. */
1407 #ifdef RX_ENABLE_LOCKS
1409 rx_GetCall(tno, cur_service, socketp)
1411 struct rx_service *cur_service;
1412 osi_socket *socketp;
1414 struct rx_serverQueueEntry *sq;
1415 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1416 struct rx_service *service = NULL;
1419 MUTEX_ENTER(&freeSQEList_lock);
1421 if ((sq = rx_FreeSQEList)) {
1422 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1423 MUTEX_EXIT(&freeSQEList_lock);
1424 } else { /* otherwise allocate a new one and return that */
1425 MUTEX_EXIT(&freeSQEList_lock);
1426 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1427 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1428 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1431 MUTEX_ENTER(&rx_serverPool_lock);
1432 if (cur_service != NULL) {
1433 ReturnToServerPool(cur_service);
1436 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1437 register struct rx_call *tcall, *ncall;
1438 choice2 = (struct rx_call *) 0;
1439 /* Scan for eligible incoming calls. A call is not eligible
1440 * if the maximum number of calls for its service type are
1441 * already executing */
1442 /* One thread will process calls FCFS (to prevent starvation),
1443 * while the other threads may run ahead looking for calls which
1444 * have all their input data available immediately. This helps
1445 * keep threads from blocking, waiting for data from the client. */
1446 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1447 service = tcall->conn->service;
1448 if (!QuotaOK(service)) {
1451 if (!tno || !tcall->queue_item_header.next ) {
1452 /* If we're thread 0, then we'll just use
1453 * this call. If we haven't been able to find an optimal
1454 * choice, and we're at the end of the list, then use a
1455 * 2d choice if one has been identified. Otherwise... */
1456 call = (choice2 ? choice2 : tcall);
1457 service = call->conn->service;
1458 } else if (!queue_IsEmpty(&tcall->rq)) {
1459 struct rx_packet *rp;
1460 rp = queue_First(&tcall->rq, rx_packet);
1461 if (rp->header.seq == 1) {
1462 if (!meltdown_1pkt ||
1463 (rp->header.flags & RX_LAST_PACKET)) {
1465 } else if (rxi_2dchoice && !choice2 &&
1466 !(tcall->flags & RX_CALL_CLEARED) &&
1467 (tcall->rprev > rxi_HardAckRate)) {
1469 } else rxi_md2cnt++;
1475 ReturnToServerPool(service);
1482 rxi_ServerThreadSelectingCall = 1;
1483 MUTEX_EXIT(&rx_serverPool_lock);
1484 MUTEX_ENTER(&call->lock);
1485 MUTEX_ENTER(&rx_serverPool_lock);
1487 if (queue_IsEmpty(&call->rq) ||
1488 queue_First(&call->rq, rx_packet)->header.seq != 1)
1489 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1491 CLEAR_CALL_QUEUE_LOCK(call);
1493 MUTEX_EXIT(&call->lock);
1494 ReturnToServerPool(service);
1495 rxi_ServerThreadSelectingCall = 0;
1496 CV_SIGNAL(&rx_serverPool_cv);
1497 call = (struct rx_call*)0;
1500 call->flags &= (~RX_CALL_WAIT_PROC);
1501 MUTEX_ENTER(&rx_stats_mutex);
1503 MUTEX_EXIT(&rx_stats_mutex);
1504 rxi_ServerThreadSelectingCall = 0;
1505 CV_SIGNAL(&rx_serverPool_cv);
1506 MUTEX_EXIT(&rx_serverPool_lock);
1510 /* If there are no eligible incoming calls, add this process
1511 * to the idle server queue, to wait for one */
1515 *socketp = OSI_NULLSOCKET;
1517 sq->socketp = socketp;
1518 queue_Append(&rx_idleServerQueue, sq);
1519 #ifndef AFS_AIX41_ENV
1520 rx_waitForPacket = sq;
1521 #endif /* AFS_AIX41_ENV */
1523 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1525 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1526 MUTEX_EXIT(&rx_serverPool_lock);
1527 return (struct rx_call *)0;
1530 } while (!(call = sq->newcall) &&
1531 !(socketp && *socketp != OSI_NULLSOCKET));
1532 MUTEX_EXIT(&rx_serverPool_lock);
1534 MUTEX_ENTER(&call->lock);
1540 MUTEX_ENTER(&freeSQEList_lock);
1541 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1542 rx_FreeSQEList = sq;
1543 MUTEX_EXIT(&freeSQEList_lock);
1546 clock_GetTime(&call->startTime);
1547 call->state = RX_STATE_ACTIVE;
1548 call->mode = RX_MODE_RECEIVING;
1550 rxi_calltrace(RX_CALL_START, call);
1551 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1552 call->conn->service->servicePort,
1553 call->conn->service->serviceId, call));
1555 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1556 MUTEX_EXIT(&call->lock);
1558 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1563 #else /* RX_ENABLE_LOCKS */
1565 rx_GetCall(tno, cur_service, socketp)
1567 struct rx_service *cur_service;
1568 osi_socket *socketp;
1570 struct rx_serverQueueEntry *sq;
1571 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1572 struct rx_service *service = NULL;
1577 MUTEX_ENTER(&freeSQEList_lock);
1579 if ((sq = rx_FreeSQEList)) {
1580 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1581 MUTEX_EXIT(&freeSQEList_lock);
1582 } else { /* otherwise allocate a new one and return that */
1583 MUTEX_EXIT(&freeSQEList_lock);
1584 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1585 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1586 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1588 MUTEX_ENTER(&sq->lock);
1590 if (cur_service != NULL) {
1591 cur_service->nRequestsRunning--;
1592 if (cur_service->nRequestsRunning < cur_service->minProcs)
1596 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1597 register struct rx_call *tcall, *ncall;
1598 /* Scan for eligible incoming calls. A call is not eligible
1599 * if the maximum number of calls for its service type are
1600 * already executing */
1601 /* One thread will process calls FCFS (to prevent starvation),
1602 * while the other threads may run ahead looking for calls which
1603 * have all their input data available immediately. This helps
1604 * keep threads from blocking, waiting for data from the client. */
1605 choice2 = (struct rx_call *) 0;
1606 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1607 service = tcall->conn->service;
1608 if (QuotaOK(service)) {
1609 if (!tno || !tcall->queue_item_header.next ) {
1610 /* If we're thread 0, then we'll just use
1611 * this call. If we haven't been able to find an optimal
1612 * choice, and we're at the end of the list, then use a
1613 * 2d choice if one has been identified. Otherwise... */
1614 call = (choice2 ? choice2 : tcall);
1615 service = call->conn->service;
1616 } else if (!queue_IsEmpty(&tcall->rq)) {
1617 struct rx_packet *rp;
1618 rp = queue_First(&tcall->rq, rx_packet);
1619 if (rp->header.seq == 1
1620 && (!meltdown_1pkt ||
1621 (rp->header.flags & RX_LAST_PACKET))) {
1623 } else if (rxi_2dchoice && !choice2 &&
1624 !(tcall->flags & RX_CALL_CLEARED) &&
1625 (tcall->rprev > rxi_HardAckRate)) {
1627 } else rxi_md2cnt++;
1637 /* we can't schedule a call if there's no data!!! */
1638 /* send an ack if there's no data, if we're missing the
1639 * first packet, or we're missing something between first
1640 * and last -- there's a "hole" in the incoming data. */
1641 if (queue_IsEmpty(&call->rq) ||
1642 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1643 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1644 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1646 call->flags &= (~RX_CALL_WAIT_PROC);
1647 service->nRequestsRunning++;
1648 /* just started call in minProcs pool, need fewer to maintain
1650 if (service->nRequestsRunning <= service->minProcs)
1654 /* MUTEX_EXIT(&call->lock); */
1657 /* If there are no eligible incoming calls, add this process
1658 * to the idle server queue, to wait for one */
1661 *socketp = OSI_NULLSOCKET;
1663 sq->socketp = socketp;
1664 queue_Append(&rx_idleServerQueue, sq);
1668 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1671 return (struct rx_call *)0;
1674 } while (!(call = sq->newcall) &&
1675 !(socketp && *socketp != OSI_NULLSOCKET));
1677 MUTEX_EXIT(&sq->lock);
1679 MUTEX_ENTER(&freeSQEList_lock);
1680 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1681 rx_FreeSQEList = sq;
1682 MUTEX_EXIT(&freeSQEList_lock);
1685 clock_GetTime(&call->startTime);
1686 call->state = RX_STATE_ACTIVE;
1687 call->mode = RX_MODE_RECEIVING;
1689 rxi_calltrace(RX_CALL_START, call);
1690 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1691 call->conn->service->servicePort,
1692 call->conn->service->serviceId, call));
1694 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1702 #endif /* RX_ENABLE_LOCKS */
1706 /* Establish a procedure to be called when a packet arrives for a
1707 * call. This routine will be called at most once after each call,
1708 * and will also be called if there is an error condition on the or
1709 * the call is complete. Used by multi rx to build a selection
1710 * function which determines which of several calls is likely to be a
1711 * good one to read from.
1712 * NOTE: the way this is currently implemented it is probably only a
1713 * good idea to (1) use it immediately after a newcall (clients only)
1714 * and (2) only use it once. Other uses currently void your warranty
1716 void rx_SetArrivalProc(call, proc, handle, arg)
1717 register struct rx_call *call;
1718 register VOID (*proc)();
1719 register VOID *handle;
1722 call->arrivalProc = proc;
1723 call->arrivalProcHandle = handle;
1724 call->arrivalProcArg = arg;
1727 /* Call is finished (possibly prematurely). Return rc to the peer, if
1728 * appropriate, and return the final error code from the conversation
1731 afs_int32 rx_EndCall(call, rc)
1732 register struct rx_call *call;
1735 register struct rx_connection *conn = call->conn;
1736 register struct rx_service *service;
1737 register struct rx_packet *tp; /* Temporary packet pointer */
1738 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1742 dpf(("rx_EndCall(call %x)\n", call));
1746 MUTEX_ENTER(&call->lock);
1748 if (rc == 0 && call->error == 0) {
1749 call->abortCode = 0;
1750 call->abortCount = 0;
1753 call->arrivalProc = (VOID (*)()) 0;
1754 if (rc && call->error == 0) {
1755 rxi_CallError(call, rc);
1756 /* Send an abort message to the peer if this error code has
1757 * only just been set. If it was set previously, assume the
1758 * peer has already been sent the error code or will request it
1760 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1762 if (conn->type == RX_SERVER_CONNECTION) {
1763 /* Make sure reply or at least dummy reply is sent */
1764 if (call->mode == RX_MODE_RECEIVING) {
1765 rxi_WriteProc(call, 0, 0);
1767 if (call->mode == RX_MODE_SENDING) {
1768 rxi_FlushWrite(call);
1770 service = conn->service;
1771 rxi_calltrace(RX_CALL_END, call);
1772 /* Call goes to hold state until reply packets are acknowledged */
1773 if (call->tfirst + call->nSoftAcked < call->tnext) {
1774 call->state = RX_STATE_HOLD;
1776 call->state = RX_STATE_DALLY;
1777 rxi_ClearTransmitQueue(call, 0);
1778 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1779 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1782 else { /* Client connection */
1784 /* Make sure server receives input packets, in the case where
1785 * no reply arguments are expected */
1786 if ((call->mode == RX_MODE_SENDING)
1787 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1788 (void) rxi_ReadProc(call, &dummy, 1);
1790 /* We need to release the call lock since it's lower than the
1791 * conn_call_lock and we don't want to hold the conn_call_lock
1792 * over the rx_ReadProc call. The conn_call_lock needs to be held
1793 * here for the case where rx_NewCall is perusing the calls on
1794 * the connection structure. We don't want to signal until
1795 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1796 * have checked this call, found it active and by the time it
1797 * goes to sleep, will have missed the signal.
1799 MUTEX_EXIT(&call->lock);
1800 MUTEX_ENTER(&conn->conn_call_lock);
1801 MUTEX_ENTER(&call->lock);
1802 MUTEX_ENTER(&conn->conn_data_lock);
1803 conn->flags |= RX_CONN_BUSY;
1804 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1805 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1806 MUTEX_EXIT(&conn->conn_data_lock);
1807 #ifdef RX_ENABLE_LOCKS
1808 CV_BROADCAST(&conn->conn_call_cv);
1813 #ifdef RX_ENABLE_LOCKS
1815 MUTEX_EXIT(&conn->conn_data_lock);
1817 #endif /* RX_ENABLE_LOCKS */
1818 call->state = RX_STATE_DALLY;
1820 error = call->error;
1822 /* currentPacket, nLeft, and NFree must be zeroed here, because
1823 * ResetCall cannot: ResetCall may be called at splnet(), in the
1824 * kernel version, and may interrupt the macros rx_Read or
1825 * rx_Write, which run at normal priority for efficiency. */
1826 if (call->currentPacket) {
1827 rxi_FreePacket(call->currentPacket);
1828 call->currentPacket = (struct rx_packet *) 0;
1829 call->nLeft = call->nFree = call->curlen = 0;
1832 call->nLeft = call->nFree = call->curlen = 0;
1834 /* Free any packets from the last call to ReadvProc/WritevProc */
1835 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1840 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1841 MUTEX_EXIT(&call->lock);
1842 if (conn->type == RX_CLIENT_CONNECTION) {
1843 MUTEX_EXIT(&conn->conn_call_lock);
1844 conn->flags &= ~RX_CONN_BUSY;
1849 * Map errors to the local host's errno.h format.
1851 error = ntoh_syserr_conv(error);
1855 #if !defined(KERNEL)
1857 /* Call this routine when shutting down a server or client (especially
1858 * clients). This will allow Rx to gracefully garbage collect server
1859 * connections, and reduce the number of retries that a server might
1860 * make to a dead client.
1861 * This is not quite right, since some calls may still be ongoing and
1862 * we can't lock them to destroy them. */
1863 void rx_Finalize() {
1864 register struct rx_connection **conn_ptr, **conn_end;
1868 if (rxinit_status == 1) {
1870 return; /* Already shutdown. */
1872 rxi_DeleteCachedConnections();
1873 if (rx_connHashTable) {
1874 MUTEX_ENTER(&rx_connHashTable_lock);
1875 for (conn_ptr = &rx_connHashTable[0],
1876 conn_end = &rx_connHashTable[rx_hashTableSize];
1877 conn_ptr < conn_end; conn_ptr++) {
1878 struct rx_connection *conn, *next;
1879 for (conn = *conn_ptr; conn; conn = next) {
1881 if (conn->type == RX_CLIENT_CONNECTION) {
1882 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1884 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1885 #ifdef RX_ENABLE_LOCKS
1886 rxi_DestroyConnectionNoLock(conn);
1887 #else /* RX_ENABLE_LOCKS */
1888 rxi_DestroyConnection(conn);
1889 #endif /* RX_ENABLE_LOCKS */
1893 #ifdef RX_ENABLE_LOCKS
1894 while (rx_connCleanup_list) {
1895 struct rx_connection *conn;
1896 conn = rx_connCleanup_list;
1897 rx_connCleanup_list = rx_connCleanup_list->next;
1898 MUTEX_EXIT(&rx_connHashTable_lock);
1899 rxi_CleanupConnection(conn);
1900 MUTEX_ENTER(&rx_connHashTable_lock);
1902 MUTEX_EXIT(&rx_connHashTable_lock);
1903 #endif /* RX_ENABLE_LOCKS */
1912 /* if we wakeup packet waiter too often, can get in loop with two
1913 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1915 rxi_PacketsUnWait() {
1917 if (!rx_waitingForPackets) {
1921 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1922 return; /* still over quota */
1925 rx_waitingForPackets = 0;
1926 #ifdef RX_ENABLE_LOCKS
1927 CV_BROADCAST(&rx_waitingForPackets_cv);
1929 osi_rxWakeup(&rx_waitingForPackets);
1935 /* ------------------Internal interfaces------------------------- */
1937 /* Return this process's service structure for the
1938 * specified socket and service */
1939 struct rx_service *rxi_FindService(socket, serviceId)
1940 register osi_socket socket;
1941 register u_short serviceId;
1943 register struct rx_service **sp;
1944 for (sp = &rx_services[0]; *sp; sp++) {
1945 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1951 /* Allocate a call structure, for the indicated channel of the
1952 * supplied connection. The mode and state of the call must be set by
1953 * the caller. Returns the call with mutex locked. */
1954 struct rx_call *rxi_NewCall(conn, channel)
1955 register struct rx_connection *conn;
1956 register int channel;
1958 register struct rx_call *call;
1959 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1960 register struct rx_call *cp; /* Call pointer temp */
1961 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1962 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1964 /* Grab an existing call structure, or allocate a new one.
1965 * Existing call structures are assumed to have been left reset by
1967 MUTEX_ENTER(&rx_freeCallQueue_lock);
1969 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1971 * EXCEPT that the TQ might not yet be cleared out.
1972 * Skip over those with in-use TQs.
1975 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1976 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1982 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1983 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1984 call = queue_First(&rx_freeCallQueue, rx_call);
1985 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1987 MUTEX_ENTER(&rx_stats_mutex);
1988 rx_stats.nFreeCallStructs--;
1989 MUTEX_EXIT(&rx_stats_mutex);
1990 MUTEX_EXIT(&rx_freeCallQueue_lock);
1991 MUTEX_ENTER(&call->lock);
1992 CLEAR_CALL_QUEUE_LOCK(call);
1993 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1994 /* Now, if TQ wasn't cleared earlier, do it now. */
1995 if (call->flags & RX_CALL_TQ_CLEARME) {
1996 rxi_ClearTransmitQueue(call, 0);
1997 queue_Init(&call->tq);
1999 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2000 /* Bind the call to its connection structure */
2002 rxi_ResetCall(call, 1);
2005 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
2007 MUTEX_EXIT(&rx_freeCallQueue_lock);
2008 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
2009 MUTEX_ENTER(&call->lock);
2010 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
2011 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
2012 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
2014 MUTEX_ENTER(&rx_stats_mutex);
2015 rx_stats.nCallStructs++;
2016 MUTEX_EXIT(&rx_stats_mutex);
2017 /* Initialize once-only items */
2018 queue_Init(&call->tq);
2019 queue_Init(&call->rq);
2020 queue_Init(&call->iovq);
2021 /* Bind the call to its connection structure (prereq for reset) */
2023 rxi_ResetCall(call, 1);
2025 call->channel = channel;
2026 call->callNumber = &conn->callNumber[channel];
2027 /* Note that the next expected call number is retained (in
2028 * conn->callNumber[i]), even if we reallocate the call structure
2030 conn->call[channel] = call;
2031 /* if the channel's never been used (== 0), we should start at 1, otherwise
2032 the call number is valid from the last time this channel was used */
2033 if (*call->callNumber == 0) *call->callNumber = 1;
2038 /* A call has been inactive long enough that so we can throw away
2039 * state, including the call structure, which is placed on the call
2041 * Call is locked upon entry.
2043 #ifdef RX_ENABLE_LOCKS
2044 void rxi_FreeCall(call, haveCTLock)
2045 int haveCTLock; /* Set if called from rxi_ReapConnections */
2046 #else /* RX_ENABLE_LOCKS */
2047 void rxi_FreeCall(call)
2048 #endif /* RX_ENABLE_LOCKS */
2049 register struct rx_call *call;
2051 register int channel = call->channel;
2052 register struct rx_connection *conn = call->conn;
2055 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2056 (*call->callNumber)++;
2057 rxi_ResetCall(call, 0);
2058 call->conn->call[channel] = (struct rx_call *) 0;
2060 MUTEX_ENTER(&rx_freeCallQueue_lock);
2061 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2062 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2063 /* A call may be free even though its transmit queue is still in use.
2064 * Since we search the call list from head to tail, put busy calls at
2065 * the head of the list, and idle calls at the tail.
2067 if (call->flags & RX_CALL_TQ_BUSY)
2068 queue_Prepend(&rx_freeCallQueue, call);
2070 queue_Append(&rx_freeCallQueue, call);
2071 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2072 queue_Append(&rx_freeCallQueue, call);
2073 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2074 MUTEX_ENTER(&rx_stats_mutex);
2075 rx_stats.nFreeCallStructs++;
2076 MUTEX_EXIT(&rx_stats_mutex);
2078 MUTEX_EXIT(&rx_freeCallQueue_lock);
2080 /* Destroy the connection if it was previously slated for
2081 * destruction, i.e. the Rx client code previously called
2082 * rx_DestroyConnection (client connections), or
2083 * rxi_ReapConnections called the same routine (server
2084 * connections). Only do this, however, if there are no
2085 * outstanding calls. Note that for fine grain locking, there appears
2086 * to be a deadlock in that rxi_FreeCall has a call locked and
2087 * DestroyConnectionNoLock locks each call in the conn. But note a
2088 * few lines up where we have removed this call from the conn.
2089 * If someone else destroys a connection, they either have no
2090 * call lock held or are going through this section of code.
2092 if (conn->flags & RX_CONN_DESTROY_ME) {
2093 MUTEX_ENTER(&conn->conn_data_lock);
2095 MUTEX_EXIT(&conn->conn_data_lock);
2096 #ifdef RX_ENABLE_LOCKS
2098 rxi_DestroyConnectionNoLock(conn);
2100 rxi_DestroyConnection(conn);
2101 #else /* RX_ENABLE_LOCKS */
2102 rxi_DestroyConnection(conn);
2103 #endif /* RX_ENABLE_LOCKS */
2107 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2108 char *rxi_Alloc(size)
2109 register size_t size;
2113 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2114 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2117 int glockOwner = ISAFS_GLOCK();
2121 MUTEX_ENTER(&rx_stats_mutex);
2122 rxi_Alloccnt++; rxi_Allocsize += size;
2123 MUTEX_EXIT(&rx_stats_mutex);
2124 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2125 if (size > AFS_SMALLOCSIZ) {
2126 p = (char *) osi_AllocMediumSpace(size);
2128 p = (char *) osi_AllocSmall(size, 1);
2129 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2134 p = (char *) osi_Alloc(size);
2136 if (!p) osi_Panic("rxi_Alloc error");
2141 void rxi_Free(addr, size)
2143 register size_t size;
2145 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2146 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2149 int glockOwner = ISAFS_GLOCK();
2153 MUTEX_ENTER(&rx_stats_mutex);
2154 rxi_Alloccnt--; rxi_Allocsize -= size;
2155 MUTEX_EXIT(&rx_stats_mutex);
2156 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2157 if (size > AFS_SMALLOCSIZ)
2158 osi_FreeMediumSpace(addr);
2160 osi_FreeSmall(addr);
2161 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2166 osi_Free(addr, size);
2170 /* Find the peer process represented by the supplied (host,port)
2171 * combination. If there is no appropriate active peer structure, a
2172 * new one will be allocated and initialized
2173 * The origPeer, if set, is a pointer to a peer structure on which the
2174 * refcount will be be decremented. This is used to replace the peer
2175 * structure hanging off a connection structure */
2176 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2177 register afs_uint32 host;
2178 register u_short port;
2179 struct rx_peer *origPeer;
2182 register struct rx_peer *pp;
2184 hashIndex = PEER_HASH(host, port);
2185 MUTEX_ENTER(&rx_peerHashTable_lock);
2186 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2187 if ((pp->host == host) && (pp->port == port)) break;
2191 pp = rxi_AllocPeer(); /* This bzero's *pp */
2192 pp->host = host; /* set here or in InitPeerParams is zero */
2194 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2195 queue_Init(&pp->congestionQueue);
2196 queue_Init(&pp->rpcStats);
2197 pp->next = rx_peerHashTable[hashIndex];
2198 rx_peerHashTable[hashIndex] = pp;
2199 rxi_InitPeerParams(pp);
2200 MUTEX_ENTER(&rx_stats_mutex);
2201 rx_stats.nPeerStructs++;
2202 MUTEX_EXIT(&rx_stats_mutex);
2209 origPeer->refCount--;
2210 MUTEX_EXIT(&rx_peerHashTable_lock);
2215 /* Find the connection at (host, port) started at epoch, and with the
2216 * given connection id. Creates the server connection if necessary.
2217 * The type specifies whether a client connection or a server
2218 * connection is desired. In both cases, (host, port) specify the
2219 * peer's (host, pair) pair. Client connections are not made
2220 * automatically by this routine. The parameter socket gives the
2221 * socket descriptor on which the packet was received. This is used,
2222 * in the case of server connections, to check that *new* connections
2223 * come via a valid (port, serviceId). Finally, the securityIndex
2224 * parameter must match the existing index for the connection. If a
2225 * server connection is created, it will be created using the supplied
2226 * index, if the index is valid for this service */
2227 struct rx_connection *
2228 rxi_FindConnection(socket, host, port, serviceId, cid,
2229 epoch, type, securityIndex)
2231 register afs_int32 host;
2232 register u_short port;
2237 u_int securityIndex;
2239 int hashindex, flag;
2240 register struct rx_connection *conn;
2241 struct rx_peer *peer;
2242 hashindex = CONN_HASH(host, port, cid, epoch, type);
2243 MUTEX_ENTER(&rx_connHashTable_lock);
2244 rxLastConn ? (conn = rxLastConn, flag = 0) :
2245 (conn = rx_connHashTable[hashindex], flag = 1);
2247 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2248 && (epoch == conn->epoch)) {
2249 register struct rx_peer *pp = conn->peer;
2250 if (securityIndex != conn->securityIndex) {
2251 /* this isn't supposed to happen, but someone could forge a packet
2252 like this, and there seems to be some CM bug that makes this
2253 happen from time to time -- in which case, the fileserver
2255 MUTEX_EXIT(&rx_connHashTable_lock);
2256 return (struct rx_connection *) 0;
2258 /* epoch's high order bits mean route for security reasons only on
2259 * the cid, not the host and port fields.
2261 if (conn->epoch & 0x80000000) break;
2262 if (((type == RX_CLIENT_CONNECTION)
2263 || (pp->host == host)) && (pp->port == port))
2268 /* the connection rxLastConn that was used the last time is not the
2269 ** one we are looking for now. Hence, start searching in the hash */
2271 conn = rx_connHashTable[hashindex];
2277 struct rx_service *service;
2278 if (type == RX_CLIENT_CONNECTION) {
2279 MUTEX_EXIT(&rx_connHashTable_lock);
2280 return (struct rx_connection *) 0;
2282 service = rxi_FindService(socket, serviceId);
2283 if (!service || (securityIndex >= service->nSecurityObjects)
2284 || (service->securityObjects[securityIndex] == 0)) {
2285 MUTEX_EXIT(&rx_connHashTable_lock);
2286 return (struct rx_connection *) 0;
2288 conn = rxi_AllocConnection(); /* This bzero's the connection */
2289 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2291 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2293 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2294 conn->next = rx_connHashTable[hashindex];
2295 rx_connHashTable[hashindex] = conn;
2296 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2297 conn->type = RX_SERVER_CONNECTION;
2298 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2299 conn->epoch = epoch;
2300 conn->cid = cid & RX_CIDMASK;
2301 /* conn->serial = conn->lastSerial = 0; */
2302 /* conn->timeout = 0; */
2303 conn->ackRate = RX_FAST_ACK_RATE;
2304 conn->service = service;
2305 conn->serviceId = serviceId;
2306 conn->securityIndex = securityIndex;
2307 conn->securityObject = service->securityObjects[securityIndex];
2308 conn->nSpecific = 0;
2309 conn->specific = NULL;
2310 rx_SetConnDeadTime(conn, service->connDeadTime);
2311 /* Notify security object of the new connection */
2312 RXS_NewConnection(conn->securityObject, conn);
2313 /* XXXX Connection timeout? */
2314 if (service->newConnProc) (*service->newConnProc)(conn);
2315 MUTEX_ENTER(&rx_stats_mutex);
2316 rx_stats.nServerConns++;
2317 MUTEX_EXIT(&rx_stats_mutex);
2321 /* Ensure that the peer structure is set up in such a way that
2322 ** replies in this connection go back to that remote interface
2323 ** from which the last packet was sent out. In case, this packet's
2324 ** source IP address does not match the peer struct for this conn,
2325 ** then drop the refCount on conn->peer and get a new peer structure.
2326 ** We can check the host,port field in the peer structure without the
2327 ** rx_peerHashTable_lock because the peer structure has its refCount
2328 ** incremented and the only time the host,port in the peer struct gets
2329 ** updated is when the peer structure is created.
2331 if (conn->peer->host == host )
2332 peer = conn->peer; /* no change to the peer structure */
2334 peer = rxi_FindPeer(host, port, conn->peer, 1);
2337 MUTEX_ENTER(&conn->conn_data_lock);
2340 MUTEX_EXIT(&conn->conn_data_lock);
2342 rxLastConn = conn; /* store this connection as the last conn used */
2343 MUTEX_EXIT(&rx_connHashTable_lock);
2347 /* There are two packet tracing routines available for testing and monitoring
2348 * Rx. One is called just after every packet is received and the other is
2349 * called just before every packet is sent. Received packets, have had their
2350 * headers decoded, and packets to be sent have not yet had their headers
2351 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2352 * containing the network address. Both can be modified. The return value, if
2353 * non-zero, indicates that the packet should be dropped. */
2355 int (*rx_justReceived)() = 0;
2356 int (*rx_almostSent)() = 0;
2358 /* A packet has been received off the interface. Np is the packet, socket is
2359 * the socket number it was received from (useful in determining which service
2360 * this packet corresponds to), and (host, port) reflect the host,port of the
2361 * sender. This call returns the packet to the caller if it is finished with
2362 * it, rather than de-allocating it, just as a small performance hack */
2364 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2365 register struct rx_packet *np;
2370 struct rx_call **newcallp;
2372 register struct rx_call *call;
2373 register struct rx_connection *conn;
2375 afs_uint32 currentCallNumber;
2381 struct rx_packet *tnp;
2384 /* We don't print out the packet until now because (1) the time may not be
2385 * accurate enough until now in the lwp implementation (rx_Listener only gets
2386 * the time after the packet is read) and (2) from a protocol point of view,
2387 * this is the first time the packet has been seen */
2388 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2389 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2390 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2391 np->header.serial, packetType, host, port, np->header.serviceId,
2392 np->header.epoch, np->header.cid, np->header.callNumber,
2393 np->header.seq, np->header.flags, np));
2396 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2397 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2400 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2401 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2404 /* If an input tracer function is defined, call it with the packet and
2405 * network address. Note this function may modify its arguments. */
2406 if (rx_justReceived) {
2407 struct sockaddr_in addr;
2409 addr.sin_family = AF_INET;
2410 addr.sin_port = port;
2411 addr.sin_addr.s_addr = host;
2412 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2413 addr.sin_len = sizeof(addr);
2414 #endif /* AFS_OSF_ENV */
2415 drop = (*rx_justReceived) (np, &addr);
2416 /* drop packet if return value is non-zero */
2417 if (drop) return np;
2418 port = addr.sin_port; /* in case fcn changed addr */
2419 host = addr.sin_addr.s_addr;
2423 /* If packet was not sent by the client, then *we* must be the client */
2424 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2425 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2427 /* Find the connection (or fabricate one, if we're the server & if
2428 * necessary) associated with this packet */
2429 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2430 np->header.cid, np->header.epoch, type,
2431 np->header.securityIndex);
2434 /* If no connection found or fabricated, just ignore the packet.
2435 * (An argument could be made for sending an abort packet for
2440 MUTEX_ENTER(&conn->conn_data_lock);
2441 if (conn->maxSerial < np->header.serial)
2442 conn->maxSerial = np->header.serial;
2443 MUTEX_EXIT(&conn->conn_data_lock);
2445 /* If the connection is in an error state, send an abort packet and ignore
2446 * the incoming packet */
2448 /* Don't respond to an abort packet--we don't want loops! */
2449 MUTEX_ENTER(&conn->conn_data_lock);
2450 if (np->header.type != RX_PACKET_TYPE_ABORT)
2451 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2453 MUTEX_EXIT(&conn->conn_data_lock);
2457 /* Check for connection-only requests (i.e. not call specific). */
2458 if (np->header.callNumber == 0) {
2459 switch (np->header.type) {
2460 case RX_PACKET_TYPE_ABORT:
2461 /* What if the supplied error is zero? */
2462 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2463 MUTEX_ENTER(&conn->conn_data_lock);
2465 MUTEX_EXIT(&conn->conn_data_lock);
2467 case RX_PACKET_TYPE_CHALLENGE:
2468 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2469 MUTEX_ENTER(&conn->conn_data_lock);
2471 MUTEX_EXIT(&conn->conn_data_lock);
2473 case RX_PACKET_TYPE_RESPONSE:
2474 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2475 MUTEX_ENTER(&conn->conn_data_lock);
2477 MUTEX_EXIT(&conn->conn_data_lock);
2479 case RX_PACKET_TYPE_PARAMS:
2480 case RX_PACKET_TYPE_PARAMS+1:
2481 case RX_PACKET_TYPE_PARAMS+2:
2482 /* ignore these packet types for now */
2483 MUTEX_ENTER(&conn->conn_data_lock);
2485 MUTEX_EXIT(&conn->conn_data_lock);
2490 /* Should not reach here, unless the peer is broken: send an
2492 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2493 MUTEX_ENTER(&conn->conn_data_lock);
2494 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2496 MUTEX_EXIT(&conn->conn_data_lock);
2501 channel = np->header.cid & RX_CHANNELMASK;
2502 call = conn->call[channel];
2503 #ifdef RX_ENABLE_LOCKS
2505 MUTEX_ENTER(&call->lock);
2506 /* Test to see if call struct is still attached to conn. */
2507 if (call != conn->call[channel]) {
2509 MUTEX_EXIT(&call->lock);
2510 if (type == RX_SERVER_CONNECTION) {
2511 call = conn->call[channel];
2512 /* If we started with no call attached and there is one now,
2513 * another thread is also running this routine and has gotten
2514 * the connection channel. We should drop this packet in the tests
2515 * below. If there was a call on this connection and it's now
2516 * gone, then we'll be making a new call below.
2517 * If there was previously a call and it's now different then
2518 * the old call was freed and another thread running this routine
2519 * has created a call on this channel. One of these two threads
2520 * has a packet for the old call and the code below handles those
2524 MUTEX_ENTER(&call->lock);
2527 /* This packet can't be for this call. If the new call address is
2528 * 0 then no call is running on this channel. If there is a call
2529 * then, since this is a client connection we're getting data for
2530 * it must be for the previous call.
2532 MUTEX_ENTER(&rx_stats_mutex);
2533 rx_stats.spuriousPacketsRead++;
2534 MUTEX_EXIT(&rx_stats_mutex);
2535 MUTEX_ENTER(&conn->conn_data_lock);
2537 MUTEX_EXIT(&conn->conn_data_lock);
2542 currentCallNumber = conn->callNumber[channel];
2544 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2545 if (np->header.callNumber < currentCallNumber) {
2546 MUTEX_ENTER(&rx_stats_mutex);
2547 rx_stats.spuriousPacketsRead++;
2548 MUTEX_EXIT(&rx_stats_mutex);
2549 #ifdef RX_ENABLE_LOCKS
2551 MUTEX_EXIT(&call->lock);
2553 MUTEX_ENTER(&conn->conn_data_lock);
2555 MUTEX_EXIT(&conn->conn_data_lock);
2559 MUTEX_ENTER(&conn->conn_call_lock);
2560 call = rxi_NewCall(conn, channel);
2561 MUTEX_EXIT(&conn->conn_call_lock);
2562 *call->callNumber = np->header.callNumber;
2563 call->state = RX_STATE_PRECALL;
2564 clock_GetTime(&call->queueTime);
2565 hzero(call->bytesSent);
2566 hzero(call->bytesRcvd);
2567 rxi_KeepAliveOn(call);
2569 else if (np->header.callNumber != currentCallNumber) {
2570 /* Wait until the transmit queue is idle before deciding
2571 * whether to reset the current call. Chances are that the
2572 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2575 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2576 while ((call->state == RX_STATE_ACTIVE) &&
2577 (call->flags & RX_CALL_TQ_BUSY)) {
2578 call->flags |= RX_CALL_TQ_WAIT;
2579 #ifdef RX_ENABLE_LOCKS
2580 CV_WAIT(&call->cv_tq, &call->lock);
2581 #else /* RX_ENABLE_LOCKS */
2582 osi_rxSleep(&call->tq);
2583 #endif /* RX_ENABLE_LOCKS */
2585 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2586 /* If the new call cannot be taken right now send a busy and set
2587 * the error condition in this call, so that it terminates as
2588 * quickly as possible */
2589 if (call->state == RX_STATE_ACTIVE) {
2590 struct rx_packet *tp;
2592 rxi_CallError(call, RX_CALL_DEAD);
2593 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2594 MUTEX_EXIT(&call->lock);
2595 MUTEX_ENTER(&conn->conn_data_lock);
2597 MUTEX_EXIT(&conn->conn_data_lock);
2600 rxi_ResetCall(call, 0);
2601 *call->callNumber = np->header.callNumber;
2602 call->state = RX_STATE_PRECALL;
2603 clock_GetTime(&call->queueTime);
2604 hzero(call->bytesSent);
2605 hzero(call->bytesRcvd);
2607 * If the number of queued calls exceeds the overload
2608 * threshold then abort this call.
2610 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2611 struct rx_packet *tp;
2613 rxi_CallError(call, rx_BusyError);
2614 tp = rxi_SendCallAbort(call, np, 1, 0);
2615 MUTEX_EXIT(&call->lock);
2616 MUTEX_ENTER(&conn->conn_data_lock);
2618 MUTEX_EXIT(&conn->conn_data_lock);
2621 rxi_KeepAliveOn(call);
2624 /* Continuing call; do nothing here. */
2626 } else { /* we're the client */
2627 /* Ignore all incoming acknowledgements for calls in DALLY state */
2628 if ( call && (call->state == RX_STATE_DALLY)
2629 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2630 MUTEX_ENTER(&rx_stats_mutex);
2631 rx_stats.ignorePacketDally++;
2632 MUTEX_EXIT(&rx_stats_mutex);
2633 #ifdef RX_ENABLE_LOCKS
2635 MUTEX_EXIT(&call->lock);
2638 MUTEX_ENTER(&conn->conn_data_lock);
2640 MUTEX_EXIT(&conn->conn_data_lock);
2644 /* Ignore anything that's not relevant to the current call. If there
2645 * isn't a current call, then no packet is relevant. */
2646 if (!call || (np->header.callNumber != currentCallNumber)) {
2647 MUTEX_ENTER(&rx_stats_mutex);
2648 rx_stats.spuriousPacketsRead++;
2649 MUTEX_EXIT(&rx_stats_mutex);
2650 #ifdef RX_ENABLE_LOCKS
2652 MUTEX_EXIT(&call->lock);
2655 MUTEX_ENTER(&conn->conn_data_lock);
2657 MUTEX_EXIT(&conn->conn_data_lock);
2660 /* If the service security object index stamped in the packet does not
2661 * match the connection's security index, ignore the packet */
2662 if (np->header.securityIndex != conn->securityIndex) {
2663 #ifdef RX_ENABLE_LOCKS
2664 MUTEX_EXIT(&call->lock);
2666 MUTEX_ENTER(&conn->conn_data_lock);
2668 MUTEX_EXIT(&conn->conn_data_lock);
2672 /* If we're receiving the response, then all transmit packets are
2673 * implicitly acknowledged. Get rid of them. */
2674 if (np->header.type == RX_PACKET_TYPE_DATA) {
2675 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2676 /* XXX Hack. Because we must release the global rx lock when
2677 * sending packets (osi_NetSend) we drop all acks while we're
2678 * traversing the tq in rxi_Start sending packets out because
2679 * packets may move to the freePacketQueue as result of being here!
2680 * So we drop these packets until we're safely out of the
2681 * traversing. Really ugly!
2682 * For fine grain RX locking, we set the acked field in the
2683 * packets and let rxi_Start remove them from the transmit queue.
2685 if (call->flags & RX_CALL_TQ_BUSY) {
2686 #ifdef RX_ENABLE_LOCKS
2687 rxi_SetAcksInTransmitQueue(call);
2690 return np; /* xmitting; drop packet */
2694 rxi_ClearTransmitQueue(call, 0);
2696 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2697 rxi_ClearTransmitQueue(call, 0);
2698 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2700 if (np->header.type == RX_PACKET_TYPE_ACK) {
2701 /* now check to see if this is an ack packet acknowledging that the
2702 * server actually *lost* some hard-acked data. If this happens we
2703 * ignore this packet, as it may indicate that the server restarted in
2704 * the middle of a call. It is also possible that this is an old ack
2705 * packet. We don't abort the connection in this case, because this
2706 * *might* just be an old ack packet. The right way to detect a server
2707 * restart in the midst of a call is to notice that the server epoch
2709 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2710 * XXX unacknowledged. I think that this is off-by-one, but
2711 * XXX I don't dare change it just yet, since it will
2712 * XXX interact badly with the server-restart detection
2713 * XXX code in receiveackpacket. */
2714 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2715 MUTEX_ENTER(&rx_stats_mutex);
2716 rx_stats.spuriousPacketsRead++;
2717 MUTEX_EXIT(&rx_stats_mutex);
2718 MUTEX_EXIT(&call->lock);
2719 MUTEX_ENTER(&conn->conn_data_lock);
2721 MUTEX_EXIT(&conn->conn_data_lock);
2725 } /* else not a data packet */
2728 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2729 /* Set remote user defined status from packet */
2730 call->remoteStatus = np->header.userStatus;
2732 /* Note the gap between the expected next packet and the actual
2733 * packet that arrived, when the new packet has a smaller serial number
2734 * than expected. Rioses frequently reorder packets all by themselves,
2735 * so this will be quite important with very large window sizes.
2736 * Skew is checked against 0 here to avoid any dependence on the type of
2737 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2739 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2740 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2741 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2743 MUTEX_ENTER(&conn->conn_data_lock);
2744 skew = conn->lastSerial - np->header.serial;
2745 conn->lastSerial = np->header.serial;
2746 MUTEX_EXIT(&conn->conn_data_lock);
2748 register struct rx_peer *peer;
2750 if (skew > peer->inPacketSkew) {
2751 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2752 peer->inPacketSkew = skew;
2756 /* Now do packet type-specific processing */
2757 switch (np->header.type) {
2758 case RX_PACKET_TYPE_DATA:
2759 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2762 case RX_PACKET_TYPE_ACK:
2763 /* Respond immediately to ack packets requesting acknowledgement
2765 if (np->header.flags & RX_REQUEST_ACK) {
2766 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2767 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2769 np = rxi_ReceiveAckPacket(call, np, 1);
2771 case RX_PACKET_TYPE_ABORT:
2772 /* An abort packet: reset the connection, passing the error up to
2774 /* What if error is zero? */
2775 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2777 case RX_PACKET_TYPE_BUSY:
2780 case RX_PACKET_TYPE_ACKALL:
2781 /* All packets acknowledged, so we can drop all packets previously
2782 * readied for sending */
2783 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2784 /* XXX Hack. We because we can't release the global rx lock when
2785 * sending packets (osi_NetSend) we drop all ack pkts while we're
2786 * traversing the tq in rxi_Start sending packets out because
2787 * packets may move to the freePacketQueue as result of being
2788 * here! So we drop these packets until we're safely out of the
2789 * traversing. Really ugly!
2790 * For fine grain RX locking, we set the acked field in the packets
2791 * and let rxi_Start remove the packets from the transmit queue.
2793 if (call->flags & RX_CALL_TQ_BUSY) {
2794 #ifdef RX_ENABLE_LOCKS
2795 rxi_SetAcksInTransmitQueue(call);
2797 #else /* RX_ENABLE_LOCKS */
2799 return np; /* xmitting; drop packet */
2800 #endif /* RX_ENABLE_LOCKS */
2802 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2803 rxi_ClearTransmitQueue(call, 0);
2806 /* Should not reach here, unless the peer is broken: send an abort
2808 rxi_CallError(call, RX_PROTOCOL_ERROR);
2809 np = rxi_SendCallAbort(call, np, 1, 0);
2812 /* Note when this last legitimate packet was received, for keep-alive
2813 * processing. Note, we delay getting the time until now in the hope that
2814 * the packet will be delivered to the user before any get time is required
2815 * (if not, then the time won't actually be re-evaluated here). */
2816 call->lastReceiveTime = clock_Sec();
2817 MUTEX_EXIT(&call->lock);
2818 MUTEX_ENTER(&conn->conn_data_lock);
2820 MUTEX_EXIT(&conn->conn_data_lock);
2824 /* return true if this is an "interesting" connection from the point of view
2825 of someone trying to debug the system */
2826 int rxi_IsConnInteresting(struct rx_connection *aconn)
2829 register struct rx_call *tcall;
2831 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2833 for(i=0;i<RX_MAXCALLS;i++) {
2834 tcall = aconn->call[i];
2836 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2838 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2846 /* if this is one of the last few packets AND it wouldn't be used by the
2847 receiving call to immediately satisfy a read request, then drop it on
2848 the floor, since accepting it might prevent a lock-holding thread from
2849 making progress in its reading. If a call has been cleared while in
2850 the precall state then ignore all subsequent packets until the call
2851 is assigned to a thread. */
2853 static TooLow(ap, acall)
2854 struct rx_call *acall;
2855 struct rx_packet *ap; {
2857 MUTEX_ENTER(&rx_stats_mutex);
2858 if (((ap->header.seq != 1) &&
2859 (acall->flags & RX_CALL_CLEARED) &&
2860 (acall->state == RX_STATE_PRECALL)) ||
2861 ((rx_nFreePackets < rxi_dataQuota+2) &&
2862 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2863 && (acall->flags & RX_CALL_READER_WAIT)))) {
2866 MUTEX_EXIT(&rx_stats_mutex);
2871 static void rxi_CheckReachEvent(event, conn, acall)
2872 struct rxevent *event;
2873 struct rx_connection *conn;
2874 struct rx_call *acall;
2876 struct rx_call *call = acall;
2880 MUTEX_ENTER(&conn->conn_call_lock);
2881 MUTEX_ENTER(&conn->conn_data_lock);
2882 conn->checkReachEvent = (struct rxevent *) 0;
2883 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2884 if (event) conn->refCount--;
2885 MUTEX_EXIT(&conn->conn_data_lock);
2889 for (i=0; i<RX_MAXCALLS; i++) {
2890 struct rx_call *tc = conn->call[i];
2891 if (tc && tc->state == RX_STATE_PRECALL) {
2898 if (call != acall) MUTEX_ENTER(&call->lock);
2899 rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
2900 if (call != acall) MUTEX_EXIT(&call->lock);
2902 MUTEX_ENTER(&conn->conn_data_lock);
2904 MUTEX_EXIT(&conn->conn_data_lock);
2905 clock_GetTime(&when);
2906 when.sec += RX_CHECKREACH_TIMEOUT;
2907 conn->checkReachEvent =
2908 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2911 MUTEX_EXIT(&conn->conn_call_lock);
2914 static int rxi_CheckConnReach(conn, call)
2915 struct rx_connection *conn;
2916 struct rx_call *call;
2918 struct rx_service *service = conn->service;
2919 struct rx_peer *peer = conn->peer;
2920 afs_uint32 now, lastReach;
2922 MUTEX_ENTER(&rx_serverPool_lock);
2923 if (service->nRequestsRunning <= service->maxProcs/2) {
2924 MUTEX_EXIT(&rx_serverPool_lock);
2927 MUTEX_EXIT(&rx_serverPool_lock);
2930 MUTEX_ENTER(&peer->peer_lock);
2931 lastReach = peer->lastReachTime;
2932 MUTEX_EXIT(&peer->peer_lock);
2933 if (now - lastReach < RX_CHECKREACH_TTL)
2936 MUTEX_ENTER(&conn->conn_data_lock);
2937 if (conn->flags & RX_CONN_ATTACHWAIT) {
2938 MUTEX_EXIT(&conn->conn_data_lock);
2941 conn->flags |= RX_CONN_ATTACHWAIT;
2942 MUTEX_EXIT(&conn->conn_data_lock);
2943 if (!conn->checkReachEvent)
2944 rxi_CheckReachEvent((struct rxevent *)0, conn, call);
2949 /* try to attach call, if authentication is complete */
2950 static void TryAttach(acall, socket, tnop, newcallp, reachOverride)
2951 register struct rx_call *acall;
2952 register osi_socket socket;
2954 register struct rx_call **newcallp;
2957 struct rx_connection *conn = acall->conn;
2959 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2960 /* Don't attach until we have any req'd. authentication. */
2961 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2962 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2963 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2964 /* Note: this does not necessarily succeed; there
2965 * may not any proc available
2969 rxi_ChallengeOn(acall->conn);
2974 /* A data packet has been received off the interface. This packet is
2975 * appropriate to the call (the call is in the right state, etc.). This
2976 * routine can return a packet to the caller, for re-use */
2978 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2979 port, tnop, newcallp)
2980 register struct rx_call *call;
2981 register struct rx_packet *np;
2987 struct rx_call **newcallp;
2993 afs_uint32 seq, serial, flags;
2995 struct rx_packet *tnp;
2997 MUTEX_ENTER(&rx_stats_mutex);
2998 rx_stats.dataPacketsRead++;
2999 MUTEX_EXIT(&rx_stats_mutex);
3002 /* If there are no packet buffers, drop this new packet, unless we can find
3003 * packet buffers from inactive calls */
3005 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
3006 MUTEX_ENTER(&rx_freePktQ_lock);
3007 rxi_NeedMorePackets = TRUE;
3008 MUTEX_EXIT(&rx_freePktQ_lock);
3009 MUTEX_ENTER(&rx_stats_mutex);
3010 rx_stats.noPacketBuffersOnRead++;
3011 MUTEX_EXIT(&rx_stats_mutex);
3012 call->rprev = np->header.serial;
3013 rxi_calltrace(RX_TRACE_DROP, call);
3014 dpf (("packet %x dropped on receipt - quota problems", np));
3016 rxi_ClearReceiveQueue(call);
3017 clock_GetTime(&when);
3018 clock_Add(&when, &rx_softAckDelay);
3019 if (!call->delayedAckEvent ||
3020 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3021 rxevent_Cancel(call->delayedAckEvent, call,
3022 RX_CALL_REFCOUNT_DELAY);
3023 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3024 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3027 /* we've damaged this call already, might as well do it in. */
3033 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
3034 * packet is one of several packets transmitted as a single
3035 * datagram. Do not send any soft or hard acks until all packets
3036 * in a jumbogram have been processed. Send negative acks right away.
3038 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
3039 /* tnp is non-null when there are more packets in the
3040 * current jumbo gram */
3047 seq = np->header.seq;
3048 serial = np->header.serial;
3049 flags = np->header.flags;
3051 /* If the call is in an error state, send an abort message */
3053 return rxi_SendCallAbort(call, np, istack, 0);
3055 /* The RX_JUMBO_PACKET is set in all but the last packet in each
3056 * AFS 3.5 jumbogram. */
3057 if (flags & RX_JUMBO_PACKET) {
3058 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3063 if (np->header.spare != 0) {
3064 MUTEX_ENTER(&call->conn->conn_data_lock);
3065 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3066 MUTEX_EXIT(&call->conn->conn_data_lock);
3069 /* The usual case is that this is the expected next packet */
3070 if (seq == call->rnext) {
3072 /* Check to make sure it is not a duplicate of one already queued */
3073 if (queue_IsNotEmpty(&call->rq)
3074 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3075 MUTEX_ENTER(&rx_stats_mutex);
3076 rx_stats.dupPacketsRead++;
3077 MUTEX_EXIT(&rx_stats_mutex);
3078 dpf (("packet %x dropped on receipt - duplicate", np));
3079 rxevent_Cancel(call->delayedAckEvent, call,
3080 RX_CALL_REFCOUNT_DELAY);
3081 np = rxi_SendAck(call, np, seq, serial,
3082 flags, RX_ACK_DUPLICATE, istack);
3088 /* It's the next packet. Stick it on the receive queue
3089 * for this call. Set newPackets to make sure we wake
3090 * the reader once all packets have been processed */
3091 queue_Prepend(&call->rq, np);
3093 np = NULL; /* We can't use this anymore */
3096 /* If an ack is requested then set a flag to make sure we
3097 * send an acknowledgement for this packet */
3098 if (flags & RX_REQUEST_ACK) {
3102 /* Keep track of whether we have received the last packet */
3103 if (flags & RX_LAST_PACKET) {
3104 call->flags |= RX_CALL_HAVE_LAST;
3108 /* Check whether we have all of the packets for this call */
3109 if (call->flags & RX_CALL_HAVE_LAST) {
3110 afs_uint32 tseq; /* temporary sequence number */
3111 struct rx_packet *tp; /* Temporary packet pointer */
3112 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3114 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3115 if (tseq != tp->header.seq)
3117 if (tp->header.flags & RX_LAST_PACKET) {
3118 call->flags |= RX_CALL_RECEIVE_DONE;
3125 /* Provide asynchronous notification for those who want it
3126 * (e.g. multi rx) */
3127 if (call->arrivalProc) {
3128 (*call->arrivalProc)(call, call->arrivalProcHandle,
3129 call->arrivalProcArg);
3130 call->arrivalProc = (VOID (*)()) 0;
3133 /* Update last packet received */
3136 /* If there is no server process serving this call, grab
3137 * one, if available. We only need to do this once. If a
3138 * server thread is available, this thread becomes a server
3139 * thread and the server thread becomes a listener thread. */
3141 TryAttach(call, socket, tnop, newcallp, 0);
3144 /* This is not the expected next packet. */
3146 /* Determine whether this is a new or old packet, and if it's
3147 * a new one, whether it fits into the current receive window.
3148 * Also figure out whether the packet was delivered in sequence.
3149 * We use the prev variable to determine whether the new packet
3150 * is the successor of its immediate predecessor in the
3151 * receive queue, and the missing flag to determine whether
3152 * any of this packets predecessors are missing. */
3154 afs_uint32 prev; /* "Previous packet" sequence number */
3155 struct rx_packet *tp; /* Temporary packet pointer */
3156 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3157 int missing; /* Are any predecessors missing? */
3159 /* If the new packet's sequence number has been sent to the
3160 * application already, then this is a duplicate */
3161 if (seq < call->rnext) {
3162 MUTEX_ENTER(&rx_stats_mutex);
3163 rx_stats.dupPacketsRead++;
3164 MUTEX_EXIT(&rx_stats_mutex);
3165 rxevent_Cancel(call->delayedAckEvent, call,
3166 RX_CALL_REFCOUNT_DELAY);
3167 np = rxi_SendAck(call, np, seq, serial,
3168 flags, RX_ACK_DUPLICATE, istack);
3174 /* If the sequence number is greater than what can be
3175 * accomodated by the current window, then send a negative
3176 * acknowledge and drop the packet */
3177 if ((call->rnext + call->rwind) <= seq) {
3178 rxevent_Cancel(call->delayedAckEvent, call,
3179 RX_CALL_REFCOUNT_DELAY);
3180 np = rxi_SendAck(call, np, seq, serial,
3181 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3187 /* Look for the packet in the queue of old received packets */
3188 for (prev = call->rnext - 1, missing = 0,
3189 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3190 /*Check for duplicate packet */
3191 if (seq == tp->header.seq) {
3192 MUTEX_ENTER(&rx_stats_mutex);
3193 rx_stats.dupPacketsRead++;
3194 MUTEX_EXIT(&rx_stats_mutex);
3195 rxevent_Cancel(call->delayedAckEvent, call,
3196 RX_CALL_REFCOUNT_DELAY);
3197 np = rxi_SendAck(call, np, seq, serial,
3198 flags, RX_ACK_DUPLICATE, istack);
3203 /* If we find a higher sequence packet, break out and
3204 * insert the new packet here. */
3205 if (seq < tp->header.seq) break;
3206 /* Check for missing packet */
3207 if (tp->header.seq != prev+1) {
3211 prev = tp->header.seq;
3214 /* Keep track of whether we have received the last packet. */
3215 if (flags & RX_LAST_PACKET) {
3216 call->flags |= RX_CALL_HAVE_LAST;
3219 /* It's within the window: add it to the the receive queue.
3220 * tp is left by the previous loop either pointing at the
3221 * packet before which to insert the new packet, or at the
3222 * queue head if the queue is empty or the packet should be
3224 queue_InsertBefore(tp, np);
3228 /* Check whether we have all of the packets for this call */
3229 if ((call->flags & RX_CALL_HAVE_LAST)
3230 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3231 afs_uint32 tseq; /* temporary sequence number */
3233 for (tseq = call->rnext,
3234 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3235 if (tseq != tp->header.seq)
3237 if (tp->header.flags & RX_LAST_PACKET) {
3238 call->flags |= RX_CALL_RECEIVE_DONE;
3245 /* We need to send an ack of the packet is out of sequence,
3246 * or if an ack was requested by the peer. */
3247 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3251 /* Acknowledge the last packet for each call */
3252 if (flags & RX_LAST_PACKET) {
3263 * If the receiver is waiting for an iovec, fill the iovec
3264 * using the data from the receive queue */
3265 if (call->flags & RX_CALL_IOVEC_WAIT) {
3266 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3267 /* the call may have been aborted */
3276 /* Wakeup the reader if any */
3277 if ((call->flags & RX_CALL_READER_WAIT) &&
3278 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3279 (call->iovNext >= call->iovMax) ||
3280 (call->flags & RX_CALL_RECEIVE_DONE))) {
3281 call->flags &= ~RX_CALL_READER_WAIT;
3282 #ifdef RX_ENABLE_LOCKS
3283 CV_BROADCAST(&call->cv_rq);
3285 osi_rxWakeup(&call->rq);
3291 * Send an ack when requested by the peer, or once every
3292 * rxi_SoftAckRate packets until the last packet has been
3293 * received. Always send a soft ack for the last packet in
3294 * the server's reply. */
3296 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3297 np = rxi_SendAck(call, np, seq, serial, flags,
3298 RX_ACK_REQUESTED, istack);
3299 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3300 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3301 np = rxi_SendAck(call, np, seq, serial, flags,
3302 RX_ACK_IDLE, istack);
3303 } else if (call->nSoftAcks) {
3304 clock_GetTime(&when);
3305 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3306 clock_Add(&when, &rx_lastAckDelay);
3308 clock_Add(&when, &rx_softAckDelay);
3310 if (!call->delayedAckEvent ||
3311 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3312 rxevent_Cancel(call->delayedAckEvent, call,
3313 RX_CALL_REFCOUNT_DELAY);
3314 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3315 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3318 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3319 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3326 static void rxi_ComputeRate();
3329 static void rxi_UpdatePeerReach(conn, acall)
3330 struct rx_connection *conn;
3331 struct rx_call *acall;
3333 struct rx_peer *peer = conn->peer;
3335 MUTEX_ENTER(&peer->peer_lock);
3336 peer->lastReachTime = clock_Sec();
3337 MUTEX_EXIT(&peer->peer_lock);
3339 MUTEX_ENTER(&conn->conn_call_lock);
3340 MUTEX_ENTER(&conn->conn_data_lock);
3341 if (conn->flags & RX_CONN_ATTACHWAIT) {
3344 conn->flags &= ~RX_CONN_ATTACHWAIT;
3345 MUTEX_EXIT(&conn->conn_data_lock);
3347 for (i=0; i<RX_MAXCALLS; i++) {
3348 struct rx_call *call = conn->call[i];
3350 if (call != acall) MUTEX_ENTER(&call->lock);
3351 TryAttach(call, -1, NULL, NULL, 1);
3352 if (call != acall) MUTEX_EXIT(&call->lock);
3356 MUTEX_EXIT(&conn->conn_data_lock);
3357 MUTEX_EXIT(&conn->conn_call_lock);
3360 /* The real smarts of the whole thing. */
3361 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3362 register struct rx_call *call;
3363 struct rx_packet *np;
3366 struct rx_ackPacket *ap;
3368 register struct rx_packet *tp;
3369 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3370 register struct rx_connection *conn = call->conn;
3371 struct rx_peer *peer = conn->peer;
3374 /* because there are CM's that are bogus, sending weird values for this. */
3375 afs_uint32 skew = 0;
3380 int newAckCount = 0;
3381 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3382 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3384 MUTEX_ENTER(&rx_stats_mutex);
3385 rx_stats.ackPacketsRead++;
3386 MUTEX_EXIT(&rx_stats_mutex);
3387 ap = (struct rx_ackPacket *) rx_DataOf(np);
3388 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3390 return np; /* truncated ack packet */
3392 /* depends on ack packet struct */
3393 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3394 first = ntohl(ap->firstPacket);
3395 serial = ntohl(ap->serial);
3396 /* temporarily disabled -- needs to degrade over time
3397 skew = ntohs(ap->maxSkew); */
3399 /* Ignore ack packets received out of order */
3400 if (first < call->tfirst) {
3404 if (np->header.flags & RX_SLOW_START_OK) {
3405 call->flags |= RX_CALL_SLOW_START_OK;
3408 if (ap->reason == RX_ACK_PING_RESPONSE)
3409 rxi_UpdatePeerReach(conn, call);
3414 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3415 ap->reason, ntohl(ap->previousPacket),
3416 (unsigned int) np->header.seq, (unsigned int) serial,
3417 (unsigned int) skew, ntohl(ap->firstPacket));
3420 for (offset = 0; offset < nAcks; offset++)
3421 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3427 /* if a server connection has been re-created, it doesn't remember what
3428 serial # it was up to. An ack will tell us, since the serial field
3429 contains the largest serial received by the other side */
3430 MUTEX_ENTER(&conn->conn_data_lock);
3431 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3432 conn->serial = serial+1;
3434 MUTEX_EXIT(&conn->conn_data_lock);
3436 /* Update the outgoing packet skew value to the latest value of
3437 * the peer's incoming packet skew value. The ack packet, of
3438 * course, could arrive out of order, but that won't affect things
3440 MUTEX_ENTER(&peer->peer_lock);
3441 peer->outPacketSkew = skew;
3443 /* Check for packets that no longer need to be transmitted, and
3444 * discard them. This only applies to packets positively
3445 * acknowledged as having been sent to the peer's upper level.
3446 * All other packets must be retained. So only packets with
3447 * sequence numbers < ap->firstPacket are candidates. */
3448 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3449 if (tp->header.seq >= first) break;
3450 call->tfirst = tp->header.seq + 1;
3451 if (tp->header.serial == serial) {
3452 /* Use RTT if not delayed by client. */
3453 if (ap->reason != RX_ACK_DELAY)
3454 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3456 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3459 else if (tp->firstSerial == serial) {
3460 /* Use RTT if not delayed by client. */
3461 if (ap->reason != RX_ACK_DELAY)
3462 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3464 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3467 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3468 /* XXX Hack. Because we have to release the global rx lock when sending
3469 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3470 * in rxi_Start sending packets out because packets may move to the
3471 * freePacketQueue as result of being here! So we drop these packets until
3472 * we're safely out of the traversing. Really ugly!
3473 * To make it even uglier, if we're using fine grain locking, we can
3474 * set the ack bits in the packets and have rxi_Start remove the packets
3475 * when it's done transmitting.
3477 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3480 if (call->flags & RX_CALL_TQ_BUSY) {
3481 #ifdef RX_ENABLE_LOCKS
3482 tp->flags |= RX_PKTFLAG_ACKED;
3483 call->flags |= RX_CALL_TQ_SOME_ACKED;
3484 #else /* RX_ENABLE_LOCKS */
3486 #endif /* RX_ENABLE_LOCKS */
3488 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3491 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3496 /* Give rate detector a chance to respond to ping requests */
3497 if (ap->reason == RX_ACK_PING_RESPONSE) {
3498 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3502 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3504 /* Now go through explicit acks/nacks and record the results in
3505 * the waiting packets. These are packets that can't be released
3506 * yet, even with a positive acknowledge. This positive
3507 * acknowledge only means the packet has been received by the
3508 * peer, not that it will be retained long enough to be sent to
3509 * the peer's upper level. In addition, reset the transmit timers
3510 * of any missing packets (those packets that must be missing
3511 * because this packet was out of sequence) */
3513 call->nSoftAcked = 0;
3514 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3515 /* Update round trip time if the ack was stimulated on receipt
3517 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3518 #ifdef RX_ENABLE_LOCKS
3519 if (tp->header.seq >= first) {
3520 #endif /* RX_ENABLE_LOCKS */
3521 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3522 if (tp->header.serial == serial) {
3523 /* Use RTT if not delayed by client. */
3524 if (ap->reason != RX_ACK_DELAY)
3525 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3527 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3530 else if ((tp->firstSerial == serial)) {
3531 /* Use RTT if not delayed by client. */
3532 if (ap->reason != RX_ACK_DELAY)
3533 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3535 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3538 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3539 #ifdef RX_ENABLE_LOCKS
3541 #endif /* RX_ENABLE_LOCKS */
3542 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3544 /* Set the acknowledge flag per packet based on the
3545 * information in the ack packet. An acknowlegded packet can
3546 * be downgraded when the server has discarded a packet it
3547 * soacked previously, or when an ack packet is received
3548 * out of sequence. */
3549 if (tp->header.seq < first) {
3550 /* Implicit ack information */
3551 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3554 tp->flags |= RX_PKTFLAG_ACKED;
3556 else if (tp->header.seq < first + nAcks) {
3557 /* Explicit ack information: set it in the packet appropriately */
3558 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3559 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3561 tp->flags |= RX_PKTFLAG_ACKED;
3569 tp->flags &= ~RX_PKTFLAG_ACKED;
3574 tp->flags &= ~RX_PKTFLAG_ACKED;
3578 /* If packet isn't yet acked, and it has been transmitted at least
3579 * once, reset retransmit time using latest timeout
3580 * ie, this should readjust the retransmit timer for all outstanding
3581 * packets... So we don't just retransmit when we should know better*/
3583 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3584 tp->retryTime = tp->timeSent;
3585 clock_Add(&tp->retryTime, &peer->timeout);
3586 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3587 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3591 /* If the window has been extended by this acknowledge packet,
3592 * then wakeup a sender waiting in alloc for window space, or try
3593 * sending packets now, if he's been sitting on packets due to
3594 * lack of window space */
3595 if (call->tnext < (call->tfirst + call->twind)) {
3596 #ifdef RX_ENABLE_LOCKS
3597 CV_SIGNAL(&call->cv_twind);
3599 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3600 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3601 osi_rxWakeup(&call->twind);
3604 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3605 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3609 /* if the ack packet has a receivelen field hanging off it,
3610 * update our state */
3611 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3614 /* If the ack packet has a "recommended" size that is less than
3615 * what I am using now, reduce my size to match */
3616 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3617 sizeof(afs_int32), &tSize);
3618 tSize = (afs_uint32) ntohl(tSize);
3619 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3621 /* Get the maximum packet size to send to this peer */
3622 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3624 tSize = (afs_uint32)ntohl(tSize);
3625 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3626 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3628 /* sanity check - peer might have restarted with different params.
3629 * If peer says "send less", dammit, send less... Peer should never
3630 * be unable to accept packets of the size that prior AFS versions would
3631 * send without asking. */
3632 if (peer->maxMTU != tSize) {
3633 peer->maxMTU = tSize;
3634 peer->MTU = MIN(tSize, peer->MTU);
3635 call->MTU = MIN(call->MTU, tSize);
3639 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3641 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3642 sizeof(afs_int32), &tSize);
3643 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3644 if (tSize < call->twind) { /* smaller than our send */
3645 call->twind = tSize; /* window, we must send less... */
3646 call->ssthresh = MIN(call->twind, call->ssthresh);
3649 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3650 * network MTU confused with the loopback MTU. Calculate the
3651 * maximum MTU here for use in the slow start code below.
3653 maxMTU = peer->maxMTU;
3654 /* Did peer restart with older RX version? */
3655 if (peer->maxDgramPackets > 1) {
3656 peer->maxDgramPackets = 1;
3658 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3660 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3661 sizeof(afs_int32), &tSize);
3662 tSize = (afs_uint32) ntohl(tSize);
3664 * As of AFS 3.5 we set the send window to match the receive window.
3666 if (tSize < call->twind) {
3667 call->twind = tSize;
3668 call->ssthresh = MIN(call->twind, call->ssthresh);
3669 } else if (tSize > call->twind) {
3670 call->twind = tSize;
3674 * As of AFS 3.5, a jumbogram is more than one fixed size
3675 * packet transmitted in a single UDP datagram. If the remote
3676 * MTU is smaller than our local MTU then never send a datagram
3677 * larger than the natural MTU.
3679 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3680 sizeof(afs_int32), &tSize);
3681 maxDgramPackets = (afs_uint32) ntohl(tSize);
3682 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3683 maxDgramPackets = MIN(maxDgramPackets,
3684 (int)(peer->ifDgramPackets));
3685 maxDgramPackets = MIN(maxDgramPackets, tSize);
3686 if (maxDgramPackets > 1) {
3687 peer->maxDgramPackets = maxDgramPackets;
3688 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3690 peer->maxDgramPackets = 1;
3691 call->MTU = peer->natMTU;
3693 } else if (peer->maxDgramPackets > 1) {
3694 /* Restarted with lower version of RX */
3695 peer->maxDgramPackets = 1;
3697 } else if (peer->maxDgramPackets > 1 ||
3698 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3699 /* Restarted with lower version of RX */
3700 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3701 peer->natMTU = OLD_MAX_PACKET_SIZE;
3702 peer->MTU = OLD_MAX_PACKET_SIZE;
3703 peer->maxDgramPackets = 1;
3704 peer->nDgramPackets = 1;
3706 call->MTU = OLD_MAX_PACKET_SIZE;
3711 * Calculate how many datagrams were successfully received after
3712 * the first missing packet and adjust the negative ack counter
3717 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3718 if (call->nNacks < nNacked) {
3719 call->nNacks = nNacked;
3728 if (call->flags & RX_CALL_FAST_RECOVER) {
3730 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3732 call->flags &= ~RX_CALL_FAST_RECOVER;
3733 call->cwind = call->nextCwind;
3734 call->nextCwind = 0;
3737 call->nCwindAcks = 0;
3739 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3740 /* Three negative acks in a row trigger congestion recovery */
3741 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3742 MUTEX_EXIT(&peer->peer_lock);
3743 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3744 /* someone else is waiting to start recovery */
3747 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3748 while (call->flags & RX_CALL_TQ_BUSY) {
3749 call->flags |= RX_CALL_TQ_WAIT;
3750 #ifdef RX_ENABLE_LOCKS
3751 CV_WAIT(&call->cv_tq, &call->lock);
3752 #else /* RX_ENABLE_LOCKS */
3753 osi_rxSleep(&call->tq);
3754 #endif /* RX_ENABLE_LOCKS */
3756 MUTEX_ENTER(&peer->peer_lock);
3757 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3758 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3759 call->flags |= RX_CALL_FAST_RECOVER;
3760 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3761 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3763 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3764 call->nextCwind = call->ssthresh;
3767 peer->MTU = call->MTU;
3768 peer->cwind = call->nextCwind;
3769 peer->nDgramPackets = call->nDgramPackets;
3771 call->congestSeq = peer->congestSeq;
3772 /* Reset the resend times on the packets that were nacked
3773 * so we will retransmit as soon as the window permits*/
3774 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3776 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3777 clock_Zero(&tp->retryTime);
3779 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3784 /* If cwind is smaller than ssthresh, then increase
3785 * the window one packet for each ack we receive (exponential
3787 * If cwind is greater than or equal to ssthresh then increase
3788 * the congestion window by one packet for each cwind acks we
3789 * receive (linear growth). */
3790 if (call->cwind < call->ssthresh) {
3791 call->cwind = MIN((int)call->ssthresh,
3792 (int)(call->cwind + newAckCount));
3793 call->nCwindAcks = 0;
3795 call->nCwindAcks += newAckCount;
3796 if (call->nCwindAcks >= call->cwind) {
3797 call->nCwindAcks = 0;
3798 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3802 * If we have received several acknowledgements in a row then
3803 * it is time to increase the size of our datagrams
3805 if ((int)call->nAcks > rx_nDgramThreshold) {
3806 if (peer->maxDgramPackets > 1) {
3807 if (call->nDgramPackets < peer->maxDgramPackets) {
3808 call->nDgramPackets++;
3810 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3811 } else if (call->MTU < peer->maxMTU) {
3812 call->MTU += peer->natMTU;
3813 call->MTU = MIN(call->MTU, peer->maxMTU);
3819 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3821 /* Servers need to hold the call until all response packets have
3822 * been acknowledged. Soft acks are good enough since clients
3823 * are not allowed to clear their receive queues. */
3824 if (call->state == RX_STATE_HOLD &&
3825 call->tfirst + call->nSoftAcked >= call->tnext) {
3826 call->state = RX_STATE_DALLY;
3827 rxi_ClearTransmitQueue(call, 0);
3828 } else if (!queue_IsEmpty(&call->tq)) {
3829 rxi_Start(0, call, istack);
3834 /* Received a response to a challenge packet */
3835 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3836 register struct rx_connection *conn;
3837 register struct rx_packet *np;
3842 /* Ignore the packet if we're the client */
3843 if (conn->type == RX_CLIENT_CONNECTION) return np;
3845 /* If already authenticated, ignore the packet (it's probably a retry) */
3846 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3849 /* Otherwise, have the security object evaluate the response packet */
3850 error = RXS_CheckResponse(conn->securityObject, conn, np);
3852 /* If the response is invalid, reset the connection, sending
3853 * an abort to the peer */
3857 rxi_ConnectionError(conn, error);
3858 MUTEX_ENTER(&conn->conn_data_lock);
3859 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3860 MUTEX_EXIT(&conn->conn_data_lock);
3864 /* If the response is valid, any calls waiting to attach
3865 * servers can now do so */
3868 for (i=0; i<RX_MAXCALLS; i++) {
3869 struct rx_call *call = conn->call[i];
3871 MUTEX_ENTER(&call->lock);
3872 if (call->state == RX_STATE_PRECALL)
3873 rxi_AttachServerProc(call, -1, NULL, NULL);
3874 MUTEX_EXIT(&call->lock);
3878 /* Update the peer reachability information, just in case
3879 * some calls went into attach-wait while we were waiting
3880 * for authentication..
3882 rxi_UpdatePeerReach(conn, NULL);
3887 /* A client has received an authentication challenge: the security
3888 * object is asked to cough up a respectable response packet to send
3889 * back to the server. The server is responsible for retrying the
3890 * challenge if it fails to get a response. */
3893 rxi_ReceiveChallengePacket(conn, np, istack)
3894 register struct rx_connection *conn;
3895 register struct rx_packet *np;
3900 /* Ignore the challenge if we're the server */
3901 if (conn->type == RX_SERVER_CONNECTION) return np;
3903 /* Ignore the challenge if the connection is otherwise idle; someone's
3904 * trying to use us as an oracle. */
3905 if (!rxi_HasActiveCalls(conn)) return np;
3907 /* Send the security object the challenge packet. It is expected to fill
3908 * in the response. */
3909 error = RXS_GetResponse(conn->securityObject, conn, np);
3911 /* If the security object is unable to return a valid response, reset the
3912 * connection and send an abort to the peer. Otherwise send the response
3913 * packet to the peer connection. */
3915 rxi_ConnectionError(conn, error);
3916 MUTEX_ENTER(&conn->conn_data_lock);
3917 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3918 MUTEX_EXIT(&conn->conn_data_lock);
3921 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3922 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3928 /* Find an available server process to service the current request in
3929 * the given call structure. If one isn't available, queue up this
3930 * call so it eventually gets one */
3932 rxi_AttachServerProc(call, socket, tnop, newcallp)
3933 register struct rx_call *call;
3934 register osi_socket socket;
3936 register struct rx_call **newcallp;
3938 register struct rx_serverQueueEntry *sq;
3939 register struct rx_service *service = call->conn->service;
3940 #ifdef RX_ENABLE_LOCKS
3941 register int haveQuota = 0;
3942 #endif /* RX_ENABLE_LOCKS */
3943 /* May already be attached */
3944 if (call->state == RX_STATE_ACTIVE) return;
3946 MUTEX_ENTER(&rx_serverPool_lock);
3947 #ifdef RX_ENABLE_LOCKS
3948 while(rxi_ServerThreadSelectingCall) {
3949 MUTEX_EXIT(&call->lock);
3950 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3951 MUTEX_EXIT(&rx_serverPool_lock);
3952 MUTEX_ENTER(&call->lock);
3953 MUTEX_ENTER(&rx_serverPool_lock);
3954 /* Call may have been attached */
3955 if (call->state == RX_STATE_ACTIVE) return;
3958 haveQuota = QuotaOK(service);
3959 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3960 /* If there are no processes available to service this call,
3961 * put the call on the incoming call queue (unless it's
3962 * already on the queue).
3965 ReturnToServerPool(service);
3966 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3967 call->flags |= RX_CALL_WAIT_PROC;
3968 MUTEX_ENTER(&rx_stats_mutex);
3970 MUTEX_EXIT(&rx_stats_mutex);
3971 rxi_calltrace(RX_CALL_ARRIVAL, call);
3972 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3973 queue_Append(&rx_incomingCallQueue, call);
3976 #else /* RX_ENABLE_LOCKS */
3977 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3978 /* If there are no processes available to service this call,
3979 * put the call on the incoming call queue (unless it's
3980 * already on the queue).
3982 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3983 call->flags |= RX_CALL_WAIT_PROC;
3985 rxi_calltrace(RX_CALL_ARRIVAL, call);
3986 queue_Append(&rx_incomingCallQueue, call);
3989 #endif /* RX_ENABLE_LOCKS */
3991 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3993 /* If hot threads are enabled, and both newcallp and sq->socketp
3994 * are non-null, then this thread will process the call, and the
3995 * idle server thread will start listening on this threads socket.
3998 if (rx_enable_hot_thread && newcallp && sq->socketp) {
4001 *sq->socketp = socket;
4002 clock_GetTime(&call->startTime);
4003 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
4007 if (call->flags & RX_CALL_WAIT_PROC) {
4008 /* Conservative: I don't think this should happen */
4009 call->flags &= ~RX_CALL_WAIT_PROC;
4010 MUTEX_ENTER(&rx_stats_mutex);
4012 MUTEX_EXIT(&rx_stats_mutex);
4015 call->state = RX_STATE_ACTIVE;
4016 call->mode = RX_MODE_RECEIVING;
4017 if (call->flags & RX_CALL_CLEARED) {
4018 /* send an ack now to start the packet flow up again */
4019 call->flags &= ~RX_CALL_CLEARED;
4020 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
4022 #ifdef RX_ENABLE_LOCKS
4025 service->nRequestsRunning++;
4026 if (service->nRequestsRunning <= service->minProcs)
4032 MUTEX_EXIT(&rx_serverPool_lock);
4035 /* Delay the sending of an acknowledge event for a short while, while
4036 * a new call is being prepared (in the case of a client) or a reply
4037 * is being prepared (in the case of a server). Rather than sending
4038 * an ack packet, an ACKALL packet is sent. */
4039 void rxi_AckAll(event, call, dummy)
4040 struct rxevent *event;
4041 register struct rx_call *call;
4044 #ifdef RX_ENABLE_LOCKS
4046 MUTEX_ENTER(&call->lock);
4047 call->delayedAckEvent = (struct rxevent *) 0;
4048 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
4050 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
4051 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
4053 MUTEX_EXIT(&call->lock);
4054 #else /* RX_ENABLE_LOCKS */
4055 if (event) call->delayedAckEvent = (struct rxevent *) 0;
4056 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
4057 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
4058 #endif /* RX_ENABLE_LOCKS */
4061 void rxi_SendDelayedAck(event, call, dummy)
4062 struct rxevent *event;
4063 register struct rx_call *call;
4066 #ifdef RX_ENABLE_LOCKS
4068 MUTEX_ENTER(&call->lock);
4069 if (event == call->delayedAckEvent)
4070 call->delayedAckEvent = (struct rxevent *) 0;
4071 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
4073 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4075 MUTEX_EXIT(&call->lock);
4076 #else /* RX_ENABLE_LOCKS */
4077 if (event) call->delayedAckEvent = (struct rxevent *) 0;
4078 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4079 #endif /* RX_ENABLE_LOCKS */
4083 #ifdef RX_ENABLE_LOCKS
4084 /* Set ack in all packets in transmit queue. rxi_Start will deal with
4085 * clearing them out.
4087 static void rxi_SetAcksInTransmitQueue(call)
4088 register struct rx_call *call;
4090 register struct rx_packet *p, *tp;
4093 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4096 p->flags |= RX_PKTFLAG_ACKED;
4100 call->flags |= RX_CALL_TQ_CLEARME;
4101 call->flags |= RX_CALL_TQ_SOME_ACKED;
4104 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4105 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4106 call->tfirst = call->tnext;
4107 call->nSoftAcked = 0;
4109 if (call->flags & RX_CALL_FAST_RECOVER) {
4110 call->flags &= ~RX_CALL_FAST_RECOVER;
4111 call->cwind = call->nextCwind;
4112 call->nextCwind = 0;
4115 CV_SIGNAL(&call->cv_twind);
4117 #endif /* RX_ENABLE_LOCKS */
4119 /* Clear out the transmit queue for the current call (all packets have
4120 * been received by peer) */
4121 void rxi_ClearTransmitQueue(call, force)
4122 register struct rx_call *call;
4125 register struct rx_packet *p, *tp;
4127 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4128 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
4130 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4133 p->flags |= RX_PKTFLAG_ACKED;
4137 call->flags |= RX_CALL_TQ_CLEARME;
4138 call->flags |= RX_CALL_TQ_SOME_ACKED;
4141 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4142 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4148 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4149 call->flags &= ~RX_CALL_TQ_CLEARME;
4151 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4153 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4154 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4155 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4156 call->nSoftAcked = 0;
4158 if (call->flags & RX_CALL_FAST_RECOVER) {
4159 call->flags &= ~RX_CALL_FAST_RECOVER;
4160 call->cwind = call->nextCwind;
4163 #ifdef RX_ENABLE_LOCKS
4164 CV_SIGNAL(&call->cv_twind);
4166 osi_rxWakeup(&call->twind);
4170 void rxi_ClearReceiveQueue(call)
4171 register struct rx_call *call;
4173 register struct rx_packet *p, *tp;
4174 if (queue_IsNotEmpty(&call->rq)) {
4175 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4180 rx_packetReclaims++;
4182 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4184 if (call->state == RX_STATE_PRECALL) {
4185 call->flags |= RX_CALL_CLEARED;
4189 /* Send an abort packet for the specified call */
4190 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4191 register struct rx_call *call;
4192 struct rx_packet *packet;
4202 /* Clients should never delay abort messages */
4203 if (rx_IsClientConn(call->conn))
4206 if (call->abortCode != call->error) {
4207 call->abortCode = call->error;
4208 call->abortCount = 0;
4211 if (force || rxi_callAbortThreshhold == 0 ||
4212 call->abortCount < rxi_callAbortThreshhold) {
4213 if (call->delayedAbortEvent) {
4214 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4216 error = htonl(call->error);
4218 packet = rxi_SendSpecial(call, call->conn, packet,
4219 RX_PACKET_TYPE_ABORT, (char *)&error,
4220 sizeof(error), istack);
4221 } else if (!call->delayedAbortEvent) {
4222 clock_GetTime(&when);
4223 clock_Addmsec(&when, rxi_callAbortDelay);
4224 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4225 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4231 /* Send an abort packet for the specified connection. Packet is an
4232 * optional pointer to a packet that can be used to send the abort.
4233 * Once the number of abort messages reaches the threshhold, an
4234 * event is scheduled to send the abort. Setting the force flag
4235 * overrides sending delayed abort messages.
4237 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4238 * to send the abort packet.
4240 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4241 register struct rx_connection *conn;
4242 struct rx_packet *packet;
4252 /* Clients should never delay abort messages */
4253 if (rx_IsClientConn(conn))
4256 if (force || rxi_connAbortThreshhold == 0 ||
4257 conn->abortCount < rxi_connAbortThreshhold) {
4258 if (conn->delayedAbortEvent) {
4259 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4261 error = htonl(conn->error);
4263 MUTEX_EXIT(&conn->conn_data_lock);
4264 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4265 RX_PACKET_TYPE_ABORT, (char *)&error,
4266 sizeof(error), istack);
4267 MUTEX_ENTER(&conn->conn_data_lock);
4268 } else if (!conn->delayedAbortEvent) {
4269 clock_GetTime(&when);
4270 clock_Addmsec(&when, rxi_connAbortDelay);
4271 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4277 /* Associate an error all of the calls owned by a connection. Called
4278 * with error non-zero. This is only for really fatal things, like
4279 * bad authentication responses. The connection itself is set in
4280 * error at this point, so that future packets received will be
4282 void rxi_ConnectionError(conn, error)
4283 register struct rx_connection *conn;
4284 register afs_int32 error;
4288 MUTEX_ENTER(&conn->conn_data_lock);
4289 if (conn->challengeEvent)
4290 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4291 if (conn->checkReachEvent) {
4292 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4293 conn->checkReachEvent = 0;
4296 MUTEX_EXIT(&conn->conn_data_lock);
4297 for (i=0; i<RX_MAXCALLS; i++) {
4298 struct rx_call *call = conn->call[i];
4300 MUTEX_ENTER(&call->lock);
4301 rxi_CallError(call, error);
4302 MUTEX_EXIT(&call->lock);
4305 conn->error = error;
4306 MUTEX_ENTER(&rx_stats_mutex);
4307 rx_stats.fatalErrors++;
4308 MUTEX_EXIT(&rx_stats_mutex);
4312 void rxi_CallError(call, error)
4313 register struct rx_call *call;
4316 if (call->error) error = call->error;
4317 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4318 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4319 rxi_ResetCall(call, 0);
4322 rxi_ResetCall(call, 0);
4324 call->error = error;
4325 call->mode = RX_MODE_ERROR;
4328 /* Reset various fields in a call structure, and wakeup waiting
4329 * processes. Some fields aren't changed: state & mode are not
4330 * touched (these must be set by the caller), and bufptr, nLeft, and
4331 * nFree are not reset, since these fields are manipulated by
4332 * unprotected macros, and may only be reset by non-interrupting code.
4335 /* this code requires that call->conn be set properly as a pre-condition. */
4336 #endif /* ADAPT_WINDOW */
4338 void rxi_ResetCall(call, newcall)
4339 register struct rx_call *call;
4340 register int newcall;
4343 register struct rx_peer *peer;
4344 struct rx_packet *packet;
4346 /* Notify anyone who is waiting for asynchronous packet arrival */
4347 if (call->arrivalProc) {
4348 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4349 call->arrivalProc = (VOID (*)()) 0;
4352 if (call->delayedAbortEvent) {
4353 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4354 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4356 rxi_SendCallAbort(call, packet, 0, 1);
4357 rxi_FreePacket(packet);
4362 * Update the peer with the congestion information in this call
4363 * so other calls on this connection can pick up where this call
4364 * left off. If the congestion sequence numbers don't match then
4365 * another call experienced a retransmission.
4367 peer = call->conn->peer;
4368 MUTEX_ENTER(&peer->peer_lock);
4370 if (call->congestSeq == peer->congestSeq) {
4371 peer->cwind = MAX(peer->cwind, call->cwind);
4372 peer->MTU = MAX(peer->MTU, call->MTU);
4373 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4376 call->abortCode = 0;
4377 call->abortCount = 0;
4379 if (peer->maxDgramPackets > 1) {
4380 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4382 call->MTU = peer->MTU;
4384 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4385 call->ssthresh = rx_maxSendWindow;
4386 call->nDgramPackets = peer->nDgramPackets;
4387 call->congestSeq = peer->congestSeq;
4388 MUTEX_EXIT(&peer->peer_lock);
4390 flags = call->flags;
4391 rxi_ClearReceiveQueue(call);
4392 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4393 if (call->flags & RX_CALL_TQ_BUSY) {
4394 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4395 call->flags |= (flags & RX_CALL_TQ_WAIT);
4397 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4399 rxi_ClearTransmitQueue(call, 0);
4400 queue_Init(&call->tq);
4403 queue_Init(&call->rq);
4405 call->rwind = rx_initReceiveWindow;
4406 call->twind = rx_initSendWindow;
4407 call->nSoftAcked = 0;
4408 call->nextCwind = 0;
4411 call->nCwindAcks = 0;
4412 call->nSoftAcks = 0;
4413 call->nHardAcks = 0;
4415 call->tfirst = call->rnext = call->tnext = 1;
4417 call->lastAcked = 0;
4418 call->localStatus = call->remoteStatus = 0;
4420 if (flags & RX_CALL_READER_WAIT) {
4421 #ifdef RX_ENABLE_LOCKS
4422 CV_BROADCAST(&call->cv_rq);
4424 osi_rxWakeup(&call->rq);
4427 if (flags & RX_CALL_WAIT_PACKETS) {
4428 MUTEX_ENTER(&rx_freePktQ_lock);
4429 rxi_PacketsUnWait(); /* XXX */
4430 MUTEX_EXIT(&rx_freePktQ_lock);
4433 #ifdef RX_ENABLE_LOCKS
4434 CV_SIGNAL(&call->cv_twind);
4436 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4437 osi_rxWakeup(&call->twind);
4440 #ifdef RX_ENABLE_LOCKS
4441 /* The following ensures that we don't mess with any queue while some
4442 * other thread might also be doing so. The call_queue_lock field is
4443 * is only modified under the call lock. If the call is in the process
4444 * of being removed from a queue, the call is not locked until the
4445 * the queue lock is dropped and only then is the call_queue_lock field
4446 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4447 * Note that any other routine which removes a call from a queue has to
4448 * obtain the queue lock before examing the queue and removing the call.
4450 if (call->call_queue_lock) {
4451 MUTEX_ENTER(call->call_queue_lock);
4452 if (queue_IsOnQueue(call)) {
4454 if (flags & RX_CALL_WAIT_PROC) {
4455 MUTEX_ENTER(&rx_stats_mutex);
4457 MUTEX_EXIT(&rx_stats_mutex);
4460 MUTEX_EXIT(call->call_queue_lock);
4461 CLEAR_CALL_QUEUE_LOCK(call);
4463 #else /* RX_ENABLE_LOCKS */
4464 if (queue_IsOnQueue(call)) {
4466 if (flags & RX_CALL_WAIT_PROC)
4469 #endif /* RX_ENABLE_LOCKS */
4471 rxi_KeepAliveOff(call);
4472 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4475 /* Send an acknowledge for the indicated packet (seq,serial) of the
4476 * indicated call, for the indicated reason (reason). This
4477 * acknowledge will specifically acknowledge receiving the packet, and
4478 * will also specify which other packets for this call have been
4479 * received. This routine returns the packet that was used to the
4480 * caller. The caller is responsible for freeing it or re-using it.
4481 * This acknowledgement also returns the highest sequence number
4482 * actually read out by the higher level to the sender; the sender
4483 * promises to keep around packets that have not been read by the
4484 * higher level yet (unless, of course, the sender decides to abort
4485 * the call altogether). Any of p, seq, serial, pflags, or reason may
4486 * be set to zero without ill effect. That is, if they are zero, they
4487 * will not convey any information.
4488 * NOW there is a trailer field, after the ack where it will safely be
4489 * ignored by mundanes, which indicates the maximum size packet this
4490 * host can swallow. */
4491 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4492 register struct rx_call *call;
4493 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4494 int seq; /* Sequence number of the packet we are acking */
4495 int serial; /* Serial number of the packet */
4496 int pflags; /* Flags field from packet header */
4497 int reason; /* Reason an acknowledge was prompted */
4500 struct rx_ackPacket *ap;
4501 register struct rx_packet *rqp;
4502 register struct rx_packet *nxp; /* For queue_Scan */
4503 register struct rx_packet *p;
4508 * Open the receive window once a thread starts reading packets
4510 if (call->rnext > 1) {
4511 call->rwind = rx_maxReceiveWindow;
4514 call->nHardAcks = 0;
4515 call->nSoftAcks = 0;
4516 if (call->rnext > call->lastAcked)
4517 call->lastAcked = call->rnext;
4521 rx_computelen(p, p->length); /* reset length, you never know */
4522 } /* where that's been... */
4524 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4525 /* We won't send the ack, but don't panic. */
4526 return optionalPacket;
4529 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4531 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4532 if (!optionalPacket) rxi_FreePacket(p);
4533 return optionalPacket;
4535 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4536 if (rx_Contiguous(p)<templ) {
4537 if (!optionalPacket) rxi_FreePacket(p);
4538 return optionalPacket;
4540 } /* MTUXXX failing to send an ack is very serious. We should */
4541 /* try as hard as possible to send even a partial ack; it's */
4542 /* better than nothing. */
4544 ap = (struct rx_ackPacket *) rx_DataOf(p);
4545 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4546 ap->reason = reason;
4548 /* The skew computation used to be bogus, I think it's better now. */
4549 /* We should start paying attention to skew. XXX */
4550 ap->serial = htonl(call->conn->maxSerial);
4551 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4553 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4554 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4556 /* No fear of running out of ack packet here because there can only be at most
4557 * one window full of unacknowledged packets. The window size must be constrained
4558 * to be less than the maximum ack size, of course. Also, an ack should always
4559 * fit into a single packet -- it should not ever be fragmented. */
4560 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4561 if (!rqp || !call->rq.next
4562 || (rqp->header.seq > (call->rnext + call->rwind))) {
4563 if (!optionalPacket) rxi_FreePacket(p);
4564 rxi_CallError(call, RX_CALL_DEAD);
4565 return optionalPacket;
4568 while (rqp->header.seq > call->rnext + offset)
4569 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4570 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4572 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4573 if (!optionalPacket) rxi_FreePacket(p);
4574 rxi_CallError(call, RX_CALL_DEAD);
4575 return optionalPacket;
4580 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4582 /* these are new for AFS 3.3 */
4583 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4584 templ = htonl(templ);
4585 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4586 templ = htonl(call->conn->peer->ifMTU);
4587 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4589 /* new for AFS 3.4 */
4590 templ = htonl(call->rwind);
4591 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4593 /* new for AFS 3.5 */
4594 templ = htonl(call->conn->peer->ifDgramPackets);
4595 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4597 p->header.serviceId = call->conn->serviceId;
4598 p->header.cid = (call->conn->cid | call->channel);
4599 p->header.callNumber = *call->callNumber;
4600 p->header.seq = seq;
4601 p->header.securityIndex = call->conn->securityIndex;
4602 p->header.epoch = call->conn->epoch;
4603 p->header.type = RX_PACKET_TYPE_ACK;
4604 p->header.flags = RX_SLOW_START_OK;
4605 if (reason == RX_ACK_PING) {
4606 p->header.flags |= RX_REQUEST_ACK;
4608 clock_GetTime(&call->pingRequestTime);
4611 if (call->conn->type == RX_CLIENT_CONNECTION)
4612 p->header.flags |= RX_CLIENT_INITIATED;
4616 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4617 ap->reason, ntohl(ap->previousPacket),
4618 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4620 for (offset = 0; offset < ap->nAcks; offset++)
4621 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4628 register int i, nbytes = p->length;
4630 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4631 if (nbytes <= p->wirevec[i].iov_len) {
4632 register int savelen, saven;
4634 savelen = p->wirevec[i].iov_len;
4636 p->wirevec[i].iov_len = nbytes;
4638 rxi_Send(call, p, istack);
4639 p->wirevec[i].iov_len = savelen;
4643 else nbytes -= p->wirevec[i].iov_len;
4646 MUTEX_ENTER(&rx_stats_mutex);
4647 rx_stats.ackPacketsSent++;
4648 MUTEX_EXIT(&rx_stats_mutex);
4649 if (!optionalPacket) rxi_FreePacket(p);
4650 return optionalPacket; /* Return packet for re-use by caller */
4653 /* Send all of the packets in the list in single datagram */
4654 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4655 struct rx_call *call;
4656 struct rx_packet **list;
4661 struct clock *retryTime;
4667 struct rx_connection *conn = call->conn;
4668 struct rx_peer *peer = conn->peer;
4670 MUTEX_ENTER(&peer->peer_lock);
4672 if (resending) peer->reSends += len;
4673 MUTEX_ENTER(&rx_stats_mutex);
4674 rx_stats.dataPacketsSent += len;
4675 MUTEX_EXIT(&rx_stats_mutex);
4676 MUTEX_EXIT(&peer->peer_lock);
4678 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4682 /* Set the packet flags and schedule the resend events */
4683 /* Only request an ack for the last packet in the list */
4684 for (i = 0 ; i < len ; i++) {
4685 list[i]->retryTime = *retryTime;
4686 if (list[i]->header.serial) {
4687 /* Exponentially backoff retry times */
4688 if (list[i]->backoff < MAXBACKOFF) {
4689 /* so it can't stay == 0 */
4690 list[i]->backoff = (list[i]->backoff << 1) +1;
4692 else list[i]->backoff++;
4693 clock_Addmsec(&(list[i]->retryTime),
4694 ((afs_uint32) list[i]->backoff) << 8);
4697 /* Wait a little extra for the ack on the last packet */
4698 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4699 clock_Addmsec(&(list[i]->retryTime), 400);
4702 /* Record the time sent */
4703 list[i]->timeSent = *now;
4705 /* Ask for an ack on retransmitted packets, on every other packet
4706 * if the peer doesn't support slow start. Ask for an ack on every
4707 * packet until the congestion window reaches the ack rate. */
4708 if (list[i]->header.serial) {
4710 MUTEX_ENTER(&rx_stats_mutex);
4711 rx_stats.dataPacketsReSent++;
4712 MUTEX_EXIT(&rx_stats_mutex);
4714 /* improved RTO calculation- not Karn */
4715 list[i]->firstSent = *now;
4717 && (call->cwind <= (u_short)(conn->ackRate+1)
4718 || (!(call->flags & RX_CALL_SLOW_START_OK)
4719 && (list[i]->header.seq & 1)))) {
4724 MUTEX_ENTER(&peer->peer_lock);
4726 if (resending) peer->reSends++;
4727 MUTEX_ENTER(&rx_stats_mutex);
4728 rx_stats.dataPacketsSent++;
4729 MUTEX_EXIT(&rx_stats_mutex);
4730 MUTEX_EXIT(&peer->peer_lock);
4732 /* Tag this packet as not being the last in this group,
4733 * for the receiver's benefit */
4734 if (i < len-1 || moreFlag) {
4735 list[i]->header.flags |= RX_MORE_PACKETS;
4738 /* Install the new retransmit time for the packet, and
4739 * record the time sent */
4740 list[i]->timeSent = *now;
4744 list[len-1]->header.flags |= RX_REQUEST_ACK;
4747 /* Since we're about to send a data packet to the peer, it's
4748 * safe to nuke any scheduled end-of-packets ack */
4749 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4751 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4752 MUTEX_EXIT(&call->lock);
4754 rxi_SendPacketList(conn, list, len, istack);
4756 rxi_SendPacket(conn, list[0], istack);
4758 MUTEX_ENTER(&call->lock);
4759 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4761 /* Update last send time for this call (for keep-alive
4762 * processing), and for the connection (so that we can discover
4763 * idle connections) */
4764 conn->lastSendTime = call->lastSendTime = clock_Sec();
4767 /* When sending packets we need to follow these rules:
4768 * 1. Never send more than maxDgramPackets in a jumbogram.
4769 * 2. Never send a packet with more than two iovecs in a jumbogram.
4770 * 3. Never send a retransmitted packet in a jumbogram.
4771 * 4. Never send more than cwind/4 packets in a jumbogram
4772 * We always keep the last list we should have sent so we
4773 * can set the RX_MORE_PACKETS flags correctly.
4775 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4776 struct rx_call *call;
4777 struct rx_packet **list;
4781 struct clock *retryTime;
4784 int i, cnt, lastCnt = 0;
4785 struct rx_packet **listP, **lastP = 0;
4786 struct rx_peer *peer = call->conn->peer;
4787 int morePackets = 0;
4789 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4790 /* Does the current packet force us to flush the current list? */
4792 && (list[i]->header.serial
4793 || (list[i]->flags & RX_PKTFLAG_ACKED)
4794 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4796 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4797 /* If the call enters an error state stop sending, or if
4798 * we entered congestion recovery mode, stop sending */
4799 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4807 /* Add the current packet to the list if it hasn't been acked.
4808 * Otherwise adjust the list pointer to skip the current packet. */
4809 if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
4811 /* Do we need to flush the list? */
4812 if (cnt >= (int)peer->maxDgramPackets
4813 || cnt >= (int)call->nDgramPackets
4814 || cnt >= (int)call->cwind
4815 || list[i]->header.serial
4816 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4818 rxi_SendList(call, lastP, lastCnt, istack, 1,
4819 now, retryTime, resending);
4820 /* If the call enters an error state stop sending, or if
4821 * we entered congestion recovery mode, stop sending */
4822 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4832 osi_Panic("rxi_SendList error");
4838 /* Send the whole list when the call is in receive mode, when
4839 * the call is in eof mode, when we are in fast recovery mode,
4840 * and when we have the last packet */
4841 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4842 || call->mode == RX_MODE_RECEIVING
4843 || call->mode == RX_MODE_EOF
4844 || (call->flags & RX_CALL_FAST_RECOVER)) {
4845 /* Check for the case where the current list contains
4846 * an acked packet. Since we always send retransmissions
4847 * in a separate packet, we only need to check the first
4848 * packet in the list */
4849 if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
4853 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4854 now, retryTime, resending);
4855 /* If the call enters an error state stop sending, or if
4856 * we entered congestion recovery mode, stop sending */
4857 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4861 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4863 } else if (lastCnt > 0) {
4864 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4868 #ifdef RX_ENABLE_LOCKS
4869 /* Call rxi_Start, below, but with the call lock held. */
4870 void rxi_StartUnlocked(event, call, istack)
4871 struct rxevent *event;
4872 register struct rx_call *call;
4875 MUTEX_ENTER(&call->lock);
4876 rxi_Start(event, call, istack);
4877 MUTEX_EXIT(&call->lock);
4879 #endif /* RX_ENABLE_LOCKS */
4881 /* This routine is called when new packets are readied for
4882 * transmission and when retransmission may be necessary, or when the
4883 * transmission window or burst count are favourable. This should be
4884 * better optimized for new packets, the usual case, now that we've
4885 * got rid of queues of send packets. XXXXXXXXXXX */
4886 void rxi_Start(event, call, istack)
4887 struct rxevent *event;
4888 register struct rx_call *call;
4891 struct rx_packet *p;
4892 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4893 struct rx_peer *peer = call->conn->peer;
4894 struct clock now, retryTime;
4898 struct rx_packet **xmitList;
4901 /* If rxi_Start is being called as a result of a resend event,
4902 * then make sure that the event pointer is removed from the call
4903 * structure, since there is no longer a per-call retransmission
4905 if (event && event == call->resendEvent) {
4906 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4907 call->resendEvent = NULL;
4909 if (queue_IsEmpty(&call->tq)) {
4913 /* Timeouts trigger congestion recovery */
4914 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4915 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4916 /* someone else is waiting to start recovery */
4919 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4920 while (call->flags & RX_CALL_TQ_BUSY) {
4921 call->flags |= RX_CALL_TQ_WAIT;
4922 #ifdef RX_ENABLE_LOCKS
4923 CV_WAIT(&call->cv_tq, &call->lock);
4924 #else /* RX_ENABLE_LOCKS */
4925 osi_rxSleep(&call->tq);
4926 #endif /* RX_ENABLE_LOCKS */
4928 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4929 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4930 call->flags |= RX_CALL_FAST_RECOVER;
4931 if (peer->maxDgramPackets > 1) {
4932 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4934 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4936 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4937 call->nDgramPackets = 1;
4939 call->nextCwind = 1;
4942 MUTEX_ENTER(&peer->peer_lock);
4943 peer->MTU = call->MTU;
4944 peer->cwind = call->cwind;
4945 peer->nDgramPackets = 1;
4947 call->congestSeq = peer->congestSeq;
4948 MUTEX_EXIT(&peer->peer_lock);
4949 /* Clear retry times on packets. Otherwise, it's possible for
4950 * some packets in the queue to force resends at rates faster
4951 * than recovery rates.
4953 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4954 if (!(p->flags & RX_PKTFLAG_ACKED)) {
4955 clock_Zero(&p->retryTime);
4960 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4961 MUTEX_ENTER(&rx_stats_mutex);
4962 rx_tq_debug.rxi_start_in_error ++;
4963 MUTEX_EXIT(&rx_stats_mutex);
4968 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4969 /* Get clock to compute the re-transmit time for any packets
4970 * in this burst. Note, if we back off, it's reasonable to
4971 * back off all of the packets in the same manner, even if
4972 * some of them have been retransmitted more times than more
4973 * recent additions */
4974 clock_GetTime(&now);
4975 retryTime = now; /* initialize before use */
4976 MUTEX_ENTER(&peer->peer_lock);
4977 clock_Add(&retryTime, &peer->timeout);
4978 MUTEX_EXIT(&peer->peer_lock);
4980 /* Send (or resend) any packets that need it, subject to
4981 * window restrictions and congestion burst control
4982 * restrictions. Ask for an ack on the last packet sent in
4983 * this burst. For now, we're relying upon the window being
4984 * considerably bigger than the largest number of packets that
4985 * are typically sent at once by one initial call to
4986 * rxi_Start. This is probably bogus (perhaps we should ask
4987 * for an ack when we're half way through the current
4988 * window?). Also, for non file transfer applications, this
4989 * may end up asking for an ack for every packet. Bogus. XXXX
4992 * But check whether we're here recursively, and let the other guy
4995 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4996 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4997 call->flags |= RX_CALL_TQ_BUSY;
4999 call->flags &= ~RX_CALL_NEED_START;
5000 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5002 maxXmitPackets = MIN(call->twind, call->cwind);
5003 xmitList = (struct rx_packet **)
5004 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
5005 if (xmitList == NULL)
5006 osi_Panic("rxi_Start, failed to allocate xmit list");
5007 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
5008 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
5009 /* We shouldn't be sending packets if a thread is waiting
5010 * to initiate congestion recovery */
5013 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
5014 /* Only send one packet during fast recovery */
5017 if ((p->flags & RX_PKTFLAG_FREE) ||
5018 (!queue_IsEnd(&call->tq, nxp)
5019 && (nxp->flags & RX_PKTFLAG_FREE)) ||
5020 (p == (struct rx_packet *)&rx_freePacketQueue) ||
5021 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
5022 osi_Panic("rxi_Start: xmit queue clobbered");
5024 if (p->flags & RX_PKTFLAG_ACKED) {
5025 MUTEX_ENTER(&rx_stats_mutex);
5026 rx_stats.ignoreAckedPacket++;
5027 MUTEX_EXIT(&rx_stats_mutex);
5028 continue; /* Ignore this packet if it has been acknowledged */
5031 /* Turn off all flags except these ones, which are the same
5032 * on each transmission */
5033 p->header.flags &= RX_PRESET_FLAGS;
5035 if (p->header.seq >= call->tfirst +
5036 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
5037 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
5038 /* Note: if we're waiting for more window space, we can
5039 * still send retransmits; hence we don't return here, but
5040 * break out to schedule a retransmit event */
5041 dpf(("call %d waiting for window", *(call->callNumber)));
5045 /* Transmit the packet if it needs to be sent. */
5046 if (!clock_Lt(&now, &p->retryTime)) {
5047 if (nXmitPackets == maxXmitPackets) {
5048 osi_Panic("rxi_Start: xmit list overflowed");
5050 xmitList[nXmitPackets++] = p;
5054 /* xmitList now hold pointers to all of the packets that are
5055 * ready to send. Now we loop to send the packets */
5056 if (nXmitPackets > 0) {
5057 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
5058 &now, &retryTime, resending);
5060 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
5062 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5064 * TQ references no longer protected by this flag; they must remain
5065 * protected by the global lock.
5067 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
5068 call->flags &= ~RX_CALL_TQ_BUSY;
5069 if (call->flags & RX_CALL_TQ_WAIT) {
5070 call->flags &= ~RX_CALL_TQ_WAIT;
5071 #ifdef RX_ENABLE_LOCKS
5072 CV_BROADCAST(&call->cv_tq);
5073 #else /* RX_ENABLE_LOCKS */
5074 osi_rxWakeup(&call->tq);
5075 #endif /* RX_ENABLE_LOCKS */
5080 /* We went into the error state while sending packets. Now is
5081 * the time to reset the call. This will also inform the using
5082 * process that the call is in an error state.
5084 MUTEX_ENTER(&rx_stats_mutex);
5085 rx_tq_debug.rxi_start_aborted ++;
5086 MUTEX_EXIT(&rx_stats_mutex);
5087 call->flags &= ~RX_CALL_TQ_BUSY;
5088 if (call->flags & RX_CALL_TQ_WAIT) {
5089 call->flags &= ~RX_CALL_TQ_WAIT;
5090 #ifdef RX_ENABLE_LOCKS
5091 CV_BROADCAST(&call->cv_tq);
5092 #else /* RX_ENABLE_LOCKS */
5093 osi_rxWakeup(&call->tq);
5094 #endif /* RX_ENABLE_LOCKS */
5096 rxi_CallError(call, call->error);
5099 #ifdef RX_ENABLE_LOCKS
5100 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
5101 register int missing;
5102 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
5103 /* Some packets have received acks. If they all have, we can clear
5104 * the transmit queue.
5106 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5107 if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
5115 call->flags |= RX_CALL_TQ_CLEARME;
5117 #endif /* RX_ENABLE_LOCKS */
5118 /* Don't bother doing retransmits if the TQ is cleared. */
5119 if (call->flags & RX_CALL_TQ_CLEARME) {
5120 rxi_ClearTransmitQueue(call, 1);
5122 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5125 /* Always post a resend event, if there is anything in the
5126 * queue, and resend is possible. There should be at least
5127 * one unacknowledged packet in the queue ... otherwise none
5128 * of these packets should be on the queue in the first place.
5130 if (call->resendEvent) {
5131 /* Cancel the existing event and post a new one */
5132 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5135 /* The retry time is the retry time on the first unacknowledged
5136 * packet inside the current window */
5137 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5138 /* Don't set timers for packets outside the window */
5139 if (p->header.seq >= call->tfirst + call->twind) {
5143 if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
5145 retryTime = p->retryTime;
5150 /* Post a new event to re-run rxi_Start when retries may be needed */
5151 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
5152 #ifdef RX_ENABLE_LOCKS
5153 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
5154 call->resendEvent = rxevent_Post(&retryTime,
5156 (char *)call, istack);
5157 #else /* RX_ENABLE_LOCKS */
5158 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
5159 (char *)call, (void*)(long)istack);
5160 #endif /* RX_ENABLE_LOCKS */
5163 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5164 } while (call->flags & RX_CALL_NEED_START);
5166 * TQ references no longer protected by this flag; they must remain
5167 * protected by the global lock.
5169 call->flags &= ~RX_CALL_TQ_BUSY;
5170 if (call->flags & RX_CALL_TQ_WAIT) {
5171 call->flags &= ~RX_CALL_TQ_WAIT;
5172 #ifdef RX_ENABLE_LOCKS
5173 CV_BROADCAST(&call->cv_tq);
5174 #else /* RX_ENABLE_LOCKS */
5175 osi_rxWakeup(&call->tq);
5176 #endif /* RX_ENABLE_LOCKS */
5179 call->flags |= RX_CALL_NEED_START;
5181 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5183 if (call->resendEvent) {
5184 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5189 /* Also adjusts the keep alive parameters for the call, to reflect
5190 * that we have just sent a packet (so keep alives aren't sent
5192 void rxi_Send(call, p, istack)
5193 register struct rx_call *call;
5194 register struct rx_packet *p;
5197 register struct rx_connection *conn = call->conn;
5199 /* Stamp each packet with the user supplied status */
5200 p->header.userStatus = call->localStatus;
5202 /* Allow the security object controlling this call's security to
5203 * make any last-minute changes to the packet */
5204 RXS_SendPacket(conn->securityObject, call, p);
5206 /* Since we're about to send SOME sort of packet to the peer, it's
5207 * safe to nuke any scheduled end-of-packets ack */
5208 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5210 /* Actually send the packet, filling in more connection-specific fields */
5211 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5212 MUTEX_EXIT(&call->lock);
5213 rxi_SendPacket(conn, p, istack);
5214 MUTEX_ENTER(&call->lock);
5215 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5217 /* Update last send time for this call (for keep-alive
5218 * processing), and for the connection (so that we can discover
5219 * idle connections) */
5220 conn->lastSendTime = call->lastSendTime = clock_Sec();
5224 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5225 * that things are fine. Also called periodically to guarantee that nothing
5226 * falls through the cracks (e.g. (error + dally) connections have keepalive
5227 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5230 #ifdef RX_ENABLE_LOCKS
5231 int rxi_CheckCall(call, haveCTLock)
5232 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5233 #else /* RX_ENABLE_LOCKS */
5234 int rxi_CheckCall(call)
5235 #endif /* RX_ENABLE_LOCKS */
5236 register struct rx_call *call;
5238 register struct rx_connection *conn = call->conn;
5239 register struct rx_service *tservice;
5241 afs_uint32 deadTime;
5243 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5244 if (call->flags & RX_CALL_TQ_BUSY) {
5245 /* Call is active and will be reset by rxi_Start if it's
5246 * in an error state.
5251 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5252 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5253 ((afs_uint32)conn->peer->rtt >> 3) +
5254 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5256 /* These are computed to the second (+- 1 second). But that's
5257 * good enough for these values, which should be a significant
5258 * number of seconds. */
5259 if (now > (call->lastReceiveTime + deadTime)) {
5260 if (call->state == RX_STATE_ACTIVE) {
5261 rxi_CallError(call, RX_CALL_DEAD);
5265 #ifdef RX_ENABLE_LOCKS
5266 /* Cancel pending events */
5267 rxevent_Cancel(call->delayedAckEvent, call,
5268 RX_CALL_REFCOUNT_DELAY);
5269 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5270 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5271 if (call->refCount == 0) {
5272 rxi_FreeCall(call, haveCTLock);
5276 #else /* RX_ENABLE_LOCKS */
5279 #endif /* RX_ENABLE_LOCKS */
5281 /* Non-active calls are destroyed if they are not responding
5282 * to pings; active calls are simply flagged in error, so the
5283 * attached process can die reasonably gracefully. */
5285 /* see if we have a non-activity timeout */
5286 tservice = conn->service;
5287 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5288 && tservice->idleDeadTime
5289 && ((call->startWait + tservice->idleDeadTime) < now)) {
5290 if (call->state == RX_STATE_ACTIVE) {
5291 rxi_CallError(call, RX_CALL_TIMEOUT);
5295 /* see if we have a hard timeout */
5296 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5297 if (call->state == RX_STATE_ACTIVE)
5298 rxi_CallError(call, RX_CALL_TIMEOUT);
5305 /* When a call is in progress, this routine is called occasionally to
5306 * make sure that some traffic has arrived (or been sent to) the peer.
5307 * If nothing has arrived in a reasonable amount of time, the call is
5308 * declared dead; if nothing has been sent for a while, we send a
5309 * keep-alive packet (if we're actually trying to keep the call alive)
5311 void rxi_KeepAliveEvent(event, call, dummy)
5312 struct rxevent *event;
5313 register struct rx_call *call;
5315 struct rx_connection *conn;
5318 MUTEX_ENTER(&call->lock);
5319 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5320 if (event == call->keepAliveEvent)
5321 call->keepAliveEvent = (struct rxevent *) 0;
5324 #ifdef RX_ENABLE_LOCKS
5325 if(rxi_CheckCall(call, 0)) {
5326 MUTEX_EXIT(&call->lock);
5329 #else /* RX_ENABLE_LOCKS */
5330 if (rxi_CheckCall(call)) return;
5331 #endif /* RX_ENABLE_LOCKS */
5333 /* Don't try to keep alive dallying calls */
5334 if (call->state == RX_STATE_DALLY) {
5335 MUTEX_EXIT(&call->lock);
5340 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5341 /* Don't try to send keepalives if there is unacknowledged data */
5342 /* the rexmit code should be good enough, this little hack
5343 * doesn't quite work XXX */
5344 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5346 rxi_ScheduleKeepAliveEvent(call);
5347 MUTEX_EXIT(&call->lock);
5351 void rxi_ScheduleKeepAliveEvent(call)
5352 register struct rx_call *call;
5354 if (!call->keepAliveEvent) {
5356 clock_GetTime(&when);
5357 when.sec += call->conn->secondsUntilPing;
5358 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5359 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5363 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5364 void rxi_KeepAliveOn(call)
5365 register struct rx_call *call;
5367 /* Pretend last packet received was received now--i.e. if another
5368 * packet isn't received within the keep alive time, then the call
5369 * will die; Initialize last send time to the current time--even
5370 * if a packet hasn't been sent yet. This will guarantee that a
5371 * keep-alive is sent within the ping time */
5372 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5373 rxi_ScheduleKeepAliveEvent(call);
5376 /* This routine is called to send connection abort messages
5377 * that have been delayed to throttle looping clients. */
5378 void rxi_SendDelayedConnAbort(event, conn, dummy)
5379 struct rxevent *event;
5380 register struct rx_connection *conn;
5384 struct rx_packet *packet;
5386 MUTEX_ENTER(&conn->conn_data_lock);
5387 conn->delayedAbortEvent = (struct rxevent *) 0;
5388 error = htonl(conn->error);
5390 MUTEX_EXIT(&conn->conn_data_lock);
5391 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5393 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5394 RX_PACKET_TYPE_ABORT, (char *)&error,
5396 rxi_FreePacket(packet);
5400 /* This routine is called to send call abort messages
5401 * that have been delayed to throttle looping clients. */
5402 void rxi_SendDelayedCallAbort(event, call, dummy)
5403 struct rxevent *event;
5404 register struct rx_call *call;
5408 struct rx_packet *packet;
5410 MUTEX_ENTER(&call->lock);
5411 call->delayedAbortEvent = (struct rxevent *) 0;
5412 error = htonl(call->error);
5414 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5416 packet = rxi_SendSpecial(call, call->conn, packet,
5417 RX_PACKET_TYPE_ABORT, (char *)&error,
5419 rxi_FreePacket(packet);
5421 MUTEX_EXIT(&call->lock);
5424 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5425 * seconds) to ask the client to authenticate itself. The routine
5426 * issues a challenge to the client, which is obtained from the
5427 * security object associated with the connection */
5428 void rxi_ChallengeEvent(event, conn, atries)
5429 struct rxevent *event;
5430 register struct rx_connection *conn;
5433 int tries = (int) atries;
5434 conn->challengeEvent = (struct rxevent *) 0;
5435 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5436 register struct rx_packet *packet;
5440 /* We've failed to authenticate for too long.
5441 * Reset any calls waiting for authentication;
5442 * they are all in RX_STATE_PRECALL.
5446 MUTEX_ENTER(&conn->conn_call_lock);
5447 for (i=0; i<RX_MAXCALLS; i++) {
5448 struct rx_call *call = conn->call[i];
5450 MUTEX_ENTER(&call->lock);
5451 if (call->state == RX_STATE_PRECALL) {
5452 rxi_CallError(call, RX_CALL_DEAD);
5453 rxi_SendCallAbort(call, NULL, 0, 0);
5455 MUTEX_EXIT(&call->lock);
5458 MUTEX_EXIT(&conn->conn_call_lock);
5462 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5464 /* If there's no packet available, do this later. */
5465 RXS_GetChallenge(conn->securityObject, conn, packet);
5466 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5467 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5468 rxi_FreePacket(packet);
5470 clock_GetTime(&when);
5471 when.sec += RX_CHALLENGE_TIMEOUT;
5472 conn->challengeEvent =
5473 rxevent_Post(&when, rxi_ChallengeEvent, conn, (void *) (tries-1));
5477 /* Call this routine to start requesting the client to authenticate
5478 * itself. This will continue until authentication is established,
5479 * the call times out, or an invalid response is returned. The
5480 * security object associated with the connection is asked to create
5481 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5482 * defined earlier. */
5483 void rxi_ChallengeOn(conn)
5484 register struct rx_connection *conn;
5486 if (!conn->challengeEvent) {
5487 RXS_CreateChallenge(conn->securityObject, conn);
5488 rxi_ChallengeEvent(NULL, conn, (void *) RX_CHALLENGE_MAXTRIES);
5493 /* Compute round trip time of the packet provided, in *rttp.
5496 /* rxi_ComputeRoundTripTime is called with peer locked. */
5497 void rxi_ComputeRoundTripTime(p, sentp, peer)
5498 register struct clock *sentp; /* may be null */
5499 register struct rx_peer *peer; /* may be null */
5500 register struct rx_packet *p;
5502 struct clock thisRtt, *rttp = &thisRtt;
5504 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5505 /* making year 2038 bugs to get this running now - stroucki */
5506 struct timeval temptime;
5508 register int rtt_timeout;
5510 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5511 /* yet again. This was the worst Heisenbug of the port - stroucki */
5512 clock_GetTime(&temptime);
5513 rttp->sec=(afs_int32)temptime.tv_sec;
5514 rttp->usec=(afs_int32)temptime.tv_usec;
5516 clock_GetTime(rttp);
5518 if (clock_Lt(rttp, sentp)) {
5520 return; /* somebody set the clock back, don't count this time. */
5522 clock_Sub(rttp, sentp);
5523 MUTEX_ENTER(&rx_stats_mutex);
5524 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5525 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5526 if (rttp->sec > 60) {
5527 MUTEX_EXIT(&rx_stats_mutex);
5528 return; /* somebody set the clock ahead */
5530 rx_stats.maxRtt = *rttp;
5532 clock_Add(&rx_stats.totalRtt, rttp);
5533 rx_stats.nRttSamples++;
5534 MUTEX_EXIT(&rx_stats_mutex);
5536 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5538 /* Apply VanJacobson round-trip estimations */
5543 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5544 * srtt is stored as fixed point with 3 bits after the binary
5545 * point (i.e., scaled by 8). The following magic is
5546 * equivalent to the smoothing algorithm in rfc793 with an
5547 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5548 * srtt*8 = srtt*8 + rtt - srtt
5549 * srtt = srtt + rtt/8 - srtt/8
5552 delta = MSEC(rttp) - (peer->rtt >> 3);
5556 * We accumulate a smoothed rtt variance (actually, a smoothed
5557 * mean difference), then set the retransmit timer to smoothed
5558 * rtt + 4 times the smoothed variance (was 2x in van's original
5559 * paper, but 4x works better for me, and apparently for him as
5561 * rttvar is stored as
5562 * fixed point with 2 bits after the binary point (scaled by
5563 * 4). The following is equivalent to rfc793 smoothing with
5564 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5565 * replaces rfc793's wired-in beta.
5566 * dev*4 = dev*4 + (|actual - expected| - dev)
5572 delta -= (peer->rtt_dev >> 2);
5573 peer->rtt_dev += delta;
5576 /* I don't have a stored RTT so I start with this value. Since I'm
5577 * probably just starting a call, and will be pushing more data down
5578 * this, I expect congestion to increase rapidly. So I fudge a
5579 * little, and I set deviance to half the rtt. In practice,
5580 * deviance tends to approach something a little less than
5581 * half the smoothed rtt. */
5582 peer->rtt = (MSEC(rttp) << 3) + 8;
5583 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5585 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5586 * the other of these connections is usually in a user process, and can
5587 * be switched and/or swapped out. So on fast, reliable networks, the
5588 * timeout would otherwise be too short.
5590 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5591 clock_Zero(&(peer->timeout));
5592 clock_Addmsec(&(peer->timeout), rtt_timeout);
5594 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5595 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5596 (peer->timeout.sec),(peer->timeout.usec)) );
5600 /* Find all server connections that have not been active for a long time, and
5602 void rxi_ReapConnections()
5605 clock_GetTime(&now);
5607 /* Find server connection structures that haven't been used for
5608 * greater than rx_idleConnectionTime */
5609 { struct rx_connection **conn_ptr, **conn_end;
5610 int i, havecalls = 0;
5611 MUTEX_ENTER(&rx_connHashTable_lock);
5612 for (conn_ptr = &rx_connHashTable[0],
5613 conn_end = &rx_connHashTable[rx_hashTableSize];
5614 conn_ptr < conn_end; conn_ptr++) {
5615 struct rx_connection *conn, *next;
5616 struct rx_call *call;
5620 for (conn = *conn_ptr; conn; conn = next) {
5621 /* XXX -- Shouldn't the connection be locked? */
5624 for(i=0;i<RX_MAXCALLS;i++) {
5625 call = conn->call[i];
5628 MUTEX_ENTER(&call->lock);
5629 #ifdef RX_ENABLE_LOCKS
5630 result = rxi_CheckCall(call, 1);
5631 #else /* RX_ENABLE_LOCKS */
5632 result = rxi_CheckCall(call);
5633 #endif /* RX_ENABLE_LOCKS */
5634 MUTEX_EXIT(&call->lock);
5636 /* If CheckCall freed the call, it might
5637 * have destroyed the connection as well,
5638 * which screws up the linked lists.
5644 if (conn->type == RX_SERVER_CONNECTION) {
5645 /* This only actually destroys the connection if
5646 * there are no outstanding calls */
5647 MUTEX_ENTER(&conn->conn_data_lock);
5648 if (!havecalls && !conn->refCount &&
5649 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5650 conn->refCount++; /* it will be decr in rx_DestroyConn */
5651 MUTEX_EXIT(&conn->conn_data_lock);
5652 #ifdef RX_ENABLE_LOCKS
5653 rxi_DestroyConnectionNoLock(conn);
5654 #else /* RX_ENABLE_LOCKS */
5655 rxi_DestroyConnection(conn);
5656 #endif /* RX_ENABLE_LOCKS */
5658 #ifdef RX_ENABLE_LOCKS
5660 MUTEX_EXIT(&conn->conn_data_lock);
5662 #endif /* RX_ENABLE_LOCKS */
5666 #ifdef RX_ENABLE_LOCKS
5667 while (rx_connCleanup_list) {
5668 struct rx_connection *conn;
5669 conn = rx_connCleanup_list;
5670 rx_connCleanup_list = rx_connCleanup_list->next;
5671 MUTEX_EXIT(&rx_connHashTable_lock);
5672 rxi_CleanupConnection(conn);
5673 MUTEX_ENTER(&rx_connHashTable_lock);
5675 MUTEX_EXIT(&rx_connHashTable_lock);
5676 #endif /* RX_ENABLE_LOCKS */
5679 /* Find any peer structures that haven't been used (haven't had an
5680 * associated connection) for greater than rx_idlePeerTime */
5681 { struct rx_peer **peer_ptr, **peer_end;
5683 MUTEX_ENTER(&rx_rpc_stats);
5684 MUTEX_ENTER(&rx_peerHashTable_lock);
5685 for (peer_ptr = &rx_peerHashTable[0],
5686 peer_end = &rx_peerHashTable[rx_hashTableSize];
5687 peer_ptr < peer_end; peer_ptr++) {
5688 struct rx_peer *peer, *next, *prev;
5689 for (prev = peer = *peer_ptr; peer; peer = next) {
5691 code = MUTEX_TRYENTER(&peer->peer_lock);
5692 if ((code) && (peer->refCount == 0)
5693 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5694 rx_interface_stat_p rpc_stat, nrpc_stat;
5696 MUTEX_EXIT(&peer->peer_lock);
5697 MUTEX_DESTROY(&peer->peer_lock);
5698 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5699 rx_interface_stat)) {
5700 unsigned int num_funcs;
5701 if (!rpc_stat) break;
5702 queue_Remove(&rpc_stat->queue_header);
5703 queue_Remove(&rpc_stat->all_peers);
5704 num_funcs = rpc_stat->stats[0].func_total;
5705 space = sizeof(rx_interface_stat_t) +
5706 rpc_stat->stats[0].func_total *
5707 sizeof(rx_function_entry_v1_t);
5709 rxi_Free(rpc_stat, space);
5710 rxi_rpc_peer_stat_cnt -= num_funcs;
5713 MUTEX_ENTER(&rx_stats_mutex);
5714 rx_stats.nPeerStructs--;
5715 MUTEX_EXIT(&rx_stats_mutex);
5716 if (prev == *peer_ptr) {
5725 MUTEX_EXIT(&peer->peer_lock);
5731 MUTEX_EXIT(&rx_peerHashTable_lock);
5732 MUTEX_EXIT(&rx_rpc_stats);
5735 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5736 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5737 GC, just below. Really, we shouldn't have to keep moving packets from
5738 one place to another, but instead ought to always know if we can
5739 afford to hold onto a packet in its particular use. */
5740 MUTEX_ENTER(&rx_freePktQ_lock);
5741 if (rx_waitingForPackets) {
5742 rx_waitingForPackets = 0;
5743 #ifdef RX_ENABLE_LOCKS
5744 CV_BROADCAST(&rx_waitingForPackets_cv);
5746 osi_rxWakeup(&rx_waitingForPackets);
5749 MUTEX_EXIT(&rx_freePktQ_lock);
5751 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5752 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5756 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5757 * rx.h is sort of strange this is better. This is called with a security
5758 * object before it is discarded. Each connection using a security object has
5759 * its own refcount to the object so it won't actually be freed until the last
5760 * connection is destroyed.
5762 * This is the only rxs module call. A hold could also be written but no one
5765 int rxs_Release (aobj)
5766 struct rx_securityClass *aobj;
5768 return RXS_Close (aobj);
5772 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5773 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5774 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5775 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5777 /* Adjust our estimate of the transmission rate to this peer, given
5778 * that the packet p was just acked. We can adjust peer->timeout and
5779 * call->twind. Pragmatically, this is called
5780 * only with packets of maximal length.
5781 * Called with peer and call locked.
5784 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5785 register struct rx_peer *peer;
5786 register struct rx_call *call;
5787 struct rx_packet *p, *ackp;
5790 afs_int32 xferSize, xferMs;
5791 register afs_int32 minTime;
5794 /* Count down packets */
5795 if (peer->rateFlag > 0) peer->rateFlag--;
5796 /* Do nothing until we're enabled */
5797 if (peer->rateFlag != 0) return;
5798 if (!call->conn) return;
5800 /* Count only when the ack seems legitimate */
5801 switch (ackReason) {
5802 case RX_ACK_REQUESTED:
5803 xferSize = p->length + RX_HEADER_SIZE +
5804 call->conn->securityMaxTrailerSize;
5808 case RX_ACK_PING_RESPONSE:
5809 if (p) /* want the response to ping-request, not data send */
5811 clock_GetTime(&newTO);
5812 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5813 clock_Sub(&newTO, &call->pingRequestTime);
5814 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5818 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5825 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5826 ntohl(peer->host), ntohs(peer->port),
5827 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5828 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5831 /* Track only packets that are big enough. */
5832 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5836 /* absorb RTT data (in milliseconds) for these big packets */
5837 if (peer->smRtt == 0) {
5838 peer->smRtt = xferMs;
5840 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5841 if (!peer->smRtt) peer->smRtt = 1;
5844 if (peer->countDown) {
5848 peer->countDown = 10; /* recalculate only every so often */
5850 /* In practice, we can measure only the RTT for full packets,
5851 * because of the way Rx acks the data that it receives. (If it's
5852 * smaller than a full packet, it often gets implicitly acked
5853 * either by the call response (from a server) or by the next call
5854 * (from a client), and either case confuses transmission times
5855 * with processing times.) Therefore, replace the above
5856 * more-sophisticated processing with a simpler version, where the
5857 * smoothed RTT is kept for full-size packets, and the time to
5858 * transmit a windowful of full-size packets is simply RTT *
5859 * windowSize. Again, we take two steps:
5860 - ensure the timeout is large enough for a single packet's RTT;
5861 - ensure that the window is small enough to fit in the desired timeout.*/
5863 /* First, the timeout check. */
5864 minTime = peer->smRtt;
5865 /* Get a reasonable estimate for a timeout period */
5867 newTO.sec = minTime / 1000;
5868 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5870 /* Increase the timeout period so that we can always do at least
5871 * one packet exchange */
5872 if (clock_Gt(&newTO, &peer->timeout)) {
5874 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5875 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5876 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5879 peer->timeout = newTO;
5882 /* Now, get an estimate for the transmit window size. */
5883 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5884 /* Now, convert to the number of full packets that could fit in a
5885 * reasonable fraction of that interval */
5886 minTime /= (peer->smRtt << 1);
5887 xferSize = minTime; /* (make a copy) */
5889 /* Now clamp the size to reasonable bounds. */
5890 if (minTime <= 1) minTime = 1;
5891 else if (minTime > rx_Window) minTime = rx_Window;
5892 /* if (minTime != peer->maxWindow) {
5893 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5894 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5895 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5897 peer->maxWindow = minTime;
5898 elide... call->twind = minTime;
5902 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5903 * Discern this by calculating the timeout necessary for rx_Window
5905 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5906 /* calculate estimate for transmission interval in milliseconds */
5907 minTime = rx_Window * peer->smRtt;
5908 if (minTime < 1000) {
5909 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5910 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5911 peer->timeout.usec, peer->smRtt,
5914 newTO.sec = 0; /* cut back on timeout by half a second */
5915 newTO.usec = 500000;
5916 clock_Sub(&peer->timeout, &newTO);
5921 } /* end of rxi_ComputeRate */
5922 #endif /* ADAPT_WINDOW */
5930 /* Don't call this debugging routine directly; use dpf */
5932 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5933 a11, a12, a13, a14, a15)
5937 clock_GetTime(&now);
5938 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5939 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5946 * This function is used to process the rx_stats structure that is local
5947 * to a process as well as an rx_stats structure received from a remote
5948 * process (via rxdebug). Therefore, it needs to do minimal version
5951 void rx_PrintTheseStats (file, s, size, freePackets, version)
5954 int size; /* some idea of version control */
5955 afs_int32 freePackets;
5960 if (size != sizeof(struct rx_stats)) {
5962 "Unexpected size of stats structure: was %d, expected %d\n",
5963 size, sizeof(struct rx_stats));
5967 "rx stats: free packets %d, allocs %d, ",
5971 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5973 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5974 s->receivePktAllocFailures,
5975 s->receiveCbufPktAllocFailures,
5976 s->sendPktAllocFailures,
5977 s->sendCbufPktAllocFailures,
5978 s->specialPktAllocFailures);
5981 "alloc-failures(rcv %d,send %d,ack %d)\n",
5982 s->receivePktAllocFailures,
5983 s->sendPktAllocFailures,
5984 s->specialPktAllocFailures);
5989 "bogusReads %d (last from host %x), "
5995 s->bogusPacketOnRead,
5998 s->noPacketBuffersOnRead,
6002 fprintf(file, " packets read: ");
6003 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
6009 fprintf(file, "\n");
6012 " other read counters: data %d, "
6020 s->spuriousPacketsRead,
6021 s->ignorePacketDally);
6023 fprintf(file, " packets sent: ");
6024 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
6030 fprintf(file, "\n");
6033 " other send counters: ack %d, "
6034 "data %d (not resends), "
6037 "acked&ignored %d\n",
6040 s->dataPacketsReSent,
6041 s->dataPacketsPushed,
6042 s->ignoreAckedPacket);
6045 " \t(these should be small) sendFailed %d, "
6048 (int) s->fatalErrors);
6050 if (s->nRttSamples) {
6052 " Average rtt is %0.3f, with %d samples\n",
6053 clock_Float(&s->totalRtt)/s->nRttSamples,
6057 " Minimum rtt is %0.3f, maximum is %0.3f\n",
6058 clock_Float(&s->minRtt),
6059 clock_Float(&s->maxRtt));
6063 " %d server connections, "
6064 "%d client connections, "
6067 "%d free call structs\n",
6072 s->nFreeCallStructs);
6074 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
6076 " %d clock updates\n",
6082 /* for backward compatibility */
6083 void rx_PrintStats(file)
6086 MUTEX_ENTER(&rx_stats_mutex);
6087 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
6088 MUTEX_EXIT(&rx_stats_mutex);
6091 void rx_PrintPeerStats(file, peer)
6093 struct rx_peer *peer;
6098 "burst wait %u.%d.\n",
6101 (int) peer->burstSize,
6102 (int) peer->burstWait.sec,
6103 (int) peer->burstWait.usec);
6107 "retry time %u.%06d, "
6111 (int) peer->timeout.sec,
6112 (int) peer->timeout.usec,
6118 "max in packet skew %d, "
6119 "max out packet skew %d\n",
6121 (int) peer->inPacketSkew,
6122 (int) peer->outPacketSkew);
6125 #ifdef AFS_PTHREAD_ENV
6127 * This mutex protects the following static variables:
6131 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
6132 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
6134 #define LOCK_RX_DEBUG
6135 #define UNLOCK_RX_DEBUG
6136 #endif /* AFS_PTHREAD_ENV */
6138 static int MakeDebugCall(
6140 afs_uint32 remoteAddr,
6141 afs_uint16 remotePort,
6149 static afs_int32 counter = 100;
6151 struct rx_header theader;
6153 register afs_int32 code;
6155 struct sockaddr_in taddr, faddr;
6160 endTime = time(0) + 20; /* try for 20 seconds */
6164 tp = &tbuffer[sizeof(struct rx_header)];
6165 taddr.sin_family = AF_INET;
6166 taddr.sin_port = remotePort;
6167 taddr.sin_addr.s_addr = remoteAddr;
6168 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
6169 taddr.sin_len = sizeof(struct sockaddr_in);
6172 memset(&theader, 0, sizeof(theader));
6173 theader.epoch = htonl(999);
6175 theader.callNumber = htonl(counter);
6178 theader.type = type;
6179 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
6180 theader.serviceId = 0;
6182 memcpy(tbuffer, &theader, sizeof(theader));
6183 memcpy(tp, inputData, inputLength);
6184 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
6185 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
6187 /* see if there's a packet available */
6189 FD_SET(socket, &imask);
6192 code = select(socket+1, &imask, 0, 0, &tv);
6194 /* now receive a packet */
6195 faddrLen = sizeof(struct sockaddr_in);
6196 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6197 (struct sockaddr *) &faddr, &faddrLen);
6199 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6200 if (counter == ntohl(theader.callNumber)) break;
6203 /* see if we've timed out */
6204 if (endTime < time(0)) return -1;
6206 code -= sizeof(struct rx_header);
6207 if (code > outputLength) code = outputLength;
6208 memcpy(outputData, tp, code);
6212 afs_int32 rx_GetServerDebug(
6214 afs_uint32 remoteAddr,
6215 afs_uint16 remotePort,
6216 struct rx_debugStats *stat,
6217 afs_uint32 *supportedValues
6220 struct rx_debugIn in;
6223 *supportedValues = 0;
6224 in.type = htonl(RX_DEBUGI_GETSTATS);
6227 rc = MakeDebugCall(socket,
6230 RX_PACKET_TYPE_DEBUG,
6237 * If the call was successful, fixup the version and indicate
6238 * what contents of the stat structure are valid.
6239 * Also do net to host conversion of fields here.
6243 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6244 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6246 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6247 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6249 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6250 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6252 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6253 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6255 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6256 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6258 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6259 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6261 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6262 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6265 stat->nFreePackets = ntohl(stat->nFreePackets);
6266 stat->packetReclaims = ntohl(stat->packetReclaims);
6267 stat->callsExecuted = ntohl(stat->callsExecuted);
6268 stat->nWaiting = ntohl(stat->nWaiting);
6269 stat->idleThreads = ntohl(stat->idleThreads);
6275 afs_int32 rx_GetServerStats(
6277 afs_uint32 remoteAddr,
6278 afs_uint16 remotePort,
6279 struct rx_stats *stat,
6280 afs_uint32 *supportedValues
6283 struct rx_debugIn in;
6284 afs_int32 *lp = (afs_int32 *) stat;
6289 * supportedValues is currently unused, but added to allow future
6290 * versioning of this function.
6293 *supportedValues = 0;
6294 in.type = htonl(RX_DEBUGI_RXSTATS);
6296 memset(stat, 0, sizeof(*stat));
6298 rc = MakeDebugCall(socket,
6301 RX_PACKET_TYPE_DEBUG,
6310 * Do net to host conversion here
6313 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6321 afs_int32 rx_GetServerVersion(
6323 afs_uint32 remoteAddr,
6324 afs_uint16 remotePort,
6325 size_t version_length,
6330 return MakeDebugCall(socket,
6333 RX_PACKET_TYPE_VERSION,
6340 afs_int32 rx_GetServerConnections(
6342 afs_uint32 remoteAddr,
6343 afs_uint16 remotePort,
6344 afs_int32 *nextConnection,
6346 afs_uint32 debugSupportedValues,
6347 struct rx_debugConn *conn,
6348 afs_uint32 *supportedValues
6351 struct rx_debugIn in;
6356 * supportedValues is currently unused, but added to allow future
6357 * versioning of this function.
6360 *supportedValues = 0;
6361 if (allConnections) {
6362 in.type = htonl(RX_DEBUGI_GETALLCONN);
6364 in.type = htonl(RX_DEBUGI_GETCONN);
6366 in.index = htonl(*nextConnection);
6367 memset(conn, 0, sizeof(*conn));
6369 rc = MakeDebugCall(socket,
6372 RX_PACKET_TYPE_DEBUG,
6379 *nextConnection += 1;
6382 * Convert old connection format to new structure.
6385 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6386 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6387 #define MOVEvL(a) (conn->a = vL->a)
6389 /* any old or unrecognized version... */
6390 for (i=0;i<RX_MAXCALLS;i++) {
6391 MOVEvL(callState[i]);
6392 MOVEvL(callMode[i]);
6393 MOVEvL(callFlags[i]);
6394 MOVEvL(callOther[i]);
6396 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6397 MOVEvL(secStats.type);
6398 MOVEvL(secStats.level);
6399 MOVEvL(secStats.flags);
6400 MOVEvL(secStats.expires);
6401 MOVEvL(secStats.packetsReceived);
6402 MOVEvL(secStats.packetsSent);
6403 MOVEvL(secStats.bytesReceived);
6404 MOVEvL(secStats.bytesSent);
6409 * Do net to host conversion here
6411 * I don't convert host or port since we are most likely
6412 * going to want these in NBO.
6414 conn->cid = ntohl(conn->cid);
6415 conn->serial = ntohl(conn->serial);
6416 for(i=0;i<RX_MAXCALLS;i++) {
6417 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6419 conn->error = ntohl(conn->error);
6420 conn->secStats.flags = ntohl(conn->secStats.flags);
6421 conn->secStats.expires = ntohl(conn->secStats.expires);
6422 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6423 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6424 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6425 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6426 conn->epoch = ntohl(conn->epoch);
6427 conn->natMTU = ntohl(conn->natMTU);
6433 afs_int32 rx_GetServerPeers(
6435 afs_uint32 remoteAddr,
6436 afs_uint16 remotePort,
6437 afs_int32 *nextPeer,
6438 afs_uint32 debugSupportedValues,
6439 struct rx_debugPeer *peer,
6440 afs_uint32 *supportedValues
6443 struct rx_debugIn in;
6447 * supportedValues is currently unused, but added to allow future
6448 * versioning of this function.
6451 *supportedValues = 0;
6452 in.type = htonl(RX_DEBUGI_GETPEER);
6453 in.index = htonl(*nextPeer);
6454 memset(peer, 0, sizeof(*peer));
6456 rc = MakeDebugCall(socket,
6459 RX_PACKET_TYPE_DEBUG,
6469 * Do net to host conversion here
6471 * I don't convert host or port since we are most likely
6472 * going to want these in NBO.
6474 peer->ifMTU = ntohs(peer->ifMTU);
6475 peer->idleWhen = ntohl(peer->idleWhen);
6476 peer->refCount = ntohs(peer->refCount);
6477 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6478 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6479 peer->rtt = ntohl(peer->rtt);
6480 peer->rtt_dev = ntohl(peer->rtt_dev);
6481 peer->timeout.sec = ntohl(peer->timeout.sec);
6482 peer->timeout.usec = ntohl(peer->timeout.usec);
6483 peer->nSent = ntohl(peer->nSent);
6484 peer->reSends = ntohl(peer->reSends);
6485 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6486 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6487 peer->rateFlag = ntohl(peer->rateFlag);
6488 peer->natMTU = ntohs(peer->natMTU);
6489 peer->maxMTU = ntohs(peer->maxMTU);
6490 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6491 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6492 peer->MTU = ntohs(peer->MTU);
6493 peer->cwind = ntohs(peer->cwind);
6494 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6495 peer->congestSeq = ntohs(peer->congestSeq);
6496 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6497 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6498 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6499 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6504 #endif /* RXDEBUG */
6506 void shutdown_rx(void)
6508 struct rx_serverQueueEntry *np;
6510 register struct rx_call *call;
6511 register struct rx_serverQueueEntry *sq;
6514 if (rxinit_status == 1) {
6516 return; /* Already shutdown. */
6521 #ifndef AFS_PTHREAD_ENV
6522 FD_ZERO(&rx_selectMask);
6523 #endif /* AFS_PTHREAD_ENV */
6524 rxi_dataQuota = RX_MAX_QUOTA;
6525 #ifndef AFS_PTHREAD_ENV
6527 #endif /* AFS_PTHREAD_ENV */
6530 #ifndef AFS_PTHREAD_ENV
6531 #ifndef AFS_USE_GETTIMEOFDAY
6533 #endif /* AFS_USE_GETTIMEOFDAY */
6534 #endif /* AFS_PTHREAD_ENV */
6536 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6537 call = queue_First(&rx_freeCallQueue, rx_call);
6539 rxi_Free(call, sizeof(struct rx_call));
6542 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6543 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6549 struct rx_peer **peer_ptr, **peer_end;
6550 for (peer_ptr = &rx_peerHashTable[0],
6551 peer_end = &rx_peerHashTable[rx_hashTableSize];
6552 peer_ptr < peer_end; peer_ptr++) {
6553 struct rx_peer *peer, *next;
6554 for (peer = *peer_ptr; peer; peer = next) {
6555 rx_interface_stat_p rpc_stat, nrpc_stat;
6557 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6558 rx_interface_stat)) {
6559 unsigned int num_funcs;
6560 if (!rpc_stat) break;
6561 queue_Remove(&rpc_stat->queue_header);
6562 queue_Remove(&rpc_stat->all_peers);
6563 num_funcs = rpc_stat->stats[0].func_total;
6564 space = sizeof(rx_interface_stat_t) +
6565 rpc_stat->stats[0].func_total *
6566 sizeof(rx_function_entry_v1_t);
6568 rxi_Free(rpc_stat, space);
6569 MUTEX_ENTER(&rx_rpc_stats);
6570 rxi_rpc_peer_stat_cnt -= num_funcs;
6571 MUTEX_EXIT(&rx_rpc_stats);
6575 MUTEX_ENTER(&rx_stats_mutex);
6576 rx_stats.nPeerStructs--;
6577 MUTEX_EXIT(&rx_stats_mutex);
6581 for (i = 0; i<RX_MAX_SERVICES; i++) {
6583 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6585 for (i = 0; i < rx_hashTableSize; i++) {
6586 register struct rx_connection *tc, *ntc;
6587 MUTEX_ENTER(&rx_connHashTable_lock);
6588 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6590 for (j = 0; j < RX_MAXCALLS; j++) {
6592 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6595 rxi_Free(tc, sizeof(*tc));
6597 MUTEX_EXIT(&rx_connHashTable_lock);
6600 MUTEX_ENTER(&freeSQEList_lock);
6602 while ((np = rx_FreeSQEList)) {
6603 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6604 MUTEX_DESTROY(&np->lock);
6605 rxi_Free(np, sizeof(*np));
6608 MUTEX_EXIT(&freeSQEList_lock);
6609 MUTEX_DESTROY(&freeSQEList_lock);
6610 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6611 MUTEX_DESTROY(&rx_connHashTable_lock);
6612 MUTEX_DESTROY(&rx_peerHashTable_lock);
6613 MUTEX_DESTROY(&rx_serverPool_lock);
6615 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6616 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6618 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6619 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6621 rxi_FreeAllPackets();
6623 MUTEX_ENTER(&rx_stats_mutex);
6624 rxi_dataQuota = RX_MAX_QUOTA;
6625 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6626 MUTEX_EXIT(&rx_stats_mutex);
6632 #ifdef RX_ENABLE_LOCKS
6633 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6635 if (!MUTEX_ISMINE(lockaddr))
6636 osi_Panic("Lock not held: %s", msg);
6638 #endif /* RX_ENABLE_LOCKS */
6643 * Routines to implement connection specific data.
6646 int rx_KeyCreate(rx_destructor_t rtn)
6649 MUTEX_ENTER(&rxi_keyCreate_lock);
6650 key = rxi_keyCreate_counter++;
6651 rxi_keyCreate_destructor = (rx_destructor_t *)
6652 realloc((void *)rxi_keyCreate_destructor,
6653 (key+1) * sizeof(rx_destructor_t));
6654 rxi_keyCreate_destructor[key] = rtn;
6655 MUTEX_EXIT(&rxi_keyCreate_lock);
6659 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6662 MUTEX_ENTER(&conn->conn_data_lock);
6663 if (!conn->specific) {
6664 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6665 for (i = 0 ; i < key ; i++)
6666 conn->specific[i] = NULL;
6667 conn->nSpecific = key+1;
6668 conn->specific[key] = ptr;
6669 } else if (key >= conn->nSpecific) {
6670 conn->specific = (void **)
6671 realloc(conn->specific,(key+1)*sizeof(void *));
6672 for (i = conn->nSpecific ; i < key ; i++)
6673 conn->specific[i] = NULL;
6674 conn->nSpecific = key+1;
6675 conn->specific[key] = ptr;
6677 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6678 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6679 conn->specific[key] = ptr;
6681 MUTEX_EXIT(&conn->conn_data_lock);
6684 void *rx_GetSpecific(struct rx_connection *conn, int key)
6687 MUTEX_ENTER(&conn->conn_data_lock);
6688 if (key >= conn->nSpecific)
6691 ptr = conn->specific[key];
6692 MUTEX_EXIT(&conn->conn_data_lock);
6696 #endif /* !KERNEL */
6699 * processStats is a queue used to store the statistics for the local
6700 * process. Its contents are similar to the contents of the rpcStats
6701 * queue on a rx_peer structure, but the actual data stored within
6702 * this queue contains totals across the lifetime of the process (assuming
6703 * the stats have not been reset) - unlike the per peer structures
6704 * which can come and go based upon the peer lifetime.
6707 static struct rx_queue processStats = {&processStats,&processStats};
6710 * peerStats is a queue used to store the statistics for all peer structs.
6711 * Its contents are the union of all the peer rpcStats queues.
6714 static struct rx_queue peerStats = {&peerStats,&peerStats};
6717 * rxi_monitor_processStats is used to turn process wide stat collection
6721 static int rxi_monitor_processStats = 0;
6724 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6727 static int rxi_monitor_peerStats = 0;
6730 * rxi_AddRpcStat - given all of the information for a particular rpc
6731 * call, create (if needed) and update the stat totals for the rpc.
6735 * IN stats - the queue of stats that will be updated with the new value
6737 * IN rxInterface - a unique number that identifies the rpc interface
6739 * IN currentFunc - the index of the function being invoked
6741 * IN totalFunc - the total number of functions in this interface
6743 * IN queueTime - the amount of time this function waited for a thread
6745 * IN execTime - the amount of time this function invocation took to execute
6747 * IN bytesSent - the number bytes sent by this invocation
6749 * IN bytesRcvd - the number bytes received by this invocation
6751 * IN isServer - if true, this invocation was made to a server
6753 * IN remoteHost - the ip address of the remote host
6755 * IN remotePort - the port of the remote host
6757 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6759 * INOUT counter - if a new stats structure is allocated, the counter will
6760 * be updated with the new number of allocated stat structures
6767 static int rxi_AddRpcStat(
6768 struct rx_queue *stats,
6769 afs_uint32 rxInterface,
6770 afs_uint32 currentFunc,
6771 afs_uint32 totalFunc,
6772 struct clock *queueTime,
6773 struct clock *execTime,
6774 afs_hyper_t *bytesSent,
6775 afs_hyper_t *bytesRcvd,
6777 afs_uint32 remoteHost,
6778 afs_uint32 remotePort,
6780 unsigned int *counter)
6783 rx_interface_stat_p rpc_stat, nrpc_stat;
6786 * See if there's already a structure for this interface
6789 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6790 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6791 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6795 * Didn't find a match so allocate a new structure and add it to the
6799 if (queue_IsEnd(stats, rpc_stat) ||
6800 (rpc_stat == NULL) ||
6801 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6802 (rpc_stat->stats[0].remote_is_server != isServer)) {
6806 space = sizeof(rx_interface_stat_t) + totalFunc *
6807 sizeof(rx_function_entry_v1_t);
6809 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6810 if (rpc_stat == NULL) {
6814 *counter += totalFunc;
6815 for(i=0;i<totalFunc;i++) {
6816 rpc_stat->stats[i].remote_peer = remoteHost;
6817 rpc_stat->stats[i].remote_port = remotePort;
6818 rpc_stat->stats[i].remote_is_server = isServer;
6819 rpc_stat->stats[i].interfaceId = rxInterface;
6820 rpc_stat->stats[i].func_total = totalFunc;
6821 rpc_stat->stats[i].func_index = i;
6822 hzero(rpc_stat->stats[i].invocations);
6823 hzero(rpc_stat->stats[i].bytes_sent);
6824 hzero(rpc_stat->stats[i].bytes_rcvd);
6825 rpc_stat->stats[i].queue_time_sum.sec = 0;
6826 rpc_stat->stats[i].queue_time_sum.usec = 0;
6827 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6828 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6829 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6830 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6831 rpc_stat->stats[i].queue_time_max.sec = 0;
6832 rpc_stat->stats[i].queue_time_max.usec = 0;
6833 rpc_stat->stats[i].execution_time_sum.sec = 0;
6834 rpc_stat->stats[i].execution_time_sum.usec = 0;
6835 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6836 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6837 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6838 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6839 rpc_stat->stats[i].execution_time_max.sec = 0;
6840 rpc_stat->stats[i].execution_time_max.usec = 0;
6842 queue_Prepend(stats, rpc_stat);
6843 if (addToPeerList) {
6844 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6849 * Increment the stats for this function
6852 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6853 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6854 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6855 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6856 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6857 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6858 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6860 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6861 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6863 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6864 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6865 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6866 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6868 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6869 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6877 * rx_IncrementTimeAndCount - increment the times and count for a particular
6882 * IN peer - the peer who invoked the rpc
6884 * IN rxInterface - a unique number that identifies the rpc interface
6886 * IN currentFunc - the index of the function being invoked
6888 * IN totalFunc - the total number of functions in this interface
6890 * IN queueTime - the amount of time this function waited for a thread
6892 * IN execTime - the amount of time this function invocation took to execute
6894 * IN bytesSent - the number bytes sent by this invocation
6896 * IN bytesRcvd - the number bytes received by this invocation
6898 * IN isServer - if true, this invocation was made to a server
6905 void rx_IncrementTimeAndCount(
6906 struct rx_peer *peer,
6907 afs_uint32 rxInterface,
6908 afs_uint32 currentFunc,
6909 afs_uint32 totalFunc,
6910 struct clock *queueTime,
6911 struct clock *execTime,
6912 afs_hyper_t *bytesSent,
6913 afs_hyper_t *bytesRcvd,
6917 MUTEX_ENTER(&rx_rpc_stats);
6918 MUTEX_ENTER(&peer->peer_lock);
6920 if (rxi_monitor_peerStats) {
6921 rxi_AddRpcStat(&peer->rpcStats,
6933 &rxi_rpc_peer_stat_cnt);
6936 if (rxi_monitor_processStats) {
6937 rxi_AddRpcStat(&processStats,
6949 &rxi_rpc_process_stat_cnt);
6952 MUTEX_EXIT(&peer->peer_lock);
6953 MUTEX_EXIT(&rx_rpc_stats);
6958 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6962 * IN callerVersion - the rpc stat version of the caller.
6964 * IN count - the number of entries to marshall.
6966 * IN stats - pointer to stats to be marshalled.
6968 * OUT ptr - Where to store the marshalled data.
6974 void rx_MarshallProcessRPCStats(
6975 afs_uint32 callerVersion,
6977 rx_function_entry_v1_t *stats,
6984 * We only support the first version
6986 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6987 *(ptr++) = stats->remote_peer;
6988 *(ptr++) = stats->remote_port;
6989 *(ptr++) = stats->remote_is_server;
6990 *(ptr++) = stats->interfaceId;
6991 *(ptr++) = stats->func_total;
6992 *(ptr++) = stats->func_index;
6993 *(ptr++) = hgethi(stats->invocations);
6994 *(ptr++) = hgetlo(stats->invocations);
6995 *(ptr++) = hgethi(stats->bytes_sent);
6996 *(ptr++) = hgetlo(stats->bytes_sent);
6997 *(ptr++) = hgethi(stats->bytes_rcvd);
6998 *(ptr++) = hgetlo(stats->bytes_rcvd);
6999 *(ptr++) = stats->queue_time_sum.sec;
7000 *(ptr++) = stats->queue_time_sum.usec;
7001 *(ptr++) = stats->queue_time_sum_sqr.sec;
7002 *(ptr++) = stats->queue_time_sum_sqr.usec;
7003 *(ptr++) = stats->queue_time_min.sec;
7004 *(ptr++) = stats->queue_time_min.usec;
7005 *(ptr++) = stats->queue_time_max.sec;
7006 *(ptr++) = stats->queue_time_max.usec;
7007 *(ptr++) = stats->execution_time_sum.sec;
7008 *(ptr++) = stats->execution_time_sum.usec;
7009 *(ptr++) = stats->execution_time_sum_sqr.sec;
7010 *(ptr++) = stats->execution_time_sum_sqr.usec;
7011 *(ptr++) = stats->execution_time_min.sec;
7012 *(ptr++) = stats->execution_time_min.usec;
7013 *(ptr++) = stats->execution_time_max.sec;
7014 *(ptr++) = stats->execution_time_max.usec;
7020 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
7025 * IN callerVersion - the rpc stat version of the caller
7027 * OUT myVersion - the rpc stat version of this function
7029 * OUT clock_sec - local time seconds
7031 * OUT clock_usec - local time microseconds
7033 * OUT allocSize - the number of bytes allocated to contain stats
7035 * OUT statCount - the number stats retrieved from this process.
7037 * OUT stats - the actual stats retrieved from this process.
7041 * Returns void. If successful, stats will != NULL.
7044 int rx_RetrieveProcessRPCStats(
7045 afs_uint32 callerVersion,
7046 afs_uint32 *myVersion,
7047 afs_uint32 *clock_sec,
7048 afs_uint32 *clock_usec,
7050 afs_uint32 *statCount,
7061 *myVersion = RX_STATS_RETRIEVAL_VERSION;
7064 * Check to see if stats are enabled
7067 MUTEX_ENTER(&rx_rpc_stats);
7068 if (!rxi_monitor_processStats) {
7069 MUTEX_EXIT(&rx_rpc_stats);
7073 clock_GetTime(&now);
7074 *clock_sec = now.sec;
7075 *clock_usec = now.usec;
7078 * Allocate the space based upon the caller version
7080 * If the client is at an older version than we are,
7081 * we return the statistic data in the older data format, but
7082 * we still return our version number so the client knows we
7083 * are maintaining more data than it can retrieve.
7086 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7087 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
7088 *statCount = rxi_rpc_process_stat_cnt;
7091 * This can't happen yet, but in the future version changes
7092 * can be handled by adding additional code here
7096 if (space > (size_t) 0) {
7098 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7101 rx_interface_stat_p rpc_stat, nrpc_stat;
7104 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
7105 rx_interface_stat)) {
7107 * Copy the data based upon the caller version
7109 rx_MarshallProcessRPCStats(callerVersion,
7110 rpc_stat->stats[0].func_total,
7111 rpc_stat->stats, &ptr);
7117 MUTEX_EXIT(&rx_rpc_stats);
7122 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
7126 * IN callerVersion - the rpc stat version of the caller
7128 * OUT myVersion - the rpc stat version of this function
7130 * OUT clock_sec - local time seconds
7132 * OUT clock_usec - local time microseconds
7134 * OUT allocSize - the number of bytes allocated to contain stats
7136 * OUT statCount - the number of stats retrieved from the individual
7139 * OUT stats - the actual stats retrieved from the individual peer structures.
7143 * Returns void. If successful, stats will != NULL.
7146 int rx_RetrievePeerRPCStats(
7147 afs_uint32 callerVersion,
7148 afs_uint32 *myVersion,
7149 afs_uint32 *clock_sec,
7150 afs_uint32 *clock_usec,
7152 afs_uint32 *statCount,
7163 *myVersion = RX_STATS_RETRIEVAL_VERSION;
7166 * Check to see if stats are enabled
7169 MUTEX_ENTER(&rx_rpc_stats);
7170 if (!rxi_monitor_peerStats) {
7171 MUTEX_EXIT(&rx_rpc_stats);
7175 clock_GetTime(&now);
7176 *clock_sec = now.sec;
7177 *clock_usec = now.usec;
7180 * Allocate the space based upon the caller version
7182 * If the client is at an older version than we are,
7183 * we return the statistic data in the older data format, but
7184 * we still return our version number so the client knows we
7185 * are maintaining more data than it can retrieve.
7188 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7189 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
7190 *statCount = rxi_rpc_peer_stat_cnt;
7193 * This can't happen yet, but in the future version changes
7194 * can be handled by adding additional code here
7198 if (space > (size_t) 0) {
7200 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7203 rx_interface_stat_p rpc_stat, nrpc_stat;
7206 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7207 rx_interface_stat)) {
7209 * We have to fix the offset of rpc_stat since we are
7210 * keeping this structure on two rx_queues. The rx_queue
7211 * package assumes that the rx_queue member is the first
7212 * member of the structure. That is, rx_queue assumes that
7213 * any one item is only on one queue at a time. We are
7214 * breaking that assumption and so we have to do a little
7215 * math to fix our pointers.
7218 fix_offset = (char *) rpc_stat;
7219 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7220 rpc_stat = (rx_interface_stat_p) fix_offset;
7223 * Copy the data based upon the caller version
7225 rx_MarshallProcessRPCStats(callerVersion,
7226 rpc_stat->stats[0].func_total,
7227 rpc_stat->stats, &ptr);
7233 MUTEX_EXIT(&rx_rpc_stats);
7238 * rx_FreeRPCStats - free memory allocated by
7239 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7243 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7244 * rx_RetrievePeerRPCStats
7246 * IN allocSize - the number of bytes in stats.
7253 void rx_FreeRPCStats(
7257 rxi_Free(stats, allocSize);
7261 * rx_queryProcessRPCStats - see if process rpc stat collection is
7262 * currently enabled.
7268 * Returns 0 if stats are not enabled != 0 otherwise
7271 int rx_queryProcessRPCStats()
7274 MUTEX_ENTER(&rx_rpc_stats);
7275 rc = rxi_monitor_processStats;
7276 MUTEX_EXIT(&rx_rpc_stats);
7281 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7287 * Returns 0 if stats are not enabled != 0 otherwise
7290 int rx_queryPeerRPCStats()
7293 MUTEX_ENTER(&rx_rpc_stats);
7294 rc = rxi_monitor_peerStats;
7295 MUTEX_EXIT(&rx_rpc_stats);
7300 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7309 void rx_enableProcessRPCStats()
7311 MUTEX_ENTER(&rx_rpc_stats);
7312 rx_enable_stats = 1;
7313 rxi_monitor_processStats = 1;
7314 MUTEX_EXIT(&rx_rpc_stats);
7318 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7327 void rx_enablePeerRPCStats()
7329 MUTEX_ENTER(&rx_rpc_stats);
7330 rx_enable_stats = 1;
7331 rxi_monitor_peerStats = 1;
7332 MUTEX_EXIT(&rx_rpc_stats);
7336 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7345 void rx_disableProcessRPCStats()
7347 rx_interface_stat_p rpc_stat, nrpc_stat;
7350 MUTEX_ENTER(&rx_rpc_stats);
7353 * Turn off process statistics and if peer stats is also off, turn
7357 rxi_monitor_processStats = 0;
7358 if (rxi_monitor_peerStats == 0) {
7359 rx_enable_stats = 0;
7362 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7363 unsigned int num_funcs = 0;
7364 if (!rpc_stat) break;
7365 queue_Remove(rpc_stat);
7366 num_funcs = rpc_stat->stats[0].func_total;
7367 space = sizeof(rx_interface_stat_t) +
7368 rpc_stat->stats[0].func_total *
7369 sizeof(rx_function_entry_v1_t);
7371 rxi_Free(rpc_stat, space);
7372 rxi_rpc_process_stat_cnt -= num_funcs;
7374 MUTEX_EXIT(&rx_rpc_stats);
7378 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7387 void rx_disablePeerRPCStats()
7389 struct rx_peer **peer_ptr, **peer_end;
7392 MUTEX_ENTER(&rx_rpc_stats);
7395 * Turn off peer statistics and if process stats is also off, turn
7399 rxi_monitor_peerStats = 0;
7400 if (rxi_monitor_processStats == 0) {
7401 rx_enable_stats = 0;
7404 MUTEX_ENTER(&rx_peerHashTable_lock);
7405 for (peer_ptr = &rx_peerHashTable[0],
7406 peer_end = &rx_peerHashTable[rx_hashTableSize];
7407 peer_ptr < peer_end; peer_ptr++) {
7408 struct rx_peer *peer, *next, *prev;
7409 for (prev = peer = *peer_ptr; peer; peer = next) {
7411 code = MUTEX_TRYENTER(&peer->peer_lock);
7413 rx_interface_stat_p rpc_stat, nrpc_stat;
7415 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7416 rx_interface_stat)) {
7417 unsigned int num_funcs = 0;
7418 if (!rpc_stat) break;
7419 queue_Remove(&rpc_stat->queue_header);
7420 queue_Remove(&rpc_stat->all_peers);
7421 num_funcs = rpc_stat->stats[0].func_total;
7422 space = sizeof(rx_interface_stat_t) +
7423 rpc_stat->stats[0].func_total *
7424 sizeof(rx_function_entry_v1_t);
7426 rxi_Free(rpc_stat, space);
7427 rxi_rpc_peer_stat_cnt -= num_funcs;
7429 MUTEX_EXIT(&peer->peer_lock);
7430 if (prev == *peer_ptr) {
7442 MUTEX_EXIT(&rx_peerHashTable_lock);
7443 MUTEX_EXIT(&rx_rpc_stats);
7447 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7452 * IN clearFlag - flag indicating which stats to clear
7459 void rx_clearProcessRPCStats(
7460 afs_uint32 clearFlag)
7462 rx_interface_stat_p rpc_stat, nrpc_stat;
7464 MUTEX_ENTER(&rx_rpc_stats);
7466 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7467 unsigned int num_funcs = 0, i;
7468 num_funcs = rpc_stat->stats[0].func_total;
7469 for(i=0;i<num_funcs;i++) {
7470 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7471 hzero(rpc_stat->stats[i].invocations);
7473 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7474 hzero(rpc_stat->stats[i].bytes_sent);
7476 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7477 hzero(rpc_stat->stats[i].bytes_rcvd);
7479 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7480 rpc_stat->stats[i].queue_time_sum.sec = 0;
7481 rpc_stat->stats[i].queue_time_sum.usec = 0;
7483 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7484 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7485 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7487 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7488 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7489 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7491 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7492 rpc_stat->stats[i].queue_time_max.sec = 0;
7493 rpc_stat->stats[i].queue_time_max.usec = 0;
7495 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7496 rpc_stat->stats[i].execution_time_sum.sec = 0;
7497 rpc_stat->stats[i].execution_time_sum.usec = 0;
7499 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7500 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7501 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7503 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7504 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7505 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7507 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7508 rpc_stat->stats[i].execution_time_max.sec = 0;
7509 rpc_stat->stats[i].execution_time_max.usec = 0;
7514 MUTEX_EXIT(&rx_rpc_stats);
7518 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7523 * IN clearFlag - flag indicating which stats to clear
7530 void rx_clearPeerRPCStats(
7531 afs_uint32 clearFlag)
7533 rx_interface_stat_p rpc_stat, nrpc_stat;
7535 MUTEX_ENTER(&rx_rpc_stats);
7537 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7538 unsigned int num_funcs = 0, i;
7541 * We have to fix the offset of rpc_stat since we are
7542 * keeping this structure on two rx_queues. The rx_queue
7543 * package assumes that the rx_queue member is the first
7544 * member of the structure. That is, rx_queue assumes that
7545 * any one item is only on one queue at a time. We are
7546 * breaking that assumption and so we have to do a little
7547 * math to fix our pointers.
7550 fix_offset = (char *) rpc_stat;
7551 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7552 rpc_stat = (rx_interface_stat_p) fix_offset;
7554 num_funcs = rpc_stat->stats[0].func_total;
7555 for(i=0;i<num_funcs;i++) {
7556 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7557 hzero(rpc_stat->stats[i].invocations);
7559 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7560 hzero(rpc_stat->stats[i].bytes_sent);
7562 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7563 hzero(rpc_stat->stats[i].bytes_rcvd);
7565 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7566 rpc_stat->stats[i].queue_time_sum.sec = 0;
7567 rpc_stat->stats[i].queue_time_sum.usec = 0;
7569 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7570 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7571 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7573 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7574 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7575 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7577 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7578 rpc_stat->stats[i].queue_time_max.sec = 0;
7579 rpc_stat->stats[i].queue_time_max.usec = 0;
7581 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7582 rpc_stat->stats[i].execution_time_sum.sec = 0;
7583 rpc_stat->stats[i].execution_time_sum.usec = 0;
7585 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7586 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7587 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7589 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7590 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7591 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7593 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7594 rpc_stat->stats[i].execution_time_max.sec = 0;
7595 rpc_stat->stats[i].execution_time_max.usec = 0;
7600 MUTEX_EXIT(&rx_rpc_stats);
7604 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7605 * is authorized to enable/disable/clear RX statistics.
7607 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7609 void rx_SetRxStatUserOk(
7610 int (*proc)(struct rx_call *call))
7612 rxi_rxstat_userok = proc;
7615 int rx_RxStatUserOk(
7616 struct rx_call *call)
7618 if (!rxi_rxstat_userok)
7620 return rxi_rxstat_userok(call);