2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "../afs/param.h"
16 #include <afs/param.h>
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
46 #include "../afsint/afsint.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include "rx_internal.h"
105 # include <afs/rxgen_consts.h>
108 int (*registerProgram)() = 0;
109 int (*swapNameProgram)() = 0;
111 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
113 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
114 afs_int32 rxi_start_in_error;
116 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
119 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
120 * currently allocated within rx. This number is used to allocate the
121 * memory required to return the statistics when queried.
124 static unsigned int rxi_rpc_peer_stat_cnt;
127 * rxi_rpc_process_stat_cnt counts the total number of local process stat
128 * structures currently allocated within rx. The number is used to allocate
129 * the memory required to return the statistics when queried.
132 static unsigned int rxi_rpc_process_stat_cnt;
134 #if !defined(offsetof)
135 #include <stddef.h> /* for definition of offsetof() */
138 #ifdef AFS_PTHREAD_ENV
142 * Use procedural initialization of mutexes/condition variables
146 extern pthread_mutex_t rxkad_stats_mutex;
147 extern pthread_mutex_t des_init_mutex;
148 extern pthread_mutex_t des_random_mutex;
149 extern pthread_mutex_t rx_clock_mutex;
150 extern pthread_mutex_t rxi_connCacheMutex;
151 extern pthread_mutex_t rx_event_mutex;
152 extern pthread_mutex_t osi_malloc_mutex;
153 extern pthread_mutex_t event_handler_mutex;
154 extern pthread_mutex_t listener_mutex;
155 extern pthread_mutex_t rx_if_init_mutex;
156 extern pthread_mutex_t rx_if_mutex;
157 extern pthread_mutex_t rxkad_client_uid_mutex;
158 extern pthread_mutex_t rxkad_random_mutex;
160 extern pthread_cond_t rx_event_handler_cond;
161 extern pthread_cond_t rx_listener_cond;
163 static pthread_mutex_t epoch_mutex;
164 static pthread_mutex_t rx_init_mutex;
165 static pthread_mutex_t rx_debug_mutex;
167 static void rxi_InitPthread(void) {
168 assert(pthread_mutex_init(&rx_clock_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rxi_connCacheMutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&rx_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&epoch_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&rx_event_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&des_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&des_random_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&osi_malloc_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&event_handler_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&listener_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_if_init_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rx_if_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
193 (const pthread_mutexattr_t*)0)==0);
194 assert(pthread_mutex_init(&rxkad_random_mutex,
195 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_mutex_init(&rxkad_stats_mutex,
197 (const pthread_mutexattr_t*)0)==0);
198 assert(pthread_mutex_init(&rx_debug_mutex,
199 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_cond_init(&rx_event_handler_cond,
202 (const pthread_condattr_t*)0)==0);
203 assert(pthread_cond_init(&rx_listener_cond,
204 (const pthread_condattr_t*)0)==0);
205 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
208 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
209 #define INIT_PTHREAD_LOCKS \
210 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
212 * The rx_stats_mutex mutex protects the following global variables:
217 * rxi_lowConnRefCount
218 * rxi_lowPeerRefCount
227 #define INIT_PTHREAD_LOCKS
231 /* Variables for handling the minProcs implementation. availProcs gives the
232 * number of threads available in the pool at this moment (not counting dudes
233 * executing right now). totalMin gives the total number of procs required
234 * for handling all minProcs requests. minDeficit is a dynamic variable
235 * tracking the # of procs required to satisfy all of the remaining minProcs
237 * For fine grain locking to work, the quota check and the reservation of
238 * a server thread has to come while rxi_availProcs and rxi_minDeficit
239 * are locked. To this end, the code has been modified under #ifdef
240 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
241 * same time. A new function, ReturnToServerPool() returns the allocation.
243 * A call can be on several queue's (but only one at a time). When
244 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
245 * that no one else is touching the queue. To this end, we store the address
246 * of the queue lock in the call structure (under the call lock) when we
247 * put the call on a queue, and we clear the call_queue_lock when the
248 * call is removed from a queue (once the call lock has been obtained).
249 * This allows rxi_ResetCall to safely synchronize with others wishing
250 * to manipulate the queue.
253 #ifdef RX_ENABLE_LOCKS
254 static int rxi_ServerThreadSelectingCall;
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
260 ** pretty good that the next packet coming in is from the same connection
261 ** as the last packet, since we're send multiple packets in a transmit window.
263 struct rx_connection *rxLastConn = 0;
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
268 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269 * call->lock - locks call data fields.
270 * Most any other lock - these are all independent of each other.....
272 * rx_freeCallQueue_lock
274 * rx_connHashTable_lock
277 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
280 * peer_lock - locks peer data fields.
281 * conn_data_lock - that more than one thread is not updating a conn data
282 * field at the same time.
283 * Do we need a lock to protect the peer field in the conn structure?
284 * conn->peer was previously a constant for all intents and so has no
285 * lock protecting this field. The multihomed client delta introduced
286 * a RX code change : change the peer field in the connection structure
287 * to that remote inetrface from which the last packet for this
288 * connection was sent out. This may become an issue if further changes
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 struct rx_serverQueueEntry *rx_waitForPacket = 0;
306 /* ------------Exported Interfaces------------- */
308 /* This function allows rxkad to set the epoch to a suitably random number
309 * which rx_NewConnection will use in the future. The principle purpose is to
310 * get rxnull connections to use the same epoch as the rxkad connections do, at
311 * least once the first rxkad connection is established. This is important now
312 * that the host/port addresses aren't used in FindConnection: the uniqueness
313 * of epoch/cid matters and the start time won't do. */
315 #ifdef AFS_PTHREAD_ENV
317 * This mutex protects the following global variables:
321 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
322 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
326 #endif /* AFS_PTHREAD_ENV */
328 void rx_SetEpoch (epoch)
336 /* Initialize rx. A port number may be mentioned, in which case this
337 * becomes the default port number for any service installed later.
338 * If 0 is provided for the port number, a random port will be chosen
339 * by the kernel. Whether this will ever overlap anything in
340 * /etc/services is anybody's guess... Returns 0 on success, -1 on
342 static int rxinit_status = 1;
343 #ifdef AFS_PTHREAD_ENV
345 * This mutex protects the following global variables:
349 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
353 #define UNLOCK_RX_INIT
356 int rx_Init(u_int port)
363 char *htable, *ptable;
366 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
367 __djgpp_set_quiet_socket(1);
374 if (rxinit_status == 0) {
375 tmp_status = rxinit_status;
377 return tmp_status; /* Already started; return previous error code. */
381 if (afs_winsockInit()<0)
387 * Initialize anything necessary to provide a non-premptive threading
390 rxi_InitializeThreadSupport();
393 /* Allocate and initialize a socket for client and perhaps server
396 rx_socket = rxi_GetUDPSocket((u_short)port);
397 if (rx_socket == OSI_NULLSOCKET) {
403 #ifdef RX_ENABLE_LOCKS
406 #endif /* RX_LOCKS_DB */
407 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
413 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
414 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
415 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
418 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
420 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
421 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
423 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
424 #endif /* KERNEL && AFS_HPUX110_ENV */
425 #else /* RX_ENABLE_LOCKS */
426 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
427 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
428 #endif /* AFS_GLOBAL_SUNLOCK */
429 #endif /* RX_ENABLE_LOCKS */
432 rx_connDeadTime = 12;
433 rx_tranquil = 0; /* reset flag */
434 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
436 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
437 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
438 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
439 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
440 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
441 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
443 /* Malloc up a bunch of packets & buffers */
445 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
446 queue_Init(&rx_freePacketQueue);
447 rxi_NeedMorePackets = FALSE;
448 rxi_MorePackets(rx_nPackets);
456 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
457 tv.tv_sec = clock_now.sec;
458 tv.tv_usec = clock_now.usec;
459 srand((unsigned int) tv.tv_usec);
466 #if defined(KERNEL) && !defined(UKERNEL)
467 /* Really, this should never happen in a real kernel */
470 struct sockaddr_in addr;
471 int addrlen = sizeof(addr);
472 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
476 rx_port = addr.sin_port;
479 rx_stats.minRtt.sec = 9999999;
481 rx_SetEpoch (tv.tv_sec | 0x80000000);
483 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
484 * will provide a randomer value. */
486 MUTEX_ENTER(&rx_stats_mutex);
487 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
488 MUTEX_EXIT(&rx_stats_mutex);
489 /* *Slightly* random start time for the cid. This is just to help
490 * out with the hashing function at the peer */
491 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
492 rx_connHashTable = (struct rx_connection **) htable;
493 rx_peerHashTable = (struct rx_peer **) ptable;
495 rx_lastAckDelay.sec = 0;
496 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
497 rx_hardAckDelay.sec = 0;
498 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
499 rx_softAckDelay.sec = 0;
500 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
502 rxevent_Init(20, rxi_ReScheduleEvents);
504 /* Initialize various global queues */
505 queue_Init(&rx_idleServerQueue);
506 queue_Init(&rx_incomingCallQueue);
507 queue_Init(&rx_freeCallQueue);
509 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
510 /* Initialize our list of usable IP addresses. */
514 /* Start listener process (exact function is dependent on the
515 * implementation environment--kernel or user space) */
520 tmp_status = rxinit_status = 0;
525 /* called with unincremented nRequestsRunning to see if it is OK to start
526 * a new thread in this service. Could be "no" for two reasons: over the
527 * max quota, or would prevent others from reaching their min quota.
529 #ifdef RX_ENABLE_LOCKS
530 /* This verion of QuotaOK reserves quota if it's ok while the
531 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
533 static int QuotaOK(aservice)
534 register struct rx_service *aservice;
536 /* check if over max quota */
537 if (aservice->nRequestsRunning >= aservice->maxProcs) {
541 /* under min quota, we're OK */
542 /* otherwise, can use only if there are enough to allow everyone
543 * to go to their min quota after this guy starts.
545 MUTEX_ENTER(&rx_stats_mutex);
546 if ((aservice->nRequestsRunning < aservice->minProcs) ||
547 (rxi_availProcs > rxi_minDeficit)) {
548 aservice->nRequestsRunning++;
549 /* just started call in minProcs pool, need fewer to maintain
551 if (aservice->nRequestsRunning <= aservice->minProcs)
554 MUTEX_EXIT(&rx_stats_mutex);
557 MUTEX_EXIT(&rx_stats_mutex);
561 static void ReturnToServerPool(aservice)
562 register struct rx_service *aservice;
564 aservice->nRequestsRunning--;
565 MUTEX_ENTER(&rx_stats_mutex);
566 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
568 MUTEX_EXIT(&rx_stats_mutex);
571 #else /* RX_ENABLE_LOCKS */
572 static int QuotaOK(aservice)
573 register struct rx_service *aservice; {
575 /* under min quota, we're OK */
576 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
578 /* check if over max quota */
579 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
581 /* otherwise, can use only if there are enough to allow everyone
582 * to go to their min quota after this guy starts.
584 if (rxi_availProcs > rxi_minDeficit) rc = 1;
587 #endif /* RX_ENABLE_LOCKS */
590 /* Called by rx_StartServer to start up lwp's to service calls.
591 NExistingProcs gives the number of procs already existing, and which
592 therefore needn't be created. */
593 void rxi_StartServerProcs(nExistingProcs)
596 register struct rx_service *service;
601 /* For each service, reserve N processes, where N is the "minimum"
602 number of processes that MUST be able to execute a request in parallel,
603 at any time, for that process. Also compute the maximum difference
604 between any service's maximum number of processes that can run
605 (i.e. the maximum number that ever will be run, and a guarantee
606 that this number will run if other services aren't running), and its
607 minimum number. The result is the extra number of processes that
608 we need in order to provide the latter guarantee */
609 for (i=0; i<RX_MAX_SERVICES; i++) {
611 service = rx_services[i];
612 if (service == (struct rx_service *) 0) break;
613 nProcs += service->minProcs;
614 diff = service->maxProcs - service->minProcs;
615 if (diff > maxdiff) maxdiff = diff;
617 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
618 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
619 for (i = 0; i<nProcs; i++) {
620 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
625 /* This routine must be called if any services are exported. If the
626 * donateMe flag is set, the calling process is donated to the server
628 void rx_StartServer(donateMe)
630 register struct rx_service *service;
631 register int i, nProcs=0;
637 /* Start server processes, if necessary (exact function is dependent
638 * on the implementation environment--kernel or user space). DonateMe
639 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
640 * case, one less new proc will be created rx_StartServerProcs.
642 rxi_StartServerProcs(donateMe);
644 /* count up the # of threads in minProcs, and add set the min deficit to
645 * be that value, too.
647 for (i=0; i<RX_MAX_SERVICES; i++) {
648 service = rx_services[i];
649 if (service == (struct rx_service *) 0) break;
650 MUTEX_ENTER(&rx_stats_mutex);
651 rxi_totalMin += service->minProcs;
652 /* below works even if a thread is running, since minDeficit would
653 * still have been decremented and later re-incremented.
655 rxi_minDeficit += service->minProcs;
656 MUTEX_EXIT(&rx_stats_mutex);
659 /* Turn on reaping of idle server connections */
660 rxi_ReapConnections();
669 #ifdef AFS_PTHREAD_ENV
671 pid = (pid_t) pthread_self();
672 #else /* AFS_PTHREAD_ENV */
674 LWP_CurrentProcess(&pid);
675 #endif /* AFS_PTHREAD_ENV */
677 sprintf(name,"srv_%d", ++nProcs);
679 (*registerProgram)(pid, name);
681 #endif /* AFS_NT40_ENV */
682 rx_ServerProc(); /* Never returns */
687 /* Create a new client connection to the specified service, using the
688 * specified security object to implement the security model for this
690 struct rx_connection *
691 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
692 register afs_uint32 shost; /* Server host */
693 u_short sport; /* Server port */
694 u_short sservice; /* Server service id */
695 register struct rx_securityClass *securityObject;
696 int serviceSecurityIndex;
700 register struct rx_connection *conn;
705 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
706 shost, sport, sservice, securityObject, serviceSecurityIndex));
708 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
709 * the case of kmem_alloc? */
710 conn = rxi_AllocConnection();
711 #ifdef RX_ENABLE_LOCKS
712 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
713 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
714 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
718 MUTEX_ENTER(&rx_connHashTable_lock);
719 cid = (rx_nextCid += RX_MAXCALLS);
720 conn->type = RX_CLIENT_CONNECTION;
722 conn->epoch = rx_epoch;
723 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
724 conn->serviceId = sservice;
725 conn->securityObject = securityObject;
726 /* This doesn't work in all compilers with void (they're buggy), so fake it
728 conn->securityData = (VOID *) 0;
729 conn->securityIndex = serviceSecurityIndex;
730 rx_SetConnDeadTime(conn, rx_connDeadTime);
731 conn->ackRate = RX_FAST_ACK_RATE;
733 conn->specific = NULL;
734 conn->challengeEvent = (struct rxevent *)0;
735 conn->delayedAbortEvent = (struct rxevent *)0;
736 conn->abortCount = 0;
739 RXS_NewConnection(securityObject, conn);
740 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
742 conn->refCount++; /* no lock required since only this thread knows... */
743 conn->next = rx_connHashTable[hashindex];
744 rx_connHashTable[hashindex] = conn;
745 MUTEX_ENTER(&rx_stats_mutex);
746 rx_stats.nClientConns++;
747 MUTEX_EXIT(&rx_stats_mutex);
749 MUTEX_EXIT(&rx_connHashTable_lock);
755 void rx_SetConnDeadTime(conn, seconds)
756 register struct rx_connection *conn;
757 register int seconds;
759 /* The idea is to set the dead time to a value that allows several
760 * keepalives to be dropped without timing out the connection. */
761 conn->secondsUntilDead = MAX(seconds, 6);
762 conn->secondsUntilPing = conn->secondsUntilDead/6;
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
769 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770 * NOTE: must not be called with rx_connHashTable_lock held.
772 void rxi_CleanupConnection(conn)
773 struct rx_connection *conn;
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(conn)
830 register struct rx_connection *conn;
832 MUTEX_ENTER(&rx_connHashTable_lock);
833 rxi_DestroyConnectionNoLock(conn);
834 /* conn should be at the head of the cleanup list */
835 if (conn == rx_connCleanup_list) {
836 rx_connCleanup_list = rx_connCleanup_list->next;
837 MUTEX_EXIT(&rx_connHashTable_lock);
838 rxi_CleanupConnection(conn);
840 #ifdef RX_ENABLE_LOCKS
842 MUTEX_EXIT(&rx_connHashTable_lock);
844 #endif /* RX_ENABLE_LOCKS */
847 static void rxi_DestroyConnectionNoLock(conn)
848 register struct rx_connection *conn;
850 register struct rx_connection **conn_ptr;
851 register int havecalls = 0;
852 struct rx_packet *packet;
859 MUTEX_ENTER(&conn->conn_data_lock);
860 if (conn->refCount > 0)
863 MUTEX_ENTER(&rx_stats_mutex);
864 rxi_lowConnRefCount++;
865 MUTEX_EXIT(&rx_stats_mutex);
868 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
869 /* Busy; wait till the last guy before proceeding */
870 MUTEX_EXIT(&conn->conn_data_lock);
875 /* If the client previously called rx_NewCall, but it is still
876 * waiting, treat this as a running call, and wait to destroy the
877 * connection later when the call completes. */
878 if ((conn->type == RX_CLIENT_CONNECTION) &&
879 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
880 conn->flags |= RX_CONN_DESTROY_ME;
881 MUTEX_EXIT(&conn->conn_data_lock);
885 MUTEX_EXIT(&conn->conn_data_lock);
887 /* Check for extant references to this connection */
888 for (i = 0; i<RX_MAXCALLS; i++) {
889 register struct rx_call *call = conn->call[i];
892 if (conn->type == RX_CLIENT_CONNECTION) {
893 MUTEX_ENTER(&call->lock);
894 if (call->delayedAckEvent) {
895 /* Push the final acknowledgment out now--there
896 * won't be a subsequent call to acknowledge the
897 * last reply packets */
898 rxevent_Cancel(call->delayedAckEvent, call,
899 RX_CALL_REFCOUNT_DELAY);
900 if (call->state == RX_STATE_PRECALL ||
901 call->state == RX_STATE_ACTIVE) {
902 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
904 rxi_AckAll((struct rxevent *)0, call, 0);
907 MUTEX_EXIT(&call->lock);
911 #ifdef RX_ENABLE_LOCKS
913 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
914 MUTEX_EXIT(&conn->conn_data_lock);
917 /* Someone is accessing a packet right now. */
921 #endif /* RX_ENABLE_LOCKS */
924 /* Don't destroy the connection if there are any call
925 * structures still in use */
926 MUTEX_ENTER(&conn->conn_data_lock);
927 conn->flags |= RX_CONN_DESTROY_ME;
928 MUTEX_EXIT(&conn->conn_data_lock);
933 if (conn->delayedAbortEvent) {
934 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
935 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
937 MUTEX_ENTER(&conn->conn_data_lock);
938 rxi_SendConnectionAbort(conn, packet, 0, 1);
939 MUTEX_EXIT(&conn->conn_data_lock);
940 rxi_FreePacket(packet);
944 /* Remove from connection hash table before proceeding */
945 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
946 conn->epoch, conn->type) ];
947 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
948 if (*conn_ptr == conn) {
949 *conn_ptr = conn->next;
953 /* if the conn that we are destroying was the last connection, then we
954 * clear rxLastConn as well */
955 if ( rxLastConn == conn )
958 /* Make sure the connection is completely reset before deleting it. */
959 /* get rid of pending events that could zap us later */
960 if (conn->challengeEvent)
961 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
962 if (conn->checkReachEvent)
963 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
965 /* Add the connection to the list of destroyed connections that
966 * need to be cleaned up. This is necessary to avoid deadlocks
967 * in the routines we call to inform others that this connection is
968 * being destroyed. */
969 conn->next = rx_connCleanup_list;
970 rx_connCleanup_list = conn;
973 /* Externally available version */
974 void rx_DestroyConnection(conn)
975 register struct rx_connection *conn;
981 rxi_DestroyConnection (conn);
986 /* Start a new rx remote procedure call, on the specified connection.
987 * If wait is set to 1, wait for a free call channel; otherwise return
988 * 0. Maxtime gives the maximum number of seconds this call may take,
989 * after rx_MakeCall returns. After this time interval, a call to any
990 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
991 * For fine grain locking, we hold the conn_call_lock in order to
992 * to ensure that we don't get signalle after we found a call in an active
993 * state and before we go to sleep.
995 struct rx_call *rx_NewCall(conn)
996 register struct rx_connection *conn;
999 register struct rx_call *call;
1000 struct clock queueTime;
1004 dpf (("rx_MakeCall(conn %x)\n", conn));
1007 clock_GetTime(&queueTime);
1009 MUTEX_ENTER(&conn->conn_call_lock);
1012 * Check if there are others waiting for a new call.
1013 * If so, let them go first to avoid starving them.
1014 * This is a fairly simple scheme, and might not be
1015 * a complete solution for large numbers of waiters.
1017 if (conn->makeCallWaiters) {
1018 #ifdef RX_ENABLE_LOCKS
1019 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1026 for (i=0; i<RX_MAXCALLS; i++) {
1027 call = conn->call[i];
1029 MUTEX_ENTER(&call->lock);
1030 if (call->state == RX_STATE_DALLY) {
1031 rxi_ResetCall(call, 0);
1032 (*call->callNumber)++;
1035 MUTEX_EXIT(&call->lock);
1038 call = rxi_NewCall(conn, i);
1042 if (i < RX_MAXCALLS) {
1045 MUTEX_ENTER(&conn->conn_data_lock);
1046 conn->flags |= RX_CONN_MAKECALL_WAITING;
1047 MUTEX_EXIT(&conn->conn_data_lock);
1049 conn->makeCallWaiters++;
1050 #ifdef RX_ENABLE_LOCKS
1051 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1055 conn->makeCallWaiters--;
1058 * Wake up anyone else who might be giving us a chance to
1059 * run (see code above that avoids resource starvation).
1061 #ifdef RX_ENABLE_LOCKS
1062 CV_BROADCAST(&conn->conn_call_cv);
1067 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1069 /* Client is initially in send mode */
1070 call->state = RX_STATE_ACTIVE;
1071 call->mode = RX_MODE_SENDING;
1073 /* remember start time for call in case we have hard dead time limit */
1074 call->queueTime = queueTime;
1075 clock_GetTime(&call->startTime);
1076 hzero(call->bytesSent);
1077 hzero(call->bytesRcvd);
1079 /* Turn on busy protocol. */
1080 rxi_KeepAliveOn(call);
1082 MUTEX_EXIT(&call->lock);
1083 MUTEX_EXIT(&conn->conn_call_lock);
1087 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1088 /* Now, if TQ wasn't cleared earlier, do it now. */
1090 MUTEX_ENTER(&call->lock);
1091 while (call->flags & RX_CALL_TQ_BUSY) {
1092 call->flags |= RX_CALL_TQ_WAIT;
1093 #ifdef RX_ENABLE_LOCKS
1094 CV_WAIT(&call->cv_tq, &call->lock);
1095 #else /* RX_ENABLE_LOCKS */
1096 osi_rxSleep(&call->tq);
1097 #endif /* RX_ENABLE_LOCKS */
1099 if (call->flags & RX_CALL_TQ_CLEARME) {
1100 rxi_ClearTransmitQueue(call, 0);
1101 queue_Init(&call->tq);
1103 MUTEX_EXIT(&call->lock);
1105 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1111 rxi_HasActiveCalls(aconn)
1112 register struct rx_connection *aconn; {
1114 register struct rx_call *tcall;
1118 for(i=0; i<RX_MAXCALLS; i++) {
1119 if ((tcall = aconn->call[i])) {
1120 if ((tcall->state == RX_STATE_ACTIVE)
1121 || (tcall->state == RX_STATE_PRECALL)) {
1132 rxi_GetCallNumberVector(aconn, aint32s)
1133 register struct rx_connection *aconn;
1134 register afs_int32 *aint32s; {
1136 register struct rx_call *tcall;
1140 for(i=0; i<RX_MAXCALLS; i++) {
1141 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1142 aint32s[i] = aconn->callNumber[i]+1;
1144 aint32s[i] = aconn->callNumber[i];
1151 rxi_SetCallNumberVector(aconn, aint32s)
1152 register struct rx_connection *aconn;
1153 register afs_int32 *aint32s; {
1155 register struct rx_call *tcall;
1159 for(i=0; i<RX_MAXCALLS; i++) {
1160 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1161 aconn->callNumber[i] = aint32s[i] - 1;
1163 aconn->callNumber[i] = aint32s[i];
1169 /* Advertise a new service. A service is named locally by a UDP port
1170 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1173 rx_NewService(port, serviceId, serviceName, securityObjects,
1174 nSecurityObjects, serviceProc)
1177 char *serviceName; /* Name for identification purposes (e.g. the
1178 * service name might be used for probing for
1180 struct rx_securityClass **securityObjects;
1181 int nSecurityObjects;
1182 afs_int32 (*serviceProc)();
1184 osi_socket socket = OSI_NULLSOCKET;
1185 register struct rx_service *tservice;
1191 if (serviceId == 0) {
1192 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1198 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1205 tservice = rxi_AllocService();
1208 for (i = 0; i<RX_MAX_SERVICES; i++) {
1209 register struct rx_service *service = rx_services[i];
1211 if (port == service->servicePort) {
1212 if (service->serviceId == serviceId) {
1213 /* The identical service has already been
1214 * installed; if the caller was intending to
1215 * change the security classes used by this
1216 * service, he/she loses. */
1217 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1220 rxi_FreeService(tservice);
1223 /* Different service, same port: re-use the socket
1224 * which is bound to the same port */
1225 socket = service->socket;
1228 if (socket == OSI_NULLSOCKET) {
1229 /* If we don't already have a socket (from another
1230 * service on same port) get a new one */
1231 socket = rxi_GetUDPSocket(port);
1232 if (socket == OSI_NULLSOCKET) {
1235 rxi_FreeService(tservice);
1240 service->socket = socket;
1241 service->servicePort = port;
1242 service->serviceId = serviceId;
1243 service->serviceName = serviceName;
1244 service->nSecurityObjects = nSecurityObjects;
1245 service->securityObjects = securityObjects;
1246 service->minProcs = 0;
1247 service->maxProcs = 1;
1248 service->idleDeadTime = 60;
1249 service->connDeadTime = rx_connDeadTime;
1250 service->executeRequestProc = serviceProc;
1251 service->checkReach = 0;
1252 rx_services[i] = service; /* not visible until now */
1260 rxi_FreeService(tservice);
1261 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1265 /* Generic request processing loop. This routine should be called
1266 * by the implementation dependent rx_ServerProc. If socketp is
1267 * non-null, it will be set to the file descriptor that this thread
1268 * is now listening on. If socketp is null, this routine will never
1270 void rxi_ServerProc(threadID, newcall, socketp)
1272 struct rx_call *newcall;
1273 osi_socket *socketp;
1275 register struct rx_call *call;
1276 register afs_int32 code;
1277 register struct rx_service *tservice = NULL;
1284 call = rx_GetCall(threadID, tservice, socketp);
1285 if (socketp && *socketp != OSI_NULLSOCKET) {
1286 /* We are now a listener thread */
1291 /* if server is restarting( typically smooth shutdown) then do not
1292 * allow any new calls.
1295 if ( rx_tranquil && (call != NULL) ) {
1300 MUTEX_ENTER(&call->lock);
1302 rxi_CallError(call, RX_RESTARTING);
1303 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1305 MUTEX_EXIT(&call->lock);
1311 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1312 #ifdef RX_ENABLE_LOCKS
1314 #endif /* RX_ENABLE_LOCKS */
1315 afs_termState = AFSOP_STOP_AFS;
1316 afs_osi_Wakeup(&afs_termState);
1317 #ifdef RX_ENABLE_LOCKS
1319 #endif /* RX_ENABLE_LOCKS */
1324 tservice = call->conn->service;
1326 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1328 code = call->conn->service->executeRequestProc(call);
1330 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1332 rx_EndCall(call, code);
1333 MUTEX_ENTER(&rx_stats_mutex);
1335 MUTEX_EXIT(&rx_stats_mutex);
1340 void rx_WakeupServerProcs()
1342 struct rx_serverQueueEntry *np, *tqp;
1347 MUTEX_ENTER(&rx_serverPool_lock);
1349 #ifdef RX_ENABLE_LOCKS
1350 if (rx_waitForPacket)
1351 CV_BROADCAST(&rx_waitForPacket->cv);
1352 #else /* RX_ENABLE_LOCKS */
1353 if (rx_waitForPacket)
1354 osi_rxWakeup(rx_waitForPacket);
1355 #endif /* RX_ENABLE_LOCKS */
1356 MUTEX_ENTER(&freeSQEList_lock);
1357 for (np = rx_FreeSQEList; np; np = tqp) {
1358 tqp = *(struct rx_serverQueueEntry **)np;
1359 #ifdef RX_ENABLE_LOCKS
1360 CV_BROADCAST(&np->cv);
1361 #else /* RX_ENABLE_LOCKS */
1363 #endif /* RX_ENABLE_LOCKS */
1365 MUTEX_EXIT(&freeSQEList_lock);
1366 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1367 #ifdef RX_ENABLE_LOCKS
1368 CV_BROADCAST(&np->cv);
1369 #else /* RX_ENABLE_LOCKS */
1371 #endif /* RX_ENABLE_LOCKS */
1373 MUTEX_EXIT(&rx_serverPool_lock);
1379 * One thing that seems to happen is that all the server threads get
1380 * tied up on some empty or slow call, and then a whole bunch of calls
1381 * arrive at once, using up the packet pool, so now there are more
1382 * empty calls. The most critical resources here are server threads
1383 * and the free packet pool. The "doreclaim" code seems to help in
1384 * general. I think that eventually we arrive in this state: there
1385 * are lots of pending calls which do have all their packets present,
1386 * so they won't be reclaimed, are multi-packet calls, so they won't
1387 * be scheduled until later, and thus are tying up most of the free
1388 * packet pool for a very long time.
1390 * 1. schedule multi-packet calls if all the packets are present.
1391 * Probably CPU-bound operation, useful to return packets to pool.
1392 * Do what if there is a full window, but the last packet isn't here?
1393 * 3. preserve one thread which *only* runs "best" calls, otherwise
1394 * it sleeps and waits for that type of call.
1395 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1396 * the current dataquota business is badly broken. The quota isn't adjusted
1397 * to reflect how many packets are presently queued for a running call.
1398 * So, when we schedule a queued call with a full window of packets queued
1399 * up for it, that *should* free up a window full of packets for other 2d-class
1400 * calls to be able to use from the packet pool. But it doesn't.
1402 * NB. Most of the time, this code doesn't run -- since idle server threads
1403 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1404 * as a new call arrives.
1406 /* Sleep until a call arrives. Returns a pointer to the call, ready
1407 * for an rx_Read. */
1408 #ifdef RX_ENABLE_LOCKS
1410 rx_GetCall(tno, cur_service, socketp)
1412 struct rx_service *cur_service;
1413 osi_socket *socketp;
1415 struct rx_serverQueueEntry *sq;
1416 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1417 struct rx_service *service = NULL;
1420 MUTEX_ENTER(&freeSQEList_lock);
1422 if ((sq = rx_FreeSQEList)) {
1423 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1424 MUTEX_EXIT(&freeSQEList_lock);
1425 } else { /* otherwise allocate a new one and return that */
1426 MUTEX_EXIT(&freeSQEList_lock);
1427 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1428 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1429 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1432 MUTEX_ENTER(&rx_serverPool_lock);
1433 if (cur_service != NULL) {
1434 ReturnToServerPool(cur_service);
1437 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1438 register struct rx_call *tcall, *ncall;
1439 choice2 = (struct rx_call *) 0;
1440 /* Scan for eligible incoming calls. A call is not eligible
1441 * if the maximum number of calls for its service type are
1442 * already executing */
1443 /* One thread will process calls FCFS (to prevent starvation),
1444 * while the other threads may run ahead looking for calls which
1445 * have all their input data available immediately. This helps
1446 * keep threads from blocking, waiting for data from the client. */
1447 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1448 service = tcall->conn->service;
1449 if (!QuotaOK(service)) {
1452 if (!tno || !tcall->queue_item_header.next ) {
1453 /* If we're thread 0, then we'll just use
1454 * this call. If we haven't been able to find an optimal
1455 * choice, and we're at the end of the list, then use a
1456 * 2d choice if one has been identified. Otherwise... */
1457 call = (choice2 ? choice2 : tcall);
1458 service = call->conn->service;
1459 } else if (!queue_IsEmpty(&tcall->rq)) {
1460 struct rx_packet *rp;
1461 rp = queue_First(&tcall->rq, rx_packet);
1462 if (rp->header.seq == 1) {
1463 if (!meltdown_1pkt ||
1464 (rp->header.flags & RX_LAST_PACKET)) {
1466 } else if (rxi_2dchoice && !choice2 &&
1467 !(tcall->flags & RX_CALL_CLEARED) &&
1468 (tcall->rprev > rxi_HardAckRate)) {
1470 } else rxi_md2cnt++;
1476 ReturnToServerPool(service);
1483 rxi_ServerThreadSelectingCall = 1;
1484 MUTEX_EXIT(&rx_serverPool_lock);
1485 MUTEX_ENTER(&call->lock);
1486 MUTEX_ENTER(&rx_serverPool_lock);
1488 if (queue_IsEmpty(&call->rq) ||
1489 queue_First(&call->rq, rx_packet)->header.seq != 1)
1490 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1492 CLEAR_CALL_QUEUE_LOCK(call);
1494 MUTEX_EXIT(&call->lock);
1495 ReturnToServerPool(service);
1496 rxi_ServerThreadSelectingCall = 0;
1497 CV_SIGNAL(&rx_serverPool_cv);
1498 call = (struct rx_call*)0;
1501 call->flags &= (~RX_CALL_WAIT_PROC);
1502 MUTEX_ENTER(&rx_stats_mutex);
1504 MUTEX_EXIT(&rx_stats_mutex);
1505 rxi_ServerThreadSelectingCall = 0;
1506 CV_SIGNAL(&rx_serverPool_cv);
1507 MUTEX_EXIT(&rx_serverPool_lock);
1511 /* If there are no eligible incoming calls, add this process
1512 * to the idle server queue, to wait for one */
1516 *socketp = OSI_NULLSOCKET;
1518 sq->socketp = socketp;
1519 queue_Append(&rx_idleServerQueue, sq);
1520 #ifndef AFS_AIX41_ENV
1521 rx_waitForPacket = sq;
1522 #endif /* AFS_AIX41_ENV */
1524 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1526 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1527 MUTEX_EXIT(&rx_serverPool_lock);
1528 return (struct rx_call *)0;
1531 } while (!(call = sq->newcall) &&
1532 !(socketp && *socketp != OSI_NULLSOCKET));
1533 MUTEX_EXIT(&rx_serverPool_lock);
1535 MUTEX_ENTER(&call->lock);
1541 MUTEX_ENTER(&freeSQEList_lock);
1542 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1543 rx_FreeSQEList = sq;
1544 MUTEX_EXIT(&freeSQEList_lock);
1547 clock_GetTime(&call->startTime);
1548 call->state = RX_STATE_ACTIVE;
1549 call->mode = RX_MODE_RECEIVING;
1551 rxi_calltrace(RX_CALL_START, call);
1552 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1553 call->conn->service->servicePort,
1554 call->conn->service->serviceId, call));
1556 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1557 MUTEX_EXIT(&call->lock);
1559 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1564 #else /* RX_ENABLE_LOCKS */
1566 rx_GetCall(tno, cur_service, socketp)
1568 struct rx_service *cur_service;
1569 osi_socket *socketp;
1571 struct rx_serverQueueEntry *sq;
1572 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1573 struct rx_service *service = NULL;
1578 MUTEX_ENTER(&freeSQEList_lock);
1580 if ((sq = rx_FreeSQEList)) {
1581 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1582 MUTEX_EXIT(&freeSQEList_lock);
1583 } else { /* otherwise allocate a new one and return that */
1584 MUTEX_EXIT(&freeSQEList_lock);
1585 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1586 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1587 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1589 MUTEX_ENTER(&sq->lock);
1591 if (cur_service != NULL) {
1592 cur_service->nRequestsRunning--;
1593 if (cur_service->nRequestsRunning < cur_service->minProcs)
1597 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1598 register struct rx_call *tcall, *ncall;
1599 /* Scan for eligible incoming calls. A call is not eligible
1600 * if the maximum number of calls for its service type are
1601 * already executing */
1602 /* One thread will process calls FCFS (to prevent starvation),
1603 * while the other threads may run ahead looking for calls which
1604 * have all their input data available immediately. This helps
1605 * keep threads from blocking, waiting for data from the client. */
1606 choice2 = (struct rx_call *) 0;
1607 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1608 service = tcall->conn->service;
1609 if (QuotaOK(service)) {
1610 if (!tno || !tcall->queue_item_header.next ) {
1611 /* If we're thread 0, then we'll just use
1612 * this call. If we haven't been able to find an optimal
1613 * choice, and we're at the end of the list, then use a
1614 * 2d choice if one has been identified. Otherwise... */
1615 call = (choice2 ? choice2 : tcall);
1616 service = call->conn->service;
1617 } else if (!queue_IsEmpty(&tcall->rq)) {
1618 struct rx_packet *rp;
1619 rp = queue_First(&tcall->rq, rx_packet);
1620 if (rp->header.seq == 1
1621 && (!meltdown_1pkt ||
1622 (rp->header.flags & RX_LAST_PACKET))) {
1624 } else if (rxi_2dchoice && !choice2 &&
1625 !(tcall->flags & RX_CALL_CLEARED) &&
1626 (tcall->rprev > rxi_HardAckRate)) {
1628 } else rxi_md2cnt++;
1638 /* we can't schedule a call if there's no data!!! */
1639 /* send an ack if there's no data, if we're missing the
1640 * first packet, or we're missing something between first
1641 * and last -- there's a "hole" in the incoming data. */
1642 if (queue_IsEmpty(&call->rq) ||
1643 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1644 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1645 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1647 call->flags &= (~RX_CALL_WAIT_PROC);
1648 service->nRequestsRunning++;
1649 /* just started call in minProcs pool, need fewer to maintain
1651 if (service->nRequestsRunning <= service->minProcs)
1655 /* MUTEX_EXIT(&call->lock); */
1658 /* If there are no eligible incoming calls, add this process
1659 * to the idle server queue, to wait for one */
1662 *socketp = OSI_NULLSOCKET;
1664 sq->socketp = socketp;
1665 queue_Append(&rx_idleServerQueue, sq);
1669 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1672 return (struct rx_call *)0;
1675 } while (!(call = sq->newcall) &&
1676 !(socketp && *socketp != OSI_NULLSOCKET));
1678 MUTEX_EXIT(&sq->lock);
1680 MUTEX_ENTER(&freeSQEList_lock);
1681 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1682 rx_FreeSQEList = sq;
1683 MUTEX_EXIT(&freeSQEList_lock);
1686 clock_GetTime(&call->startTime);
1687 call->state = RX_STATE_ACTIVE;
1688 call->mode = RX_MODE_RECEIVING;
1690 rxi_calltrace(RX_CALL_START, call);
1691 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1692 call->conn->service->servicePort,
1693 call->conn->service->serviceId, call));
1695 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1703 #endif /* RX_ENABLE_LOCKS */
1707 /* Establish a procedure to be called when a packet arrives for a
1708 * call. This routine will be called at most once after each call,
1709 * and will also be called if there is an error condition on the or
1710 * the call is complete. Used by multi rx to build a selection
1711 * function which determines which of several calls is likely to be a
1712 * good one to read from.
1713 * NOTE: the way this is currently implemented it is probably only a
1714 * good idea to (1) use it immediately after a newcall (clients only)
1715 * and (2) only use it once. Other uses currently void your warranty
1717 void rx_SetArrivalProc(call, proc, handle, arg)
1718 register struct rx_call *call;
1719 register VOID (*proc)();
1720 register VOID *handle;
1723 call->arrivalProc = proc;
1724 call->arrivalProcHandle = handle;
1725 call->arrivalProcArg = arg;
1728 /* Call is finished (possibly prematurely). Return rc to the peer, if
1729 * appropriate, and return the final error code from the conversation
1732 afs_int32 rx_EndCall(call, rc)
1733 register struct rx_call *call;
1736 register struct rx_connection *conn = call->conn;
1737 register struct rx_service *service;
1738 register struct rx_packet *tp; /* Temporary packet pointer */
1739 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1743 dpf(("rx_EndCall(call %x)\n", call));
1747 MUTEX_ENTER(&call->lock);
1749 if (rc == 0 && call->error == 0) {
1750 call->abortCode = 0;
1751 call->abortCount = 0;
1754 call->arrivalProc = (VOID (*)()) 0;
1755 if (rc && call->error == 0) {
1756 rxi_CallError(call, rc);
1757 /* Send an abort message to the peer if this error code has
1758 * only just been set. If it was set previously, assume the
1759 * peer has already been sent the error code or will request it
1761 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1763 if (conn->type == RX_SERVER_CONNECTION) {
1764 /* Make sure reply or at least dummy reply is sent */
1765 if (call->mode == RX_MODE_RECEIVING) {
1766 rxi_WriteProc(call, 0, 0);
1768 if (call->mode == RX_MODE_SENDING) {
1769 rxi_FlushWrite(call);
1771 service = conn->service;
1772 rxi_calltrace(RX_CALL_END, call);
1773 /* Call goes to hold state until reply packets are acknowledged */
1774 if (call->tfirst + call->nSoftAcked < call->tnext) {
1775 call->state = RX_STATE_HOLD;
1777 call->state = RX_STATE_DALLY;
1778 rxi_ClearTransmitQueue(call, 0);
1779 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1780 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1783 else { /* Client connection */
1785 /* Make sure server receives input packets, in the case where
1786 * no reply arguments are expected */
1787 if ((call->mode == RX_MODE_SENDING)
1788 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1789 (void) rxi_ReadProc(call, &dummy, 1);
1791 /* We need to release the call lock since it's lower than the
1792 * conn_call_lock and we don't want to hold the conn_call_lock
1793 * over the rx_ReadProc call. The conn_call_lock needs to be held
1794 * here for the case where rx_NewCall is perusing the calls on
1795 * the connection structure. We don't want to signal until
1796 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1797 * have checked this call, found it active and by the time it
1798 * goes to sleep, will have missed the signal.
1800 MUTEX_EXIT(&call->lock);
1801 MUTEX_ENTER(&conn->conn_call_lock);
1802 MUTEX_ENTER(&call->lock);
1803 MUTEX_ENTER(&conn->conn_data_lock);
1804 conn->flags |= RX_CONN_BUSY;
1805 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1806 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1807 MUTEX_EXIT(&conn->conn_data_lock);
1808 #ifdef RX_ENABLE_LOCKS
1809 CV_BROADCAST(&conn->conn_call_cv);
1814 #ifdef RX_ENABLE_LOCKS
1816 MUTEX_EXIT(&conn->conn_data_lock);
1818 #endif /* RX_ENABLE_LOCKS */
1819 call->state = RX_STATE_DALLY;
1821 error = call->error;
1823 /* currentPacket, nLeft, and NFree must be zeroed here, because
1824 * ResetCall cannot: ResetCall may be called at splnet(), in the
1825 * kernel version, and may interrupt the macros rx_Read or
1826 * rx_Write, which run at normal priority for efficiency. */
1827 if (call->currentPacket) {
1828 rxi_FreePacket(call->currentPacket);
1829 call->currentPacket = (struct rx_packet *) 0;
1830 call->nLeft = call->nFree = call->curlen = 0;
1833 call->nLeft = call->nFree = call->curlen = 0;
1835 /* Free any packets from the last call to ReadvProc/WritevProc */
1836 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1841 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1842 MUTEX_EXIT(&call->lock);
1843 if (conn->type == RX_CLIENT_CONNECTION) {
1844 MUTEX_EXIT(&conn->conn_call_lock);
1845 conn->flags &= ~RX_CONN_BUSY;
1850 * Map errors to the local host's errno.h format.
1852 error = ntoh_syserr_conv(error);
1856 #if !defined(KERNEL)
1858 /* Call this routine when shutting down a server or client (especially
1859 * clients). This will allow Rx to gracefully garbage collect server
1860 * connections, and reduce the number of retries that a server might
1861 * make to a dead client.
1862 * This is not quite right, since some calls may still be ongoing and
1863 * we can't lock them to destroy them. */
1864 void rx_Finalize() {
1865 register struct rx_connection **conn_ptr, **conn_end;
1869 if (rxinit_status == 1) {
1871 return; /* Already shutdown. */
1873 rxi_DeleteCachedConnections();
1874 if (rx_connHashTable) {
1875 MUTEX_ENTER(&rx_connHashTable_lock);
1876 for (conn_ptr = &rx_connHashTable[0],
1877 conn_end = &rx_connHashTable[rx_hashTableSize];
1878 conn_ptr < conn_end; conn_ptr++) {
1879 struct rx_connection *conn, *next;
1880 for (conn = *conn_ptr; conn; conn = next) {
1882 if (conn->type == RX_CLIENT_CONNECTION) {
1883 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1885 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1886 #ifdef RX_ENABLE_LOCKS
1887 rxi_DestroyConnectionNoLock(conn);
1888 #else /* RX_ENABLE_LOCKS */
1889 rxi_DestroyConnection(conn);
1890 #endif /* RX_ENABLE_LOCKS */
1894 #ifdef RX_ENABLE_LOCKS
1895 while (rx_connCleanup_list) {
1896 struct rx_connection *conn;
1897 conn = rx_connCleanup_list;
1898 rx_connCleanup_list = rx_connCleanup_list->next;
1899 MUTEX_EXIT(&rx_connHashTable_lock);
1900 rxi_CleanupConnection(conn);
1901 MUTEX_ENTER(&rx_connHashTable_lock);
1903 MUTEX_EXIT(&rx_connHashTable_lock);
1904 #endif /* RX_ENABLE_LOCKS */
1913 /* if we wakeup packet waiter too often, can get in loop with two
1914 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1916 rxi_PacketsUnWait() {
1918 if (!rx_waitingForPackets) {
1922 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1923 return; /* still over quota */
1926 rx_waitingForPackets = 0;
1927 #ifdef RX_ENABLE_LOCKS
1928 CV_BROADCAST(&rx_waitingForPackets_cv);
1930 osi_rxWakeup(&rx_waitingForPackets);
1936 /* ------------------Internal interfaces------------------------- */
1938 /* Return this process's service structure for the
1939 * specified socket and service */
1940 struct rx_service *rxi_FindService(socket, serviceId)
1941 register osi_socket socket;
1942 register u_short serviceId;
1944 register struct rx_service **sp;
1945 for (sp = &rx_services[0]; *sp; sp++) {
1946 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1952 /* Allocate a call structure, for the indicated channel of the
1953 * supplied connection. The mode and state of the call must be set by
1954 * the caller. Returns the call with mutex locked. */
1955 struct rx_call *rxi_NewCall(conn, channel)
1956 register struct rx_connection *conn;
1957 register int channel;
1959 register struct rx_call *call;
1960 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1961 register struct rx_call *cp; /* Call pointer temp */
1962 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1963 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1965 /* Grab an existing call structure, or allocate a new one.
1966 * Existing call structures are assumed to have been left reset by
1968 MUTEX_ENTER(&rx_freeCallQueue_lock);
1970 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1972 * EXCEPT that the TQ might not yet be cleared out.
1973 * Skip over those with in-use TQs.
1976 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1977 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1983 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1984 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1985 call = queue_First(&rx_freeCallQueue, rx_call);
1986 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1988 MUTEX_ENTER(&rx_stats_mutex);
1989 rx_stats.nFreeCallStructs--;
1990 MUTEX_EXIT(&rx_stats_mutex);
1991 MUTEX_EXIT(&rx_freeCallQueue_lock);
1992 MUTEX_ENTER(&call->lock);
1993 CLEAR_CALL_QUEUE_LOCK(call);
1994 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1995 /* Now, if TQ wasn't cleared earlier, do it now. */
1996 if (call->flags & RX_CALL_TQ_CLEARME) {
1997 rxi_ClearTransmitQueue(call, 0);
1998 queue_Init(&call->tq);
2000 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2001 /* Bind the call to its connection structure */
2003 rxi_ResetCall(call, 1);
2006 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
2008 MUTEX_EXIT(&rx_freeCallQueue_lock);
2009 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
2010 MUTEX_ENTER(&call->lock);
2011 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
2012 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
2013 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
2015 MUTEX_ENTER(&rx_stats_mutex);
2016 rx_stats.nCallStructs++;
2017 MUTEX_EXIT(&rx_stats_mutex);
2018 /* Initialize once-only items */
2019 queue_Init(&call->tq);
2020 queue_Init(&call->rq);
2021 queue_Init(&call->iovq);
2022 /* Bind the call to its connection structure (prereq for reset) */
2024 rxi_ResetCall(call, 1);
2026 call->channel = channel;
2027 call->callNumber = &conn->callNumber[channel];
2028 /* Note that the next expected call number is retained (in
2029 * conn->callNumber[i]), even if we reallocate the call structure
2031 conn->call[channel] = call;
2032 /* if the channel's never been used (== 0), we should start at 1, otherwise
2033 the call number is valid from the last time this channel was used */
2034 if (*call->callNumber == 0) *call->callNumber = 1;
2039 /* A call has been inactive long enough that so we can throw away
2040 * state, including the call structure, which is placed on the call
2042 * Call is locked upon entry.
2044 #ifdef RX_ENABLE_LOCKS
2045 void rxi_FreeCall(call, haveCTLock)
2046 int haveCTLock; /* Set if called from rxi_ReapConnections */
2047 #else /* RX_ENABLE_LOCKS */
2048 void rxi_FreeCall(call)
2049 #endif /* RX_ENABLE_LOCKS */
2050 register struct rx_call *call;
2052 register int channel = call->channel;
2053 register struct rx_connection *conn = call->conn;
2056 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2057 (*call->callNumber)++;
2058 rxi_ResetCall(call, 0);
2059 call->conn->call[channel] = (struct rx_call *) 0;
2061 MUTEX_ENTER(&rx_freeCallQueue_lock);
2062 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2063 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2064 /* A call may be free even though its transmit queue is still in use.
2065 * Since we search the call list from head to tail, put busy calls at
2066 * the head of the list, and idle calls at the tail.
2068 if (call->flags & RX_CALL_TQ_BUSY)
2069 queue_Prepend(&rx_freeCallQueue, call);
2071 queue_Append(&rx_freeCallQueue, call);
2072 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2073 queue_Append(&rx_freeCallQueue, call);
2074 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2075 MUTEX_ENTER(&rx_stats_mutex);
2076 rx_stats.nFreeCallStructs++;
2077 MUTEX_EXIT(&rx_stats_mutex);
2079 MUTEX_EXIT(&rx_freeCallQueue_lock);
2081 /* Destroy the connection if it was previously slated for
2082 * destruction, i.e. the Rx client code previously called
2083 * rx_DestroyConnection (client connections), or
2084 * rxi_ReapConnections called the same routine (server
2085 * connections). Only do this, however, if there are no
2086 * outstanding calls. Note that for fine grain locking, there appears
2087 * to be a deadlock in that rxi_FreeCall has a call locked and
2088 * DestroyConnectionNoLock locks each call in the conn. But note a
2089 * few lines up where we have removed this call from the conn.
2090 * If someone else destroys a connection, they either have no
2091 * call lock held or are going through this section of code.
2093 if (conn->flags & RX_CONN_DESTROY_ME) {
2094 MUTEX_ENTER(&conn->conn_data_lock);
2096 MUTEX_EXIT(&conn->conn_data_lock);
2097 #ifdef RX_ENABLE_LOCKS
2099 rxi_DestroyConnectionNoLock(conn);
2101 rxi_DestroyConnection(conn);
2102 #else /* RX_ENABLE_LOCKS */
2103 rxi_DestroyConnection(conn);
2104 #endif /* RX_ENABLE_LOCKS */
2108 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2109 char *rxi_Alloc(size)
2110 register size_t size;
2114 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2115 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2118 int glockOwner = ISAFS_GLOCK();
2122 MUTEX_ENTER(&rx_stats_mutex);
2123 rxi_Alloccnt++; rxi_Allocsize += size;
2124 MUTEX_EXIT(&rx_stats_mutex);
2125 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2126 if (size > AFS_SMALLOCSIZ) {
2127 p = (char *) osi_AllocMediumSpace(size);
2129 p = (char *) osi_AllocSmall(size, 1);
2130 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2135 p = (char *) osi_Alloc(size);
2137 if (!p) osi_Panic("rxi_Alloc error");
2142 void rxi_Free(addr, size)
2144 register size_t size;
2146 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2147 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2150 int glockOwner = ISAFS_GLOCK();
2154 MUTEX_ENTER(&rx_stats_mutex);
2155 rxi_Alloccnt--; rxi_Allocsize -= size;
2156 MUTEX_EXIT(&rx_stats_mutex);
2157 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2158 if (size > AFS_SMALLOCSIZ)
2159 osi_FreeMediumSpace(addr);
2161 osi_FreeSmall(addr);
2162 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2167 osi_Free(addr, size);
2171 /* Find the peer process represented by the supplied (host,port)
2172 * combination. If there is no appropriate active peer structure, a
2173 * new one will be allocated and initialized
2174 * The origPeer, if set, is a pointer to a peer structure on which the
2175 * refcount will be be decremented. This is used to replace the peer
2176 * structure hanging off a connection structure */
2177 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2178 register afs_uint32 host;
2179 register u_short port;
2180 struct rx_peer *origPeer;
2183 register struct rx_peer *pp;
2185 hashIndex = PEER_HASH(host, port);
2186 MUTEX_ENTER(&rx_peerHashTable_lock);
2187 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2188 if ((pp->host == host) && (pp->port == port)) break;
2192 pp = rxi_AllocPeer(); /* This bzero's *pp */
2193 pp->host = host; /* set here or in InitPeerParams is zero */
2195 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2196 queue_Init(&pp->congestionQueue);
2197 queue_Init(&pp->rpcStats);
2198 pp->next = rx_peerHashTable[hashIndex];
2199 rx_peerHashTable[hashIndex] = pp;
2200 rxi_InitPeerParams(pp);
2201 MUTEX_ENTER(&rx_stats_mutex);
2202 rx_stats.nPeerStructs++;
2203 MUTEX_EXIT(&rx_stats_mutex);
2210 origPeer->refCount--;
2211 MUTEX_EXIT(&rx_peerHashTable_lock);
2216 /* Find the connection at (host, port) started at epoch, and with the
2217 * given connection id. Creates the server connection if necessary.
2218 * The type specifies whether a client connection or a server
2219 * connection is desired. In both cases, (host, port) specify the
2220 * peer's (host, pair) pair. Client connections are not made
2221 * automatically by this routine. The parameter socket gives the
2222 * socket descriptor on which the packet was received. This is used,
2223 * in the case of server connections, to check that *new* connections
2224 * come via a valid (port, serviceId). Finally, the securityIndex
2225 * parameter must match the existing index for the connection. If a
2226 * server connection is created, it will be created using the supplied
2227 * index, if the index is valid for this service */
2228 struct rx_connection *
2229 rxi_FindConnection(socket, host, port, serviceId, cid,
2230 epoch, type, securityIndex)
2232 register afs_int32 host;
2233 register u_short port;
2238 u_int securityIndex;
2240 int hashindex, flag;
2241 register struct rx_connection *conn;
2242 struct rx_peer *peer;
2243 hashindex = CONN_HASH(host, port, cid, epoch, type);
2244 MUTEX_ENTER(&rx_connHashTable_lock);
2245 rxLastConn ? (conn = rxLastConn, flag = 0) :
2246 (conn = rx_connHashTable[hashindex], flag = 1);
2248 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2249 && (epoch == conn->epoch)) {
2250 register struct rx_peer *pp = conn->peer;
2251 if (securityIndex != conn->securityIndex) {
2252 /* this isn't supposed to happen, but someone could forge a packet
2253 like this, and there seems to be some CM bug that makes this
2254 happen from time to time -- in which case, the fileserver
2256 MUTEX_EXIT(&rx_connHashTable_lock);
2257 return (struct rx_connection *) 0;
2259 /* epoch's high order bits mean route for security reasons only on
2260 * the cid, not the host and port fields.
2262 if (conn->epoch & 0x80000000) break;
2263 if (((type == RX_CLIENT_CONNECTION)
2264 || (pp->host == host)) && (pp->port == port))
2269 /* the connection rxLastConn that was used the last time is not the
2270 ** one we are looking for now. Hence, start searching in the hash */
2272 conn = rx_connHashTable[hashindex];
2278 struct rx_service *service;
2279 if (type == RX_CLIENT_CONNECTION) {
2280 MUTEX_EXIT(&rx_connHashTable_lock);
2281 return (struct rx_connection *) 0;
2283 service = rxi_FindService(socket, serviceId);
2284 if (!service || (securityIndex >= service->nSecurityObjects)
2285 || (service->securityObjects[securityIndex] == 0)) {
2286 MUTEX_EXIT(&rx_connHashTable_lock);
2287 return (struct rx_connection *) 0;
2289 conn = rxi_AllocConnection(); /* This bzero's the connection */
2290 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2292 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2294 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2295 conn->next = rx_connHashTable[hashindex];
2296 rx_connHashTable[hashindex] = conn;
2297 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2298 conn->type = RX_SERVER_CONNECTION;
2299 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2300 conn->epoch = epoch;
2301 conn->cid = cid & RX_CIDMASK;
2302 /* conn->serial = conn->lastSerial = 0; */
2303 /* conn->timeout = 0; */
2304 conn->ackRate = RX_FAST_ACK_RATE;
2305 conn->service = service;
2306 conn->serviceId = serviceId;
2307 conn->securityIndex = securityIndex;
2308 conn->securityObject = service->securityObjects[securityIndex];
2309 conn->nSpecific = 0;
2310 conn->specific = NULL;
2311 rx_SetConnDeadTime(conn, service->connDeadTime);
2312 /* Notify security object of the new connection */
2313 RXS_NewConnection(conn->securityObject, conn);
2314 /* XXXX Connection timeout? */
2315 if (service->newConnProc) (*service->newConnProc)(conn);
2316 MUTEX_ENTER(&rx_stats_mutex);
2317 rx_stats.nServerConns++;
2318 MUTEX_EXIT(&rx_stats_mutex);
2322 /* Ensure that the peer structure is set up in such a way that
2323 ** replies in this connection go back to that remote interface
2324 ** from which the last packet was sent out. In case, this packet's
2325 ** source IP address does not match the peer struct for this conn,
2326 ** then drop the refCount on conn->peer and get a new peer structure.
2327 ** We can check the host,port field in the peer structure without the
2328 ** rx_peerHashTable_lock because the peer structure has its refCount
2329 ** incremented and the only time the host,port in the peer struct gets
2330 ** updated is when the peer structure is created.
2332 if (conn->peer->host == host )
2333 peer = conn->peer; /* no change to the peer structure */
2335 peer = rxi_FindPeer(host, port, conn->peer, 1);
2338 MUTEX_ENTER(&conn->conn_data_lock);
2341 MUTEX_EXIT(&conn->conn_data_lock);
2343 rxLastConn = conn; /* store this connection as the last conn used */
2344 MUTEX_EXIT(&rx_connHashTable_lock);
2348 /* There are two packet tracing routines available for testing and monitoring
2349 * Rx. One is called just after every packet is received and the other is
2350 * called just before every packet is sent. Received packets, have had their
2351 * headers decoded, and packets to be sent have not yet had their headers
2352 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2353 * containing the network address. Both can be modified. The return value, if
2354 * non-zero, indicates that the packet should be dropped. */
2356 int (*rx_justReceived)() = 0;
2357 int (*rx_almostSent)() = 0;
2359 /* A packet has been received off the interface. Np is the packet, socket is
2360 * the socket number it was received from (useful in determining which service
2361 * this packet corresponds to), and (host, port) reflect the host,port of the
2362 * sender. This call returns the packet to the caller if it is finished with
2363 * it, rather than de-allocating it, just as a small performance hack */
2365 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2366 register struct rx_packet *np;
2371 struct rx_call **newcallp;
2373 register struct rx_call *call;
2374 register struct rx_connection *conn;
2376 afs_uint32 currentCallNumber;
2382 struct rx_packet *tnp;
2385 /* We don't print out the packet until now because (1) the time may not be
2386 * accurate enough until now in the lwp implementation (rx_Listener only gets
2387 * the time after the packet is read) and (2) from a protocol point of view,
2388 * this is the first time the packet has been seen */
2389 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2390 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2391 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2392 np->header.serial, packetType, host, port, np->header.serviceId,
2393 np->header.epoch, np->header.cid, np->header.callNumber,
2394 np->header.seq, np->header.flags, np));
2397 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2398 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2401 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2402 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2405 /* If an input tracer function is defined, call it with the packet and
2406 * network address. Note this function may modify its arguments. */
2407 if (rx_justReceived) {
2408 struct sockaddr_in addr;
2410 addr.sin_family = AF_INET;
2411 addr.sin_port = port;
2412 addr.sin_addr.s_addr = host;
2413 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2414 addr.sin_len = sizeof(addr);
2415 #endif /* AFS_OSF_ENV */
2416 drop = (*rx_justReceived) (np, &addr);
2417 /* drop packet if return value is non-zero */
2418 if (drop) return np;
2419 port = addr.sin_port; /* in case fcn changed addr */
2420 host = addr.sin_addr.s_addr;
2424 /* If packet was not sent by the client, then *we* must be the client */
2425 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2426 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2428 /* Find the connection (or fabricate one, if we're the server & if
2429 * necessary) associated with this packet */
2430 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2431 np->header.cid, np->header.epoch, type,
2432 np->header.securityIndex);
2435 /* If no connection found or fabricated, just ignore the packet.
2436 * (An argument could be made for sending an abort packet for
2441 MUTEX_ENTER(&conn->conn_data_lock);
2442 if (conn->maxSerial < np->header.serial)
2443 conn->maxSerial = np->header.serial;
2444 MUTEX_EXIT(&conn->conn_data_lock);
2446 /* If the connection is in an error state, send an abort packet and ignore
2447 * the incoming packet */
2449 /* Don't respond to an abort packet--we don't want loops! */
2450 MUTEX_ENTER(&conn->conn_data_lock);
2451 if (np->header.type != RX_PACKET_TYPE_ABORT)
2452 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2454 MUTEX_EXIT(&conn->conn_data_lock);
2458 /* Check for connection-only requests (i.e. not call specific). */
2459 if (np->header.callNumber == 0) {
2460 switch (np->header.type) {
2461 case RX_PACKET_TYPE_ABORT:
2462 /* What if the supplied error is zero? */
2463 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2464 MUTEX_ENTER(&conn->conn_data_lock);
2466 MUTEX_EXIT(&conn->conn_data_lock);
2468 case RX_PACKET_TYPE_CHALLENGE:
2469 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2470 MUTEX_ENTER(&conn->conn_data_lock);
2472 MUTEX_EXIT(&conn->conn_data_lock);
2474 case RX_PACKET_TYPE_RESPONSE:
2475 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2476 MUTEX_ENTER(&conn->conn_data_lock);
2478 MUTEX_EXIT(&conn->conn_data_lock);
2480 case RX_PACKET_TYPE_PARAMS:
2481 case RX_PACKET_TYPE_PARAMS+1:
2482 case RX_PACKET_TYPE_PARAMS+2:
2483 /* ignore these packet types for now */
2484 MUTEX_ENTER(&conn->conn_data_lock);
2486 MUTEX_EXIT(&conn->conn_data_lock);
2491 /* Should not reach here, unless the peer is broken: send an
2493 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2494 MUTEX_ENTER(&conn->conn_data_lock);
2495 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2497 MUTEX_EXIT(&conn->conn_data_lock);
2502 channel = np->header.cid & RX_CHANNELMASK;
2503 call = conn->call[channel];
2504 #ifdef RX_ENABLE_LOCKS
2506 MUTEX_ENTER(&call->lock);
2507 /* Test to see if call struct is still attached to conn. */
2508 if (call != conn->call[channel]) {
2510 MUTEX_EXIT(&call->lock);
2511 if (type == RX_SERVER_CONNECTION) {
2512 call = conn->call[channel];
2513 /* If we started with no call attached and there is one now,
2514 * another thread is also running this routine and has gotten
2515 * the connection channel. We should drop this packet in the tests
2516 * below. If there was a call on this connection and it's now
2517 * gone, then we'll be making a new call below.
2518 * If there was previously a call and it's now different then
2519 * the old call was freed and another thread running this routine
2520 * has created a call on this channel. One of these two threads
2521 * has a packet for the old call and the code below handles those
2525 MUTEX_ENTER(&call->lock);
2528 /* This packet can't be for this call. If the new call address is
2529 * 0 then no call is running on this channel. If there is a call
2530 * then, since this is a client connection we're getting data for
2531 * it must be for the previous call.
2533 MUTEX_ENTER(&rx_stats_mutex);
2534 rx_stats.spuriousPacketsRead++;
2535 MUTEX_EXIT(&rx_stats_mutex);
2536 MUTEX_ENTER(&conn->conn_data_lock);
2538 MUTEX_EXIT(&conn->conn_data_lock);
2543 currentCallNumber = conn->callNumber[channel];
2545 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2546 if (np->header.callNumber < currentCallNumber) {
2547 MUTEX_ENTER(&rx_stats_mutex);
2548 rx_stats.spuriousPacketsRead++;
2549 MUTEX_EXIT(&rx_stats_mutex);
2550 #ifdef RX_ENABLE_LOCKS
2552 MUTEX_EXIT(&call->lock);
2554 MUTEX_ENTER(&conn->conn_data_lock);
2556 MUTEX_EXIT(&conn->conn_data_lock);
2560 MUTEX_ENTER(&conn->conn_call_lock);
2561 call = rxi_NewCall(conn, channel);
2562 MUTEX_EXIT(&conn->conn_call_lock);
2563 *call->callNumber = np->header.callNumber;
2564 call->state = RX_STATE_PRECALL;
2565 clock_GetTime(&call->queueTime);
2566 hzero(call->bytesSent);
2567 hzero(call->bytesRcvd);
2568 rxi_KeepAliveOn(call);
2570 else if (np->header.callNumber != currentCallNumber) {
2571 /* Wait until the transmit queue is idle before deciding
2572 * whether to reset the current call. Chances are that the
2573 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2576 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2577 while ((call->state == RX_STATE_ACTIVE) &&
2578 (call->flags & RX_CALL_TQ_BUSY)) {
2579 call->flags |= RX_CALL_TQ_WAIT;
2580 #ifdef RX_ENABLE_LOCKS
2581 CV_WAIT(&call->cv_tq, &call->lock);
2582 #else /* RX_ENABLE_LOCKS */
2583 osi_rxSleep(&call->tq);
2584 #endif /* RX_ENABLE_LOCKS */
2586 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2587 /* If the new call cannot be taken right now send a busy and set
2588 * the error condition in this call, so that it terminates as
2589 * quickly as possible */
2590 if (call->state == RX_STATE_ACTIVE) {
2591 struct rx_packet *tp;
2593 rxi_CallError(call, RX_CALL_DEAD);
2594 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2595 MUTEX_EXIT(&call->lock);
2596 MUTEX_ENTER(&conn->conn_data_lock);
2598 MUTEX_EXIT(&conn->conn_data_lock);
2601 rxi_ResetCall(call, 0);
2602 *call->callNumber = np->header.callNumber;
2603 call->state = RX_STATE_PRECALL;
2604 clock_GetTime(&call->queueTime);
2605 hzero(call->bytesSent);
2606 hzero(call->bytesRcvd);
2608 * If the number of queued calls exceeds the overload
2609 * threshold then abort this call.
2611 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2612 struct rx_packet *tp;
2614 rxi_CallError(call, rx_BusyError);
2615 tp = rxi_SendCallAbort(call, np, 1, 0);
2616 MUTEX_EXIT(&call->lock);
2617 MUTEX_ENTER(&conn->conn_data_lock);
2619 MUTEX_EXIT(&conn->conn_data_lock);
2622 rxi_KeepAliveOn(call);
2625 /* Continuing call; do nothing here. */
2627 } else { /* we're the client */
2628 /* Ignore all incoming acknowledgements for calls in DALLY state */
2629 if ( call && (call->state == RX_STATE_DALLY)
2630 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2631 MUTEX_ENTER(&rx_stats_mutex);
2632 rx_stats.ignorePacketDally++;
2633 MUTEX_EXIT(&rx_stats_mutex);
2634 #ifdef RX_ENABLE_LOCKS
2636 MUTEX_EXIT(&call->lock);
2639 MUTEX_ENTER(&conn->conn_data_lock);
2641 MUTEX_EXIT(&conn->conn_data_lock);
2645 /* Ignore anything that's not relevant to the current call. If there
2646 * isn't a current call, then no packet is relevant. */
2647 if (!call || (np->header.callNumber != currentCallNumber)) {
2648 MUTEX_ENTER(&rx_stats_mutex);
2649 rx_stats.spuriousPacketsRead++;
2650 MUTEX_EXIT(&rx_stats_mutex);
2651 #ifdef RX_ENABLE_LOCKS
2653 MUTEX_EXIT(&call->lock);
2656 MUTEX_ENTER(&conn->conn_data_lock);
2658 MUTEX_EXIT(&conn->conn_data_lock);
2661 /* If the service security object index stamped in the packet does not
2662 * match the connection's security index, ignore the packet */
2663 if (np->header.securityIndex != conn->securityIndex) {
2664 #ifdef RX_ENABLE_LOCKS
2665 MUTEX_EXIT(&call->lock);
2667 MUTEX_ENTER(&conn->conn_data_lock);
2669 MUTEX_EXIT(&conn->conn_data_lock);
2673 /* If we're receiving the response, then all transmit packets are
2674 * implicitly acknowledged. Get rid of them. */
2675 if (np->header.type == RX_PACKET_TYPE_DATA) {
2676 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2677 /* XXX Hack. Because we must release the global rx lock when
2678 * sending packets (osi_NetSend) we drop all acks while we're
2679 * traversing the tq in rxi_Start sending packets out because
2680 * packets may move to the freePacketQueue as result of being here!
2681 * So we drop these packets until we're safely out of the
2682 * traversing. Really ugly!
2683 * For fine grain RX locking, we set the acked field in the
2684 * packets and let rxi_Start remove them from the transmit queue.
2686 if (call->flags & RX_CALL_TQ_BUSY) {
2687 #ifdef RX_ENABLE_LOCKS
2688 rxi_SetAcksInTransmitQueue(call);
2691 return np; /* xmitting; drop packet */
2695 rxi_ClearTransmitQueue(call, 0);
2697 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2698 rxi_ClearTransmitQueue(call, 0);
2699 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2701 if (np->header.type == RX_PACKET_TYPE_ACK) {
2702 /* now check to see if this is an ack packet acknowledging that the
2703 * server actually *lost* some hard-acked data. If this happens we
2704 * ignore this packet, as it may indicate that the server restarted in
2705 * the middle of a call. It is also possible that this is an old ack
2706 * packet. We don't abort the connection in this case, because this
2707 * *might* just be an old ack packet. The right way to detect a server
2708 * restart in the midst of a call is to notice that the server epoch
2710 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2711 * XXX unacknowledged. I think that this is off-by-one, but
2712 * XXX I don't dare change it just yet, since it will
2713 * XXX interact badly with the server-restart detection
2714 * XXX code in receiveackpacket. */
2715 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2716 MUTEX_ENTER(&rx_stats_mutex);
2717 rx_stats.spuriousPacketsRead++;
2718 MUTEX_EXIT(&rx_stats_mutex);
2719 MUTEX_EXIT(&call->lock);
2720 MUTEX_ENTER(&conn->conn_data_lock);
2722 MUTEX_EXIT(&conn->conn_data_lock);
2726 } /* else not a data packet */
2729 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2730 /* Set remote user defined status from packet */
2731 call->remoteStatus = np->header.userStatus;
2733 /* Note the gap between the expected next packet and the actual
2734 * packet that arrived, when the new packet has a smaller serial number
2735 * than expected. Rioses frequently reorder packets all by themselves,
2736 * so this will be quite important with very large window sizes.
2737 * Skew is checked against 0 here to avoid any dependence on the type of
2738 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2740 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2741 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2742 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2744 MUTEX_ENTER(&conn->conn_data_lock);
2745 skew = conn->lastSerial - np->header.serial;
2746 conn->lastSerial = np->header.serial;
2747 MUTEX_EXIT(&conn->conn_data_lock);
2749 register struct rx_peer *peer;
2751 if (skew > peer->inPacketSkew) {
2752 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2753 peer->inPacketSkew = skew;
2757 /* Now do packet type-specific processing */
2758 switch (np->header.type) {
2759 case RX_PACKET_TYPE_DATA:
2760 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2763 case RX_PACKET_TYPE_ACK:
2764 /* Respond immediately to ack packets requesting acknowledgement
2766 if (np->header.flags & RX_REQUEST_ACK) {
2767 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2768 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2770 np = rxi_ReceiveAckPacket(call, np, 1);
2772 case RX_PACKET_TYPE_ABORT:
2773 /* An abort packet: reset the connection, passing the error up to
2775 /* What if error is zero? */
2776 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2778 case RX_PACKET_TYPE_BUSY:
2781 case RX_PACKET_TYPE_ACKALL:
2782 /* All packets acknowledged, so we can drop all packets previously
2783 * readied for sending */
2784 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2785 /* XXX Hack. We because we can't release the global rx lock when
2786 * sending packets (osi_NetSend) we drop all ack pkts while we're
2787 * traversing the tq in rxi_Start sending packets out because
2788 * packets may move to the freePacketQueue as result of being
2789 * here! So we drop these packets until we're safely out of the
2790 * traversing. Really ugly!
2791 * For fine grain RX locking, we set the acked field in the packets
2792 * and let rxi_Start remove the packets from the transmit queue.
2794 if (call->flags & RX_CALL_TQ_BUSY) {
2795 #ifdef RX_ENABLE_LOCKS
2796 rxi_SetAcksInTransmitQueue(call);
2798 #else /* RX_ENABLE_LOCKS */
2800 return np; /* xmitting; drop packet */
2801 #endif /* RX_ENABLE_LOCKS */
2803 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2804 rxi_ClearTransmitQueue(call, 0);
2807 /* Should not reach here, unless the peer is broken: send an abort
2809 rxi_CallError(call, RX_PROTOCOL_ERROR);
2810 np = rxi_SendCallAbort(call, np, 1, 0);
2813 /* Note when this last legitimate packet was received, for keep-alive
2814 * processing. Note, we delay getting the time until now in the hope that
2815 * the packet will be delivered to the user before any get time is required
2816 * (if not, then the time won't actually be re-evaluated here). */
2817 call->lastReceiveTime = clock_Sec();
2818 MUTEX_EXIT(&call->lock);
2819 MUTEX_ENTER(&conn->conn_data_lock);
2821 MUTEX_EXIT(&conn->conn_data_lock);
2825 /* return true if this is an "interesting" connection from the point of view
2826 of someone trying to debug the system */
2827 int rxi_IsConnInteresting(struct rx_connection *aconn)
2830 register struct rx_call *tcall;
2832 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2834 for(i=0;i<RX_MAXCALLS;i++) {
2835 tcall = aconn->call[i];
2837 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2839 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2847 /* if this is one of the last few packets AND it wouldn't be used by the
2848 receiving call to immediately satisfy a read request, then drop it on
2849 the floor, since accepting it might prevent a lock-holding thread from
2850 making progress in its reading. If a call has been cleared while in
2851 the precall state then ignore all subsequent packets until the call
2852 is assigned to a thread. */
2854 static TooLow(ap, acall)
2855 struct rx_call *acall;
2856 struct rx_packet *ap; {
2858 MUTEX_ENTER(&rx_stats_mutex);
2859 if (((ap->header.seq != 1) &&
2860 (acall->flags & RX_CALL_CLEARED) &&
2861 (acall->state == RX_STATE_PRECALL)) ||
2862 ((rx_nFreePackets < rxi_dataQuota+2) &&
2863 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2864 && (acall->flags & RX_CALL_READER_WAIT)))) {
2867 MUTEX_EXIT(&rx_stats_mutex);
2872 static void rxi_CheckReachEvent(event, conn, acall)
2873 struct rxevent *event;
2874 struct rx_connection *conn;
2875 struct rx_call *acall;
2877 struct rx_call *call = acall;
2881 MUTEX_ENTER(&conn->conn_call_lock);
2882 MUTEX_ENTER(&conn->conn_data_lock);
2883 conn->checkReachEvent = (struct rxevent *) 0;
2884 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2885 if (event) conn->refCount--;
2886 MUTEX_EXIT(&conn->conn_data_lock);
2890 for (i=0; i<RX_MAXCALLS; i++) {
2891 struct rx_call *tc = conn->call[i];
2892 if (tc && tc->state == RX_STATE_PRECALL) {
2899 if (call != acall) MUTEX_ENTER(&call->lock);
2900 rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
2901 if (call != acall) MUTEX_EXIT(&call->lock);
2903 MUTEX_ENTER(&conn->conn_data_lock);
2905 MUTEX_EXIT(&conn->conn_data_lock);
2906 clock_GetTime(&when);
2907 when.sec += RX_CHECKREACH_TIMEOUT;
2908 conn->checkReachEvent =
2909 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2912 MUTEX_EXIT(&conn->conn_call_lock);
2915 static int rxi_CheckConnReach(conn, call)
2916 struct rx_connection *conn;
2917 struct rx_call *call;
2919 struct rx_service *service = conn->service;
2920 struct rx_peer *peer = conn->peer;
2921 afs_uint32 now, lastReach;
2923 if (service->checkReach == 0)
2927 MUTEX_ENTER(&peer->peer_lock);
2928 lastReach = peer->lastReachTime;
2929 MUTEX_EXIT(&peer->peer_lock);
2930 if (now - lastReach < RX_CHECKREACH_TTL)
2933 MUTEX_ENTER(&conn->conn_data_lock);
2934 if (conn->flags & RX_CONN_ATTACHWAIT) {
2935 MUTEX_EXIT(&conn->conn_data_lock);
2938 conn->flags |= RX_CONN_ATTACHWAIT;
2939 MUTEX_EXIT(&conn->conn_data_lock);
2940 if (!conn->checkReachEvent)
2941 rxi_CheckReachEvent((struct rxevent *)0, conn, call);
2946 /* try to attach call, if authentication is complete */
2947 static void TryAttach(acall, socket, tnop, newcallp, reachOverride)
2948 register struct rx_call *acall;
2949 register osi_socket socket;
2951 register struct rx_call **newcallp;
2954 struct rx_connection *conn = acall->conn;
2956 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2957 /* Don't attach until we have any req'd. authentication. */
2958 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2959 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2960 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2961 /* Note: this does not necessarily succeed; there
2962 * may not any proc available
2966 rxi_ChallengeOn(acall->conn);
2971 /* A data packet has been received off the interface. This packet is
2972 * appropriate to the call (the call is in the right state, etc.). This
2973 * routine can return a packet to the caller, for re-use */
2975 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2976 port, tnop, newcallp)
2977 register struct rx_call *call;
2978 register struct rx_packet *np;
2984 struct rx_call **newcallp;
2990 afs_uint32 seq, serial, flags;
2992 struct rx_packet *tnp;
2994 MUTEX_ENTER(&rx_stats_mutex);
2995 rx_stats.dataPacketsRead++;
2996 MUTEX_EXIT(&rx_stats_mutex);
2999 /* If there are no packet buffers, drop this new packet, unless we can find
3000 * packet buffers from inactive calls */
3002 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
3003 MUTEX_ENTER(&rx_freePktQ_lock);
3004 rxi_NeedMorePackets = TRUE;
3005 MUTEX_EXIT(&rx_freePktQ_lock);
3006 MUTEX_ENTER(&rx_stats_mutex);
3007 rx_stats.noPacketBuffersOnRead++;
3008 MUTEX_EXIT(&rx_stats_mutex);
3009 call->rprev = np->header.serial;
3010 rxi_calltrace(RX_TRACE_DROP, call);
3011 dpf (("packet %x dropped on receipt - quota problems", np));
3013 rxi_ClearReceiveQueue(call);
3014 clock_GetTime(&when);
3015 clock_Add(&when, &rx_softAckDelay);
3016 if (!call->delayedAckEvent ||
3017 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3018 rxevent_Cancel(call->delayedAckEvent, call,
3019 RX_CALL_REFCOUNT_DELAY);
3020 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3021 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3024 /* we've damaged this call already, might as well do it in. */
3030 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
3031 * packet is one of several packets transmitted as a single
3032 * datagram. Do not send any soft or hard acks until all packets
3033 * in a jumbogram have been processed. Send negative acks right away.
3035 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
3036 /* tnp is non-null when there are more packets in the
3037 * current jumbo gram */
3044 seq = np->header.seq;
3045 serial = np->header.serial;
3046 flags = np->header.flags;
3048 /* If the call is in an error state, send an abort message */
3050 return rxi_SendCallAbort(call, np, istack, 0);
3052 /* The RX_JUMBO_PACKET is set in all but the last packet in each
3053 * AFS 3.5 jumbogram. */
3054 if (flags & RX_JUMBO_PACKET) {
3055 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3060 if (np->header.spare != 0) {
3061 MUTEX_ENTER(&call->conn->conn_data_lock);
3062 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3063 MUTEX_EXIT(&call->conn->conn_data_lock);
3066 /* The usual case is that this is the expected next packet */
3067 if (seq == call->rnext) {
3069 /* Check to make sure it is not a duplicate of one already queued */
3070 if (queue_IsNotEmpty(&call->rq)
3071 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3072 MUTEX_ENTER(&rx_stats_mutex);
3073 rx_stats.dupPacketsRead++;
3074 MUTEX_EXIT(&rx_stats_mutex);
3075 dpf (("packet %x dropped on receipt - duplicate", np));
3076 rxevent_Cancel(call->delayedAckEvent, call,
3077 RX_CALL_REFCOUNT_DELAY);
3078 np = rxi_SendAck(call, np, seq, serial,
3079 flags, RX_ACK_DUPLICATE, istack);
3085 /* It's the next packet. Stick it on the receive queue
3086 * for this call. Set newPackets to make sure we wake
3087 * the reader once all packets have been processed */
3088 queue_Prepend(&call->rq, np);
3090 np = NULL; /* We can't use this anymore */
3093 /* If an ack is requested then set a flag to make sure we
3094 * send an acknowledgement for this packet */
3095 if (flags & RX_REQUEST_ACK) {
3099 /* Keep track of whether we have received the last packet */
3100 if (flags & RX_LAST_PACKET) {
3101 call->flags |= RX_CALL_HAVE_LAST;
3105 /* Check whether we have all of the packets for this call */
3106 if (call->flags & RX_CALL_HAVE_LAST) {
3107 afs_uint32 tseq; /* temporary sequence number */
3108 struct rx_packet *tp; /* Temporary packet pointer */
3109 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3111 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3112 if (tseq != tp->header.seq)
3114 if (tp->header.flags & RX_LAST_PACKET) {
3115 call->flags |= RX_CALL_RECEIVE_DONE;
3122 /* Provide asynchronous notification for those who want it
3123 * (e.g. multi rx) */
3124 if (call->arrivalProc) {
3125 (*call->arrivalProc)(call, call->arrivalProcHandle,
3126 call->arrivalProcArg);
3127 call->arrivalProc = (VOID (*)()) 0;
3130 /* Update last packet received */
3133 /* If there is no server process serving this call, grab
3134 * one, if available. We only need to do this once. If a
3135 * server thread is available, this thread becomes a server
3136 * thread and the server thread becomes a listener thread. */
3138 TryAttach(call, socket, tnop, newcallp, 0);
3141 /* This is not the expected next packet. */
3143 /* Determine whether this is a new or old packet, and if it's
3144 * a new one, whether it fits into the current receive window.
3145 * Also figure out whether the packet was delivered in sequence.
3146 * We use the prev variable to determine whether the new packet
3147 * is the successor of its immediate predecessor in the
3148 * receive queue, and the missing flag to determine whether
3149 * any of this packets predecessors are missing. */
3151 afs_uint32 prev; /* "Previous packet" sequence number */
3152 struct rx_packet *tp; /* Temporary packet pointer */
3153 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3154 int missing; /* Are any predecessors missing? */
3156 /* If the new packet's sequence number has been sent to the
3157 * application already, then this is a duplicate */
3158 if (seq < call->rnext) {
3159 MUTEX_ENTER(&rx_stats_mutex);
3160 rx_stats.dupPacketsRead++;
3161 MUTEX_EXIT(&rx_stats_mutex);
3162 rxevent_Cancel(call->delayedAckEvent, call,
3163 RX_CALL_REFCOUNT_DELAY);
3164 np = rxi_SendAck(call, np, seq, serial,
3165 flags, RX_ACK_DUPLICATE, istack);
3171 /* If the sequence number is greater than what can be
3172 * accomodated by the current window, then send a negative
3173 * acknowledge and drop the packet */
3174 if ((call->rnext + call->rwind) <= seq) {
3175 rxevent_Cancel(call->delayedAckEvent, call,
3176 RX_CALL_REFCOUNT_DELAY);
3177 np = rxi_SendAck(call, np, seq, serial,
3178 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3184 /* Look for the packet in the queue of old received packets */
3185 for (prev = call->rnext - 1, missing = 0,
3186 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3187 /*Check for duplicate packet */
3188 if (seq == tp->header.seq) {
3189 MUTEX_ENTER(&rx_stats_mutex);
3190 rx_stats.dupPacketsRead++;
3191 MUTEX_EXIT(&rx_stats_mutex);
3192 rxevent_Cancel(call->delayedAckEvent, call,
3193 RX_CALL_REFCOUNT_DELAY);
3194 np = rxi_SendAck(call, np, seq, serial,
3195 flags, RX_ACK_DUPLICATE, istack);
3200 /* If we find a higher sequence packet, break out and
3201 * insert the new packet here. */
3202 if (seq < tp->header.seq) break;
3203 /* Check for missing packet */
3204 if (tp->header.seq != prev+1) {
3208 prev = tp->header.seq;
3211 /* Keep track of whether we have received the last packet. */
3212 if (flags & RX_LAST_PACKET) {
3213 call->flags |= RX_CALL_HAVE_LAST;
3216 /* It's within the window: add it to the the receive queue.
3217 * tp is left by the previous loop either pointing at the
3218 * packet before which to insert the new packet, or at the
3219 * queue head if the queue is empty or the packet should be
3221 queue_InsertBefore(tp, np);
3225 /* Check whether we have all of the packets for this call */
3226 if ((call->flags & RX_CALL_HAVE_LAST)
3227 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3228 afs_uint32 tseq; /* temporary sequence number */
3230 for (tseq = call->rnext,
3231 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3232 if (tseq != tp->header.seq)
3234 if (tp->header.flags & RX_LAST_PACKET) {
3235 call->flags |= RX_CALL_RECEIVE_DONE;
3242 /* We need to send an ack of the packet is out of sequence,
3243 * or if an ack was requested by the peer. */
3244 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3248 /* Acknowledge the last packet for each call */
3249 if (flags & RX_LAST_PACKET) {
3260 * If the receiver is waiting for an iovec, fill the iovec
3261 * using the data from the receive queue */
3262 if (call->flags & RX_CALL_IOVEC_WAIT) {
3263 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3264 /* the call may have been aborted */
3273 /* Wakeup the reader if any */
3274 if ((call->flags & RX_CALL_READER_WAIT) &&
3275 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3276 (call->iovNext >= call->iovMax) ||
3277 (call->flags & RX_CALL_RECEIVE_DONE))) {
3278 call->flags &= ~RX_CALL_READER_WAIT;
3279 #ifdef RX_ENABLE_LOCKS
3280 CV_BROADCAST(&call->cv_rq);
3282 osi_rxWakeup(&call->rq);
3288 * Send an ack when requested by the peer, or once every
3289 * rxi_SoftAckRate packets until the last packet has been
3290 * received. Always send a soft ack for the last packet in
3291 * the server's reply. */
3293 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3294 np = rxi_SendAck(call, np, seq, serial, flags,
3295 RX_ACK_REQUESTED, istack);
3296 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3297 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3298 np = rxi_SendAck(call, np, seq, serial, flags,
3299 RX_ACK_IDLE, istack);
3300 } else if (call->nSoftAcks) {
3301 clock_GetTime(&when);
3302 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3303 clock_Add(&when, &rx_lastAckDelay);
3305 clock_Add(&when, &rx_softAckDelay);
3307 if (!call->delayedAckEvent ||
3308 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3309 rxevent_Cancel(call->delayedAckEvent, call,
3310 RX_CALL_REFCOUNT_DELAY);
3311 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3312 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3315 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3316 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3323 static void rxi_ComputeRate();
3326 static void rxi_UpdatePeerReach(conn, acall)
3327 struct rx_connection *conn;
3328 struct rx_call *acall;
3330 struct rx_peer *peer = conn->peer;
3332 MUTEX_ENTER(&peer->peer_lock);
3333 peer->lastReachTime = clock_Sec();
3334 MUTEX_EXIT(&peer->peer_lock);
3336 MUTEX_ENTER(&conn->conn_call_lock);
3337 MUTEX_ENTER(&conn->conn_data_lock);
3338 if (conn->flags & RX_CONN_ATTACHWAIT) {
3341 conn->flags &= ~RX_CONN_ATTACHWAIT;
3342 MUTEX_EXIT(&conn->conn_data_lock);
3344 for (i=0; i<RX_MAXCALLS; i++) {
3345 struct rx_call *call = conn->call[i];
3347 if (call != acall) MUTEX_ENTER(&call->lock);
3348 TryAttach(call, -1, NULL, NULL, 1);
3349 if (call != acall) MUTEX_EXIT(&call->lock);
3353 MUTEX_EXIT(&conn->conn_data_lock);
3354 MUTEX_EXIT(&conn->conn_call_lock);
3357 /* The real smarts of the whole thing. */
3358 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3359 register struct rx_call *call;
3360 struct rx_packet *np;
3363 struct rx_ackPacket *ap;
3365 register struct rx_packet *tp;
3366 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3367 register struct rx_connection *conn = call->conn;
3368 struct rx_peer *peer = conn->peer;
3371 /* because there are CM's that are bogus, sending weird values for this. */
3372 afs_uint32 skew = 0;
3377 int newAckCount = 0;
3378 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3379 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3381 MUTEX_ENTER(&rx_stats_mutex);
3382 rx_stats.ackPacketsRead++;
3383 MUTEX_EXIT(&rx_stats_mutex);
3384 ap = (struct rx_ackPacket *) rx_DataOf(np);
3385 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3387 return np; /* truncated ack packet */
3389 /* depends on ack packet struct */
3390 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3391 first = ntohl(ap->firstPacket);
3392 serial = ntohl(ap->serial);
3393 /* temporarily disabled -- needs to degrade over time
3394 skew = ntohs(ap->maxSkew); */
3396 /* Ignore ack packets received out of order */
3397 if (first < call->tfirst) {
3401 if (np->header.flags & RX_SLOW_START_OK) {
3402 call->flags |= RX_CALL_SLOW_START_OK;
3405 if (ap->reason == RX_ACK_PING_RESPONSE)
3406 rxi_UpdatePeerReach(conn, call);
3411 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3412 ap->reason, ntohl(ap->previousPacket),
3413 (unsigned int) np->header.seq, (unsigned int) serial,
3414 (unsigned int) skew, ntohl(ap->firstPacket));
3417 for (offset = 0; offset < nAcks; offset++)
3418 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3424 /* if a server connection has been re-created, it doesn't remember what
3425 serial # it was up to. An ack will tell us, since the serial field
3426 contains the largest serial received by the other side */
3427 MUTEX_ENTER(&conn->conn_data_lock);
3428 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3429 conn->serial = serial+1;
3431 MUTEX_EXIT(&conn->conn_data_lock);
3433 /* Update the outgoing packet skew value to the latest value of
3434 * the peer's incoming packet skew value. The ack packet, of
3435 * course, could arrive out of order, but that won't affect things
3437 MUTEX_ENTER(&peer->peer_lock);
3438 peer->outPacketSkew = skew;
3440 /* Check for packets that no longer need to be transmitted, and
3441 * discard them. This only applies to packets positively
3442 * acknowledged as having been sent to the peer's upper level.
3443 * All other packets must be retained. So only packets with
3444 * sequence numbers < ap->firstPacket are candidates. */
3445 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3446 if (tp->header.seq >= first) break;
3447 call->tfirst = tp->header.seq + 1;
3448 if (tp->header.serial == serial) {
3449 /* Use RTT if not delayed by client. */
3450 if (ap->reason != RX_ACK_DELAY)
3451 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3453 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3456 else if (tp->firstSerial == serial) {
3457 /* Use RTT if not delayed by client. */
3458 if (ap->reason != RX_ACK_DELAY)
3459 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3461 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3464 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3465 /* XXX Hack. Because we have to release the global rx lock when sending
3466 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3467 * in rxi_Start sending packets out because packets may move to the
3468 * freePacketQueue as result of being here! So we drop these packets until
3469 * we're safely out of the traversing. Really ugly!
3470 * To make it even uglier, if we're using fine grain locking, we can
3471 * set the ack bits in the packets and have rxi_Start remove the packets
3472 * when it's done transmitting.
3474 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3477 if (call->flags & RX_CALL_TQ_BUSY) {
3478 #ifdef RX_ENABLE_LOCKS
3479 tp->flags |= RX_PKTFLAG_ACKED;
3480 call->flags |= RX_CALL_TQ_SOME_ACKED;
3481 #else /* RX_ENABLE_LOCKS */
3483 #endif /* RX_ENABLE_LOCKS */
3485 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3488 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3493 /* Give rate detector a chance to respond to ping requests */
3494 if (ap->reason == RX_ACK_PING_RESPONSE) {
3495 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3499 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3501 /* Now go through explicit acks/nacks and record the results in
3502 * the waiting packets. These are packets that can't be released
3503 * yet, even with a positive acknowledge. This positive
3504 * acknowledge only means the packet has been received by the
3505 * peer, not that it will be retained long enough to be sent to
3506 * the peer's upper level. In addition, reset the transmit timers
3507 * of any missing packets (those packets that must be missing
3508 * because this packet was out of sequence) */
3510 call->nSoftAcked = 0;
3511 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3512 /* Update round trip time if the ack was stimulated on receipt
3514 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3515 #ifdef RX_ENABLE_LOCKS
3516 if (tp->header.seq >= first) {
3517 #endif /* RX_ENABLE_LOCKS */
3518 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3519 if (tp->header.serial == serial) {
3520 /* Use RTT if not delayed by client. */
3521 if (ap->reason != RX_ACK_DELAY)
3522 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3524 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3527 else if ((tp->firstSerial == serial)) {
3528 /* Use RTT if not delayed by client. */
3529 if (ap->reason != RX_ACK_DELAY)
3530 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3532 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3535 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3536 #ifdef RX_ENABLE_LOCKS
3538 #endif /* RX_ENABLE_LOCKS */
3539 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3541 /* Set the acknowledge flag per packet based on the
3542 * information in the ack packet. An acknowlegded packet can
3543 * be downgraded when the server has discarded a packet it
3544 * soacked previously, or when an ack packet is received
3545 * out of sequence. */
3546 if (tp->header.seq < first) {
3547 /* Implicit ack information */
3548 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3551 tp->flags |= RX_PKTFLAG_ACKED;
3553 else if (tp->header.seq < first + nAcks) {
3554 /* Explicit ack information: set it in the packet appropriately */
3555 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3556 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3558 tp->flags |= RX_PKTFLAG_ACKED;
3566 tp->flags &= ~RX_PKTFLAG_ACKED;
3571 tp->flags &= ~RX_PKTFLAG_ACKED;
3575 /* If packet isn't yet acked, and it has been transmitted at least
3576 * once, reset retransmit time using latest timeout
3577 * ie, this should readjust the retransmit timer for all outstanding
3578 * packets... So we don't just retransmit when we should know better*/
3580 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3581 tp->retryTime = tp->timeSent;
3582 clock_Add(&tp->retryTime, &peer->timeout);
3583 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3584 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3588 /* If the window has been extended by this acknowledge packet,
3589 * then wakeup a sender waiting in alloc for window space, or try
3590 * sending packets now, if he's been sitting on packets due to
3591 * lack of window space */
3592 if (call->tnext < (call->tfirst + call->twind)) {
3593 #ifdef RX_ENABLE_LOCKS
3594 CV_SIGNAL(&call->cv_twind);
3596 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3597 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3598 osi_rxWakeup(&call->twind);
3601 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3602 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3606 /* if the ack packet has a receivelen field hanging off it,
3607 * update our state */
3608 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3611 /* If the ack packet has a "recommended" size that is less than
3612 * what I am using now, reduce my size to match */
3613 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3614 sizeof(afs_int32), &tSize);
3615 tSize = (afs_uint32) ntohl(tSize);
3616 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3618 /* Get the maximum packet size to send to this peer */
3619 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3621 tSize = (afs_uint32)ntohl(tSize);
3622 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3623 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3625 /* sanity check - peer might have restarted with different params.
3626 * If peer says "send less", dammit, send less... Peer should never
3627 * be unable to accept packets of the size that prior AFS versions would
3628 * send without asking. */
3629 if (peer->maxMTU != tSize) {
3630 peer->maxMTU = tSize;
3631 peer->MTU = MIN(tSize, peer->MTU);
3632 call->MTU = MIN(call->MTU, tSize);
3636 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3638 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3639 sizeof(afs_int32), &tSize);
3640 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3641 if (tSize < call->twind) { /* smaller than our send */
3642 call->twind = tSize; /* window, we must send less... */
3643 call->ssthresh = MIN(call->twind, call->ssthresh);
3646 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3647 * network MTU confused with the loopback MTU. Calculate the
3648 * maximum MTU here for use in the slow start code below.
3650 maxMTU = peer->maxMTU;
3651 /* Did peer restart with older RX version? */
3652 if (peer->maxDgramPackets > 1) {
3653 peer->maxDgramPackets = 1;
3655 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3657 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3658 sizeof(afs_int32), &tSize);
3659 tSize = (afs_uint32) ntohl(tSize);
3661 * As of AFS 3.5 we set the send window to match the receive window.
3663 if (tSize < call->twind) {
3664 call->twind = tSize;
3665 call->ssthresh = MIN(call->twind, call->ssthresh);
3666 } else if (tSize > call->twind) {
3667 call->twind = tSize;
3671 * As of AFS 3.5, a jumbogram is more than one fixed size
3672 * packet transmitted in a single UDP datagram. If the remote
3673 * MTU is smaller than our local MTU then never send a datagram
3674 * larger than the natural MTU.
3676 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3677 sizeof(afs_int32), &tSize);
3678 maxDgramPackets = (afs_uint32) ntohl(tSize);
3679 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3680 maxDgramPackets = MIN(maxDgramPackets,
3681 (int)(peer->ifDgramPackets));
3682 maxDgramPackets = MIN(maxDgramPackets, tSize);
3683 if (maxDgramPackets > 1) {
3684 peer->maxDgramPackets = maxDgramPackets;
3685 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3687 peer->maxDgramPackets = 1;
3688 call->MTU = peer->natMTU;
3690 } else if (peer->maxDgramPackets > 1) {
3691 /* Restarted with lower version of RX */
3692 peer->maxDgramPackets = 1;
3694 } else if (peer->maxDgramPackets > 1 ||
3695 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3696 /* Restarted with lower version of RX */
3697 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3698 peer->natMTU = OLD_MAX_PACKET_SIZE;
3699 peer->MTU = OLD_MAX_PACKET_SIZE;
3700 peer->maxDgramPackets = 1;
3701 peer->nDgramPackets = 1;
3703 call->MTU = OLD_MAX_PACKET_SIZE;
3708 * Calculate how many datagrams were successfully received after
3709 * the first missing packet and adjust the negative ack counter
3714 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3715 if (call->nNacks < nNacked) {
3716 call->nNacks = nNacked;
3725 if (call->flags & RX_CALL_FAST_RECOVER) {
3727 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3729 call->flags &= ~RX_CALL_FAST_RECOVER;
3730 call->cwind = call->nextCwind;
3731 call->nextCwind = 0;
3734 call->nCwindAcks = 0;
3736 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3737 /* Three negative acks in a row trigger congestion recovery */
3738 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3739 MUTEX_EXIT(&peer->peer_lock);
3740 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3741 /* someone else is waiting to start recovery */
3744 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3745 while (call->flags & RX_CALL_TQ_BUSY) {
3746 call->flags |= RX_CALL_TQ_WAIT;
3747 #ifdef RX_ENABLE_LOCKS
3748 CV_WAIT(&call->cv_tq, &call->lock);
3749 #else /* RX_ENABLE_LOCKS */
3750 osi_rxSleep(&call->tq);
3751 #endif /* RX_ENABLE_LOCKS */
3753 MUTEX_ENTER(&peer->peer_lock);
3754 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3755 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3756 call->flags |= RX_CALL_FAST_RECOVER;
3757 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3758 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3760 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3761 call->nextCwind = call->ssthresh;
3764 peer->MTU = call->MTU;
3765 peer->cwind = call->nextCwind;
3766 peer->nDgramPackets = call->nDgramPackets;
3768 call->congestSeq = peer->congestSeq;
3769 /* Reset the resend times on the packets that were nacked
3770 * so we will retransmit as soon as the window permits*/
3771 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3773 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3774 clock_Zero(&tp->retryTime);
3776 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3781 /* If cwind is smaller than ssthresh, then increase
3782 * the window one packet for each ack we receive (exponential
3784 * If cwind is greater than or equal to ssthresh then increase
3785 * the congestion window by one packet for each cwind acks we
3786 * receive (linear growth). */
3787 if (call->cwind < call->ssthresh) {
3788 call->cwind = MIN((int)call->ssthresh,
3789 (int)(call->cwind + newAckCount));
3790 call->nCwindAcks = 0;
3792 call->nCwindAcks += newAckCount;
3793 if (call->nCwindAcks >= call->cwind) {
3794 call->nCwindAcks = 0;
3795 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3799 * If we have received several acknowledgements in a row then
3800 * it is time to increase the size of our datagrams
3802 if ((int)call->nAcks > rx_nDgramThreshold) {
3803 if (peer->maxDgramPackets > 1) {
3804 if (call->nDgramPackets < peer->maxDgramPackets) {
3805 call->nDgramPackets++;
3807 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3808 } else if (call->MTU < peer->maxMTU) {
3809 call->MTU += peer->natMTU;
3810 call->MTU = MIN(call->MTU, peer->maxMTU);
3816 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3818 /* Servers need to hold the call until all response packets have
3819 * been acknowledged. Soft acks are good enough since clients
3820 * are not allowed to clear their receive queues. */
3821 if (call->state == RX_STATE_HOLD &&
3822 call->tfirst + call->nSoftAcked >= call->tnext) {
3823 call->state = RX_STATE_DALLY;
3824 rxi_ClearTransmitQueue(call, 0);
3825 } else if (!queue_IsEmpty(&call->tq)) {
3826 rxi_Start(0, call, istack);
3831 /* Received a response to a challenge packet */
3832 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3833 register struct rx_connection *conn;
3834 register struct rx_packet *np;
3839 /* Ignore the packet if we're the client */
3840 if (conn->type == RX_CLIENT_CONNECTION) return np;
3842 /* If already authenticated, ignore the packet (it's probably a retry) */
3843 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3846 /* Otherwise, have the security object evaluate the response packet */
3847 error = RXS_CheckResponse(conn->securityObject, conn, np);
3849 /* If the response is invalid, reset the connection, sending
3850 * an abort to the peer */
3854 rxi_ConnectionError(conn, error);
3855 MUTEX_ENTER(&conn->conn_data_lock);
3856 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3857 MUTEX_EXIT(&conn->conn_data_lock);
3861 /* If the response is valid, any calls waiting to attach
3862 * servers can now do so */
3865 for (i=0; i<RX_MAXCALLS; i++) {
3866 struct rx_call *call = conn->call[i];
3868 MUTEX_ENTER(&call->lock);
3869 if (call->state == RX_STATE_PRECALL)
3870 rxi_AttachServerProc(call, -1, NULL, NULL);
3871 MUTEX_EXIT(&call->lock);
3875 /* Update the peer reachability information, just in case
3876 * some calls went into attach-wait while we were waiting
3877 * for authentication..
3879 rxi_UpdatePeerReach(conn, NULL);
3884 /* A client has received an authentication challenge: the security
3885 * object is asked to cough up a respectable response packet to send
3886 * back to the server. The server is responsible for retrying the
3887 * challenge if it fails to get a response. */
3890 rxi_ReceiveChallengePacket(conn, np, istack)
3891 register struct rx_connection *conn;
3892 register struct rx_packet *np;
3897 /* Ignore the challenge if we're the server */
3898 if (conn->type == RX_SERVER_CONNECTION) return np;
3900 /* Ignore the challenge if the connection is otherwise idle; someone's
3901 * trying to use us as an oracle. */
3902 if (!rxi_HasActiveCalls(conn)) return np;
3904 /* Send the security object the challenge packet. It is expected to fill
3905 * in the response. */
3906 error = RXS_GetResponse(conn->securityObject, conn, np);
3908 /* If the security object is unable to return a valid response, reset the
3909 * connection and send an abort to the peer. Otherwise send the response
3910 * packet to the peer connection. */
3912 rxi_ConnectionError(conn, error);
3913 MUTEX_ENTER(&conn->conn_data_lock);
3914 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3915 MUTEX_EXIT(&conn->conn_data_lock);
3918 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3919 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3925 /* Find an available server process to service the current request in
3926 * the given call structure. If one isn't available, queue up this
3927 * call so it eventually gets one */
3929 rxi_AttachServerProc(call, socket, tnop, newcallp)
3930 register struct rx_call *call;
3931 register osi_socket socket;
3933 register struct rx_call **newcallp;
3935 register struct rx_serverQueueEntry *sq;
3936 register struct rx_service *service = call->conn->service;
3937 #ifdef RX_ENABLE_LOCKS
3938 register int haveQuota = 0;
3939 #endif /* RX_ENABLE_LOCKS */
3940 /* May already be attached */
3941 if (call->state == RX_STATE_ACTIVE) return;
3943 MUTEX_ENTER(&rx_serverPool_lock);
3944 #ifdef RX_ENABLE_LOCKS
3945 while(rxi_ServerThreadSelectingCall) {
3946 MUTEX_EXIT(&call->lock);
3947 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3948 MUTEX_EXIT(&rx_serverPool_lock);
3949 MUTEX_ENTER(&call->lock);
3950 MUTEX_ENTER(&rx_serverPool_lock);
3951 /* Call may have been attached */
3952 if (call->state == RX_STATE_ACTIVE) return;
3955 haveQuota = QuotaOK(service);
3956 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3957 /* If there are no processes available to service this call,
3958 * put the call on the incoming call queue (unless it's
3959 * already on the queue).
3962 ReturnToServerPool(service);
3963 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3964 call->flags |= RX_CALL_WAIT_PROC;
3965 MUTEX_ENTER(&rx_stats_mutex);
3967 MUTEX_EXIT(&rx_stats_mutex);
3968 rxi_calltrace(RX_CALL_ARRIVAL, call);
3969 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3970 queue_Append(&rx_incomingCallQueue, call);
3973 #else /* RX_ENABLE_LOCKS */
3974 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3975 /* If there are no processes available to service this call,
3976 * put the call on the incoming call queue (unless it's
3977 * already on the queue).
3979 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3980 call->flags |= RX_CALL_WAIT_PROC;
3982 rxi_calltrace(RX_CALL_ARRIVAL, call);
3983 queue_Append(&rx_incomingCallQueue, call);
3986 #endif /* RX_ENABLE_LOCKS */
3988 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3990 /* If hot threads are enabled, and both newcallp and sq->socketp
3991 * are non-null, then this thread will process the call, and the
3992 * idle server thread will start listening on this threads socket.
3995 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3998 *sq->socketp = socket;
3999 clock_GetTime(&call->startTime);
4000 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
4004 if (call->flags & RX_CALL_WAIT_PROC) {
4005 /* Conservative: I don't think this should happen */
4006 call->flags &= ~RX_CALL_WAIT_PROC;
4007 MUTEX_ENTER(&rx_stats_mutex);
4009 MUTEX_EXIT(&rx_stats_mutex);
4012 call->state = RX_STATE_ACTIVE;
4013 call->mode = RX_MODE_RECEIVING;
4014 if (call->flags & RX_CALL_CLEARED) {
4015 /* send an ack now to start the packet flow up again */
4016 call->flags &= ~RX_CALL_CLEARED;
4017 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
4019 #ifdef RX_ENABLE_LOCKS
4022 service->nRequestsRunning++;
4023 if (service->nRequestsRunning <= service->minProcs)
4029 MUTEX_EXIT(&rx_serverPool_lock);
4032 /* Delay the sending of an acknowledge event for a short while, while
4033 * a new call is being prepared (in the case of a client) or a reply
4034 * is being prepared (in the case of a server). Rather than sending
4035 * an ack packet, an ACKALL packet is sent. */
4036 void rxi_AckAll(event, call, dummy)
4037 struct rxevent *event;
4038 register struct rx_call *call;
4041 #ifdef RX_ENABLE_LOCKS
4043 MUTEX_ENTER(&call->lock);
4044 call->delayedAckEvent = (struct rxevent *) 0;
4045 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
4047 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
4048 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
4050 MUTEX_EXIT(&call->lock);
4051 #else /* RX_ENABLE_LOCKS */
4052 if (event) call->delayedAckEvent = (struct rxevent *) 0;
4053 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
4054 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
4055 #endif /* RX_ENABLE_LOCKS */
4058 void rxi_SendDelayedAck(event, call, dummy)
4059 struct rxevent *event;
4060 register struct rx_call *call;
4063 #ifdef RX_ENABLE_LOCKS
4065 MUTEX_ENTER(&call->lock);
4066 if (event == call->delayedAckEvent)
4067 call->delayedAckEvent = (struct rxevent *) 0;
4068 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
4070 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4072 MUTEX_EXIT(&call->lock);
4073 #else /* RX_ENABLE_LOCKS */
4074 if (event) call->delayedAckEvent = (struct rxevent *) 0;
4075 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
4076 #endif /* RX_ENABLE_LOCKS */
4080 #ifdef RX_ENABLE_LOCKS
4081 /* Set ack in all packets in transmit queue. rxi_Start will deal with
4082 * clearing them out.
4084 static void rxi_SetAcksInTransmitQueue(call)
4085 register struct rx_call *call;
4087 register struct rx_packet *p, *tp;
4090 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4093 p->flags |= RX_PKTFLAG_ACKED;
4097 call->flags |= RX_CALL_TQ_CLEARME;
4098 call->flags |= RX_CALL_TQ_SOME_ACKED;
4101 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4102 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4103 call->tfirst = call->tnext;
4104 call->nSoftAcked = 0;
4106 if (call->flags & RX_CALL_FAST_RECOVER) {
4107 call->flags &= ~RX_CALL_FAST_RECOVER;
4108 call->cwind = call->nextCwind;
4109 call->nextCwind = 0;
4112 CV_SIGNAL(&call->cv_twind);
4114 #endif /* RX_ENABLE_LOCKS */
4116 /* Clear out the transmit queue for the current call (all packets have
4117 * been received by peer) */
4118 void rxi_ClearTransmitQueue(call, force)
4119 register struct rx_call *call;
4122 register struct rx_packet *p, *tp;
4124 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4125 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
4127 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4130 p->flags |= RX_PKTFLAG_ACKED;
4134 call->flags |= RX_CALL_TQ_CLEARME;
4135 call->flags |= RX_CALL_TQ_SOME_ACKED;
4138 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4139 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4145 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4146 call->flags &= ~RX_CALL_TQ_CLEARME;
4148 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4150 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4151 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4152 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4153 call->nSoftAcked = 0;
4155 if (call->flags & RX_CALL_FAST_RECOVER) {
4156 call->flags &= ~RX_CALL_FAST_RECOVER;
4157 call->cwind = call->nextCwind;
4160 #ifdef RX_ENABLE_LOCKS
4161 CV_SIGNAL(&call->cv_twind);
4163 osi_rxWakeup(&call->twind);
4167 void rxi_ClearReceiveQueue(call)
4168 register struct rx_call *call;
4170 register struct rx_packet *p, *tp;
4171 if (queue_IsNotEmpty(&call->rq)) {
4172 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4177 rx_packetReclaims++;
4179 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4181 if (call->state == RX_STATE_PRECALL) {
4182 call->flags |= RX_CALL_CLEARED;
4186 /* Send an abort packet for the specified call */
4187 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4188 register struct rx_call *call;
4189 struct rx_packet *packet;
4199 /* Clients should never delay abort messages */
4200 if (rx_IsClientConn(call->conn))
4203 if (call->abortCode != call->error) {
4204 call->abortCode = call->error;
4205 call->abortCount = 0;
4208 if (force || rxi_callAbortThreshhold == 0 ||
4209 call->abortCount < rxi_callAbortThreshhold) {
4210 if (call->delayedAbortEvent) {
4211 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4213 error = htonl(call->error);
4215 packet = rxi_SendSpecial(call, call->conn, packet,
4216 RX_PACKET_TYPE_ABORT, (char *)&error,
4217 sizeof(error), istack);
4218 } else if (!call->delayedAbortEvent) {
4219 clock_GetTime(&when);
4220 clock_Addmsec(&when, rxi_callAbortDelay);
4221 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4222 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4228 /* Send an abort packet for the specified connection. Packet is an
4229 * optional pointer to a packet that can be used to send the abort.
4230 * Once the number of abort messages reaches the threshhold, an
4231 * event is scheduled to send the abort. Setting the force flag
4232 * overrides sending delayed abort messages.
4234 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4235 * to send the abort packet.
4237 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4238 register struct rx_connection *conn;
4239 struct rx_packet *packet;
4249 /* Clients should never delay abort messages */
4250 if (rx_IsClientConn(conn))
4253 if (force || rxi_connAbortThreshhold == 0 ||
4254 conn->abortCount < rxi_connAbortThreshhold) {
4255 if (conn->delayedAbortEvent) {
4256 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4258 error = htonl(conn->error);
4260 MUTEX_EXIT(&conn->conn_data_lock);
4261 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4262 RX_PACKET_TYPE_ABORT, (char *)&error,
4263 sizeof(error), istack);
4264 MUTEX_ENTER(&conn->conn_data_lock);
4265 } else if (!conn->delayedAbortEvent) {
4266 clock_GetTime(&when);
4267 clock_Addmsec(&when, rxi_connAbortDelay);
4268 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4274 /* Associate an error all of the calls owned by a connection. Called
4275 * with error non-zero. This is only for really fatal things, like
4276 * bad authentication responses. The connection itself is set in
4277 * error at this point, so that future packets received will be
4279 void rxi_ConnectionError(conn, error)
4280 register struct rx_connection *conn;
4281 register afs_int32 error;
4285 MUTEX_ENTER(&conn->conn_data_lock);
4286 if (conn->challengeEvent)
4287 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4288 if (conn->checkReachEvent) {
4289 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4290 conn->checkReachEvent = 0;
4293 MUTEX_EXIT(&conn->conn_data_lock);
4294 for (i=0; i<RX_MAXCALLS; i++) {
4295 struct rx_call *call = conn->call[i];
4297 MUTEX_ENTER(&call->lock);
4298 rxi_CallError(call, error);
4299 MUTEX_EXIT(&call->lock);
4302 conn->error = error;
4303 MUTEX_ENTER(&rx_stats_mutex);
4304 rx_stats.fatalErrors++;
4305 MUTEX_EXIT(&rx_stats_mutex);
4309 void rxi_CallError(call, error)
4310 register struct rx_call *call;
4313 if (call->error) error = call->error;
4314 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4315 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4316 rxi_ResetCall(call, 0);
4319 rxi_ResetCall(call, 0);
4321 call->error = error;
4322 call->mode = RX_MODE_ERROR;
4325 /* Reset various fields in a call structure, and wakeup waiting
4326 * processes. Some fields aren't changed: state & mode are not
4327 * touched (these must be set by the caller), and bufptr, nLeft, and
4328 * nFree are not reset, since these fields are manipulated by
4329 * unprotected macros, and may only be reset by non-interrupting code.
4332 /* this code requires that call->conn be set properly as a pre-condition. */
4333 #endif /* ADAPT_WINDOW */
4335 void rxi_ResetCall(call, newcall)
4336 register struct rx_call *call;
4337 register int newcall;
4340 register struct rx_peer *peer;
4341 struct rx_packet *packet;
4343 /* Notify anyone who is waiting for asynchronous packet arrival */
4344 if (call->arrivalProc) {
4345 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4346 call->arrivalProc = (VOID (*)()) 0;
4349 if (call->delayedAbortEvent) {
4350 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4351 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4353 rxi_SendCallAbort(call, packet, 0, 1);
4354 rxi_FreePacket(packet);
4359 * Update the peer with the congestion information in this call
4360 * so other calls on this connection can pick up where this call
4361 * left off. If the congestion sequence numbers don't match then
4362 * another call experienced a retransmission.
4364 peer = call->conn->peer;
4365 MUTEX_ENTER(&peer->peer_lock);
4367 if (call->congestSeq == peer->congestSeq) {
4368 peer->cwind = MAX(peer->cwind, call->cwind);
4369 peer->MTU = MAX(peer->MTU, call->MTU);
4370 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4373 call->abortCode = 0;
4374 call->abortCount = 0;
4376 if (peer->maxDgramPackets > 1) {
4377 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4379 call->MTU = peer->MTU;
4381 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4382 call->ssthresh = rx_maxSendWindow;
4383 call->nDgramPackets = peer->nDgramPackets;
4384 call->congestSeq = peer->congestSeq;
4385 MUTEX_EXIT(&peer->peer_lock);
4387 flags = call->flags;
4388 rxi_ClearReceiveQueue(call);
4389 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4390 if (call->flags & RX_CALL_TQ_BUSY) {
4391 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4392 call->flags |= (flags & RX_CALL_TQ_WAIT);
4394 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4396 rxi_ClearTransmitQueue(call, 0);
4397 queue_Init(&call->tq);
4400 queue_Init(&call->rq);
4402 call->rwind = rx_initReceiveWindow;
4403 call->twind = rx_initSendWindow;
4404 call->nSoftAcked = 0;
4405 call->nextCwind = 0;
4408 call->nCwindAcks = 0;
4409 call->nSoftAcks = 0;
4410 call->nHardAcks = 0;
4412 call->tfirst = call->rnext = call->tnext = 1;
4414 call->lastAcked = 0;
4415 call->localStatus = call->remoteStatus = 0;
4417 if (flags & RX_CALL_READER_WAIT) {
4418 #ifdef RX_ENABLE_LOCKS
4419 CV_BROADCAST(&call->cv_rq);
4421 osi_rxWakeup(&call->rq);
4424 if (flags & RX_CALL_WAIT_PACKETS) {
4425 MUTEX_ENTER(&rx_freePktQ_lock);
4426 rxi_PacketsUnWait(); /* XXX */
4427 MUTEX_EXIT(&rx_freePktQ_lock);
4430 #ifdef RX_ENABLE_LOCKS
4431 CV_SIGNAL(&call->cv_twind);
4433 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4434 osi_rxWakeup(&call->twind);
4437 #ifdef RX_ENABLE_LOCKS
4438 /* The following ensures that we don't mess with any queue while some
4439 * other thread might also be doing so. The call_queue_lock field is
4440 * is only modified under the call lock. If the call is in the process
4441 * of being removed from a queue, the call is not locked until the
4442 * the queue lock is dropped and only then is the call_queue_lock field
4443 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4444 * Note that any other routine which removes a call from a queue has to
4445 * obtain the queue lock before examing the queue and removing the call.
4447 if (call->call_queue_lock) {
4448 MUTEX_ENTER(call->call_queue_lock);
4449 if (queue_IsOnQueue(call)) {
4451 if (flags & RX_CALL_WAIT_PROC) {
4452 MUTEX_ENTER(&rx_stats_mutex);
4454 MUTEX_EXIT(&rx_stats_mutex);
4457 MUTEX_EXIT(call->call_queue_lock);
4458 CLEAR_CALL_QUEUE_LOCK(call);
4460 #else /* RX_ENABLE_LOCKS */
4461 if (queue_IsOnQueue(call)) {
4463 if (flags & RX_CALL_WAIT_PROC)
4466 #endif /* RX_ENABLE_LOCKS */
4468 rxi_KeepAliveOff(call);
4469 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4472 /* Send an acknowledge for the indicated packet (seq,serial) of the
4473 * indicated call, for the indicated reason (reason). This
4474 * acknowledge will specifically acknowledge receiving the packet, and
4475 * will also specify which other packets for this call have been
4476 * received. This routine returns the packet that was used to the
4477 * caller. The caller is responsible for freeing it or re-using it.
4478 * This acknowledgement also returns the highest sequence number
4479 * actually read out by the higher level to the sender; the sender
4480 * promises to keep around packets that have not been read by the
4481 * higher level yet (unless, of course, the sender decides to abort
4482 * the call altogether). Any of p, seq, serial, pflags, or reason may
4483 * be set to zero without ill effect. That is, if they are zero, they
4484 * will not convey any information.
4485 * NOW there is a trailer field, after the ack where it will safely be
4486 * ignored by mundanes, which indicates the maximum size packet this
4487 * host can swallow. */
4488 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4489 register struct rx_call *call;
4490 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4491 int seq; /* Sequence number of the packet we are acking */
4492 int serial; /* Serial number of the packet */
4493 int pflags; /* Flags field from packet header */
4494 int reason; /* Reason an acknowledge was prompted */
4497 struct rx_ackPacket *ap;
4498 register struct rx_packet *rqp;
4499 register struct rx_packet *nxp; /* For queue_Scan */
4500 register struct rx_packet *p;
4505 * Open the receive window once a thread starts reading packets
4507 if (call->rnext > 1) {
4508 call->rwind = rx_maxReceiveWindow;
4511 call->nHardAcks = 0;
4512 call->nSoftAcks = 0;
4513 if (call->rnext > call->lastAcked)
4514 call->lastAcked = call->rnext;
4518 rx_computelen(p, p->length); /* reset length, you never know */
4519 } /* where that's been... */
4521 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4522 /* We won't send the ack, but don't panic. */
4523 return optionalPacket;
4526 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4528 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4529 if (!optionalPacket) rxi_FreePacket(p);
4530 return optionalPacket;
4532 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4533 if (rx_Contiguous(p)<templ) {
4534 if (!optionalPacket) rxi_FreePacket(p);
4535 return optionalPacket;
4537 } /* MTUXXX failing to send an ack is very serious. We should */
4538 /* try as hard as possible to send even a partial ack; it's */
4539 /* better than nothing. */
4541 ap = (struct rx_ackPacket *) rx_DataOf(p);
4542 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4543 ap->reason = reason;
4545 /* The skew computation used to be bogus, I think it's better now. */
4546 /* We should start paying attention to skew. XXX */
4547 ap->serial = htonl(call->conn->maxSerial);
4548 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4550 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4551 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4553 /* No fear of running out of ack packet here because there can only be at most
4554 * one window full of unacknowledged packets. The window size must be constrained
4555 * to be less than the maximum ack size, of course. Also, an ack should always
4556 * fit into a single packet -- it should not ever be fragmented. */
4557 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4558 if (!rqp || !call->rq.next
4559 || (rqp->header.seq > (call->rnext + call->rwind))) {
4560 if (!optionalPacket) rxi_FreePacket(p);
4561 rxi_CallError(call, RX_CALL_DEAD);
4562 return optionalPacket;
4565 while (rqp->header.seq > call->rnext + offset)
4566 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4567 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4569 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4570 if (!optionalPacket) rxi_FreePacket(p);
4571 rxi_CallError(call, RX_CALL_DEAD);
4572 return optionalPacket;
4577 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4579 /* these are new for AFS 3.3 */
4580 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4581 templ = htonl(templ);
4582 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4583 templ = htonl(call->conn->peer->ifMTU);
4584 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4586 /* new for AFS 3.4 */
4587 templ = htonl(call->rwind);
4588 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4590 /* new for AFS 3.5 */
4591 templ = htonl(call->conn->peer->ifDgramPackets);
4592 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4594 p->header.serviceId = call->conn->serviceId;
4595 p->header.cid = (call->conn->cid | call->channel);
4596 p->header.callNumber = *call->callNumber;
4597 p->header.seq = seq;
4598 p->header.securityIndex = call->conn->securityIndex;
4599 p->header.epoch = call->conn->epoch;
4600 p->header.type = RX_PACKET_TYPE_ACK;
4601 p->header.flags = RX_SLOW_START_OK;
4602 if (reason == RX_ACK_PING) {
4603 p->header.flags |= RX_REQUEST_ACK;
4605 clock_GetTime(&call->pingRequestTime);
4608 if (call->conn->type == RX_CLIENT_CONNECTION)
4609 p->header.flags |= RX_CLIENT_INITIATED;
4613 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4614 ap->reason, ntohl(ap->previousPacket),
4615 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4617 for (offset = 0; offset < ap->nAcks; offset++)
4618 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4625 register int i, nbytes = p->length;
4627 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4628 if (nbytes <= p->wirevec[i].iov_len) {
4629 register int savelen, saven;
4631 savelen = p->wirevec[i].iov_len;
4633 p->wirevec[i].iov_len = nbytes;
4635 rxi_Send(call, p, istack);
4636 p->wirevec[i].iov_len = savelen;
4640 else nbytes -= p->wirevec[i].iov_len;
4643 MUTEX_ENTER(&rx_stats_mutex);
4644 rx_stats.ackPacketsSent++;
4645 MUTEX_EXIT(&rx_stats_mutex);
4646 if (!optionalPacket) rxi_FreePacket(p);
4647 return optionalPacket; /* Return packet for re-use by caller */
4650 /* Send all of the packets in the list in single datagram */
4651 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4652 struct rx_call *call;
4653 struct rx_packet **list;
4658 struct clock *retryTime;
4664 struct rx_connection *conn = call->conn;
4665 struct rx_peer *peer = conn->peer;
4667 MUTEX_ENTER(&peer->peer_lock);
4669 if (resending) peer->reSends += len;
4670 MUTEX_ENTER(&rx_stats_mutex);
4671 rx_stats.dataPacketsSent += len;
4672 MUTEX_EXIT(&rx_stats_mutex);
4673 MUTEX_EXIT(&peer->peer_lock);
4675 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4679 /* Set the packet flags and schedule the resend events */
4680 /* Only request an ack for the last packet in the list */
4681 for (i = 0 ; i < len ; i++) {
4682 list[i]->retryTime = *retryTime;
4683 if (list[i]->header.serial) {
4684 /* Exponentially backoff retry times */
4685 if (list[i]->backoff < MAXBACKOFF) {
4686 /* so it can't stay == 0 */
4687 list[i]->backoff = (list[i]->backoff << 1) +1;
4689 else list[i]->backoff++;
4690 clock_Addmsec(&(list[i]->retryTime),
4691 ((afs_uint32) list[i]->backoff) << 8);
4694 /* Wait a little extra for the ack on the last packet */
4695 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4696 clock_Addmsec(&(list[i]->retryTime), 400);
4699 /* Record the time sent */
4700 list[i]->timeSent = *now;
4702 /* Ask for an ack on retransmitted packets, on every other packet
4703 * if the peer doesn't support slow start. Ask for an ack on every
4704 * packet until the congestion window reaches the ack rate. */
4705 if (list[i]->header.serial) {
4707 MUTEX_ENTER(&rx_stats_mutex);
4708 rx_stats.dataPacketsReSent++;
4709 MUTEX_EXIT(&rx_stats_mutex);
4711 /* improved RTO calculation- not Karn */
4712 list[i]->firstSent = *now;
4714 && (call->cwind <= (u_short)(conn->ackRate+1)
4715 || (!(call->flags & RX_CALL_SLOW_START_OK)
4716 && (list[i]->header.seq & 1)))) {
4721 MUTEX_ENTER(&peer->peer_lock);
4723 if (resending) peer->reSends++;
4724 MUTEX_ENTER(&rx_stats_mutex);
4725 rx_stats.dataPacketsSent++;
4726 MUTEX_EXIT(&rx_stats_mutex);
4727 MUTEX_EXIT(&peer->peer_lock);
4729 /* Tag this packet as not being the last in this group,
4730 * for the receiver's benefit */
4731 if (i < len-1 || moreFlag) {
4732 list[i]->header.flags |= RX_MORE_PACKETS;
4735 /* Install the new retransmit time for the packet, and
4736 * record the time sent */
4737 list[i]->timeSent = *now;
4741 list[len-1]->header.flags |= RX_REQUEST_ACK;
4744 /* Since we're about to send a data packet to the peer, it's
4745 * safe to nuke any scheduled end-of-packets ack */
4746 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4748 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4749 MUTEX_EXIT(&call->lock);
4751 rxi_SendPacketList(conn, list, len, istack);
4753 rxi_SendPacket(conn, list[0], istack);
4755 MUTEX_ENTER(&call->lock);
4756 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4758 /* Update last send time for this call (for keep-alive
4759 * processing), and for the connection (so that we can discover
4760 * idle connections) */
4761 conn->lastSendTime = call->lastSendTime = clock_Sec();
4764 /* When sending packets we need to follow these rules:
4765 * 1. Never send more than maxDgramPackets in a jumbogram.
4766 * 2. Never send a packet with more than two iovecs in a jumbogram.
4767 * 3. Never send a retransmitted packet in a jumbogram.
4768 * 4. Never send more than cwind/4 packets in a jumbogram
4769 * We always keep the last list we should have sent so we
4770 * can set the RX_MORE_PACKETS flags correctly.
4772 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4773 struct rx_call *call;
4774 struct rx_packet **list;
4778 struct clock *retryTime;
4781 int i, cnt, lastCnt = 0;
4782 struct rx_packet **listP, **lastP = 0;
4783 struct rx_peer *peer = call->conn->peer;
4784 int morePackets = 0;
4786 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4787 /* Does the current packet force us to flush the current list? */
4789 && (list[i]->header.serial
4790 || (list[i]->flags & RX_PKTFLAG_ACKED)
4791 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4793 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4794 /* If the call enters an error state stop sending, or if
4795 * we entered congestion recovery mode, stop sending */
4796 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4804 /* Add the current packet to the list if it hasn't been acked.
4805 * Otherwise adjust the list pointer to skip the current packet. */
4806 if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
4808 /* Do we need to flush the list? */
4809 if (cnt >= (int)peer->maxDgramPackets
4810 || cnt >= (int)call->nDgramPackets
4811 || cnt >= (int)call->cwind
4812 || list[i]->header.serial
4813 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4815 rxi_SendList(call, lastP, lastCnt, istack, 1,
4816 now, retryTime, resending);
4817 /* If the call enters an error state stop sending, or if
4818 * we entered congestion recovery mode, stop sending */
4819 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4829 osi_Panic("rxi_SendList error");
4835 /* Send the whole list when the call is in receive mode, when
4836 * the call is in eof mode, when we are in fast recovery mode,
4837 * and when we have the last packet */
4838 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4839 || call->mode == RX_MODE_RECEIVING
4840 || call->mode == RX_MODE_EOF
4841 || (call->flags & RX_CALL_FAST_RECOVER)) {
4842 /* Check for the case where the current list contains
4843 * an acked packet. Since we always send retransmissions
4844 * in a separate packet, we only need to check the first
4845 * packet in the list */
4846 if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
4850 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4851 now, retryTime, resending);
4852 /* If the call enters an error state stop sending, or if
4853 * we entered congestion recovery mode, stop sending */
4854 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4858 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4860 } else if (lastCnt > 0) {
4861 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4865 #ifdef RX_ENABLE_LOCKS
4866 /* Call rxi_Start, below, but with the call lock held. */
4867 void rxi_StartUnlocked(event, call, istack)
4868 struct rxevent *event;
4869 register struct rx_call *call;
4872 MUTEX_ENTER(&call->lock);
4873 rxi_Start(event, call, istack);
4874 MUTEX_EXIT(&call->lock);
4876 #endif /* RX_ENABLE_LOCKS */
4878 /* This routine is called when new packets are readied for
4879 * transmission and when retransmission may be necessary, or when the
4880 * transmission window or burst count are favourable. This should be
4881 * better optimized for new packets, the usual case, now that we've
4882 * got rid of queues of send packets. XXXXXXXXXXX */
4883 void rxi_Start(event, call, istack)
4884 struct rxevent *event;
4885 register struct rx_call *call;
4888 struct rx_packet *p;
4889 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4890 struct rx_peer *peer = call->conn->peer;
4891 struct clock now, retryTime;
4895 struct rx_packet **xmitList;
4898 /* If rxi_Start is being called as a result of a resend event,
4899 * then make sure that the event pointer is removed from the call
4900 * structure, since there is no longer a per-call retransmission
4902 if (event && event == call->resendEvent) {
4903 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4904 call->resendEvent = NULL;
4906 if (queue_IsEmpty(&call->tq)) {
4910 /* Timeouts trigger congestion recovery */
4911 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4912 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4913 /* someone else is waiting to start recovery */
4916 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4917 while (call->flags & RX_CALL_TQ_BUSY) {
4918 call->flags |= RX_CALL_TQ_WAIT;
4919 #ifdef RX_ENABLE_LOCKS
4920 CV_WAIT(&call->cv_tq, &call->lock);
4921 #else /* RX_ENABLE_LOCKS */
4922 osi_rxSleep(&call->tq);
4923 #endif /* RX_ENABLE_LOCKS */
4925 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4926 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4927 call->flags |= RX_CALL_FAST_RECOVER;
4928 if (peer->maxDgramPackets > 1) {
4929 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4931 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4933 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4934 call->nDgramPackets = 1;
4936 call->nextCwind = 1;
4939 MUTEX_ENTER(&peer->peer_lock);
4940 peer->MTU = call->MTU;
4941 peer->cwind = call->cwind;
4942 peer->nDgramPackets = 1;
4944 call->congestSeq = peer->congestSeq;
4945 MUTEX_EXIT(&peer->peer_lock);
4946 /* Clear retry times on packets. Otherwise, it's possible for
4947 * some packets in the queue to force resends at rates faster
4948 * than recovery rates.
4950 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4951 if (!(p->flags & RX_PKTFLAG_ACKED)) {
4952 clock_Zero(&p->retryTime);
4957 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4958 MUTEX_ENTER(&rx_stats_mutex);
4959 rx_tq_debug.rxi_start_in_error ++;
4960 MUTEX_EXIT(&rx_stats_mutex);
4965 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4966 /* Get clock to compute the re-transmit time for any packets
4967 * in this burst. Note, if we back off, it's reasonable to
4968 * back off all of the packets in the same manner, even if
4969 * some of them have been retransmitted more times than more
4970 * recent additions */
4971 clock_GetTime(&now);
4972 retryTime = now; /* initialize before use */
4973 MUTEX_ENTER(&peer->peer_lock);
4974 clock_Add(&retryTime, &peer->timeout);
4975 MUTEX_EXIT(&peer->peer_lock);
4977 /* Send (or resend) any packets that need it, subject to
4978 * window restrictions and congestion burst control
4979 * restrictions. Ask for an ack on the last packet sent in
4980 * this burst. For now, we're relying upon the window being
4981 * considerably bigger than the largest number of packets that
4982 * are typically sent at once by one initial call to
4983 * rxi_Start. This is probably bogus (perhaps we should ask
4984 * for an ack when we're half way through the current
4985 * window?). Also, for non file transfer applications, this
4986 * may end up asking for an ack for every packet. Bogus. XXXX
4989 * But check whether we're here recursively, and let the other guy
4992 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4993 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4994 call->flags |= RX_CALL_TQ_BUSY;
4996 call->flags &= ~RX_CALL_NEED_START;
4997 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4999 maxXmitPackets = MIN(call->twind, call->cwind);
5000 xmitList = (struct rx_packet **)
5001 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
5002 if (xmitList == NULL)
5003 osi_Panic("rxi_Start, failed to allocate xmit list");
5004 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
5005 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
5006 /* We shouldn't be sending packets if a thread is waiting
5007 * to initiate congestion recovery */
5010 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
5011 /* Only send one packet during fast recovery */
5014 if ((p->flags & RX_PKTFLAG_FREE) ||
5015 (!queue_IsEnd(&call->tq, nxp)
5016 && (nxp->flags & RX_PKTFLAG_FREE)) ||
5017 (p == (struct rx_packet *)&rx_freePacketQueue) ||
5018 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
5019 osi_Panic("rxi_Start: xmit queue clobbered");
5021 if (p->flags & RX_PKTFLAG_ACKED) {
5022 MUTEX_ENTER(&rx_stats_mutex);
5023 rx_stats.ignoreAckedPacket++;
5024 MUTEX_EXIT(&rx_stats_mutex);
5025 continue; /* Ignore this packet if it has been acknowledged */
5028 /* Turn off all flags except these ones, which are the same
5029 * on each transmission */
5030 p->header.flags &= RX_PRESET_FLAGS;
5032 if (p->header.seq >= call->tfirst +
5033 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
5034 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
5035 /* Note: if we're waiting for more window space, we can
5036 * still send retransmits; hence we don't return here, but
5037 * break out to schedule a retransmit event */
5038 dpf(("call %d waiting for window", *(call->callNumber)));
5042 /* Transmit the packet if it needs to be sent. */
5043 if (!clock_Lt(&now, &p->retryTime)) {
5044 if (nXmitPackets == maxXmitPackets) {
5045 osi_Panic("rxi_Start: xmit list overflowed");
5047 xmitList[nXmitPackets++] = p;
5051 /* xmitList now hold pointers to all of the packets that are
5052 * ready to send. Now we loop to send the packets */
5053 if (nXmitPackets > 0) {
5054 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
5055 &now, &retryTime, resending);
5057 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
5059 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5061 * TQ references no longer protected by this flag; they must remain
5062 * protected by the global lock.
5064 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
5065 call->flags &= ~RX_CALL_TQ_BUSY;
5066 if (call->flags & RX_CALL_TQ_WAIT) {
5067 call->flags &= ~RX_CALL_TQ_WAIT;
5068 #ifdef RX_ENABLE_LOCKS
5069 CV_BROADCAST(&call->cv_tq);
5070 #else /* RX_ENABLE_LOCKS */
5071 osi_rxWakeup(&call->tq);
5072 #endif /* RX_ENABLE_LOCKS */
5077 /* We went into the error state while sending packets. Now is
5078 * the time to reset the call. This will also inform the using
5079 * process that the call is in an error state.
5081 MUTEX_ENTER(&rx_stats_mutex);
5082 rx_tq_debug.rxi_start_aborted ++;
5083 MUTEX_EXIT(&rx_stats_mutex);
5084 call->flags &= ~RX_CALL_TQ_BUSY;
5085 if (call->flags & RX_CALL_TQ_WAIT) {
5086 call->flags &= ~RX_CALL_TQ_WAIT;
5087 #ifdef RX_ENABLE_LOCKS
5088 CV_BROADCAST(&call->cv_tq);
5089 #else /* RX_ENABLE_LOCKS */
5090 osi_rxWakeup(&call->tq);
5091 #endif /* RX_ENABLE_LOCKS */
5093 rxi_CallError(call, call->error);
5096 #ifdef RX_ENABLE_LOCKS
5097 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
5098 register int missing;
5099 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
5100 /* Some packets have received acks. If they all have, we can clear
5101 * the transmit queue.
5103 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5104 if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
5112 call->flags |= RX_CALL_TQ_CLEARME;
5114 #endif /* RX_ENABLE_LOCKS */
5115 /* Don't bother doing retransmits if the TQ is cleared. */
5116 if (call->flags & RX_CALL_TQ_CLEARME) {
5117 rxi_ClearTransmitQueue(call, 1);
5119 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5122 /* Always post a resend event, if there is anything in the
5123 * queue, and resend is possible. There should be at least
5124 * one unacknowledged packet in the queue ... otherwise none
5125 * of these packets should be on the queue in the first place.
5127 if (call->resendEvent) {
5128 /* Cancel the existing event and post a new one */
5129 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5132 /* The retry time is the retry time on the first unacknowledged
5133 * packet inside the current window */
5134 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
5135 /* Don't set timers for packets outside the window */
5136 if (p->header.seq >= call->tfirst + call->twind) {
5140 if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
5142 retryTime = p->retryTime;
5147 /* Post a new event to re-run rxi_Start when retries may be needed */
5148 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
5149 #ifdef RX_ENABLE_LOCKS
5150 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
5151 call->resendEvent = rxevent_Post(&retryTime,
5153 (char *)call, istack);
5154 #else /* RX_ENABLE_LOCKS */
5155 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
5156 (char *)call, (void*)(long)istack);
5157 #endif /* RX_ENABLE_LOCKS */
5160 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
5161 } while (call->flags & RX_CALL_NEED_START);
5163 * TQ references no longer protected by this flag; they must remain
5164 * protected by the global lock.
5166 call->flags &= ~RX_CALL_TQ_BUSY;
5167 if (call->flags & RX_CALL_TQ_WAIT) {
5168 call->flags &= ~RX_CALL_TQ_WAIT;
5169 #ifdef RX_ENABLE_LOCKS
5170 CV_BROADCAST(&call->cv_tq);
5171 #else /* RX_ENABLE_LOCKS */
5172 osi_rxWakeup(&call->tq);
5173 #endif /* RX_ENABLE_LOCKS */
5176 call->flags |= RX_CALL_NEED_START;
5178 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5180 if (call->resendEvent) {
5181 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5186 /* Also adjusts the keep alive parameters for the call, to reflect
5187 * that we have just sent a packet (so keep alives aren't sent
5189 void rxi_Send(call, p, istack)
5190 register struct rx_call *call;
5191 register struct rx_packet *p;
5194 register struct rx_connection *conn = call->conn;
5196 /* Stamp each packet with the user supplied status */
5197 p->header.userStatus = call->localStatus;
5199 /* Allow the security object controlling this call's security to
5200 * make any last-minute changes to the packet */
5201 RXS_SendPacket(conn->securityObject, call, p);
5203 /* Since we're about to send SOME sort of packet to the peer, it's
5204 * safe to nuke any scheduled end-of-packets ack */
5205 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5207 /* Actually send the packet, filling in more connection-specific fields */
5208 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5209 MUTEX_EXIT(&call->lock);
5210 rxi_SendPacket(conn, p, istack);
5211 MUTEX_ENTER(&call->lock);
5212 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5214 /* Update last send time for this call (for keep-alive
5215 * processing), and for the connection (so that we can discover
5216 * idle connections) */
5217 conn->lastSendTime = call->lastSendTime = clock_Sec();
5221 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5222 * that things are fine. Also called periodically to guarantee that nothing
5223 * falls through the cracks (e.g. (error + dally) connections have keepalive
5224 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5227 #ifdef RX_ENABLE_LOCKS
5228 int rxi_CheckCall(call, haveCTLock)
5229 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5230 #else /* RX_ENABLE_LOCKS */
5231 int rxi_CheckCall(call)
5232 #endif /* RX_ENABLE_LOCKS */
5233 register struct rx_call *call;
5235 register struct rx_connection *conn = call->conn;
5236 register struct rx_service *tservice;
5238 afs_uint32 deadTime;
5240 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5241 if (call->flags & RX_CALL_TQ_BUSY) {
5242 /* Call is active and will be reset by rxi_Start if it's
5243 * in an error state.
5248 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5249 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5250 ((afs_uint32)conn->peer->rtt >> 3) +
5251 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5253 /* These are computed to the second (+- 1 second). But that's
5254 * good enough for these values, which should be a significant
5255 * number of seconds. */
5256 if (now > (call->lastReceiveTime + deadTime)) {
5257 if (call->state == RX_STATE_ACTIVE) {
5258 rxi_CallError(call, RX_CALL_DEAD);
5262 #ifdef RX_ENABLE_LOCKS
5263 /* Cancel pending events */
5264 rxevent_Cancel(call->delayedAckEvent, call,
5265 RX_CALL_REFCOUNT_DELAY);
5266 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5267 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5268 if (call->refCount == 0) {
5269 rxi_FreeCall(call, haveCTLock);
5273 #else /* RX_ENABLE_LOCKS */
5276 #endif /* RX_ENABLE_LOCKS */
5278 /* Non-active calls are destroyed if they are not responding
5279 * to pings; active calls are simply flagged in error, so the
5280 * attached process can die reasonably gracefully. */
5282 /* see if we have a non-activity timeout */
5283 tservice = conn->service;
5284 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5285 && tservice->idleDeadTime
5286 && ((call->startWait + tservice->idleDeadTime) < now)) {
5287 if (call->state == RX_STATE_ACTIVE) {
5288 rxi_CallError(call, RX_CALL_TIMEOUT);
5292 /* see if we have a hard timeout */
5293 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5294 if (call->state == RX_STATE_ACTIVE)
5295 rxi_CallError(call, RX_CALL_TIMEOUT);
5302 /* When a call is in progress, this routine is called occasionally to
5303 * make sure that some traffic has arrived (or been sent to) the peer.
5304 * If nothing has arrived in a reasonable amount of time, the call is
5305 * declared dead; if nothing has been sent for a while, we send a
5306 * keep-alive packet (if we're actually trying to keep the call alive)
5308 void rxi_KeepAliveEvent(event, call, dummy)
5309 struct rxevent *event;
5310 register struct rx_call *call;
5312 struct rx_connection *conn;
5315 MUTEX_ENTER(&call->lock);
5316 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5317 if (event == call->keepAliveEvent)
5318 call->keepAliveEvent = (struct rxevent *) 0;
5321 #ifdef RX_ENABLE_LOCKS
5322 if(rxi_CheckCall(call, 0)) {
5323 MUTEX_EXIT(&call->lock);
5326 #else /* RX_ENABLE_LOCKS */
5327 if (rxi_CheckCall(call)) return;
5328 #endif /* RX_ENABLE_LOCKS */
5330 /* Don't try to keep alive dallying calls */
5331 if (call->state == RX_STATE_DALLY) {
5332 MUTEX_EXIT(&call->lock);
5337 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5338 /* Don't try to send keepalives if there is unacknowledged data */
5339 /* the rexmit code should be good enough, this little hack
5340 * doesn't quite work XXX */
5341 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5343 rxi_ScheduleKeepAliveEvent(call);
5344 MUTEX_EXIT(&call->lock);
5348 void rxi_ScheduleKeepAliveEvent(call)
5349 register struct rx_call *call;
5351 if (!call->keepAliveEvent) {
5353 clock_GetTime(&when);
5354 when.sec += call->conn->secondsUntilPing;
5355 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5356 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5360 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5361 void rxi_KeepAliveOn(call)
5362 register struct rx_call *call;
5364 /* Pretend last packet received was received now--i.e. if another
5365 * packet isn't received within the keep alive time, then the call
5366 * will die; Initialize last send time to the current time--even
5367 * if a packet hasn't been sent yet. This will guarantee that a
5368 * keep-alive is sent within the ping time */
5369 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5370 rxi_ScheduleKeepAliveEvent(call);
5373 /* This routine is called to send connection abort messages
5374 * that have been delayed to throttle looping clients. */
5375 void rxi_SendDelayedConnAbort(event, conn, dummy)
5376 struct rxevent *event;
5377 register struct rx_connection *conn;
5381 struct rx_packet *packet;
5383 MUTEX_ENTER(&conn->conn_data_lock);
5384 conn->delayedAbortEvent = (struct rxevent *) 0;
5385 error = htonl(conn->error);
5387 MUTEX_EXIT(&conn->conn_data_lock);
5388 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5390 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5391 RX_PACKET_TYPE_ABORT, (char *)&error,
5393 rxi_FreePacket(packet);
5397 /* This routine is called to send call abort messages
5398 * that have been delayed to throttle looping clients. */
5399 void rxi_SendDelayedCallAbort(event, call, dummy)
5400 struct rxevent *event;
5401 register struct rx_call *call;
5405 struct rx_packet *packet;
5407 MUTEX_ENTER(&call->lock);
5408 call->delayedAbortEvent = (struct rxevent *) 0;
5409 error = htonl(call->error);
5411 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5413 packet = rxi_SendSpecial(call, call->conn, packet,
5414 RX_PACKET_TYPE_ABORT, (char *)&error,
5416 rxi_FreePacket(packet);
5418 MUTEX_EXIT(&call->lock);
5421 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5422 * seconds) to ask the client to authenticate itself. The routine
5423 * issues a challenge to the client, which is obtained from the
5424 * security object associated with the connection */
5425 void rxi_ChallengeEvent(event, conn, atries)
5426 struct rxevent *event;
5427 register struct rx_connection *conn;
5430 int tries = (int) atries;
5431 conn->challengeEvent = (struct rxevent *) 0;
5432 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5433 register struct rx_packet *packet;
5437 /* We've failed to authenticate for too long.
5438 * Reset any calls waiting for authentication;
5439 * they are all in RX_STATE_PRECALL.
5443 MUTEX_ENTER(&conn->conn_call_lock);
5444 for (i=0; i<RX_MAXCALLS; i++) {
5445 struct rx_call *call = conn->call[i];
5447 MUTEX_ENTER(&call->lock);
5448 if (call->state == RX_STATE_PRECALL) {
5449 rxi_CallError(call, RX_CALL_DEAD);
5450 rxi_SendCallAbort(call, NULL, 0, 0);
5452 MUTEX_EXIT(&call->lock);
5455 MUTEX_EXIT(&conn->conn_call_lock);
5459 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5461 /* If there's no packet available, do this later. */
5462 RXS_GetChallenge(conn->securityObject, conn, packet);
5463 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5464 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5465 rxi_FreePacket(packet);
5467 clock_GetTime(&when);
5468 when.sec += RX_CHALLENGE_TIMEOUT;
5469 conn->challengeEvent =
5470 rxevent_Post(&when, rxi_ChallengeEvent, conn, (void *) (tries-1));
5474 /* Call this routine to start requesting the client to authenticate
5475 * itself. This will continue until authentication is established,
5476 * the call times out, or an invalid response is returned. The
5477 * security object associated with the connection is asked to create
5478 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5479 * defined earlier. */
5480 void rxi_ChallengeOn(conn)
5481 register struct rx_connection *conn;
5483 if (!conn->challengeEvent) {
5484 RXS_CreateChallenge(conn->securityObject, conn);
5485 rxi_ChallengeEvent(NULL, conn, (void *) RX_CHALLENGE_MAXTRIES);
5490 /* Compute round trip time of the packet provided, in *rttp.
5493 /* rxi_ComputeRoundTripTime is called with peer locked. */
5494 void rxi_ComputeRoundTripTime(p, sentp, peer)
5495 register struct clock *sentp; /* may be null */
5496 register struct rx_peer *peer; /* may be null */
5497 register struct rx_packet *p;
5499 struct clock thisRtt, *rttp = &thisRtt;
5501 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5502 /* making year 2038 bugs to get this running now - stroucki */
5503 struct timeval temptime;
5505 register int rtt_timeout;
5507 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5508 /* yet again. This was the worst Heisenbug of the port - stroucki */
5509 clock_GetTime(&temptime);
5510 rttp->sec=(afs_int32)temptime.tv_sec;
5511 rttp->usec=(afs_int32)temptime.tv_usec;
5513 clock_GetTime(rttp);
5515 if (clock_Lt(rttp, sentp)) {
5517 return; /* somebody set the clock back, don't count this time. */
5519 clock_Sub(rttp, sentp);
5520 MUTEX_ENTER(&rx_stats_mutex);
5521 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5522 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5523 if (rttp->sec > 60) {
5524 MUTEX_EXIT(&rx_stats_mutex);
5525 return; /* somebody set the clock ahead */
5527 rx_stats.maxRtt = *rttp;
5529 clock_Add(&rx_stats.totalRtt, rttp);
5530 rx_stats.nRttSamples++;
5531 MUTEX_EXIT(&rx_stats_mutex);
5533 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5535 /* Apply VanJacobson round-trip estimations */
5540 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5541 * srtt is stored as fixed point with 3 bits after the binary
5542 * point (i.e., scaled by 8). The following magic is
5543 * equivalent to the smoothing algorithm in rfc793 with an
5544 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5545 * srtt*8 = srtt*8 + rtt - srtt
5546 * srtt = srtt + rtt/8 - srtt/8
5549 delta = MSEC(rttp) - (peer->rtt >> 3);
5553 * We accumulate a smoothed rtt variance (actually, a smoothed
5554 * mean difference), then set the retransmit timer to smoothed
5555 * rtt + 4 times the smoothed variance (was 2x in van's original
5556 * paper, but 4x works better for me, and apparently for him as
5558 * rttvar is stored as
5559 * fixed point with 2 bits after the binary point (scaled by
5560 * 4). The following is equivalent to rfc793 smoothing with
5561 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5562 * replaces rfc793's wired-in beta.
5563 * dev*4 = dev*4 + (|actual - expected| - dev)
5569 delta -= (peer->rtt_dev >> 2);
5570 peer->rtt_dev += delta;
5573 /* I don't have a stored RTT so I start with this value. Since I'm
5574 * probably just starting a call, and will be pushing more data down
5575 * this, I expect congestion to increase rapidly. So I fudge a
5576 * little, and I set deviance to half the rtt. In practice,
5577 * deviance tends to approach something a little less than
5578 * half the smoothed rtt. */
5579 peer->rtt = (MSEC(rttp) << 3) + 8;
5580 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5582 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5583 * the other of these connections is usually in a user process, and can
5584 * be switched and/or swapped out. So on fast, reliable networks, the
5585 * timeout would otherwise be too short.
5587 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5588 clock_Zero(&(peer->timeout));
5589 clock_Addmsec(&(peer->timeout), rtt_timeout);
5591 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5592 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5593 (peer->timeout.sec),(peer->timeout.usec)) );
5597 /* Find all server connections that have not been active for a long time, and
5599 void rxi_ReapConnections()
5602 clock_GetTime(&now);
5604 /* Find server connection structures that haven't been used for
5605 * greater than rx_idleConnectionTime */
5606 { struct rx_connection **conn_ptr, **conn_end;
5607 int i, havecalls = 0;
5608 MUTEX_ENTER(&rx_connHashTable_lock);
5609 for (conn_ptr = &rx_connHashTable[0],
5610 conn_end = &rx_connHashTable[rx_hashTableSize];
5611 conn_ptr < conn_end; conn_ptr++) {
5612 struct rx_connection *conn, *next;
5613 struct rx_call *call;
5617 for (conn = *conn_ptr; conn; conn = next) {
5618 /* XXX -- Shouldn't the connection be locked? */
5621 for(i=0;i<RX_MAXCALLS;i++) {
5622 call = conn->call[i];
5625 MUTEX_ENTER(&call->lock);
5626 #ifdef RX_ENABLE_LOCKS
5627 result = rxi_CheckCall(call, 1);
5628 #else /* RX_ENABLE_LOCKS */
5629 result = rxi_CheckCall(call);
5630 #endif /* RX_ENABLE_LOCKS */
5631 MUTEX_EXIT(&call->lock);
5633 /* If CheckCall freed the call, it might
5634 * have destroyed the connection as well,
5635 * which screws up the linked lists.
5641 if (conn->type == RX_SERVER_CONNECTION) {
5642 /* This only actually destroys the connection if
5643 * there are no outstanding calls */
5644 MUTEX_ENTER(&conn->conn_data_lock);
5645 if (!havecalls && !conn->refCount &&
5646 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5647 conn->refCount++; /* it will be decr in rx_DestroyConn */
5648 MUTEX_EXIT(&conn->conn_data_lock);
5649 #ifdef RX_ENABLE_LOCKS
5650 rxi_DestroyConnectionNoLock(conn);
5651 #else /* RX_ENABLE_LOCKS */
5652 rxi_DestroyConnection(conn);
5653 #endif /* RX_ENABLE_LOCKS */
5655 #ifdef RX_ENABLE_LOCKS
5657 MUTEX_EXIT(&conn->conn_data_lock);
5659 #endif /* RX_ENABLE_LOCKS */
5663 #ifdef RX_ENABLE_LOCKS
5664 while (rx_connCleanup_list) {
5665 struct rx_connection *conn;
5666 conn = rx_connCleanup_list;
5667 rx_connCleanup_list = rx_connCleanup_list->next;
5668 MUTEX_EXIT(&rx_connHashTable_lock);
5669 rxi_CleanupConnection(conn);
5670 MUTEX_ENTER(&rx_connHashTable_lock);
5672 MUTEX_EXIT(&rx_connHashTable_lock);
5673 #endif /* RX_ENABLE_LOCKS */
5676 /* Find any peer structures that haven't been used (haven't had an
5677 * associated connection) for greater than rx_idlePeerTime */
5678 { struct rx_peer **peer_ptr, **peer_end;
5680 MUTEX_ENTER(&rx_rpc_stats);
5681 MUTEX_ENTER(&rx_peerHashTable_lock);
5682 for (peer_ptr = &rx_peerHashTable[0],
5683 peer_end = &rx_peerHashTable[rx_hashTableSize];
5684 peer_ptr < peer_end; peer_ptr++) {
5685 struct rx_peer *peer, *next, *prev;
5686 for (prev = peer = *peer_ptr; peer; peer = next) {
5688 code = MUTEX_TRYENTER(&peer->peer_lock);
5689 if ((code) && (peer->refCount == 0)
5690 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5691 rx_interface_stat_p rpc_stat, nrpc_stat;
5693 MUTEX_EXIT(&peer->peer_lock);
5694 MUTEX_DESTROY(&peer->peer_lock);
5695 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5696 rx_interface_stat)) {
5697 unsigned int num_funcs;
5698 if (!rpc_stat) break;
5699 queue_Remove(&rpc_stat->queue_header);
5700 queue_Remove(&rpc_stat->all_peers);
5701 num_funcs = rpc_stat->stats[0].func_total;
5702 space = sizeof(rx_interface_stat_t) +
5703 rpc_stat->stats[0].func_total *
5704 sizeof(rx_function_entry_v1_t);
5706 rxi_Free(rpc_stat, space);
5707 rxi_rpc_peer_stat_cnt -= num_funcs;
5710 MUTEX_ENTER(&rx_stats_mutex);
5711 rx_stats.nPeerStructs--;
5712 MUTEX_EXIT(&rx_stats_mutex);
5713 if (prev == *peer_ptr) {
5722 MUTEX_EXIT(&peer->peer_lock);
5728 MUTEX_EXIT(&rx_peerHashTable_lock);
5729 MUTEX_EXIT(&rx_rpc_stats);
5732 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5733 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5734 GC, just below. Really, we shouldn't have to keep moving packets from
5735 one place to another, but instead ought to always know if we can
5736 afford to hold onto a packet in its particular use. */
5737 MUTEX_ENTER(&rx_freePktQ_lock);
5738 if (rx_waitingForPackets) {
5739 rx_waitingForPackets = 0;
5740 #ifdef RX_ENABLE_LOCKS
5741 CV_BROADCAST(&rx_waitingForPackets_cv);
5743 osi_rxWakeup(&rx_waitingForPackets);
5746 MUTEX_EXIT(&rx_freePktQ_lock);
5748 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5749 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5753 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5754 * rx.h is sort of strange this is better. This is called with a security
5755 * object before it is discarded. Each connection using a security object has
5756 * its own refcount to the object so it won't actually be freed until the last
5757 * connection is destroyed.
5759 * This is the only rxs module call. A hold could also be written but no one
5762 int rxs_Release (aobj)
5763 struct rx_securityClass *aobj;
5765 return RXS_Close (aobj);
5769 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5770 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5771 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5772 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5774 /* Adjust our estimate of the transmission rate to this peer, given
5775 * that the packet p was just acked. We can adjust peer->timeout and
5776 * call->twind. Pragmatically, this is called
5777 * only with packets of maximal length.
5778 * Called with peer and call locked.
5781 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5782 register struct rx_peer *peer;
5783 register struct rx_call *call;
5784 struct rx_packet *p, *ackp;
5787 afs_int32 xferSize, xferMs;
5788 register afs_int32 minTime;
5791 /* Count down packets */
5792 if (peer->rateFlag > 0) peer->rateFlag--;
5793 /* Do nothing until we're enabled */
5794 if (peer->rateFlag != 0) return;
5795 if (!call->conn) return;
5797 /* Count only when the ack seems legitimate */
5798 switch (ackReason) {
5799 case RX_ACK_REQUESTED:
5800 xferSize = p->length + RX_HEADER_SIZE +
5801 call->conn->securityMaxTrailerSize;
5805 case RX_ACK_PING_RESPONSE:
5806 if (p) /* want the response to ping-request, not data send */
5808 clock_GetTime(&newTO);
5809 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5810 clock_Sub(&newTO, &call->pingRequestTime);
5811 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5815 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5822 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5823 ntohl(peer->host), ntohs(peer->port),
5824 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5825 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5828 /* Track only packets that are big enough. */
5829 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5833 /* absorb RTT data (in milliseconds) for these big packets */
5834 if (peer->smRtt == 0) {
5835 peer->smRtt = xferMs;
5837 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5838 if (!peer->smRtt) peer->smRtt = 1;
5841 if (peer->countDown) {
5845 peer->countDown = 10; /* recalculate only every so often */
5847 /* In practice, we can measure only the RTT for full packets,
5848 * because of the way Rx acks the data that it receives. (If it's
5849 * smaller than a full packet, it often gets implicitly acked
5850 * either by the call response (from a server) or by the next call
5851 * (from a client), and either case confuses transmission times
5852 * with processing times.) Therefore, replace the above
5853 * more-sophisticated processing with a simpler version, where the
5854 * smoothed RTT is kept for full-size packets, and the time to
5855 * transmit a windowful of full-size packets is simply RTT *
5856 * windowSize. Again, we take two steps:
5857 - ensure the timeout is large enough for a single packet's RTT;
5858 - ensure that the window is small enough to fit in the desired timeout.*/
5860 /* First, the timeout check. */
5861 minTime = peer->smRtt;
5862 /* Get a reasonable estimate for a timeout period */
5864 newTO.sec = minTime / 1000;
5865 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5867 /* Increase the timeout period so that we can always do at least
5868 * one packet exchange */
5869 if (clock_Gt(&newTO, &peer->timeout)) {
5871 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5872 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5873 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5876 peer->timeout = newTO;
5879 /* Now, get an estimate for the transmit window size. */
5880 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5881 /* Now, convert to the number of full packets that could fit in a
5882 * reasonable fraction of that interval */
5883 minTime /= (peer->smRtt << 1);
5884 xferSize = minTime; /* (make a copy) */
5886 /* Now clamp the size to reasonable bounds. */
5887 if (minTime <= 1) minTime = 1;
5888 else if (minTime > rx_Window) minTime = rx_Window;
5889 /* if (minTime != peer->maxWindow) {
5890 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5891 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5892 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5894 peer->maxWindow = minTime;
5895 elide... call->twind = minTime;
5899 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5900 * Discern this by calculating the timeout necessary for rx_Window
5902 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5903 /* calculate estimate for transmission interval in milliseconds */
5904 minTime = rx_Window * peer->smRtt;
5905 if (minTime < 1000) {
5906 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5907 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5908 peer->timeout.usec, peer->smRtt,
5911 newTO.sec = 0; /* cut back on timeout by half a second */
5912 newTO.usec = 500000;
5913 clock_Sub(&peer->timeout, &newTO);
5918 } /* end of rxi_ComputeRate */
5919 #endif /* ADAPT_WINDOW */
5927 /* Don't call this debugging routine directly; use dpf */
5929 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5930 a11, a12, a13, a14, a15)
5934 clock_GetTime(&now);
5935 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5936 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5943 * This function is used to process the rx_stats structure that is local
5944 * to a process as well as an rx_stats structure received from a remote
5945 * process (via rxdebug). Therefore, it needs to do minimal version
5948 void rx_PrintTheseStats (file, s, size, freePackets, version)
5951 int size; /* some idea of version control */
5952 afs_int32 freePackets;
5957 if (size != sizeof(struct rx_stats)) {
5959 "Unexpected size of stats structure: was %d, expected %d\n",
5960 size, sizeof(struct rx_stats));
5964 "rx stats: free packets %d, allocs %d, ",
5968 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5970 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5971 s->receivePktAllocFailures,
5972 s->receiveCbufPktAllocFailures,
5973 s->sendPktAllocFailures,
5974 s->sendCbufPktAllocFailures,
5975 s->specialPktAllocFailures);
5978 "alloc-failures(rcv %d,send %d,ack %d)\n",
5979 s->receivePktAllocFailures,
5980 s->sendPktAllocFailures,
5981 s->specialPktAllocFailures);
5986 "bogusReads %d (last from host %x), "
5992 s->bogusPacketOnRead,
5995 s->noPacketBuffersOnRead,
5999 fprintf(file, " packets read: ");
6000 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
6006 fprintf(file, "\n");
6009 " other read counters: data %d, "
6017 s->spuriousPacketsRead,
6018 s->ignorePacketDally);
6020 fprintf(file, " packets sent: ");
6021 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
6027 fprintf(file, "\n");
6030 " other send counters: ack %d, "
6031 "data %d (not resends), "
6034 "acked&ignored %d\n",
6037 s->dataPacketsReSent,
6038 s->dataPacketsPushed,
6039 s->ignoreAckedPacket);
6042 " \t(these should be small) sendFailed %d, "
6045 (int) s->fatalErrors);
6047 if (s->nRttSamples) {
6049 " Average rtt is %0.3f, with %d samples\n",
6050 clock_Float(&s->totalRtt)/s->nRttSamples,
6054 " Minimum rtt is %0.3f, maximum is %0.3f\n",
6055 clock_Float(&s->minRtt),
6056 clock_Float(&s->maxRtt));
6060 " %d server connections, "
6061 "%d client connections, "
6064 "%d free call structs\n",
6069 s->nFreeCallStructs);
6071 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
6073 " %d clock updates\n",
6079 /* for backward compatibility */
6080 void rx_PrintStats(file)
6083 MUTEX_ENTER(&rx_stats_mutex);
6084 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
6085 MUTEX_EXIT(&rx_stats_mutex);
6088 void rx_PrintPeerStats(file, peer)
6090 struct rx_peer *peer;
6095 "burst wait %u.%d.\n",
6098 (int) peer->burstSize,
6099 (int) peer->burstWait.sec,
6100 (int) peer->burstWait.usec);
6104 "retry time %u.%06d, "
6108 (int) peer->timeout.sec,
6109 (int) peer->timeout.usec,
6115 "max in packet skew %d, "
6116 "max out packet skew %d\n",
6118 (int) peer->inPacketSkew,
6119 (int) peer->outPacketSkew);
6122 #ifdef AFS_PTHREAD_ENV
6124 * This mutex protects the following static variables:
6128 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
6129 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
6131 #define LOCK_RX_DEBUG
6132 #define UNLOCK_RX_DEBUG
6133 #endif /* AFS_PTHREAD_ENV */
6135 static int MakeDebugCall(
6137 afs_uint32 remoteAddr,
6138 afs_uint16 remotePort,
6146 static afs_int32 counter = 100;
6148 struct rx_header theader;
6150 register afs_int32 code;
6152 struct sockaddr_in taddr, faddr;
6157 endTime = time(0) + 20; /* try for 20 seconds */
6161 tp = &tbuffer[sizeof(struct rx_header)];
6162 taddr.sin_family = AF_INET;
6163 taddr.sin_port = remotePort;
6164 taddr.sin_addr.s_addr = remoteAddr;
6165 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
6166 taddr.sin_len = sizeof(struct sockaddr_in);
6169 memset(&theader, 0, sizeof(theader));
6170 theader.epoch = htonl(999);
6172 theader.callNumber = htonl(counter);
6175 theader.type = type;
6176 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
6177 theader.serviceId = 0;
6179 memcpy(tbuffer, &theader, sizeof(theader));
6180 memcpy(tp, inputData, inputLength);
6181 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
6182 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
6184 /* see if there's a packet available */
6186 FD_SET(socket, &imask);
6189 code = select(socket+1, &imask, 0, 0, &tv);
6191 /* now receive a packet */
6192 faddrLen = sizeof(struct sockaddr_in);
6193 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6194 (struct sockaddr *) &faddr, &faddrLen);
6196 memcpy(&theader, tbuffer, sizeof(struct rx_header));
6197 if (counter == ntohl(theader.callNumber)) break;
6200 /* see if we've timed out */
6201 if (endTime < time(0)) return -1;
6203 code -= sizeof(struct rx_header);
6204 if (code > outputLength) code = outputLength;
6205 memcpy(outputData, tp, code);
6209 afs_int32 rx_GetServerDebug(
6211 afs_uint32 remoteAddr,
6212 afs_uint16 remotePort,
6213 struct rx_debugStats *stat,
6214 afs_uint32 *supportedValues
6217 struct rx_debugIn in;
6220 *supportedValues = 0;
6221 in.type = htonl(RX_DEBUGI_GETSTATS);
6224 rc = MakeDebugCall(socket,
6227 RX_PACKET_TYPE_DEBUG,
6234 * If the call was successful, fixup the version and indicate
6235 * what contents of the stat structure are valid.
6236 * Also do net to host conversion of fields here.
6240 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6241 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6243 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6244 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6246 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6247 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6249 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6250 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6252 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6253 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6255 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6256 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6258 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6259 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6262 stat->nFreePackets = ntohl(stat->nFreePackets);
6263 stat->packetReclaims = ntohl(stat->packetReclaims);
6264 stat->callsExecuted = ntohl(stat->callsExecuted);
6265 stat->nWaiting = ntohl(stat->nWaiting);
6266 stat->idleThreads = ntohl(stat->idleThreads);
6272 afs_int32 rx_GetServerStats(
6274 afs_uint32 remoteAddr,
6275 afs_uint16 remotePort,
6276 struct rx_stats *stat,
6277 afs_uint32 *supportedValues
6280 struct rx_debugIn in;
6281 afs_int32 *lp = (afs_int32 *) stat;
6286 * supportedValues is currently unused, but added to allow future
6287 * versioning of this function.
6290 *supportedValues = 0;
6291 in.type = htonl(RX_DEBUGI_RXSTATS);
6293 memset(stat, 0, sizeof(*stat));
6295 rc = MakeDebugCall(socket,
6298 RX_PACKET_TYPE_DEBUG,
6307 * Do net to host conversion here
6310 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6318 afs_int32 rx_GetServerVersion(
6320 afs_uint32 remoteAddr,
6321 afs_uint16 remotePort,
6322 size_t version_length,
6327 return MakeDebugCall(socket,
6330 RX_PACKET_TYPE_VERSION,
6337 afs_int32 rx_GetServerConnections(
6339 afs_uint32 remoteAddr,
6340 afs_uint16 remotePort,
6341 afs_int32 *nextConnection,
6343 afs_uint32 debugSupportedValues,
6344 struct rx_debugConn *conn,
6345 afs_uint32 *supportedValues
6348 struct rx_debugIn in;
6353 * supportedValues is currently unused, but added to allow future
6354 * versioning of this function.
6357 *supportedValues = 0;
6358 if (allConnections) {
6359 in.type = htonl(RX_DEBUGI_GETALLCONN);
6361 in.type = htonl(RX_DEBUGI_GETCONN);
6363 in.index = htonl(*nextConnection);
6364 memset(conn, 0, sizeof(*conn));
6366 rc = MakeDebugCall(socket,
6369 RX_PACKET_TYPE_DEBUG,
6376 *nextConnection += 1;
6379 * Convert old connection format to new structure.
6382 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6383 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6384 #define MOVEvL(a) (conn->a = vL->a)
6386 /* any old or unrecognized version... */
6387 for (i=0;i<RX_MAXCALLS;i++) {
6388 MOVEvL(callState[i]);
6389 MOVEvL(callMode[i]);
6390 MOVEvL(callFlags[i]);
6391 MOVEvL(callOther[i]);
6393 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6394 MOVEvL(secStats.type);
6395 MOVEvL(secStats.level);
6396 MOVEvL(secStats.flags);
6397 MOVEvL(secStats.expires);
6398 MOVEvL(secStats.packetsReceived);
6399 MOVEvL(secStats.packetsSent);
6400 MOVEvL(secStats.bytesReceived);
6401 MOVEvL(secStats.bytesSent);
6406 * Do net to host conversion here
6408 * I don't convert host or port since we are most likely
6409 * going to want these in NBO.
6411 conn->cid = ntohl(conn->cid);
6412 conn->serial = ntohl(conn->serial);
6413 for(i=0;i<RX_MAXCALLS;i++) {
6414 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6416 conn->error = ntohl(conn->error);
6417 conn->secStats.flags = ntohl(conn->secStats.flags);
6418 conn->secStats.expires = ntohl(conn->secStats.expires);
6419 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6420 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6421 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6422 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6423 conn->epoch = ntohl(conn->epoch);
6424 conn->natMTU = ntohl(conn->natMTU);
6430 afs_int32 rx_GetServerPeers(
6432 afs_uint32 remoteAddr,
6433 afs_uint16 remotePort,
6434 afs_int32 *nextPeer,
6435 afs_uint32 debugSupportedValues,
6436 struct rx_debugPeer *peer,
6437 afs_uint32 *supportedValues
6440 struct rx_debugIn in;
6444 * supportedValues is currently unused, but added to allow future
6445 * versioning of this function.
6448 *supportedValues = 0;
6449 in.type = htonl(RX_DEBUGI_GETPEER);
6450 in.index = htonl(*nextPeer);
6451 memset(peer, 0, sizeof(*peer));
6453 rc = MakeDebugCall(socket,
6456 RX_PACKET_TYPE_DEBUG,
6466 * Do net to host conversion here
6468 * I don't convert host or port since we are most likely
6469 * going to want these in NBO.
6471 peer->ifMTU = ntohs(peer->ifMTU);
6472 peer->idleWhen = ntohl(peer->idleWhen);
6473 peer->refCount = ntohs(peer->refCount);
6474 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6475 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6476 peer->rtt = ntohl(peer->rtt);
6477 peer->rtt_dev = ntohl(peer->rtt_dev);
6478 peer->timeout.sec = ntohl(peer->timeout.sec);
6479 peer->timeout.usec = ntohl(peer->timeout.usec);
6480 peer->nSent = ntohl(peer->nSent);
6481 peer->reSends = ntohl(peer->reSends);
6482 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6483 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6484 peer->rateFlag = ntohl(peer->rateFlag);
6485 peer->natMTU = ntohs(peer->natMTU);
6486 peer->maxMTU = ntohs(peer->maxMTU);
6487 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6488 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6489 peer->MTU = ntohs(peer->MTU);
6490 peer->cwind = ntohs(peer->cwind);
6491 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6492 peer->congestSeq = ntohs(peer->congestSeq);
6493 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6494 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6495 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6496 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6501 #endif /* RXDEBUG */
6503 void shutdown_rx(void)
6505 struct rx_serverQueueEntry *np;
6507 register struct rx_call *call;
6508 register struct rx_serverQueueEntry *sq;
6511 if (rxinit_status == 1) {
6513 return; /* Already shutdown. */
6518 #ifndef AFS_PTHREAD_ENV
6519 FD_ZERO(&rx_selectMask);
6520 #endif /* AFS_PTHREAD_ENV */
6521 rxi_dataQuota = RX_MAX_QUOTA;
6522 #ifndef AFS_PTHREAD_ENV
6524 #endif /* AFS_PTHREAD_ENV */
6527 #ifndef AFS_PTHREAD_ENV
6528 #ifndef AFS_USE_GETTIMEOFDAY
6530 #endif /* AFS_USE_GETTIMEOFDAY */
6531 #endif /* AFS_PTHREAD_ENV */
6533 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6534 call = queue_First(&rx_freeCallQueue, rx_call);
6536 rxi_Free(call, sizeof(struct rx_call));
6539 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6540 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6546 struct rx_peer **peer_ptr, **peer_end;
6547 for (peer_ptr = &rx_peerHashTable[0],
6548 peer_end = &rx_peerHashTable[rx_hashTableSize];
6549 peer_ptr < peer_end; peer_ptr++) {
6550 struct rx_peer *peer, *next;
6551 for (peer = *peer_ptr; peer; peer = next) {
6552 rx_interface_stat_p rpc_stat, nrpc_stat;
6554 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6555 rx_interface_stat)) {
6556 unsigned int num_funcs;
6557 if (!rpc_stat) break;
6558 queue_Remove(&rpc_stat->queue_header);
6559 queue_Remove(&rpc_stat->all_peers);
6560 num_funcs = rpc_stat->stats[0].func_total;
6561 space = sizeof(rx_interface_stat_t) +
6562 rpc_stat->stats[0].func_total *
6563 sizeof(rx_function_entry_v1_t);
6565 rxi_Free(rpc_stat, space);
6566 MUTEX_ENTER(&rx_rpc_stats);
6567 rxi_rpc_peer_stat_cnt -= num_funcs;
6568 MUTEX_EXIT(&rx_rpc_stats);
6572 MUTEX_ENTER(&rx_stats_mutex);
6573 rx_stats.nPeerStructs--;
6574 MUTEX_EXIT(&rx_stats_mutex);
6578 for (i = 0; i<RX_MAX_SERVICES; i++) {
6580 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6582 for (i = 0; i < rx_hashTableSize; i++) {
6583 register struct rx_connection *tc, *ntc;
6584 MUTEX_ENTER(&rx_connHashTable_lock);
6585 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6587 for (j = 0; j < RX_MAXCALLS; j++) {
6589 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6592 rxi_Free(tc, sizeof(*tc));
6594 MUTEX_EXIT(&rx_connHashTable_lock);
6597 MUTEX_ENTER(&freeSQEList_lock);
6599 while ((np = rx_FreeSQEList)) {
6600 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6601 MUTEX_DESTROY(&np->lock);
6602 rxi_Free(np, sizeof(*np));
6605 MUTEX_EXIT(&freeSQEList_lock);
6606 MUTEX_DESTROY(&freeSQEList_lock);
6607 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6608 MUTEX_DESTROY(&rx_connHashTable_lock);
6609 MUTEX_DESTROY(&rx_peerHashTable_lock);
6610 MUTEX_DESTROY(&rx_serverPool_lock);
6612 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6613 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6615 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6616 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6618 rxi_FreeAllPackets();
6620 MUTEX_ENTER(&rx_stats_mutex);
6621 rxi_dataQuota = RX_MAX_QUOTA;
6622 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6623 MUTEX_EXIT(&rx_stats_mutex);
6629 #ifdef RX_ENABLE_LOCKS
6630 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6632 if (!MUTEX_ISMINE(lockaddr))
6633 osi_Panic("Lock not held: %s", msg);
6635 #endif /* RX_ENABLE_LOCKS */
6640 * Routines to implement connection specific data.
6643 int rx_KeyCreate(rx_destructor_t rtn)
6646 MUTEX_ENTER(&rxi_keyCreate_lock);
6647 key = rxi_keyCreate_counter++;
6648 rxi_keyCreate_destructor = (rx_destructor_t *)
6649 realloc((void *)rxi_keyCreate_destructor,
6650 (key+1) * sizeof(rx_destructor_t));
6651 rxi_keyCreate_destructor[key] = rtn;
6652 MUTEX_EXIT(&rxi_keyCreate_lock);
6656 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6659 MUTEX_ENTER(&conn->conn_data_lock);
6660 if (!conn->specific) {
6661 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6662 for (i = 0 ; i < key ; i++)
6663 conn->specific[i] = NULL;
6664 conn->nSpecific = key+1;
6665 conn->specific[key] = ptr;
6666 } else if (key >= conn->nSpecific) {
6667 conn->specific = (void **)
6668 realloc(conn->specific,(key+1)*sizeof(void *));
6669 for (i = conn->nSpecific ; i < key ; i++)
6670 conn->specific[i] = NULL;
6671 conn->nSpecific = key+1;
6672 conn->specific[key] = ptr;
6674 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6675 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6676 conn->specific[key] = ptr;
6678 MUTEX_EXIT(&conn->conn_data_lock);
6681 void *rx_GetSpecific(struct rx_connection *conn, int key)
6684 MUTEX_ENTER(&conn->conn_data_lock);
6685 if (key >= conn->nSpecific)
6688 ptr = conn->specific[key];
6689 MUTEX_EXIT(&conn->conn_data_lock);
6693 #endif /* !KERNEL */
6696 * processStats is a queue used to store the statistics for the local
6697 * process. Its contents are similar to the contents of the rpcStats
6698 * queue on a rx_peer structure, but the actual data stored within
6699 * this queue contains totals across the lifetime of the process (assuming
6700 * the stats have not been reset) - unlike the per peer structures
6701 * which can come and go based upon the peer lifetime.
6704 static struct rx_queue processStats = {&processStats,&processStats};
6707 * peerStats is a queue used to store the statistics for all peer structs.
6708 * Its contents are the union of all the peer rpcStats queues.
6711 static struct rx_queue peerStats = {&peerStats,&peerStats};
6714 * rxi_monitor_processStats is used to turn process wide stat collection
6718 static int rxi_monitor_processStats = 0;
6721 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6724 static int rxi_monitor_peerStats = 0;
6727 * rxi_AddRpcStat - given all of the information for a particular rpc
6728 * call, create (if needed) and update the stat totals for the rpc.
6732 * IN stats - the queue of stats that will be updated with the new value
6734 * IN rxInterface - a unique number that identifies the rpc interface
6736 * IN currentFunc - the index of the function being invoked
6738 * IN totalFunc - the total number of functions in this interface
6740 * IN queueTime - the amount of time this function waited for a thread
6742 * IN execTime - the amount of time this function invocation took to execute
6744 * IN bytesSent - the number bytes sent by this invocation
6746 * IN bytesRcvd - the number bytes received by this invocation
6748 * IN isServer - if true, this invocation was made to a server
6750 * IN remoteHost - the ip address of the remote host
6752 * IN remotePort - the port of the remote host
6754 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6756 * INOUT counter - if a new stats structure is allocated, the counter will
6757 * be updated with the new number of allocated stat structures
6764 static int rxi_AddRpcStat(
6765 struct rx_queue *stats,
6766 afs_uint32 rxInterface,
6767 afs_uint32 currentFunc,
6768 afs_uint32 totalFunc,
6769 struct clock *queueTime,
6770 struct clock *execTime,
6771 afs_hyper_t *bytesSent,
6772 afs_hyper_t *bytesRcvd,
6774 afs_uint32 remoteHost,
6775 afs_uint32 remotePort,
6777 unsigned int *counter)
6780 rx_interface_stat_p rpc_stat, nrpc_stat;
6783 * See if there's already a structure for this interface
6786 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6787 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6788 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6792 * Didn't find a match so allocate a new structure and add it to the
6796 if (queue_IsEnd(stats, rpc_stat) ||
6797 (rpc_stat == NULL) ||
6798 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6799 (rpc_stat->stats[0].remote_is_server != isServer)) {
6803 space = sizeof(rx_interface_stat_t) + totalFunc *
6804 sizeof(rx_function_entry_v1_t);
6806 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6807 if (rpc_stat == NULL) {
6811 *counter += totalFunc;
6812 for(i=0;i<totalFunc;i++) {
6813 rpc_stat->stats[i].remote_peer = remoteHost;
6814 rpc_stat->stats[i].remote_port = remotePort;
6815 rpc_stat->stats[i].remote_is_server = isServer;
6816 rpc_stat->stats[i].interfaceId = rxInterface;
6817 rpc_stat->stats[i].func_total = totalFunc;
6818 rpc_stat->stats[i].func_index = i;
6819 hzero(rpc_stat->stats[i].invocations);
6820 hzero(rpc_stat->stats[i].bytes_sent);
6821 hzero(rpc_stat->stats[i].bytes_rcvd);
6822 rpc_stat->stats[i].queue_time_sum.sec = 0;
6823 rpc_stat->stats[i].queue_time_sum.usec = 0;
6824 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6825 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6826 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6827 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6828 rpc_stat->stats[i].queue_time_max.sec = 0;
6829 rpc_stat->stats[i].queue_time_max.usec = 0;
6830 rpc_stat->stats[i].execution_time_sum.sec = 0;
6831 rpc_stat->stats[i].execution_time_sum.usec = 0;
6832 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6833 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6834 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6835 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6836 rpc_stat->stats[i].execution_time_max.sec = 0;
6837 rpc_stat->stats[i].execution_time_max.usec = 0;
6839 queue_Prepend(stats, rpc_stat);
6840 if (addToPeerList) {
6841 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6846 * Increment the stats for this function
6849 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6850 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6851 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6852 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6853 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6854 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6855 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6857 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6858 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6860 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6861 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6862 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6863 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6865 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6866 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6874 * rx_IncrementTimeAndCount - increment the times and count for a particular
6879 * IN peer - the peer who invoked the rpc
6881 * IN rxInterface - a unique number that identifies the rpc interface
6883 * IN currentFunc - the index of the function being invoked
6885 * IN totalFunc - the total number of functions in this interface
6887 * IN queueTime - the amount of time this function waited for a thread
6889 * IN execTime - the amount of time this function invocation took to execute
6891 * IN bytesSent - the number bytes sent by this invocation
6893 * IN bytesRcvd - the number bytes received by this invocation
6895 * IN isServer - if true, this invocation was made to a server
6902 void rx_IncrementTimeAndCount(
6903 struct rx_peer *peer,
6904 afs_uint32 rxInterface,
6905 afs_uint32 currentFunc,
6906 afs_uint32 totalFunc,
6907 struct clock *queueTime,
6908 struct clock *execTime,
6909 afs_hyper_t *bytesSent,
6910 afs_hyper_t *bytesRcvd,
6914 MUTEX_ENTER(&rx_rpc_stats);
6915 MUTEX_ENTER(&peer->peer_lock);
6917 if (rxi_monitor_peerStats) {
6918 rxi_AddRpcStat(&peer->rpcStats,
6930 &rxi_rpc_peer_stat_cnt);
6933 if (rxi_monitor_processStats) {
6934 rxi_AddRpcStat(&processStats,
6946 &rxi_rpc_process_stat_cnt);
6949 MUTEX_EXIT(&peer->peer_lock);
6950 MUTEX_EXIT(&rx_rpc_stats);
6955 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6959 * IN callerVersion - the rpc stat version of the caller.
6961 * IN count - the number of entries to marshall.
6963 * IN stats - pointer to stats to be marshalled.
6965 * OUT ptr - Where to store the marshalled data.
6971 void rx_MarshallProcessRPCStats(
6972 afs_uint32 callerVersion,
6974 rx_function_entry_v1_t *stats,
6981 * We only support the first version
6983 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6984 *(ptr++) = stats->remote_peer;
6985 *(ptr++) = stats->remote_port;
6986 *(ptr++) = stats->remote_is_server;
6987 *(ptr++) = stats->interfaceId;
6988 *(ptr++) = stats->func_total;
6989 *(ptr++) = stats->func_index;
6990 *(ptr++) = hgethi(stats->invocations);
6991 *(ptr++) = hgetlo(stats->invocations);
6992 *(ptr++) = hgethi(stats->bytes_sent);
6993 *(ptr++) = hgetlo(stats->bytes_sent);
6994 *(ptr++) = hgethi(stats->bytes_rcvd);
6995 *(ptr++) = hgetlo(stats->bytes_rcvd);
6996 *(ptr++) = stats->queue_time_sum.sec;
6997 *(ptr++) = stats->queue_time_sum.usec;
6998 *(ptr++) = stats->queue_time_sum_sqr.sec;
6999 *(ptr++) = stats->queue_time_sum_sqr.usec;
7000 *(ptr++) = stats->queue_time_min.sec;
7001 *(ptr++) = stats->queue_time_min.usec;
7002 *(ptr++) = stats->queue_time_max.sec;
7003 *(ptr++) = stats->queue_time_max.usec;
7004 *(ptr++) = stats->execution_time_sum.sec;
7005 *(ptr++) = stats->execution_time_sum.usec;
7006 *(ptr++) = stats->execution_time_sum_sqr.sec;
7007 *(ptr++) = stats->execution_time_sum_sqr.usec;
7008 *(ptr++) = stats->execution_time_min.sec;
7009 *(ptr++) = stats->execution_time_min.usec;
7010 *(ptr++) = stats->execution_time_max.sec;
7011 *(ptr++) = stats->execution_time_max.usec;
7017 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
7022 * IN callerVersion - the rpc stat version of the caller
7024 * OUT myVersion - the rpc stat version of this function
7026 * OUT clock_sec - local time seconds
7028 * OUT clock_usec - local time microseconds
7030 * OUT allocSize - the number of bytes allocated to contain stats
7032 * OUT statCount - the number stats retrieved from this process.
7034 * OUT stats - the actual stats retrieved from this process.
7038 * Returns void. If successful, stats will != NULL.
7041 int rx_RetrieveProcessRPCStats(
7042 afs_uint32 callerVersion,
7043 afs_uint32 *myVersion,
7044 afs_uint32 *clock_sec,
7045 afs_uint32 *clock_usec,
7047 afs_uint32 *statCount,
7058 *myVersion = RX_STATS_RETRIEVAL_VERSION;
7061 * Check to see if stats are enabled
7064 MUTEX_ENTER(&rx_rpc_stats);
7065 if (!rxi_monitor_processStats) {
7066 MUTEX_EXIT(&rx_rpc_stats);
7070 clock_GetTime(&now);
7071 *clock_sec = now.sec;
7072 *clock_usec = now.usec;
7075 * Allocate the space based upon the caller version
7077 * If the client is at an older version than we are,
7078 * we return the statistic data in the older data format, but
7079 * we still return our version number so the client knows we
7080 * are maintaining more data than it can retrieve.
7083 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7084 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
7085 *statCount = rxi_rpc_process_stat_cnt;
7088 * This can't happen yet, but in the future version changes
7089 * can be handled by adding additional code here
7093 if (space > (size_t) 0) {
7095 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7098 rx_interface_stat_p rpc_stat, nrpc_stat;
7101 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
7102 rx_interface_stat)) {
7104 * Copy the data based upon the caller version
7106 rx_MarshallProcessRPCStats(callerVersion,
7107 rpc_stat->stats[0].func_total,
7108 rpc_stat->stats, &ptr);
7114 MUTEX_EXIT(&rx_rpc_stats);
7119 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
7123 * IN callerVersion - the rpc stat version of the caller
7125 * OUT myVersion - the rpc stat version of this function
7127 * OUT clock_sec - local time seconds
7129 * OUT clock_usec - local time microseconds
7131 * OUT allocSize - the number of bytes allocated to contain stats
7133 * OUT statCount - the number of stats retrieved from the individual
7136 * OUT stats - the actual stats retrieved from the individual peer structures.
7140 * Returns void. If successful, stats will != NULL.
7143 int rx_RetrievePeerRPCStats(
7144 afs_uint32 callerVersion,
7145 afs_uint32 *myVersion,
7146 afs_uint32 *clock_sec,
7147 afs_uint32 *clock_usec,
7149 afs_uint32 *statCount,
7160 *myVersion = RX_STATS_RETRIEVAL_VERSION;
7163 * Check to see if stats are enabled
7166 MUTEX_ENTER(&rx_rpc_stats);
7167 if (!rxi_monitor_peerStats) {
7168 MUTEX_EXIT(&rx_rpc_stats);
7172 clock_GetTime(&now);
7173 *clock_sec = now.sec;
7174 *clock_usec = now.usec;
7177 * Allocate the space based upon the caller version
7179 * If the client is at an older version than we are,
7180 * we return the statistic data in the older data format, but
7181 * we still return our version number so the client knows we
7182 * are maintaining more data than it can retrieve.
7185 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
7186 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
7187 *statCount = rxi_rpc_peer_stat_cnt;
7190 * This can't happen yet, but in the future version changes
7191 * can be handled by adding additional code here
7195 if (space > (size_t) 0) {
7197 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7200 rx_interface_stat_p rpc_stat, nrpc_stat;
7203 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7204 rx_interface_stat)) {
7206 * We have to fix the offset of rpc_stat since we are
7207 * keeping this structure on two rx_queues. The rx_queue
7208 * package assumes that the rx_queue member is the first
7209 * member of the structure. That is, rx_queue assumes that
7210 * any one item is only on one queue at a time. We are
7211 * breaking that assumption and so we have to do a little
7212 * math to fix our pointers.
7215 fix_offset = (char *) rpc_stat;
7216 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7217 rpc_stat = (rx_interface_stat_p) fix_offset;
7220 * Copy the data based upon the caller version
7222 rx_MarshallProcessRPCStats(callerVersion,
7223 rpc_stat->stats[0].func_total,
7224 rpc_stat->stats, &ptr);
7230 MUTEX_EXIT(&rx_rpc_stats);
7235 * rx_FreeRPCStats - free memory allocated by
7236 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7240 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7241 * rx_RetrievePeerRPCStats
7243 * IN allocSize - the number of bytes in stats.
7250 void rx_FreeRPCStats(
7254 rxi_Free(stats, allocSize);
7258 * rx_queryProcessRPCStats - see if process rpc stat collection is
7259 * currently enabled.
7265 * Returns 0 if stats are not enabled != 0 otherwise
7268 int rx_queryProcessRPCStats()
7271 MUTEX_ENTER(&rx_rpc_stats);
7272 rc = rxi_monitor_processStats;
7273 MUTEX_EXIT(&rx_rpc_stats);
7278 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7284 * Returns 0 if stats are not enabled != 0 otherwise
7287 int rx_queryPeerRPCStats()
7290 MUTEX_ENTER(&rx_rpc_stats);
7291 rc = rxi_monitor_peerStats;
7292 MUTEX_EXIT(&rx_rpc_stats);
7297 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7306 void rx_enableProcessRPCStats()
7308 MUTEX_ENTER(&rx_rpc_stats);
7309 rx_enable_stats = 1;
7310 rxi_monitor_processStats = 1;
7311 MUTEX_EXIT(&rx_rpc_stats);
7315 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7324 void rx_enablePeerRPCStats()
7326 MUTEX_ENTER(&rx_rpc_stats);
7327 rx_enable_stats = 1;
7328 rxi_monitor_peerStats = 1;
7329 MUTEX_EXIT(&rx_rpc_stats);
7333 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7342 void rx_disableProcessRPCStats()
7344 rx_interface_stat_p rpc_stat, nrpc_stat;
7347 MUTEX_ENTER(&rx_rpc_stats);
7350 * Turn off process statistics and if peer stats is also off, turn
7354 rxi_monitor_processStats = 0;
7355 if (rxi_monitor_peerStats == 0) {
7356 rx_enable_stats = 0;
7359 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7360 unsigned int num_funcs = 0;
7361 if (!rpc_stat) break;
7362 queue_Remove(rpc_stat);
7363 num_funcs = rpc_stat->stats[0].func_total;
7364 space = sizeof(rx_interface_stat_t) +
7365 rpc_stat->stats[0].func_total *
7366 sizeof(rx_function_entry_v1_t);
7368 rxi_Free(rpc_stat, space);
7369 rxi_rpc_process_stat_cnt -= num_funcs;
7371 MUTEX_EXIT(&rx_rpc_stats);
7375 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7384 void rx_disablePeerRPCStats()
7386 struct rx_peer **peer_ptr, **peer_end;
7389 MUTEX_ENTER(&rx_rpc_stats);
7392 * Turn off peer statistics and if process stats is also off, turn
7396 rxi_monitor_peerStats = 0;
7397 if (rxi_monitor_processStats == 0) {
7398 rx_enable_stats = 0;
7401 MUTEX_ENTER(&rx_peerHashTable_lock);
7402 for (peer_ptr = &rx_peerHashTable[0],
7403 peer_end = &rx_peerHashTable[rx_hashTableSize];
7404 peer_ptr < peer_end; peer_ptr++) {
7405 struct rx_peer *peer, *next, *prev;
7406 for (prev = peer = *peer_ptr; peer; peer = next) {
7408 code = MUTEX_TRYENTER(&peer->peer_lock);
7410 rx_interface_stat_p rpc_stat, nrpc_stat;
7412 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7413 rx_interface_stat)) {
7414 unsigned int num_funcs = 0;
7415 if (!rpc_stat) break;
7416 queue_Remove(&rpc_stat->queue_header);
7417 queue_Remove(&rpc_stat->all_peers);
7418 num_funcs = rpc_stat->stats[0].func_total;
7419 space = sizeof(rx_interface_stat_t) +
7420 rpc_stat->stats[0].func_total *
7421 sizeof(rx_function_entry_v1_t);
7423 rxi_Free(rpc_stat, space);
7424 rxi_rpc_peer_stat_cnt -= num_funcs;
7426 MUTEX_EXIT(&peer->peer_lock);
7427 if (prev == *peer_ptr) {
7439 MUTEX_EXIT(&rx_peerHashTable_lock);
7440 MUTEX_EXIT(&rx_rpc_stats);
7444 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7449 * IN clearFlag - flag indicating which stats to clear
7456 void rx_clearProcessRPCStats(
7457 afs_uint32 clearFlag)
7459 rx_interface_stat_p rpc_stat, nrpc_stat;
7461 MUTEX_ENTER(&rx_rpc_stats);
7463 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7464 unsigned int num_funcs = 0, i;
7465 num_funcs = rpc_stat->stats[0].func_total;
7466 for(i=0;i<num_funcs;i++) {
7467 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7468 hzero(rpc_stat->stats[i].invocations);
7470 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7471 hzero(rpc_stat->stats[i].bytes_sent);
7473 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7474 hzero(rpc_stat->stats[i].bytes_rcvd);
7476 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7477 rpc_stat->stats[i].queue_time_sum.sec = 0;
7478 rpc_stat->stats[i].queue_time_sum.usec = 0;
7480 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7481 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7482 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7484 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7485 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7486 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7488 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7489 rpc_stat->stats[i].queue_time_max.sec = 0;
7490 rpc_stat->stats[i].queue_time_max.usec = 0;
7492 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7493 rpc_stat->stats[i].execution_time_sum.sec = 0;
7494 rpc_stat->stats[i].execution_time_sum.usec = 0;
7496 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7497 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7498 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7500 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7501 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7502 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7504 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7505 rpc_stat->stats[i].execution_time_max.sec = 0;
7506 rpc_stat->stats[i].execution_time_max.usec = 0;
7511 MUTEX_EXIT(&rx_rpc_stats);
7515 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7520 * IN clearFlag - flag indicating which stats to clear
7527 void rx_clearPeerRPCStats(
7528 afs_uint32 clearFlag)
7530 rx_interface_stat_p rpc_stat, nrpc_stat;
7532 MUTEX_ENTER(&rx_rpc_stats);
7534 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7535 unsigned int num_funcs = 0, i;
7538 * We have to fix the offset of rpc_stat since we are
7539 * keeping this structure on two rx_queues. The rx_queue
7540 * package assumes that the rx_queue member is the first
7541 * member of the structure. That is, rx_queue assumes that
7542 * any one item is only on one queue at a time. We are
7543 * breaking that assumption and so we have to do a little
7544 * math to fix our pointers.
7547 fix_offset = (char *) rpc_stat;
7548 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7549 rpc_stat = (rx_interface_stat_p) fix_offset;
7551 num_funcs = rpc_stat->stats[0].func_total;
7552 for(i=0;i<num_funcs;i++) {
7553 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7554 hzero(rpc_stat->stats[i].invocations);
7556 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7557 hzero(rpc_stat->stats[i].bytes_sent);
7559 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7560 hzero(rpc_stat->stats[i].bytes_rcvd);
7562 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7563 rpc_stat->stats[i].queue_time_sum.sec = 0;
7564 rpc_stat->stats[i].queue_time_sum.usec = 0;
7566 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7567 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7568 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7570 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7571 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7572 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7574 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7575 rpc_stat->stats[i].queue_time_max.sec = 0;
7576 rpc_stat->stats[i].queue_time_max.usec = 0;
7578 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7579 rpc_stat->stats[i].execution_time_sum.sec = 0;
7580 rpc_stat->stats[i].execution_time_sum.usec = 0;
7582 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7583 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7584 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7586 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7587 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7588 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7590 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7591 rpc_stat->stats[i].execution_time_max.sec = 0;
7592 rpc_stat->stats[i].execution_time_max.usec = 0;
7597 MUTEX_EXIT(&rx_rpc_stats);
7601 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7602 * is authorized to enable/disable/clear RX statistics.
7604 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7606 void rx_SetRxStatUserOk(
7607 int (*proc)(struct rx_call *call))
7609 rxi_rxstat_userok = proc;
7612 int rx_RxStatUserOk(
7613 struct rx_call *call)
7615 if (!rxi_rxstat_userok)
7617 return rxi_rxstat_userok(call);