2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
13 #include "../afs/param.h"
14 #include <afsconfig.h>
15 #include "../afs/sysincludes.h"
16 #include "../afs/afsincludes.h"
18 #include "../h/types.h"
19 #include "../h/time.h"
20 #include "../h/stat.h"
22 #include <net/net_globals.h>
23 #endif /* AFS_OSF_ENV */
24 #ifdef AFS_LINUX20_ENV
25 #include "../h/socket.h"
27 #include "../netinet/in.h"
28 #include "../afs/afs_args.h"
29 #include "../afs/afs_osi.h"
30 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
31 #include "../h/systm.h"
34 #undef RXDEBUG /* turn off debugging */
36 #if defined(AFS_SGI_ENV)
37 #include "../sys/debug.h"
39 #include "../afsint/afsint.h"
46 #endif /* AFS_ALPHA_ENV */
48 #include "../afs/sysincludes.h"
49 #include "../afs/afsincludes.h"
51 #include "../afs/lock.h"
52 #include "../rx/rx_kmutex.h"
53 #include "../rx/rx_kernel.h"
54 #include "../rx/rx_clock.h"
55 #include "../rx/rx_queue.h"
57 #include "../rx/rx_globals.h"
58 #include "../rx/rx_trace.h"
59 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
60 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
61 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
62 #include "../afsint/afsint.h"
63 extern afs_int32 afs_termState;
65 #include "sys/lockl.h"
66 #include "sys/lock_def.h"
67 #endif /* AFS_AIX41_ENV */
68 # include "../afsint/rxgen_consts.h"
70 # include <afs/param.h>
71 # include <afsconfig.h>
72 # include <sys/types.h>
79 # include <sys/socket.h>
80 # include <sys/file.h>
82 # include <sys/stat.h>
83 # include <netinet/in.h>
84 # include <sys/time.h>
95 # include "rx_clock.h"
96 # include "rx_queue.h"
97 # include "rx_globals.h"
98 # include "rx_trace.h"
99 # include "rx_internal.h"
100 # include <afs/rxgen_consts.h>
103 int (*registerProgram)() = 0;
104 int (*swapNameProgram)() = 0;
106 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
108 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
109 afs_int32 rxi_start_in_error;
111 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
114 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
115 * currently allocated within rx. This number is used to allocate the
116 * memory required to return the statistics when queried.
119 static unsigned int rxi_rpc_peer_stat_cnt;
122 * rxi_rpc_process_stat_cnt counts the total number of local process stat
123 * structures currently allocated within rx. The number is used to allocate
124 * the memory required to return the statistics when queried.
127 static unsigned int rxi_rpc_process_stat_cnt;
129 #if !defined(offsetof)
130 #include <stddef.h> /* for definition of offsetof() */
133 #ifdef AFS_PTHREAD_ENV
137 * Use procedural initialization of mutexes/condition variables
141 extern pthread_mutex_t rxkad_stats_mutex;
142 extern pthread_mutex_t des_init_mutex;
143 extern pthread_mutex_t des_random_mutex;
144 extern pthread_mutex_t rx_clock_mutex;
145 extern pthread_mutex_t rxi_connCacheMutex;
146 extern pthread_mutex_t rx_event_mutex;
147 extern pthread_mutex_t osi_malloc_mutex;
148 extern pthread_mutex_t event_handler_mutex;
149 extern pthread_mutex_t listener_mutex;
150 extern pthread_mutex_t rx_if_init_mutex;
151 extern pthread_mutex_t rx_if_mutex;
152 extern pthread_mutex_t rxkad_client_uid_mutex;
153 extern pthread_mutex_t rxkad_random_mutex;
155 extern pthread_cond_t rx_event_handler_cond;
156 extern pthread_cond_t rx_listener_cond;
158 static pthread_mutex_t epoch_mutex;
159 static pthread_mutex_t rx_init_mutex;
160 static pthread_mutex_t rx_debug_mutex;
162 static void rxi_InitPthread(void) {
163 assert(pthread_mutex_init(&rx_clock_mutex,
164 (const pthread_mutexattr_t*)0)==0);
165 assert(pthread_mutex_init(&rxi_connCacheMutex,
166 (const pthread_mutexattr_t*)0)==0);
167 assert(pthread_mutex_init(&rx_init_mutex,
168 (const pthread_mutexattr_t*)0)==0);
169 assert(pthread_mutex_init(&epoch_mutex,
170 (const pthread_mutexattr_t*)0)==0);
171 assert(pthread_mutex_init(&rx_event_mutex,
172 (const pthread_mutexattr_t*)0)==0);
173 assert(pthread_mutex_init(&des_init_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&des_random_mutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&osi_malloc_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&event_handler_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&listener_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&rx_if_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&rx_if_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&rxkad_random_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&rxkad_stats_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_debug_mutex,
194 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_cond_init(&rx_event_handler_cond,
197 (const pthread_condattr_t*)0)==0);
198 assert(pthread_cond_init(&rx_listener_cond,
199 (const pthread_condattr_t*)0)==0);
200 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
203 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
204 #define INIT_PTHREAD_LOCKS \
205 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
207 * The rx_stats_mutex mutex protects the following global variables:
212 * rxi_lowConnRefCount
213 * rxi_lowPeerRefCount
222 #define INIT_PTHREAD_LOCKS
226 /* Variables for handling the minProcs implementation. availProcs gives the
227 * number of threads available in the pool at this moment (not counting dudes
228 * executing right now). totalMin gives the total number of procs required
229 * for handling all minProcs requests. minDeficit is a dynamic variable
230 * tracking the # of procs required to satisfy all of the remaining minProcs
232 * For fine grain locking to work, the quota check and the reservation of
233 * a server thread has to come while rxi_availProcs and rxi_minDeficit
234 * are locked. To this end, the code has been modified under #ifdef
235 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
236 * same time. A new function, ReturnToServerPool() returns the allocation.
238 * A call can be on several queue's (but only one at a time). When
239 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
240 * that no one else is touching the queue. To this end, we store the address
241 * of the queue lock in the call structure (under the call lock) when we
242 * put the call on a queue, and we clear the call_queue_lock when the
243 * call is removed from a queue (once the call lock has been obtained).
244 * This allows rxi_ResetCall to safely synchronize with others wishing
245 * to manipulate the queue.
248 #ifdef RX_ENABLE_LOCKS
249 static int rxi_ServerThreadSelectingCall;
250 static afs_kmutex_t rx_rpc_stats;
251 void rxi_StartUnlocked();
254 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
255 ** pretty good that the next packet coming in is from the same connection
256 ** as the last packet, since we're send multiple packets in a transmit window.
258 struct rx_connection *rxLastConn = 0;
260 #ifdef RX_ENABLE_LOCKS
261 /* The locking hierarchy for rx fine grain locking is composed of five
263 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
264 * call->lock - locks call data fields.
265 * Most any other lock - these are all independent of each other.....
267 * rx_freeCallQueue_lock
269 * rx_connHashTable_lock
272 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
275 * peer_lock - locks peer data fields.
276 * conn_data_lock - that more than one thread is not updating a conn data
277 * field at the same time.
278 * Do we need a lock to protect the peer field in the conn structure?
279 * conn->peer was previously a constant for all intents and so has no
280 * lock protecting this field. The multihomed client delta introduced
281 * a RX code change : change the peer field in the connection structure
282 * to that remote inetrface from which the last packet for this
283 * connection was sent out. This may become an issue if further changes
286 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
287 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
289 /* rxdb_fileID is used to identify the lock location, along with line#. */
290 static int rxdb_fileID = RXDB_FILE_RX;
291 #endif /* RX_LOCKS_DB */
292 static void rxi_SetAcksInTransmitQueue();
293 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
294 #else /* RX_ENABLE_LOCKS */
295 #define SET_CALL_QUEUE_LOCK(C, L)
296 #define CLEAR_CALL_QUEUE_LOCK(C)
297 #endif /* RX_ENABLE_LOCKS */
298 static void rxi_DestroyConnectionNoLock();
299 struct rx_serverQueueEntry *rx_waitForPacket = 0;
301 /* ------------Exported Interfaces------------- */
303 /* This function allows rxkad to set the epoch to a suitably random number
304 * which rx_NewConnection will use in the future. The principle purpose is to
305 * get rxnull connections to use the same epoch as the rxkad connections do, at
306 * least once the first rxkad connection is established. This is important now
307 * that the host/port addresses aren't used in FindConnection: the uniqueness
308 * of epoch/cid matters and the start time won't do. */
310 #ifdef AFS_PTHREAD_ENV
312 * This mutex protects the following global variables:
316 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
317 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
321 #endif /* AFS_PTHREAD_ENV */
323 void rx_SetEpoch (epoch)
331 /* Initialize rx. A port number may be mentioned, in which case this
332 * becomes the default port number for any service installed later.
333 * If 0 is provided for the port number, a random port will be chosen
334 * by the kernel. Whether this will ever overlap anything in
335 * /etc/services is anybody's guess... Returns 0 on success, -1 on
337 static int rxinit_status = 1;
338 #ifdef AFS_PTHREAD_ENV
340 * This mutex protects the following global variables:
344 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
345 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
348 #define UNLOCK_RX_INIT
351 int rx_Init(u_int port)
358 char *htable, *ptable;
361 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
362 __djgpp_set_quiet_socket(1);
369 if (rxinit_status == 0) {
370 tmp_status = rxinit_status;
372 return tmp_status; /* Already started; return previous error code. */
376 if (afs_winsockInit()<0)
382 * Initialize anything necessary to provide a non-premptive threading
385 rxi_InitializeThreadSupport();
388 /* Allocate and initialize a socket for client and perhaps server
391 rx_socket = rxi_GetUDPSocket((u_short)port);
392 if (rx_socket == OSI_NULLSOCKET) {
398 #ifdef RX_ENABLE_LOCKS
401 #endif /* RX_LOCKS_DB */
402 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
403 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
404 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
405 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
406 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
408 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
409 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
413 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
415 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
416 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
418 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
419 #endif /* KERNEL && AFS_HPUX110_ENV */
420 #else /* RX_ENABLE_LOCKS */
421 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
422 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
423 #endif /* AFS_GLOBAL_SUNLOCK */
424 #endif /* RX_ENABLE_LOCKS */
427 rx_connDeadTime = 12;
428 rx_tranquil = 0; /* reset flag */
429 bzero((char *)&rx_stats, sizeof(struct rx_stats));
431 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
432 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
433 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
434 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
435 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
436 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
438 /* Malloc up a bunch of packets & buffers */
440 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
441 queue_Init(&rx_freePacketQueue);
442 rxi_NeedMorePackets = FALSE;
443 rxi_MorePackets(rx_nPackets);
451 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
452 tv.tv_sec = clock_now.sec;
453 tv.tv_usec = clock_now.usec;
454 srand((unsigned int) tv.tv_usec);
461 #if defined(KERNEL) && !defined(UKERNEL)
462 /* Really, this should never happen in a real kernel */
465 struct sockaddr_in addr;
466 int addrlen = sizeof(addr);
467 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
471 rx_port = addr.sin_port;
474 rx_stats.minRtt.sec = 9999999;
476 rx_SetEpoch (tv.tv_sec | 0x80000000);
478 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
479 * will provide a randomer value. */
481 MUTEX_ENTER(&rx_stats_mutex);
482 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
483 MUTEX_EXIT(&rx_stats_mutex);
484 /* *Slightly* random start time for the cid. This is just to help
485 * out with the hashing function at the peer */
486 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
487 rx_connHashTable = (struct rx_connection **) htable;
488 rx_peerHashTable = (struct rx_peer **) ptable;
490 rx_lastAckDelay.sec = 0;
491 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
492 rx_hardAckDelay.sec = 0;
493 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
494 rx_softAckDelay.sec = 0;
495 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
497 rxevent_Init(20, rxi_ReScheduleEvents);
499 /* Initialize various global queues */
500 queue_Init(&rx_idleServerQueue);
501 queue_Init(&rx_incomingCallQueue);
502 queue_Init(&rx_freeCallQueue);
504 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
505 /* Initialize our list of usable IP addresses. */
509 /* Start listener process (exact function is dependent on the
510 * implementation environment--kernel or user space) */
515 tmp_status = rxinit_status = 0;
520 /* called with unincremented nRequestsRunning to see if it is OK to start
521 * a new thread in this service. Could be "no" for two reasons: over the
522 * max quota, or would prevent others from reaching their min quota.
524 #ifdef RX_ENABLE_LOCKS
525 /* This verion of QuotaOK reserves quota if it's ok while the
526 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
528 static int QuotaOK(aservice)
529 register struct rx_service *aservice;
531 /* check if over max quota */
532 if (aservice->nRequestsRunning >= aservice->maxProcs) {
536 /* under min quota, we're OK */
537 /* otherwise, can use only if there are enough to allow everyone
538 * to go to their min quota after this guy starts.
540 MUTEX_ENTER(&rx_stats_mutex);
541 if ((aservice->nRequestsRunning < aservice->minProcs) ||
542 (rxi_availProcs > rxi_minDeficit)) {
543 aservice->nRequestsRunning++;
544 /* just started call in minProcs pool, need fewer to maintain
546 if (aservice->nRequestsRunning <= aservice->minProcs)
549 MUTEX_EXIT(&rx_stats_mutex);
552 MUTEX_EXIT(&rx_stats_mutex);
556 static void ReturnToServerPool(aservice)
557 register struct rx_service *aservice;
559 aservice->nRequestsRunning--;
560 MUTEX_ENTER(&rx_stats_mutex);
561 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
563 MUTEX_EXIT(&rx_stats_mutex);
566 #else /* RX_ENABLE_LOCKS */
567 static int QuotaOK(aservice)
568 register struct rx_service *aservice; {
570 /* under min quota, we're OK */
571 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
573 /* check if over max quota */
574 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
576 /* otherwise, can use only if there are enough to allow everyone
577 * to go to their min quota after this guy starts.
579 if (rxi_availProcs > rxi_minDeficit) rc = 1;
582 #endif /* RX_ENABLE_LOCKS */
585 /* Called by rx_StartServer to start up lwp's to service calls.
586 NExistingProcs gives the number of procs already existing, and which
587 therefore needn't be created. */
588 void rxi_StartServerProcs(nExistingProcs)
591 register struct rx_service *service;
596 /* For each service, reserve N processes, where N is the "minimum"
597 number of processes that MUST be able to execute a request in parallel,
598 at any time, for that process. Also compute the maximum difference
599 between any service's maximum number of processes that can run
600 (i.e. the maximum number that ever will be run, and a guarantee
601 that this number will run if other services aren't running), and its
602 minimum number. The result is the extra number of processes that
603 we need in order to provide the latter guarantee */
604 for (i=0; i<RX_MAX_SERVICES; i++) {
606 service = rx_services[i];
607 if (service == (struct rx_service *) 0) break;
608 nProcs += service->minProcs;
609 diff = service->maxProcs - service->minProcs;
610 if (diff > maxdiff) maxdiff = diff;
612 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
613 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
614 for (i = 0; i<nProcs; i++) {
615 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
620 /* This routine must be called if any services are exported. If the
621 * donateMe flag is set, the calling process is donated to the server
623 void rx_StartServer(donateMe)
625 register struct rx_service *service;
626 register int i, nProcs=0;
632 /* Start server processes, if necessary (exact function is dependent
633 * on the implementation environment--kernel or user space). DonateMe
634 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
635 * case, one less new proc will be created rx_StartServerProcs.
637 rxi_StartServerProcs(donateMe);
639 /* count up the # of threads in minProcs, and add set the min deficit to
640 * be that value, too.
642 for (i=0; i<RX_MAX_SERVICES; i++) {
643 service = rx_services[i];
644 if (service == (struct rx_service *) 0) break;
645 MUTEX_ENTER(&rx_stats_mutex);
646 rxi_totalMin += service->minProcs;
647 /* below works even if a thread is running, since minDeficit would
648 * still have been decremented and later re-incremented.
650 rxi_minDeficit += service->minProcs;
651 MUTEX_EXIT(&rx_stats_mutex);
654 /* Turn on reaping of idle server connections */
655 rxi_ReapConnections();
664 #ifdef AFS_PTHREAD_ENV
666 pid = (pid_t) pthread_self();
667 #else /* AFS_PTHREAD_ENV */
669 LWP_CurrentProcess(&pid);
670 #endif /* AFS_PTHREAD_ENV */
672 sprintf(name,"srv_%d", ++nProcs);
674 (*registerProgram)(pid, name);
676 #endif /* AFS_NT40_ENV */
677 rx_ServerProc(); /* Never returns */
682 /* Create a new client connection to the specified service, using the
683 * specified security object to implement the security model for this
685 struct rx_connection *
686 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
687 register afs_uint32 shost; /* Server host */
688 u_short sport; /* Server port */
689 u_short sservice; /* Server service id */
690 register struct rx_securityClass *securityObject;
691 int serviceSecurityIndex;
695 register struct rx_connection *conn;
700 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
701 shost, sport, sservice, securityObject, serviceSecurityIndex));
703 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
704 * the case of kmem_alloc? */
705 conn = rxi_AllocConnection();
706 #ifdef RX_ENABLE_LOCKS
707 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
708 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
709 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
713 MUTEX_ENTER(&rx_connHashTable_lock);
714 cid = (rx_nextCid += RX_MAXCALLS);
715 conn->type = RX_CLIENT_CONNECTION;
717 conn->epoch = rx_epoch;
718 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
719 conn->serviceId = sservice;
720 conn->securityObject = securityObject;
721 /* This doesn't work in all compilers with void (they're buggy), so fake it
723 conn->securityData = (VOID *) 0;
724 conn->securityIndex = serviceSecurityIndex;
725 rx_SetConnDeadTime(conn, rx_connDeadTime);
726 conn->ackRate = RX_FAST_ACK_RATE;
728 conn->specific = NULL;
729 conn->challengeEvent = (struct rxevent *)0;
730 conn->delayedAbortEvent = (struct rxevent *)0;
731 conn->abortCount = 0;
734 RXS_NewConnection(securityObject, conn);
735 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
737 conn->refCount++; /* no lock required since only this thread knows... */
738 conn->next = rx_connHashTable[hashindex];
739 rx_connHashTable[hashindex] = conn;
740 MUTEX_ENTER(&rx_stats_mutex);
741 rx_stats.nClientConns++;
742 MUTEX_EXIT(&rx_stats_mutex);
744 MUTEX_EXIT(&rx_connHashTable_lock);
750 void rx_SetConnDeadTime(conn, seconds)
751 register struct rx_connection *conn;
752 register int seconds;
754 /* The idea is to set the dead time to a value that allows several
755 * keepalives to be dropped without timing out the connection. */
756 conn->secondsUntilDead = MAX(seconds, 6);
757 conn->secondsUntilPing = conn->secondsUntilDead/6;
760 int rxi_lowPeerRefCount = 0;
761 int rxi_lowConnRefCount = 0;
764 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
765 * NOTE: must not be called with rx_connHashTable_lock held.
767 void rxi_CleanupConnection(conn)
768 struct rx_connection *conn;
772 /* Notify the service exporter, if requested, that this connection
773 * is being destroyed */
774 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
775 (*conn->service->destroyConnProc)(conn);
777 /* Notify the security module that this connection is being destroyed */
778 RXS_DestroyConnection(conn->securityObject, conn);
780 /* If this is the last connection using the rx_peer struct, set its
781 * idle time to now. rxi_ReapConnections will reap it if it's still
782 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
784 MUTEX_ENTER(&rx_peerHashTable_lock);
785 if (--conn->peer->refCount <= 0) {
786 conn->peer->idleWhen = clock_Sec();
787 if (conn->peer->refCount < 0) {
788 conn->peer->refCount = 0;
789 MUTEX_ENTER(&rx_stats_mutex);
790 rxi_lowPeerRefCount ++;
791 MUTEX_EXIT(&rx_stats_mutex);
794 MUTEX_EXIT(&rx_peerHashTable_lock);
796 MUTEX_ENTER(&rx_stats_mutex);
797 if (conn->type == RX_SERVER_CONNECTION)
798 rx_stats.nServerConns--;
800 rx_stats.nClientConns--;
801 MUTEX_EXIT(&rx_stats_mutex);
804 if (conn->specific) {
805 for (i = 0 ; i < conn->nSpecific ; i++) {
806 if (conn->specific[i] && rxi_keyCreate_destructor[i])
807 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
808 conn->specific[i] = NULL;
810 free(conn->specific);
812 conn->specific = NULL;
816 MUTEX_DESTROY(&conn->conn_call_lock);
817 MUTEX_DESTROY(&conn->conn_data_lock);
818 CV_DESTROY(&conn->conn_call_cv);
820 rxi_FreeConnection(conn);
823 /* Destroy the specified connection */
824 void rxi_DestroyConnection(conn)
825 register struct rx_connection *conn;
827 MUTEX_ENTER(&rx_connHashTable_lock);
828 rxi_DestroyConnectionNoLock(conn);
829 /* conn should be at the head of the cleanup list */
830 if (conn == rx_connCleanup_list) {
831 rx_connCleanup_list = rx_connCleanup_list->next;
832 MUTEX_EXIT(&rx_connHashTable_lock);
833 rxi_CleanupConnection(conn);
835 #ifdef RX_ENABLE_LOCKS
837 MUTEX_EXIT(&rx_connHashTable_lock);
839 #endif /* RX_ENABLE_LOCKS */
842 static void rxi_DestroyConnectionNoLock(conn)
843 register struct rx_connection *conn;
845 register struct rx_connection **conn_ptr;
846 register int havecalls = 0;
847 struct rx_packet *packet;
854 MUTEX_ENTER(&conn->conn_data_lock);
855 if (conn->refCount > 0)
858 MUTEX_ENTER(&rx_stats_mutex);
859 rxi_lowConnRefCount++;
860 MUTEX_EXIT(&rx_stats_mutex);
863 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
864 /* Busy; wait till the last guy before proceeding */
865 MUTEX_EXIT(&conn->conn_data_lock);
870 /* If the client previously called rx_NewCall, but it is still
871 * waiting, treat this as a running call, and wait to destroy the
872 * connection later when the call completes. */
873 if ((conn->type == RX_CLIENT_CONNECTION) &&
874 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
875 conn->flags |= RX_CONN_DESTROY_ME;
876 MUTEX_EXIT(&conn->conn_data_lock);
880 MUTEX_EXIT(&conn->conn_data_lock);
882 /* Check for extant references to this connection */
883 for (i = 0; i<RX_MAXCALLS; i++) {
884 register struct rx_call *call = conn->call[i];
887 if (conn->type == RX_CLIENT_CONNECTION) {
888 MUTEX_ENTER(&call->lock);
889 if (call->delayedAckEvent) {
890 /* Push the final acknowledgment out now--there
891 * won't be a subsequent call to acknowledge the
892 * last reply packets */
893 rxevent_Cancel(call->delayedAckEvent, call,
894 RX_CALL_REFCOUNT_DELAY);
895 rxi_AckAll((struct rxevent *)0, call, 0);
897 MUTEX_EXIT(&call->lock);
901 #ifdef RX_ENABLE_LOCKS
903 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
904 MUTEX_EXIT(&conn->conn_data_lock);
907 /* Someone is accessing a packet right now. */
911 #endif /* RX_ENABLE_LOCKS */
914 /* Don't destroy the connection if there are any call
915 * structures still in use */
916 MUTEX_ENTER(&conn->conn_data_lock);
917 conn->flags |= RX_CONN_DESTROY_ME;
918 MUTEX_EXIT(&conn->conn_data_lock);
923 if (conn->delayedAbortEvent) {
924 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
925 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
927 MUTEX_ENTER(&conn->conn_data_lock);
928 rxi_SendConnectionAbort(conn, packet, 0, 1);
929 MUTEX_EXIT(&conn->conn_data_lock);
930 rxi_FreePacket(packet);
934 /* Remove from connection hash table before proceeding */
935 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
936 conn->epoch, conn->type) ];
937 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
938 if (*conn_ptr == conn) {
939 *conn_ptr = conn->next;
943 /* if the conn that we are destroying was the last connection, then we
944 * clear rxLastConn as well */
945 if ( rxLastConn == conn )
948 /* Make sure the connection is completely reset before deleting it. */
949 /* get rid of pending events that could zap us later */
950 if (conn->challengeEvent) {
951 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
954 /* Add the connection to the list of destroyed connections that
955 * need to be cleaned up. This is necessary to avoid deadlocks
956 * in the routines we call to inform others that this connection is
957 * being destroyed. */
958 conn->next = rx_connCleanup_list;
959 rx_connCleanup_list = conn;
962 /* Externally available version */
963 void rx_DestroyConnection(conn)
964 register struct rx_connection *conn;
970 rxi_DestroyConnection (conn);
975 /* Start a new rx remote procedure call, on the specified connection.
976 * If wait is set to 1, wait for a free call channel; otherwise return
977 * 0. Maxtime gives the maximum number of seconds this call may take,
978 * after rx_MakeCall returns. After this time interval, a call to any
979 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
980 * For fine grain locking, we hold the conn_call_lock in order to
981 * to ensure that we don't get signalle after we found a call in an active
982 * state and before we go to sleep.
984 struct rx_call *rx_NewCall(conn)
985 register struct rx_connection *conn;
988 register struct rx_call *call;
989 struct clock queueTime;
993 dpf (("rx_MakeCall(conn %x)\n", conn));
996 clock_GetTime(&queueTime);
998 MUTEX_ENTER(&conn->conn_call_lock);
1000 for (i=0; i<RX_MAXCALLS; i++) {
1001 call = conn->call[i];
1003 MUTEX_ENTER(&call->lock);
1004 if (call->state == RX_STATE_DALLY) {
1005 rxi_ResetCall(call, 0);
1006 (*call->callNumber)++;
1009 MUTEX_EXIT(&call->lock);
1012 call = rxi_NewCall(conn, i);
1013 MUTEX_ENTER(&call->lock);
1017 if (i < RX_MAXCALLS) {
1020 MUTEX_ENTER(&conn->conn_data_lock);
1021 conn->flags |= RX_CONN_MAKECALL_WAITING;
1022 MUTEX_EXIT(&conn->conn_data_lock);
1023 #ifdef RX_ENABLE_LOCKS
1024 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1030 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1032 /* Client is initially in send mode */
1033 call->state = RX_STATE_ACTIVE;
1034 call->mode = RX_MODE_SENDING;
1036 /* remember start time for call in case we have hard dead time limit */
1037 call->queueTime = queueTime;
1038 clock_GetTime(&call->startTime);
1039 hzero(call->bytesSent);
1040 hzero(call->bytesRcvd);
1042 /* Turn on busy protocol. */
1043 rxi_KeepAliveOn(call);
1045 MUTEX_EXIT(&call->lock);
1046 MUTEX_EXIT(&conn->conn_call_lock);
1050 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1051 /* Now, if TQ wasn't cleared earlier, do it now. */
1053 MUTEX_ENTER(&call->lock);
1054 while (call->flags & RX_CALL_TQ_BUSY) {
1055 call->flags |= RX_CALL_TQ_WAIT;
1056 #ifdef RX_ENABLE_LOCKS
1057 CV_WAIT(&call->cv_tq, &call->lock);
1058 #else /* RX_ENABLE_LOCKS */
1059 osi_rxSleep(&call->tq);
1060 #endif /* RX_ENABLE_LOCKS */
1062 if (call->flags & RX_CALL_TQ_CLEARME) {
1063 rxi_ClearTransmitQueue(call, 0);
1064 queue_Init(&call->tq);
1066 MUTEX_EXIT(&call->lock);
1068 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1074 rxi_HasActiveCalls(aconn)
1075 register struct rx_connection *aconn; {
1077 register struct rx_call *tcall;
1081 for(i=0; i<RX_MAXCALLS; i++) {
1082 if ((tcall = aconn->call[i])) {
1083 if ((tcall->state == RX_STATE_ACTIVE)
1084 || (tcall->state == RX_STATE_PRECALL)) {
1095 rxi_GetCallNumberVector(aconn, aint32s)
1096 register struct rx_connection *aconn;
1097 register afs_int32 *aint32s; {
1099 register struct rx_call *tcall;
1103 for(i=0; i<RX_MAXCALLS; i++) {
1104 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1105 aint32s[i] = aconn->callNumber[i]+1;
1107 aint32s[i] = aconn->callNumber[i];
1114 rxi_SetCallNumberVector(aconn, aint32s)
1115 register struct rx_connection *aconn;
1116 register afs_int32 *aint32s; {
1118 register struct rx_call *tcall;
1122 for(i=0; i<RX_MAXCALLS; i++) {
1123 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1124 aconn->callNumber[i] = aint32s[i] - 1;
1126 aconn->callNumber[i] = aint32s[i];
1132 /* Advertise a new service. A service is named locally by a UDP port
1133 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1136 rx_NewService(port, serviceId, serviceName, securityObjects,
1137 nSecurityObjects, serviceProc)
1140 char *serviceName; /* Name for identification purposes (e.g. the
1141 * service name might be used for probing for
1143 struct rx_securityClass **securityObjects;
1144 int nSecurityObjects;
1145 afs_int32 (*serviceProc)();
1147 osi_socket socket = OSI_NULLSOCKET;
1148 register struct rx_service *tservice;
1154 if (serviceId == 0) {
1155 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1161 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1168 tservice = rxi_AllocService();
1171 for (i = 0; i<RX_MAX_SERVICES; i++) {
1172 register struct rx_service *service = rx_services[i];
1174 if (port == service->servicePort) {
1175 if (service->serviceId == serviceId) {
1176 /* The identical service has already been
1177 * installed; if the caller was intending to
1178 * change the security classes used by this
1179 * service, he/she loses. */
1180 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1183 rxi_FreeService(tservice);
1186 /* Different service, same port: re-use the socket
1187 * which is bound to the same port */
1188 socket = service->socket;
1191 if (socket == OSI_NULLSOCKET) {
1192 /* If we don't already have a socket (from another
1193 * service on same port) get a new one */
1194 socket = rxi_GetUDPSocket(port);
1195 if (socket == OSI_NULLSOCKET) {
1198 rxi_FreeService(tservice);
1203 service->socket = socket;
1204 service->servicePort = port;
1205 service->serviceId = serviceId;
1206 service->serviceName = serviceName;
1207 service->nSecurityObjects = nSecurityObjects;
1208 service->securityObjects = securityObjects;
1209 service->minProcs = 0;
1210 service->maxProcs = 1;
1211 service->idleDeadTime = 60;
1212 service->connDeadTime = rx_connDeadTime;
1213 service->executeRequestProc = serviceProc;
1214 rx_services[i] = service; /* not visible until now */
1222 rxi_FreeService(tservice);
1223 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1227 /* Generic request processing loop. This routine should be called
1228 * by the implementation dependent rx_ServerProc. If socketp is
1229 * non-null, it will be set to the file descriptor that this thread
1230 * is now listening on. If socketp is null, this routine will never
1232 void rxi_ServerProc(threadID, newcall, socketp)
1234 struct rx_call *newcall;
1235 osi_socket *socketp;
1237 register struct rx_call *call;
1238 register afs_int32 code;
1239 register struct rx_service *tservice = NULL;
1246 call = rx_GetCall(threadID, tservice, socketp);
1247 if (socketp && *socketp != OSI_NULLSOCKET) {
1248 /* We are now a listener thread */
1253 /* if server is restarting( typically smooth shutdown) then do not
1254 * allow any new calls.
1257 if ( rx_tranquil && (call != NULL) ) {
1262 MUTEX_ENTER(&call->lock);
1264 rxi_CallError(call, RX_RESTARTING);
1265 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1267 MUTEX_EXIT(&call->lock);
1273 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1274 #ifdef RX_ENABLE_LOCKS
1276 #endif /* RX_ENABLE_LOCKS */
1277 afs_termState = AFSOP_STOP_AFS;
1278 afs_osi_Wakeup(&afs_termState);
1279 #ifdef RX_ENABLE_LOCKS
1281 #endif /* RX_ENABLE_LOCKS */
1286 tservice = call->conn->service;
1288 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1290 code = call->conn->service->executeRequestProc(call);
1292 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1294 rx_EndCall(call, code);
1295 MUTEX_ENTER(&rx_stats_mutex);
1297 MUTEX_EXIT(&rx_stats_mutex);
1302 void rx_WakeupServerProcs()
1304 struct rx_serverQueueEntry *np, *tqp;
1309 MUTEX_ENTER(&rx_serverPool_lock);
1311 #ifdef RX_ENABLE_LOCKS
1312 if (rx_waitForPacket)
1313 CV_BROADCAST(&rx_waitForPacket->cv);
1314 #else /* RX_ENABLE_LOCKS */
1315 if (rx_waitForPacket)
1316 osi_rxWakeup(rx_waitForPacket);
1317 #endif /* RX_ENABLE_LOCKS */
1318 MUTEX_ENTER(&freeSQEList_lock);
1319 for (np = rx_FreeSQEList; np; np = tqp) {
1320 tqp = *(struct rx_serverQueueEntry **)np;
1321 #ifdef RX_ENABLE_LOCKS
1322 CV_BROADCAST(&np->cv);
1323 #else /* RX_ENABLE_LOCKS */
1325 #endif /* RX_ENABLE_LOCKS */
1327 MUTEX_EXIT(&freeSQEList_lock);
1328 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1329 #ifdef RX_ENABLE_LOCKS
1330 CV_BROADCAST(&np->cv);
1331 #else /* RX_ENABLE_LOCKS */
1333 #endif /* RX_ENABLE_LOCKS */
1335 MUTEX_EXIT(&rx_serverPool_lock);
1341 * One thing that seems to happen is that all the server threads get
1342 * tied up on some empty or slow call, and then a whole bunch of calls
1343 * arrive at once, using up the packet pool, so now there are more
1344 * empty calls. The most critical resources here are server threads
1345 * and the free packet pool. The "doreclaim" code seems to help in
1346 * general. I think that eventually we arrive in this state: there
1347 * are lots of pending calls which do have all their packets present,
1348 * so they won't be reclaimed, are multi-packet calls, so they won't
1349 * be scheduled until later, and thus are tying up most of the free
1350 * packet pool for a very long time.
1352 * 1. schedule multi-packet calls if all the packets are present.
1353 * Probably CPU-bound operation, useful to return packets to pool.
1354 * Do what if there is a full window, but the last packet isn't here?
1355 * 3. preserve one thread which *only* runs "best" calls, otherwise
1356 * it sleeps and waits for that type of call.
1357 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1358 * the current dataquota business is badly broken. The quota isn't adjusted
1359 * to reflect how many packets are presently queued for a running call.
1360 * So, when we schedule a queued call with a full window of packets queued
1361 * up for it, that *should* free up a window full of packets for other 2d-class
1362 * calls to be able to use from the packet pool. But it doesn't.
1364 * NB. Most of the time, this code doesn't run -- since idle server threads
1365 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1366 * as a new call arrives.
1368 /* Sleep until a call arrives. Returns a pointer to the call, ready
1369 * for an rx_Read. */
1370 #ifdef RX_ENABLE_LOCKS
1372 rx_GetCall(tno, cur_service, socketp)
1374 struct rx_service *cur_service;
1375 osi_socket *socketp;
1377 struct rx_serverQueueEntry *sq;
1378 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1379 struct rx_service *service = NULL;
1382 MUTEX_ENTER(&freeSQEList_lock);
1384 if ((sq = rx_FreeSQEList)) {
1385 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1386 MUTEX_EXIT(&freeSQEList_lock);
1387 } else { /* otherwise allocate a new one and return that */
1388 MUTEX_EXIT(&freeSQEList_lock);
1389 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1390 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1391 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1394 MUTEX_ENTER(&rx_serverPool_lock);
1395 if (cur_service != NULL) {
1396 ReturnToServerPool(cur_service);
1399 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1400 register struct rx_call *tcall, *ncall;
1401 choice2 = (struct rx_call *) 0;
1402 /* Scan for eligible incoming calls. A call is not eligible
1403 * if the maximum number of calls for its service type are
1404 * already executing */
1405 /* One thread will process calls FCFS (to prevent starvation),
1406 * while the other threads may run ahead looking for calls which
1407 * have all their input data available immediately. This helps
1408 * keep threads from blocking, waiting for data from the client. */
1409 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1410 service = tcall->conn->service;
1411 if (!QuotaOK(service)) {
1414 if (!tno || !tcall->queue_item_header.next ) {
1415 /* If we're thread 0, then we'll just use
1416 * this call. If we haven't been able to find an optimal
1417 * choice, and we're at the end of the list, then use a
1418 * 2d choice if one has been identified. Otherwise... */
1419 call = (choice2 ? choice2 : tcall);
1420 service = call->conn->service;
1421 } else if (!queue_IsEmpty(&tcall->rq)) {
1422 struct rx_packet *rp;
1423 rp = queue_First(&tcall->rq, rx_packet);
1424 if (rp->header.seq == 1) {
1425 if (!meltdown_1pkt ||
1426 (rp->header.flags & RX_LAST_PACKET)) {
1428 } else if (rxi_2dchoice && !choice2 &&
1429 !(tcall->flags & RX_CALL_CLEARED) &&
1430 (tcall->rprev > rxi_HardAckRate)) {
1432 } else rxi_md2cnt++;
1438 ReturnToServerPool(service);
1445 rxi_ServerThreadSelectingCall = 1;
1446 MUTEX_EXIT(&rx_serverPool_lock);
1447 MUTEX_ENTER(&call->lock);
1448 MUTEX_ENTER(&rx_serverPool_lock);
1450 if (queue_IsEmpty(&call->rq) ||
1451 queue_First(&call->rq, rx_packet)->header.seq != 1)
1452 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1454 CLEAR_CALL_QUEUE_LOCK(call);
1456 MUTEX_EXIT(&call->lock);
1457 ReturnToServerPool(service);
1458 rxi_ServerThreadSelectingCall = 0;
1459 CV_SIGNAL(&rx_serverPool_cv);
1460 call = (struct rx_call*)0;
1463 call->flags &= (~RX_CALL_WAIT_PROC);
1464 MUTEX_ENTER(&rx_stats_mutex);
1466 MUTEX_EXIT(&rx_stats_mutex);
1467 rxi_ServerThreadSelectingCall = 0;
1468 CV_SIGNAL(&rx_serverPool_cv);
1469 MUTEX_EXIT(&rx_serverPool_lock);
1473 /* If there are no eligible incoming calls, add this process
1474 * to the idle server queue, to wait for one */
1478 *socketp = OSI_NULLSOCKET;
1480 sq->socketp = socketp;
1481 queue_Append(&rx_idleServerQueue, sq);
1482 #ifndef AFS_AIX41_ENV
1483 rx_waitForPacket = sq;
1484 #endif /* AFS_AIX41_ENV */
1486 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1488 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1489 MUTEX_EXIT(&rx_serverPool_lock);
1490 return (struct rx_call *)0;
1493 } while (!(call = sq->newcall) &&
1494 !(socketp && *socketp != OSI_NULLSOCKET));
1495 MUTEX_EXIT(&rx_serverPool_lock);
1497 MUTEX_ENTER(&call->lock);
1503 MUTEX_ENTER(&freeSQEList_lock);
1504 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1505 rx_FreeSQEList = sq;
1506 MUTEX_EXIT(&freeSQEList_lock);
1509 clock_GetTime(&call->startTime);
1510 call->state = RX_STATE_ACTIVE;
1511 call->mode = RX_MODE_RECEIVING;
1513 rxi_calltrace(RX_CALL_START, call);
1514 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1515 call->conn->service->servicePort,
1516 call->conn->service->serviceId, call));
1518 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1519 MUTEX_EXIT(&call->lock);
1521 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1526 #else /* RX_ENABLE_LOCKS */
1528 rx_GetCall(tno, cur_service, socketp)
1530 struct rx_service *cur_service;
1531 osi_socket *socketp;
1533 struct rx_serverQueueEntry *sq;
1534 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1535 struct rx_service *service = NULL;
1540 MUTEX_ENTER(&freeSQEList_lock);
1542 if ((sq = rx_FreeSQEList)) {
1543 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1544 MUTEX_EXIT(&freeSQEList_lock);
1545 } else { /* otherwise allocate a new one and return that */
1546 MUTEX_EXIT(&freeSQEList_lock);
1547 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1548 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1549 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1551 MUTEX_ENTER(&sq->lock);
1553 if (cur_service != NULL) {
1554 cur_service->nRequestsRunning--;
1555 if (cur_service->nRequestsRunning < cur_service->minProcs)
1559 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1560 register struct rx_call *tcall, *ncall;
1561 /* Scan for eligible incoming calls. A call is not eligible
1562 * if the maximum number of calls for its service type are
1563 * already executing */
1564 /* One thread will process calls FCFS (to prevent starvation),
1565 * while the other threads may run ahead looking for calls which
1566 * have all their input data available immediately. This helps
1567 * keep threads from blocking, waiting for data from the client. */
1568 choice2 = (struct rx_call *) 0;
1569 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1570 service = tcall->conn->service;
1571 if (QuotaOK(service)) {
1572 if (!tno || !tcall->queue_item_header.next ) {
1573 /* If we're thread 0, then we'll just use
1574 * this call. If we haven't been able to find an optimal
1575 * choice, and we're at the end of the list, then use a
1576 * 2d choice if one has been identified. Otherwise... */
1577 call = (choice2 ? choice2 : tcall);
1578 service = call->conn->service;
1579 } else if (!queue_IsEmpty(&tcall->rq)) {
1580 struct rx_packet *rp;
1581 rp = queue_First(&tcall->rq, rx_packet);
1582 if (rp->header.seq == 1
1583 && (!meltdown_1pkt ||
1584 (rp->header.flags & RX_LAST_PACKET))) {
1586 } else if (rxi_2dchoice && !choice2 &&
1587 !(tcall->flags & RX_CALL_CLEARED) &&
1588 (tcall->rprev > rxi_HardAckRate)) {
1590 } else rxi_md2cnt++;
1600 /* we can't schedule a call if there's no data!!! */
1601 /* send an ack if there's no data, if we're missing the
1602 * first packet, or we're missing something between first
1603 * and last -- there's a "hole" in the incoming data. */
1604 if (queue_IsEmpty(&call->rq) ||
1605 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1606 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1607 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1609 call->flags &= (~RX_CALL_WAIT_PROC);
1610 service->nRequestsRunning++;
1611 /* just started call in minProcs pool, need fewer to maintain
1613 if (service->nRequestsRunning <= service->minProcs)
1617 /* MUTEX_EXIT(&call->lock); */
1620 /* If there are no eligible incoming calls, add this process
1621 * to the idle server queue, to wait for one */
1624 *socketp = OSI_NULLSOCKET;
1626 sq->socketp = socketp;
1627 queue_Append(&rx_idleServerQueue, sq);
1631 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1634 return (struct rx_call *)0;
1637 } while (!(call = sq->newcall) &&
1638 !(socketp && *socketp != OSI_NULLSOCKET));
1640 MUTEX_EXIT(&sq->lock);
1642 MUTEX_ENTER(&freeSQEList_lock);
1643 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1644 rx_FreeSQEList = sq;
1645 MUTEX_EXIT(&freeSQEList_lock);
1648 clock_GetTime(&call->startTime);
1649 call->state = RX_STATE_ACTIVE;
1650 call->mode = RX_MODE_RECEIVING;
1652 rxi_calltrace(RX_CALL_START, call);
1653 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1654 call->conn->service->servicePort,
1655 call->conn->service->serviceId, call));
1657 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1665 #endif /* RX_ENABLE_LOCKS */
1669 /* Establish a procedure to be called when a packet arrives for a
1670 * call. This routine will be called at most once after each call,
1671 * and will also be called if there is an error condition on the or
1672 * the call is complete. Used by multi rx to build a selection
1673 * function which determines which of several calls is likely to be a
1674 * good one to read from.
1675 * NOTE: the way this is currently implemented it is probably only a
1676 * good idea to (1) use it immediately after a newcall (clients only)
1677 * and (2) only use it once. Other uses currently void your warranty
1679 void rx_SetArrivalProc(call, proc, handle, arg)
1680 register struct rx_call *call;
1681 register VOID (*proc)();
1682 register VOID *handle;
1685 call->arrivalProc = proc;
1686 call->arrivalProcHandle = handle;
1687 call->arrivalProcArg = arg;
1690 /* Call is finished (possibly prematurely). Return rc to the peer, if
1691 * appropriate, and return the final error code from the conversation
1694 afs_int32 rx_EndCall(call, rc)
1695 register struct rx_call *call;
1698 register struct rx_connection *conn = call->conn;
1699 register struct rx_service *service;
1700 register struct rx_packet *tp; /* Temporary packet pointer */
1701 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1705 dpf(("rx_EndCall(call %x)\n", call));
1709 MUTEX_ENTER(&call->lock);
1711 if (rc == 0 && call->error == 0) {
1712 call->abortCode = 0;
1713 call->abortCount = 0;
1716 call->arrivalProc = (VOID (*)()) 0;
1717 if (rc && call->error == 0) {
1718 rxi_CallError(call, rc);
1719 /* Send an abort message to the peer if this error code has
1720 * only just been set. If it was set previously, assume the
1721 * peer has already been sent the error code or will request it
1723 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1725 if (conn->type == RX_SERVER_CONNECTION) {
1726 /* Make sure reply or at least dummy reply is sent */
1727 if (call->mode == RX_MODE_RECEIVING) {
1728 rxi_WriteProc(call, 0, 0);
1730 if (call->mode == RX_MODE_SENDING) {
1731 rxi_FlushWrite(call);
1733 service = conn->service;
1734 rxi_calltrace(RX_CALL_END, call);
1735 /* Call goes to hold state until reply packets are acknowledged */
1736 if (call->tfirst + call->nSoftAcked < call->tnext) {
1737 call->state = RX_STATE_HOLD;
1739 call->state = RX_STATE_DALLY;
1740 rxi_ClearTransmitQueue(call, 0);
1741 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1742 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1745 else { /* Client connection */
1747 /* Make sure server receives input packets, in the case where
1748 * no reply arguments are expected */
1749 if ((call->mode == RX_MODE_SENDING)
1750 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1751 (void) rxi_ReadProc(call, &dummy, 1);
1753 /* We need to release the call lock since it's lower than the
1754 * conn_call_lock and we don't want to hold the conn_call_lock
1755 * over the rx_ReadProc call. The conn_call_lock needs to be held
1756 * here for the case where rx_NewCall is perusing the calls on
1757 * the connection structure. We don't want to signal until
1758 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1759 * have checked this call, found it active and by the time it
1760 * goes to sleep, will have missed the signal.
1762 MUTEX_EXIT(&call->lock);
1763 MUTEX_ENTER(&conn->conn_call_lock);
1764 MUTEX_ENTER(&call->lock);
1765 MUTEX_ENTER(&conn->conn_data_lock);
1766 conn->flags |= RX_CONN_BUSY;
1767 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1768 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1769 MUTEX_EXIT(&conn->conn_data_lock);
1770 #ifdef RX_ENABLE_LOCKS
1771 CV_BROADCAST(&conn->conn_call_cv);
1776 #ifdef RX_ENABLE_LOCKS
1778 MUTEX_EXIT(&conn->conn_data_lock);
1780 #endif /* RX_ENABLE_LOCKS */
1781 call->state = RX_STATE_DALLY;
1783 error = call->error;
1785 /* currentPacket, nLeft, and NFree must be zeroed here, because
1786 * ResetCall cannot: ResetCall may be called at splnet(), in the
1787 * kernel version, and may interrupt the macros rx_Read or
1788 * rx_Write, which run at normal priority for efficiency. */
1789 if (call->currentPacket) {
1790 rxi_FreePacket(call->currentPacket);
1791 call->currentPacket = (struct rx_packet *) 0;
1792 call->nLeft = call->nFree = call->curlen = 0;
1795 call->nLeft = call->nFree = call->curlen = 0;
1797 /* Free any packets from the last call to ReadvProc/WritevProc */
1798 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1803 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1804 MUTEX_EXIT(&call->lock);
1805 if (conn->type == RX_CLIENT_CONNECTION) {
1806 MUTEX_EXIT(&conn->conn_call_lock);
1807 conn->flags &= ~RX_CONN_BUSY;
1812 * Map errors to the local host's errno.h format.
1814 error = ntoh_syserr_conv(error);
1818 #if !defined(KERNEL)
1820 /* Call this routine when shutting down a server or client (especially
1821 * clients). This will allow Rx to gracefully garbage collect server
1822 * connections, and reduce the number of retries that a server might
1823 * make to a dead client.
1824 * This is not quite right, since some calls may still be ongoing and
1825 * we can't lock them to destroy them. */
1826 void rx_Finalize() {
1827 register struct rx_connection **conn_ptr, **conn_end;
1831 if (rxinit_status == 1) {
1833 return; /* Already shutdown. */
1835 rxi_DeleteCachedConnections();
1836 if (rx_connHashTable) {
1837 MUTEX_ENTER(&rx_connHashTable_lock);
1838 for (conn_ptr = &rx_connHashTable[0],
1839 conn_end = &rx_connHashTable[rx_hashTableSize];
1840 conn_ptr < conn_end; conn_ptr++) {
1841 struct rx_connection *conn, *next;
1842 for (conn = *conn_ptr; conn; conn = next) {
1844 if (conn->type == RX_CLIENT_CONNECTION) {
1845 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1847 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1848 #ifdef RX_ENABLE_LOCKS
1849 rxi_DestroyConnectionNoLock(conn);
1850 #else /* RX_ENABLE_LOCKS */
1851 rxi_DestroyConnection(conn);
1852 #endif /* RX_ENABLE_LOCKS */
1856 #ifdef RX_ENABLE_LOCKS
1857 while (rx_connCleanup_list) {
1858 struct rx_connection *conn;
1859 conn = rx_connCleanup_list;
1860 rx_connCleanup_list = rx_connCleanup_list->next;
1861 MUTEX_EXIT(&rx_connHashTable_lock);
1862 rxi_CleanupConnection(conn);
1863 MUTEX_ENTER(&rx_connHashTable_lock);
1865 MUTEX_EXIT(&rx_connHashTable_lock);
1866 #endif /* RX_ENABLE_LOCKS */
1875 /* if we wakeup packet waiter too often, can get in loop with two
1876 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1878 rxi_PacketsUnWait() {
1880 if (!rx_waitingForPackets) {
1884 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1885 return; /* still over quota */
1888 rx_waitingForPackets = 0;
1889 #ifdef RX_ENABLE_LOCKS
1890 CV_BROADCAST(&rx_waitingForPackets_cv);
1892 osi_rxWakeup(&rx_waitingForPackets);
1898 /* ------------------Internal interfaces------------------------- */
1900 /* Return this process's service structure for the
1901 * specified socket and service */
1902 struct rx_service *rxi_FindService(socket, serviceId)
1903 register osi_socket socket;
1904 register u_short serviceId;
1906 register struct rx_service **sp;
1907 for (sp = &rx_services[0]; *sp; sp++) {
1908 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1914 /* Allocate a call structure, for the indicated channel of the
1915 * supplied connection. The mode and state of the call must be set by
1917 struct rx_call *rxi_NewCall(conn, channel)
1918 register struct rx_connection *conn;
1919 register int channel;
1921 register struct rx_call *call;
1922 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1923 register struct rx_call *cp; /* Call pointer temp */
1924 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1925 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1927 /* Grab an existing call structure, or allocate a new one.
1928 * Existing call structures are assumed to have been left reset by
1930 MUTEX_ENTER(&rx_freeCallQueue_lock);
1932 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1934 * EXCEPT that the TQ might not yet be cleared out.
1935 * Skip over those with in-use TQs.
1938 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1939 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1945 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1946 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1947 call = queue_First(&rx_freeCallQueue, rx_call);
1948 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1950 MUTEX_ENTER(&rx_stats_mutex);
1951 rx_stats.nFreeCallStructs--;
1952 MUTEX_EXIT(&rx_stats_mutex);
1953 MUTEX_EXIT(&rx_freeCallQueue_lock);
1954 MUTEX_ENTER(&call->lock);
1955 CLEAR_CALL_QUEUE_LOCK(call);
1956 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1957 /* Now, if TQ wasn't cleared earlier, do it now. */
1958 if (call->flags & RX_CALL_TQ_CLEARME) {
1959 rxi_ClearTransmitQueue(call, 0);
1960 queue_Init(&call->tq);
1962 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1963 /* Bind the call to its connection structure */
1965 rxi_ResetCall(call, 1);
1968 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1970 MUTEX_EXIT(&rx_freeCallQueue_lock);
1971 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1972 MUTEX_ENTER(&call->lock);
1973 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1974 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1975 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1977 MUTEX_ENTER(&rx_stats_mutex);
1978 rx_stats.nCallStructs++;
1979 MUTEX_EXIT(&rx_stats_mutex);
1980 /* Initialize once-only items */
1981 queue_Init(&call->tq);
1982 queue_Init(&call->rq);
1983 queue_Init(&call->iovq);
1984 /* Bind the call to its connection structure (prereq for reset) */
1986 rxi_ResetCall(call, 1);
1988 call->channel = channel;
1989 call->callNumber = &conn->callNumber[channel];
1990 /* Note that the next expected call number is retained (in
1991 * conn->callNumber[i]), even if we reallocate the call structure
1993 conn->call[channel] = call;
1994 /* if the channel's never been used (== 0), we should start at 1, otherwise
1995 the call number is valid from the last time this channel was used */
1996 if (*call->callNumber == 0) *call->callNumber = 1;
1998 MUTEX_EXIT(&call->lock);
2002 /* A call has been inactive long enough that so we can throw away
2003 * state, including the call structure, which is placed on the call
2005 * Call is locked upon entry.
2007 #ifdef RX_ENABLE_LOCKS
2008 void rxi_FreeCall(call, haveCTLock)
2009 int haveCTLock; /* Set if called from rxi_ReapConnections */
2010 #else /* RX_ENABLE_LOCKS */
2011 void rxi_FreeCall(call)
2012 #endif /* RX_ENABLE_LOCKS */
2013 register struct rx_call *call;
2015 register int channel = call->channel;
2016 register struct rx_connection *conn = call->conn;
2019 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2020 (*call->callNumber)++;
2021 rxi_ResetCall(call, 0);
2022 call->conn->call[channel] = (struct rx_call *) 0;
2024 MUTEX_ENTER(&rx_freeCallQueue_lock);
2025 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2026 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2027 /* A call may be free even though its transmit queue is still in use.
2028 * Since we search the call list from head to tail, put busy calls at
2029 * the head of the list, and idle calls at the tail.
2031 if (call->flags & RX_CALL_TQ_BUSY)
2032 queue_Prepend(&rx_freeCallQueue, call);
2034 queue_Append(&rx_freeCallQueue, call);
2035 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2036 queue_Append(&rx_freeCallQueue, call);
2037 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2038 MUTEX_ENTER(&rx_stats_mutex);
2039 rx_stats.nFreeCallStructs++;
2040 MUTEX_EXIT(&rx_stats_mutex);
2042 MUTEX_EXIT(&rx_freeCallQueue_lock);
2044 /* Destroy the connection if it was previously slated for
2045 * destruction, i.e. the Rx client code previously called
2046 * rx_DestroyConnection (client connections), or
2047 * rxi_ReapConnections called the same routine (server
2048 * connections). Only do this, however, if there are no
2049 * outstanding calls. Note that for fine grain locking, there appears
2050 * to be a deadlock in that rxi_FreeCall has a call locked and
2051 * DestroyConnectionNoLock locks each call in the conn. But note a
2052 * few lines up where we have removed this call from the conn.
2053 * If someone else destroys a connection, they either have no
2054 * call lock held or are going through this section of code.
2056 if (conn->flags & RX_CONN_DESTROY_ME) {
2057 MUTEX_ENTER(&conn->conn_data_lock);
2059 MUTEX_EXIT(&conn->conn_data_lock);
2060 #ifdef RX_ENABLE_LOCKS
2062 rxi_DestroyConnectionNoLock(conn);
2064 rxi_DestroyConnection(conn);
2065 #else /* RX_ENABLE_LOCKS */
2066 rxi_DestroyConnection(conn);
2067 #endif /* RX_ENABLE_LOCKS */
2071 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2072 char *rxi_Alloc(size)
2073 register size_t size;
2077 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2078 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2081 int glockOwner = ISAFS_GLOCK();
2085 MUTEX_ENTER(&rx_stats_mutex);
2086 rxi_Alloccnt++; rxi_Allocsize += size;
2087 MUTEX_EXIT(&rx_stats_mutex);
2088 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2089 if (size > AFS_SMALLOCSIZ) {
2090 p = (char *) osi_AllocMediumSpace(size);
2092 p = (char *) osi_AllocSmall(size, 1);
2093 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2098 p = (char *) osi_Alloc(size);
2100 if (!p) osi_Panic("rxi_Alloc error");
2105 void rxi_Free(addr, size)
2107 register size_t size;
2109 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2110 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2113 int glockOwner = ISAFS_GLOCK();
2117 MUTEX_ENTER(&rx_stats_mutex);
2118 rxi_Alloccnt--; rxi_Allocsize -= size;
2119 MUTEX_EXIT(&rx_stats_mutex);
2120 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2121 if (size > AFS_SMALLOCSIZ)
2122 osi_FreeMediumSpace(addr);
2124 osi_FreeSmall(addr);
2125 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2130 osi_Free(addr, size);
2134 /* Find the peer process represented by the supplied (host,port)
2135 * combination. If there is no appropriate active peer structure, a
2136 * new one will be allocated and initialized
2137 * The origPeer, if set, is a pointer to a peer structure on which the
2138 * refcount will be be decremented. This is used to replace the peer
2139 * structure hanging off a connection structure */
2140 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2141 register afs_uint32 host;
2142 register u_short port;
2143 struct rx_peer *origPeer;
2146 register struct rx_peer *pp;
2148 hashIndex = PEER_HASH(host, port);
2149 MUTEX_ENTER(&rx_peerHashTable_lock);
2150 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2151 if ((pp->host == host) && (pp->port == port)) break;
2155 pp = rxi_AllocPeer(); /* This bzero's *pp */
2156 pp->host = host; /* set here or in InitPeerParams is zero */
2158 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2159 queue_Init(&pp->congestionQueue);
2160 queue_Init(&pp->rpcStats);
2161 pp->next = rx_peerHashTable[hashIndex];
2162 rx_peerHashTable[hashIndex] = pp;
2163 rxi_InitPeerParams(pp);
2164 MUTEX_ENTER(&rx_stats_mutex);
2165 rx_stats.nPeerStructs++;
2166 MUTEX_EXIT(&rx_stats_mutex);
2173 origPeer->refCount--;
2174 MUTEX_EXIT(&rx_peerHashTable_lock);
2179 /* Find the connection at (host, port) started at epoch, and with the
2180 * given connection id. Creates the server connection if necessary.
2181 * The type specifies whether a client connection or a server
2182 * connection is desired. In both cases, (host, port) specify the
2183 * peer's (host, pair) pair. Client connections are not made
2184 * automatically by this routine. The parameter socket gives the
2185 * socket descriptor on which the packet was received. This is used,
2186 * in the case of server connections, to check that *new* connections
2187 * come via a valid (port, serviceId). Finally, the securityIndex
2188 * parameter must match the existing index for the connection. If a
2189 * server connection is created, it will be created using the supplied
2190 * index, if the index is valid for this service */
2191 struct rx_connection *
2192 rxi_FindConnection(socket, host, port, serviceId, cid,
2193 epoch, type, securityIndex)
2195 register afs_int32 host;
2196 register u_short port;
2201 u_int securityIndex;
2203 int hashindex, flag;
2204 register struct rx_connection *conn;
2205 struct rx_peer *peer;
2206 hashindex = CONN_HASH(host, port, cid, epoch, type);
2207 MUTEX_ENTER(&rx_connHashTable_lock);
2208 rxLastConn ? (conn = rxLastConn, flag = 0) :
2209 (conn = rx_connHashTable[hashindex], flag = 1);
2211 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2212 && (epoch == conn->epoch)) {
2213 register struct rx_peer *pp = conn->peer;
2214 if (securityIndex != conn->securityIndex) {
2215 /* this isn't supposed to happen, but someone could forge a packet
2216 like this, and there seems to be some CM bug that makes this
2217 happen from time to time -- in which case, the fileserver
2219 MUTEX_EXIT(&rx_connHashTable_lock);
2220 return (struct rx_connection *) 0;
2222 /* epoch's high order bits mean route for security reasons only on
2223 * the cid, not the host and port fields.
2225 if (conn->epoch & 0x80000000) break;
2226 if (((type == RX_CLIENT_CONNECTION)
2227 || (pp->host == host)) && (pp->port == port))
2232 /* the connection rxLastConn that was used the last time is not the
2233 ** one we are looking for now. Hence, start searching in the hash */
2235 conn = rx_connHashTable[hashindex];
2241 struct rx_service *service;
2242 if (type == RX_CLIENT_CONNECTION) {
2243 MUTEX_EXIT(&rx_connHashTable_lock);
2244 return (struct rx_connection *) 0;
2246 service = rxi_FindService(socket, serviceId);
2247 if (!service || (securityIndex >= service->nSecurityObjects)
2248 || (service->securityObjects[securityIndex] == 0)) {
2249 MUTEX_EXIT(&rx_connHashTable_lock);
2250 return (struct rx_connection *) 0;
2252 conn = rxi_AllocConnection(); /* This bzero's the connection */
2253 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2255 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2257 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2258 conn->next = rx_connHashTable[hashindex];
2259 rx_connHashTable[hashindex] = conn;
2260 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2261 conn->type = RX_SERVER_CONNECTION;
2262 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2263 conn->epoch = epoch;
2264 conn->cid = cid & RX_CIDMASK;
2265 /* conn->serial = conn->lastSerial = 0; */
2266 /* conn->timeout = 0; */
2267 conn->ackRate = RX_FAST_ACK_RATE;
2268 conn->service = service;
2269 conn->serviceId = serviceId;
2270 conn->securityIndex = securityIndex;
2271 conn->securityObject = service->securityObjects[securityIndex];
2272 conn->nSpecific = 0;
2273 conn->specific = NULL;
2274 rx_SetConnDeadTime(conn, service->connDeadTime);
2275 /* Notify security object of the new connection */
2276 RXS_NewConnection(conn->securityObject, conn);
2277 /* XXXX Connection timeout? */
2278 if (service->newConnProc) (*service->newConnProc)(conn);
2279 MUTEX_ENTER(&rx_stats_mutex);
2280 rx_stats.nServerConns++;
2281 MUTEX_EXIT(&rx_stats_mutex);
2285 /* Ensure that the peer structure is set up in such a way that
2286 ** replies in this connection go back to that remote interface
2287 ** from which the last packet was sent out. In case, this packet's
2288 ** source IP address does not match the peer struct for this conn,
2289 ** then drop the refCount on conn->peer and get a new peer structure.
2290 ** We can check the host,port field in the peer structure without the
2291 ** rx_peerHashTable_lock because the peer structure has its refCount
2292 ** incremented and the only time the host,port in the peer struct gets
2293 ** updated is when the peer structure is created.
2295 if (conn->peer->host == host )
2296 peer = conn->peer; /* no change to the peer structure */
2298 peer = rxi_FindPeer(host, port, conn->peer, 1);
2301 MUTEX_ENTER(&conn->conn_data_lock);
2304 MUTEX_EXIT(&conn->conn_data_lock);
2306 rxLastConn = conn; /* store this connection as the last conn used */
2307 MUTEX_EXIT(&rx_connHashTable_lock);
2311 /* There are two packet tracing routines available for testing and monitoring
2312 * Rx. One is called just after every packet is received and the other is
2313 * called just before every packet is sent. Received packets, have had their
2314 * headers decoded, and packets to be sent have not yet had their headers
2315 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2316 * containing the network address. Both can be modified. The return value, if
2317 * non-zero, indicates that the packet should be dropped. */
2319 int (*rx_justReceived)() = 0;
2320 int (*rx_almostSent)() = 0;
2322 /* A packet has been received off the interface. Np is the packet, socket is
2323 * the socket number it was received from (useful in determining which service
2324 * this packet corresponds to), and (host, port) reflect the host,port of the
2325 * sender. This call returns the packet to the caller if it is finished with
2326 * it, rather than de-allocating it, just as a small performance hack */
2328 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2329 register struct rx_packet *np;
2334 struct rx_call **newcallp;
2336 register struct rx_call *call;
2337 register struct rx_connection *conn;
2339 afs_uint32 currentCallNumber;
2345 struct rx_packet *tnp;
2348 /* We don't print out the packet until now because (1) the time may not be
2349 * accurate enough until now in the lwp implementation (rx_Listener only gets
2350 * the time after the packet is read) and (2) from a protocol point of view,
2351 * this is the first time the packet has been seen */
2352 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2353 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2354 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2355 np->header.serial, packetType, host, port, np->header.serviceId,
2356 np->header.epoch, np->header.cid, np->header.callNumber,
2357 np->header.seq, np->header.flags, np));
2360 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2361 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2364 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2365 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2368 /* If an input tracer function is defined, call it with the packet and
2369 * network address. Note this function may modify its arguments. */
2370 if (rx_justReceived) {
2371 struct sockaddr_in addr;
2373 addr.sin_family = AF_INET;
2374 addr.sin_port = port;
2375 addr.sin_addr.s_addr = host;
2376 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2377 addr.sin_len = sizeof(addr);
2378 #endif /* AFS_OSF_ENV */
2379 drop = (*rx_justReceived) (np, &addr);
2380 /* drop packet if return value is non-zero */
2381 if (drop) return np;
2382 port = addr.sin_port; /* in case fcn changed addr */
2383 host = addr.sin_addr.s_addr;
2387 /* If packet was not sent by the client, then *we* must be the client */
2388 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2389 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2391 /* Find the connection (or fabricate one, if we're the server & if
2392 * necessary) associated with this packet */
2393 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2394 np->header.cid, np->header.epoch, type,
2395 np->header.securityIndex);
2398 /* If no connection found or fabricated, just ignore the packet.
2399 * (An argument could be made for sending an abort packet for
2404 MUTEX_ENTER(&conn->conn_data_lock);
2405 if (conn->maxSerial < np->header.serial)
2406 conn->maxSerial = np->header.serial;
2407 MUTEX_EXIT(&conn->conn_data_lock);
2409 /* If the connection is in an error state, send an abort packet and ignore
2410 * the incoming packet */
2412 /* Don't respond to an abort packet--we don't want loops! */
2413 MUTEX_ENTER(&conn->conn_data_lock);
2414 if (np->header.type != RX_PACKET_TYPE_ABORT)
2415 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2417 MUTEX_EXIT(&conn->conn_data_lock);
2421 /* Check for connection-only requests (i.e. not call specific). */
2422 if (np->header.callNumber == 0) {
2423 switch (np->header.type) {
2424 case RX_PACKET_TYPE_ABORT:
2425 /* What if the supplied error is zero? */
2426 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2427 MUTEX_ENTER(&conn->conn_data_lock);
2429 MUTEX_EXIT(&conn->conn_data_lock);
2431 case RX_PACKET_TYPE_CHALLENGE:
2432 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2433 MUTEX_ENTER(&conn->conn_data_lock);
2435 MUTEX_EXIT(&conn->conn_data_lock);
2437 case RX_PACKET_TYPE_RESPONSE:
2438 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2439 MUTEX_ENTER(&conn->conn_data_lock);
2441 MUTEX_EXIT(&conn->conn_data_lock);
2443 case RX_PACKET_TYPE_PARAMS:
2444 case RX_PACKET_TYPE_PARAMS+1:
2445 case RX_PACKET_TYPE_PARAMS+2:
2446 /* ignore these packet types for now */
2447 MUTEX_ENTER(&conn->conn_data_lock);
2449 MUTEX_EXIT(&conn->conn_data_lock);
2454 /* Should not reach here, unless the peer is broken: send an
2456 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2457 MUTEX_ENTER(&conn->conn_data_lock);
2458 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2460 MUTEX_EXIT(&conn->conn_data_lock);
2465 channel = np->header.cid & RX_CHANNELMASK;
2466 call = conn->call[channel];
2467 #ifdef RX_ENABLE_LOCKS
2469 MUTEX_ENTER(&call->lock);
2470 /* Test to see if call struct is still attached to conn. */
2471 if (call != conn->call[channel]) {
2473 MUTEX_EXIT(&call->lock);
2474 if (type == RX_SERVER_CONNECTION) {
2475 call = conn->call[channel];
2476 /* If we started with no call attached and there is one now,
2477 * another thread is also running this routine and has gotten
2478 * the connection channel. We should drop this packet in the tests
2479 * below. If there was a call on this connection and it's now
2480 * gone, then we'll be making a new call below.
2481 * If there was previously a call and it's now different then
2482 * the old call was freed and another thread running this routine
2483 * has created a call on this channel. One of these two threads
2484 * has a packet for the old call and the code below handles those
2488 MUTEX_ENTER(&call->lock);
2491 /* This packet can't be for this call. If the new call address is
2492 * 0 then no call is running on this channel. If there is a call
2493 * then, since this is a client connection we're getting data for
2494 * it must be for the previous call.
2496 MUTEX_ENTER(&rx_stats_mutex);
2497 rx_stats.spuriousPacketsRead++;
2498 MUTEX_EXIT(&rx_stats_mutex);
2499 MUTEX_ENTER(&conn->conn_data_lock);
2501 MUTEX_EXIT(&conn->conn_data_lock);
2506 currentCallNumber = conn->callNumber[channel];
2508 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2509 if (np->header.callNumber < currentCallNumber) {
2510 MUTEX_ENTER(&rx_stats_mutex);
2511 rx_stats.spuriousPacketsRead++;
2512 MUTEX_EXIT(&rx_stats_mutex);
2513 #ifdef RX_ENABLE_LOCKS
2515 MUTEX_EXIT(&call->lock);
2517 MUTEX_ENTER(&conn->conn_data_lock);
2519 MUTEX_EXIT(&conn->conn_data_lock);
2523 call = rxi_NewCall(conn, channel);
2524 MUTEX_ENTER(&call->lock);
2525 *call->callNumber = np->header.callNumber;
2526 call->state = RX_STATE_PRECALL;
2527 clock_GetTime(&call->queueTime);
2528 hzero(call->bytesSent);
2529 hzero(call->bytesRcvd);
2530 rxi_KeepAliveOn(call);
2532 else if (np->header.callNumber != currentCallNumber) {
2533 /* Wait until the transmit queue is idle before deciding
2534 * whether to reset the current call. Chances are that the
2535 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2538 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2539 while ((call->state == RX_STATE_ACTIVE) &&
2540 (call->flags & RX_CALL_TQ_BUSY)) {
2541 call->flags |= RX_CALL_TQ_WAIT;
2542 #ifdef RX_ENABLE_LOCKS
2543 CV_WAIT(&call->cv_tq, &call->lock);
2544 #else /* RX_ENABLE_LOCKS */
2545 osi_rxSleep(&call->tq);
2546 #endif /* RX_ENABLE_LOCKS */
2548 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2549 /* If the new call cannot be taken right now send a busy and set
2550 * the error condition in this call, so that it terminates as
2551 * quickly as possible */
2552 if (call->state == RX_STATE_ACTIVE) {
2553 struct rx_packet *tp;
2555 rxi_CallError(call, RX_CALL_DEAD);
2556 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2557 MUTEX_EXIT(&call->lock);
2558 MUTEX_ENTER(&conn->conn_data_lock);
2560 MUTEX_EXIT(&conn->conn_data_lock);
2563 rxi_ResetCall(call, 0);
2564 *call->callNumber = np->header.callNumber;
2565 call->state = RX_STATE_PRECALL;
2566 clock_GetTime(&call->queueTime);
2567 hzero(call->bytesSent);
2568 hzero(call->bytesRcvd);
2570 * If the number of queued calls exceeds the overload
2571 * threshold then abort this call.
2573 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2574 struct rx_packet *tp;
2576 rxi_CallError(call, rx_BusyError);
2577 tp = rxi_SendCallAbort(call, np, 1, 0);
2578 MUTEX_EXIT(&call->lock);
2579 MUTEX_ENTER(&conn->conn_data_lock);
2581 MUTEX_EXIT(&conn->conn_data_lock);
2584 rxi_KeepAliveOn(call);
2587 /* Continuing call; do nothing here. */
2589 } else { /* we're the client */
2590 /* Ignore all incoming acknowledgements for calls in DALLY state */
2591 if ( call && (call->state == RX_STATE_DALLY)
2592 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2593 MUTEX_ENTER(&rx_stats_mutex);
2594 rx_stats.ignorePacketDally++;
2595 MUTEX_EXIT(&rx_stats_mutex);
2596 #ifdef RX_ENABLE_LOCKS
2598 MUTEX_EXIT(&call->lock);
2601 MUTEX_ENTER(&conn->conn_data_lock);
2603 MUTEX_EXIT(&conn->conn_data_lock);
2607 /* Ignore anything that's not relevant to the current call. If there
2608 * isn't a current call, then no packet is relevant. */
2609 if (!call || (np->header.callNumber != currentCallNumber)) {
2610 MUTEX_ENTER(&rx_stats_mutex);
2611 rx_stats.spuriousPacketsRead++;
2612 MUTEX_EXIT(&rx_stats_mutex);
2613 #ifdef RX_ENABLE_LOCKS
2615 MUTEX_EXIT(&call->lock);
2618 MUTEX_ENTER(&conn->conn_data_lock);
2620 MUTEX_EXIT(&conn->conn_data_lock);
2623 /* If the service security object index stamped in the packet does not
2624 * match the connection's security index, ignore the packet */
2625 if (np->header.securityIndex != conn->securityIndex) {
2626 #ifdef RX_ENABLE_LOCKS
2627 MUTEX_EXIT(&call->lock);
2629 MUTEX_ENTER(&conn->conn_data_lock);
2631 MUTEX_EXIT(&conn->conn_data_lock);
2635 /* If we're receiving the response, then all transmit packets are
2636 * implicitly acknowledged. Get rid of them. */
2637 if (np->header.type == RX_PACKET_TYPE_DATA) {
2638 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2639 /* XXX Hack. Because we must release the global rx lock when
2640 * sending packets (osi_NetSend) we drop all acks while we're
2641 * traversing the tq in rxi_Start sending packets out because
2642 * packets may move to the freePacketQueue as result of being here!
2643 * So we drop these packets until we're safely out of the
2644 * traversing. Really ugly!
2645 * For fine grain RX locking, we set the acked field in the
2646 * packets and let rxi_Start remove them from the transmit queue.
2648 if (call->flags & RX_CALL_TQ_BUSY) {
2649 #ifdef RX_ENABLE_LOCKS
2650 rxi_SetAcksInTransmitQueue(call);
2653 return np; /* xmitting; drop packet */
2657 rxi_ClearTransmitQueue(call, 0);
2659 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2660 rxi_ClearTransmitQueue(call, 0);
2661 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2663 if (np->header.type == RX_PACKET_TYPE_ACK) {
2664 /* now check to see if this is an ack packet acknowledging that the
2665 * server actually *lost* some hard-acked data. If this happens we
2666 * ignore this packet, as it may indicate that the server restarted in
2667 * the middle of a call. It is also possible that this is an old ack
2668 * packet. We don't abort the connection in this case, because this
2669 * *might* just be an old ack packet. The right way to detect a server
2670 * restart in the midst of a call is to notice that the server epoch
2672 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2673 * XXX unacknowledged. I think that this is off-by-one, but
2674 * XXX I don't dare change it just yet, since it will
2675 * XXX interact badly with the server-restart detection
2676 * XXX code in receiveackpacket. */
2677 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2678 MUTEX_ENTER(&rx_stats_mutex);
2679 rx_stats.spuriousPacketsRead++;
2680 MUTEX_EXIT(&rx_stats_mutex);
2681 MUTEX_EXIT(&call->lock);
2682 MUTEX_ENTER(&conn->conn_data_lock);
2684 MUTEX_EXIT(&conn->conn_data_lock);
2688 } /* else not a data packet */
2691 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2692 /* Set remote user defined status from packet */
2693 call->remoteStatus = np->header.userStatus;
2695 /* Note the gap between the expected next packet and the actual
2696 * packet that arrived, when the new packet has a smaller serial number
2697 * than expected. Rioses frequently reorder packets all by themselves,
2698 * so this will be quite important with very large window sizes.
2699 * Skew is checked against 0 here to avoid any dependence on the type of
2700 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2702 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2703 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2704 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2706 MUTEX_ENTER(&conn->conn_data_lock);
2707 skew = conn->lastSerial - np->header.serial;
2708 conn->lastSerial = np->header.serial;
2709 MUTEX_EXIT(&conn->conn_data_lock);
2711 register struct rx_peer *peer;
2713 if (skew > peer->inPacketSkew) {
2714 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2715 peer->inPacketSkew = skew;
2719 /* Now do packet type-specific processing */
2720 switch (np->header.type) {
2721 case RX_PACKET_TYPE_DATA:
2722 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2725 case RX_PACKET_TYPE_ACK:
2726 /* Respond immediately to ack packets requesting acknowledgement
2728 if (np->header.flags & RX_REQUEST_ACK) {
2729 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2730 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2732 np = rxi_ReceiveAckPacket(call, np, 1);
2734 case RX_PACKET_TYPE_ABORT:
2735 /* An abort packet: reset the connection, passing the error up to
2737 /* What if error is zero? */
2738 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2740 case RX_PACKET_TYPE_BUSY:
2743 case RX_PACKET_TYPE_ACKALL:
2744 /* All packets acknowledged, so we can drop all packets previously
2745 * readied for sending */
2746 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2747 /* XXX Hack. We because we can't release the global rx lock when
2748 * sending packets (osi_NetSend) we drop all ack pkts while we're
2749 * traversing the tq in rxi_Start sending packets out because
2750 * packets may move to the freePacketQueue as result of being
2751 * here! So we drop these packets until we're safely out of the
2752 * traversing. Really ugly!
2753 * For fine grain RX locking, we set the acked field in the packets
2754 * and let rxi_Start remove the packets from the transmit queue.
2756 if (call->flags & RX_CALL_TQ_BUSY) {
2757 #ifdef RX_ENABLE_LOCKS
2758 rxi_SetAcksInTransmitQueue(call);
2760 #else /* RX_ENABLE_LOCKS */
2762 return np; /* xmitting; drop packet */
2763 #endif /* RX_ENABLE_LOCKS */
2765 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2766 rxi_ClearTransmitQueue(call, 0);
2769 /* Should not reach here, unless the peer is broken: send an abort
2771 rxi_CallError(call, RX_PROTOCOL_ERROR);
2772 np = rxi_SendCallAbort(call, np, 1, 0);
2775 /* Note when this last legitimate packet was received, for keep-alive
2776 * processing. Note, we delay getting the time until now in the hope that
2777 * the packet will be delivered to the user before any get time is required
2778 * (if not, then the time won't actually be re-evaluated here). */
2779 call->lastReceiveTime = clock_Sec();
2780 MUTEX_EXIT(&call->lock);
2781 MUTEX_ENTER(&conn->conn_data_lock);
2783 MUTEX_EXIT(&conn->conn_data_lock);
2787 /* return true if this is an "interesting" connection from the point of view
2788 of someone trying to debug the system */
2789 int rxi_IsConnInteresting(struct rx_connection *aconn)
2792 register struct rx_call *tcall;
2794 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2796 for(i=0;i<RX_MAXCALLS;i++) {
2797 tcall = aconn->call[i];
2799 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2801 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2809 /* if this is one of the last few packets AND it wouldn't be used by the
2810 receiving call to immediately satisfy a read request, then drop it on
2811 the floor, since accepting it might prevent a lock-holding thread from
2812 making progress in its reading. If a call has been cleared while in
2813 the precall state then ignore all subsequent packets until the call
2814 is assigned to a thread. */
2816 static TooLow(ap, acall)
2817 struct rx_call *acall;
2818 struct rx_packet *ap; {
2820 MUTEX_ENTER(&rx_stats_mutex);
2821 if (((ap->header.seq != 1) &&
2822 (acall->flags & RX_CALL_CLEARED) &&
2823 (acall->state == RX_STATE_PRECALL)) ||
2824 ((rx_nFreePackets < rxi_dataQuota+2) &&
2825 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2826 && (acall->flags & RX_CALL_READER_WAIT)))) {
2829 MUTEX_EXIT(&rx_stats_mutex);
2834 /* try to attach call, if authentication is complete */
2835 static void TryAttach(acall, socket, tnop, newcallp)
2836 register struct rx_call *acall;
2837 register osi_socket socket;
2839 register struct rx_call **newcallp; {
2840 register struct rx_connection *conn;
2842 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2843 /* Don't attach until we have any req'd. authentication. */
2844 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2845 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2846 /* Note: this does not necessarily succeed; there
2847 may not any proc available */
2850 rxi_ChallengeOn(acall->conn);
2855 /* A data packet has been received off the interface. This packet is
2856 * appropriate to the call (the call is in the right state, etc.). This
2857 * routine can return a packet to the caller, for re-use */
2859 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2860 port, tnop, newcallp)
2861 register struct rx_call *call;
2862 register struct rx_packet *np;
2868 struct rx_call **newcallp;
2874 afs_uint32 seq, serial, flags;
2876 struct rx_packet *tnp;
2878 MUTEX_ENTER(&rx_stats_mutex);
2879 rx_stats.dataPacketsRead++;
2880 MUTEX_EXIT(&rx_stats_mutex);
2883 /* If there are no packet buffers, drop this new packet, unless we can find
2884 * packet buffers from inactive calls */
2886 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2887 MUTEX_ENTER(&rx_freePktQ_lock);
2888 rxi_NeedMorePackets = TRUE;
2889 MUTEX_EXIT(&rx_freePktQ_lock);
2890 MUTEX_ENTER(&rx_stats_mutex);
2891 rx_stats.noPacketBuffersOnRead++;
2892 MUTEX_EXIT(&rx_stats_mutex);
2893 call->rprev = np->header.serial;
2894 rxi_calltrace(RX_TRACE_DROP, call);
2895 dpf (("packet %x dropped on receipt - quota problems", np));
2897 rxi_ClearReceiveQueue(call);
2898 clock_GetTime(&when);
2899 clock_Add(&when, &rx_softAckDelay);
2900 if (!call->delayedAckEvent ||
2901 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2902 rxevent_Cancel(call->delayedAckEvent, call,
2903 RX_CALL_REFCOUNT_DELAY);
2904 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2905 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2908 /* we've damaged this call already, might as well do it in. */
2914 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2915 * packet is one of several packets transmitted as a single
2916 * datagram. Do not send any soft or hard acks until all packets
2917 * in a jumbogram have been processed. Send negative acks right away.
2919 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2920 /* tnp is non-null when there are more packets in the
2921 * current jumbo gram */
2928 seq = np->header.seq;
2929 serial = np->header.serial;
2930 flags = np->header.flags;
2932 /* If the call is in an error state, send an abort message */
2934 return rxi_SendCallAbort(call, np, istack, 0);
2936 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2937 * AFS 3.5 jumbogram. */
2938 if (flags & RX_JUMBO_PACKET) {
2939 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2944 if (np->header.spare != 0) {
2945 MUTEX_ENTER(&call->conn->conn_data_lock);
2946 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2947 MUTEX_EXIT(&call->conn->conn_data_lock);
2950 /* The usual case is that this is the expected next packet */
2951 if (seq == call->rnext) {
2953 /* Check to make sure it is not a duplicate of one already queued */
2954 if (queue_IsNotEmpty(&call->rq)
2955 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2956 MUTEX_ENTER(&rx_stats_mutex);
2957 rx_stats.dupPacketsRead++;
2958 MUTEX_EXIT(&rx_stats_mutex);
2959 dpf (("packet %x dropped on receipt - duplicate", np));
2960 rxevent_Cancel(call->delayedAckEvent, call,
2961 RX_CALL_REFCOUNT_DELAY);
2962 np = rxi_SendAck(call, np, seq, serial,
2963 flags, RX_ACK_DUPLICATE, istack);
2969 /* It's the next packet. Stick it on the receive queue
2970 * for this call. Set newPackets to make sure we wake
2971 * the reader once all packets have been processed */
2972 queue_Prepend(&call->rq, np);
2974 np = NULL; /* We can't use this anymore */
2977 /* If an ack is requested then set a flag to make sure we
2978 * send an acknowledgement for this packet */
2979 if (flags & RX_REQUEST_ACK) {
2983 /* Keep track of whether we have received the last packet */
2984 if (flags & RX_LAST_PACKET) {
2985 call->flags |= RX_CALL_HAVE_LAST;
2989 /* Check whether we have all of the packets for this call */
2990 if (call->flags & RX_CALL_HAVE_LAST) {
2991 afs_uint32 tseq; /* temporary sequence number */
2992 struct rx_packet *tp; /* Temporary packet pointer */
2993 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2995 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2996 if (tseq != tp->header.seq)
2998 if (tp->header.flags & RX_LAST_PACKET) {
2999 call->flags |= RX_CALL_RECEIVE_DONE;
3006 /* Provide asynchronous notification for those who want it
3007 * (e.g. multi rx) */
3008 if (call->arrivalProc) {
3009 (*call->arrivalProc)(call, call->arrivalProcHandle,
3010 call->arrivalProcArg);
3011 call->arrivalProc = (VOID (*)()) 0;
3014 /* Update last packet received */
3017 /* If there is no server process serving this call, grab
3018 * one, if available. We only need to do this once. If a
3019 * server thread is available, this thread becomes a server
3020 * thread and the server thread becomes a listener thread. */
3022 TryAttach(call, socket, tnop, newcallp);
3025 /* This is not the expected next packet. */
3027 /* Determine whether this is a new or old packet, and if it's
3028 * a new one, whether it fits into the current receive window.
3029 * Also figure out whether the packet was delivered in sequence.
3030 * We use the prev variable to determine whether the new packet
3031 * is the successor of its immediate predecessor in the
3032 * receive queue, and the missing flag to determine whether
3033 * any of this packets predecessors are missing. */
3035 afs_uint32 prev; /* "Previous packet" sequence number */
3036 struct rx_packet *tp; /* Temporary packet pointer */
3037 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3038 int missing; /* Are any predecessors missing? */
3040 /* If the new packet's sequence number has been sent to the
3041 * application already, then this is a duplicate */
3042 if (seq < call->rnext) {
3043 MUTEX_ENTER(&rx_stats_mutex);
3044 rx_stats.dupPacketsRead++;
3045 MUTEX_EXIT(&rx_stats_mutex);
3046 rxevent_Cancel(call->delayedAckEvent, call,
3047 RX_CALL_REFCOUNT_DELAY);
3048 np = rxi_SendAck(call, np, seq, serial,
3049 flags, RX_ACK_DUPLICATE, istack);
3055 /* If the sequence number is greater than what can be
3056 * accomodated by the current window, then send a negative
3057 * acknowledge and drop the packet */
3058 if ((call->rnext + call->rwind) <= seq) {
3059 rxevent_Cancel(call->delayedAckEvent, call,
3060 RX_CALL_REFCOUNT_DELAY);
3061 np = rxi_SendAck(call, np, seq, serial,
3062 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3068 /* Look for the packet in the queue of old received packets */
3069 for (prev = call->rnext - 1, missing = 0,
3070 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3071 /*Check for duplicate packet */
3072 if (seq == tp->header.seq) {
3073 MUTEX_ENTER(&rx_stats_mutex);
3074 rx_stats.dupPacketsRead++;
3075 MUTEX_EXIT(&rx_stats_mutex);
3076 rxevent_Cancel(call->delayedAckEvent, call,
3077 RX_CALL_REFCOUNT_DELAY);
3078 np = rxi_SendAck(call, np, seq, serial,
3079 flags, RX_ACK_DUPLICATE, istack);
3084 /* If we find a higher sequence packet, break out and
3085 * insert the new packet here. */
3086 if (seq < tp->header.seq) break;
3087 /* Check for missing packet */
3088 if (tp->header.seq != prev+1) {
3092 prev = tp->header.seq;
3095 /* Keep track of whether we have received the last packet. */
3096 if (flags & RX_LAST_PACKET) {
3097 call->flags |= RX_CALL_HAVE_LAST;
3100 /* It's within the window: add it to the the receive queue.
3101 * tp is left by the previous loop either pointing at the
3102 * packet before which to insert the new packet, or at the
3103 * queue head if the queue is empty or the packet should be
3105 queue_InsertBefore(tp, np);
3109 /* Check whether we have all of the packets for this call */
3110 if ((call->flags & RX_CALL_HAVE_LAST)
3111 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3112 afs_uint32 tseq; /* temporary sequence number */
3114 for (tseq = call->rnext,
3115 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3116 if (tseq != tp->header.seq)
3118 if (tp->header.flags & RX_LAST_PACKET) {
3119 call->flags |= RX_CALL_RECEIVE_DONE;
3126 /* We need to send an ack of the packet is out of sequence,
3127 * or if an ack was requested by the peer. */
3128 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3132 /* Acknowledge the last packet for each call */
3133 if (flags & RX_LAST_PACKET) {
3144 * If the receiver is waiting for an iovec, fill the iovec
3145 * using the data from the receive queue */
3146 if (call->flags & RX_CALL_IOVEC_WAIT) {
3147 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3148 /* the call may have been aborted */
3157 /* Wakeup the reader if any */
3158 if ((call->flags & RX_CALL_READER_WAIT) &&
3159 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3160 (call->iovNext >= call->iovMax) ||
3161 (call->flags & RX_CALL_RECEIVE_DONE))) {
3162 call->flags &= ~RX_CALL_READER_WAIT;
3163 #ifdef RX_ENABLE_LOCKS
3164 CV_BROADCAST(&call->cv_rq);
3166 osi_rxWakeup(&call->rq);
3172 * Send an ack when requested by the peer, or once every
3173 * rxi_SoftAckRate packets until the last packet has been
3174 * received. Always send a soft ack for the last packet in
3175 * the server's reply. */
3177 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3178 np = rxi_SendAck(call, np, seq, serial, flags,
3179 RX_ACK_REQUESTED, istack);
3180 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3181 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3182 np = rxi_SendAck(call, np, seq, serial, flags,
3183 RX_ACK_IDLE, istack);
3184 } else if (call->nSoftAcks) {
3185 clock_GetTime(&when);
3186 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3187 clock_Add(&when, &rx_lastAckDelay);
3189 clock_Add(&when, &rx_softAckDelay);
3191 if (!call->delayedAckEvent ||
3192 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3193 rxevent_Cancel(call->delayedAckEvent, call,
3194 RX_CALL_REFCOUNT_DELAY);
3195 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3196 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3199 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3200 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3207 static void rxi_ComputeRate();
3210 /* The real smarts of the whole thing. */
3211 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3212 register struct rx_call *call;
3213 struct rx_packet *np;
3216 struct rx_ackPacket *ap;
3218 register struct rx_packet *tp;
3219 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3220 register struct rx_connection *conn = call->conn;
3221 struct rx_peer *peer = conn->peer;
3224 /* because there are CM's that are bogus, sending weird values for this. */
3225 afs_uint32 skew = 0;
3230 int newAckCount = 0;
3231 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3232 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3234 MUTEX_ENTER(&rx_stats_mutex);
3235 rx_stats.ackPacketsRead++;
3236 MUTEX_EXIT(&rx_stats_mutex);
3237 ap = (struct rx_ackPacket *) rx_DataOf(np);
3238 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3240 return np; /* truncated ack packet */
3242 /* depends on ack packet struct */
3243 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3244 first = ntohl(ap->firstPacket);
3245 serial = ntohl(ap->serial);
3246 /* temporarily disabled -- needs to degrade over time
3247 skew = ntohs(ap->maxSkew); */
3249 /* Ignore ack packets received out of order */
3250 if (first < call->tfirst) {
3254 if (np->header.flags & RX_SLOW_START_OK) {
3255 call->flags |= RX_CALL_SLOW_START_OK;
3261 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3262 ap->reason, ntohl(ap->previousPacket),
3263 (unsigned int) np->header.seq, (unsigned int) serial,
3264 (unsigned int) skew, ntohl(ap->firstPacket));
3267 for (offset = 0; offset < nAcks; offset++)
3268 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3274 /* if a server connection has been re-created, it doesn't remember what
3275 serial # it was up to. An ack will tell us, since the serial field
3276 contains the largest serial received by the other side */
3277 MUTEX_ENTER(&conn->conn_data_lock);
3278 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3279 conn->serial = serial+1;
3281 MUTEX_EXIT(&conn->conn_data_lock);
3283 /* Update the outgoing packet skew value to the latest value of
3284 * the peer's incoming packet skew value. The ack packet, of
3285 * course, could arrive out of order, but that won't affect things
3287 MUTEX_ENTER(&peer->peer_lock);
3288 peer->outPacketSkew = skew;
3290 /* Check for packets that no longer need to be transmitted, and
3291 * discard them. This only applies to packets positively
3292 * acknowledged as having been sent to the peer's upper level.
3293 * All other packets must be retained. So only packets with
3294 * sequence numbers < ap->firstPacket are candidates. */
3295 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3296 if (tp->header.seq >= first) break;
3297 call->tfirst = tp->header.seq + 1;
3298 if (tp->header.serial == serial) {
3299 /* Use RTT if not delayed by client. */
3300 if (ap->reason != RX_ACK_DELAY)
3301 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3303 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3306 else if (tp->firstSerial == serial) {
3307 /* Use RTT if not delayed by client. */
3308 if (ap->reason != RX_ACK_DELAY)
3309 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3311 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3314 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3315 /* XXX Hack. Because we have to release the global rx lock when sending
3316 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3317 * in rxi_Start sending packets out because packets may move to the
3318 * freePacketQueue as result of being here! So we drop these packets until
3319 * we're safely out of the traversing. Really ugly!
3320 * To make it even uglier, if we're using fine grain locking, we can
3321 * set the ack bits in the packets and have rxi_Start remove the packets
3322 * when it's done transmitting.
3327 if (call->flags & RX_CALL_TQ_BUSY) {
3328 #ifdef RX_ENABLE_LOCKS
3330 call->flags |= RX_CALL_TQ_SOME_ACKED;
3331 #else /* RX_ENABLE_LOCKS */
3333 #endif /* RX_ENABLE_LOCKS */
3335 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3338 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3343 /* Give rate detector a chance to respond to ping requests */
3344 if (ap->reason == RX_ACK_PING_RESPONSE) {
3345 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3349 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3351 /* Now go through explicit acks/nacks and record the results in
3352 * the waiting packets. These are packets that can't be released
3353 * yet, even with a positive acknowledge. This positive
3354 * acknowledge only means the packet has been received by the
3355 * peer, not that it will be retained long enough to be sent to
3356 * the peer's upper level. In addition, reset the transmit timers
3357 * of any missing packets (those packets that must be missing
3358 * because this packet was out of sequence) */
3360 call->nSoftAcked = 0;
3361 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3362 /* Update round trip time if the ack was stimulated on receipt
3364 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3365 #ifdef RX_ENABLE_LOCKS
3366 if (tp->header.seq >= first) {
3367 #endif /* RX_ENABLE_LOCKS */
3368 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3369 if (tp->header.serial == serial) {
3370 /* Use RTT if not delayed by client. */
3371 if (ap->reason != RX_ACK_DELAY)
3372 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3374 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3377 else if ((tp->firstSerial == serial)) {
3378 /* Use RTT if not delayed by client. */
3379 if (ap->reason != RX_ACK_DELAY)
3380 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3382 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3385 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3386 #ifdef RX_ENABLE_LOCKS
3388 #endif /* RX_ENABLE_LOCKS */
3389 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3391 /* Set the acknowledge flag per packet based on the
3392 * information in the ack packet. An acknowlegded packet can
3393 * be downgraded when the server has discarded a packet it
3394 * soacked previously, or when an ack packet is received
3395 * out of sequence. */
3396 if (tp->header.seq < first) {
3397 /* Implicit ack information */
3403 else if (tp->header.seq < first + nAcks) {
3404 /* Explicit ack information: set it in the packet appropriately */
3405 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3425 /* If packet isn't yet acked, and it has been transmitted at least
3426 * once, reset retransmit time using latest timeout
3427 * ie, this should readjust the retransmit timer for all outstanding
3428 * packets... So we don't just retransmit when we should know better*/
3430 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3431 tp->retryTime = tp->timeSent;
3432 clock_Add(&tp->retryTime, &peer->timeout);
3433 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3434 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3438 /* If the window has been extended by this acknowledge packet,
3439 * then wakeup a sender waiting in alloc for window space, or try
3440 * sending packets now, if he's been sitting on packets due to
3441 * lack of window space */
3442 if (call->tnext < (call->tfirst + call->twind)) {
3443 #ifdef RX_ENABLE_LOCKS
3444 CV_SIGNAL(&call->cv_twind);
3446 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3447 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3448 osi_rxWakeup(&call->twind);
3451 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3452 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3456 /* if the ack packet has a receivelen field hanging off it,
3457 * update our state */
3458 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3461 /* If the ack packet has a "recommended" size that is less than
3462 * what I am using now, reduce my size to match */
3463 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3464 sizeof(afs_int32), &tSize);
3465 tSize = (afs_uint32) ntohl(tSize);
3466 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3468 /* Get the maximum packet size to send to this peer */
3469 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3471 tSize = (afs_uint32)ntohl(tSize);
3472 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3473 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3475 /* sanity check - peer might have restarted with different params.
3476 * If peer says "send less", dammit, send less... Peer should never
3477 * be unable to accept packets of the size that prior AFS versions would
3478 * send without asking. */
3479 if (peer->maxMTU != tSize) {
3480 peer->maxMTU = tSize;
3481 peer->MTU = MIN(tSize, peer->MTU);
3482 call->MTU = MIN(call->MTU, tSize);
3486 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3488 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3489 sizeof(afs_int32), &tSize);
3490 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3491 if (tSize < call->twind) { /* smaller than our send */
3492 call->twind = tSize; /* window, we must send less... */
3493 call->ssthresh = MIN(call->twind, call->ssthresh);
3496 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3497 * network MTU confused with the loopback MTU. Calculate the
3498 * maximum MTU here for use in the slow start code below.
3500 maxMTU = peer->maxMTU;
3501 /* Did peer restart with older RX version? */
3502 if (peer->maxDgramPackets > 1) {
3503 peer->maxDgramPackets = 1;
3505 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3507 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3508 sizeof(afs_int32), &tSize);
3509 tSize = (afs_uint32) ntohl(tSize);
3511 * As of AFS 3.5 we set the send window to match the receive window.
3513 if (tSize < call->twind) {
3514 call->twind = tSize;
3515 call->ssthresh = MIN(call->twind, call->ssthresh);
3516 } else if (tSize > call->twind) {
3517 call->twind = tSize;
3521 * As of AFS 3.5, a jumbogram is more than one fixed size
3522 * packet transmitted in a single UDP datagram. If the remote
3523 * MTU is smaller than our local MTU then never send a datagram
3524 * larger than the natural MTU.
3526 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3527 sizeof(afs_int32), &tSize);
3528 maxDgramPackets = (afs_uint32) ntohl(tSize);
3529 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3530 maxDgramPackets = MIN(maxDgramPackets,
3531 (int)(peer->ifDgramPackets));
3532 maxDgramPackets = MIN(maxDgramPackets, tSize);
3533 if (maxDgramPackets > 1) {
3534 peer->maxDgramPackets = maxDgramPackets;
3535 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3537 peer->maxDgramPackets = 1;
3538 call->MTU = peer->natMTU;
3540 } else if (peer->maxDgramPackets > 1) {
3541 /* Restarted with lower version of RX */
3542 peer->maxDgramPackets = 1;
3544 } else if (peer->maxDgramPackets > 1 ||
3545 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3546 /* Restarted with lower version of RX */
3547 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3548 peer->natMTU = OLD_MAX_PACKET_SIZE;
3549 peer->MTU = OLD_MAX_PACKET_SIZE;
3550 peer->maxDgramPackets = 1;
3551 peer->nDgramPackets = 1;
3553 call->MTU = OLD_MAX_PACKET_SIZE;
3558 * Calculate how many datagrams were successfully received after
3559 * the first missing packet and adjust the negative ack counter
3564 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3565 if (call->nNacks < nNacked) {
3566 call->nNacks = nNacked;
3575 if (call->flags & RX_CALL_FAST_RECOVER) {
3577 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3579 call->flags &= ~RX_CALL_FAST_RECOVER;
3580 call->cwind = call->nextCwind;
3581 call->nextCwind = 0;
3584 call->nCwindAcks = 0;
3586 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3587 /* Three negative acks in a row trigger congestion recovery */
3588 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3589 MUTEX_EXIT(&peer->peer_lock);
3590 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3591 /* someone else is waiting to start recovery */
3594 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3595 while (call->flags & RX_CALL_TQ_BUSY) {
3596 call->flags |= RX_CALL_TQ_WAIT;
3597 #ifdef RX_ENABLE_LOCKS
3598 CV_WAIT(&call->cv_tq, &call->lock);
3599 #else /* RX_ENABLE_LOCKS */
3600 osi_rxSleep(&call->tq);
3601 #endif /* RX_ENABLE_LOCKS */
3603 MUTEX_ENTER(&peer->peer_lock);
3604 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3605 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3606 call->flags |= RX_CALL_FAST_RECOVER;
3607 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3608 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3610 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3611 call->nextCwind = call->ssthresh;
3614 peer->MTU = call->MTU;
3615 peer->cwind = call->nextCwind;
3616 peer->nDgramPackets = call->nDgramPackets;
3618 call->congestSeq = peer->congestSeq;
3619 /* Reset the resend times on the packets that were nacked
3620 * so we will retransmit as soon as the window permits*/
3621 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3624 clock_Zero(&tp->retryTime);
3626 } else if (tp->acked) {
3631 /* If cwind is smaller than ssthresh, then increase
3632 * the window one packet for each ack we receive (exponential
3634 * If cwind is greater than or equal to ssthresh then increase
3635 * the congestion window by one packet for each cwind acks we
3636 * receive (linear growth). */
3637 if (call->cwind < call->ssthresh) {
3638 call->cwind = MIN((int)call->ssthresh,
3639 (int)(call->cwind + newAckCount));
3640 call->nCwindAcks = 0;
3642 call->nCwindAcks += newAckCount;
3643 if (call->nCwindAcks >= call->cwind) {
3644 call->nCwindAcks = 0;
3645 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3649 * If we have received several acknowledgements in a row then
3650 * it is time to increase the size of our datagrams
3652 if ((int)call->nAcks > rx_nDgramThreshold) {
3653 if (peer->maxDgramPackets > 1) {
3654 if (call->nDgramPackets < peer->maxDgramPackets) {
3655 call->nDgramPackets++;
3657 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3658 } else if (call->MTU < peer->maxMTU) {
3659 call->MTU += peer->natMTU;
3660 call->MTU = MIN(call->MTU, peer->maxMTU);
3666 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3668 /* Servers need to hold the call until all response packets have
3669 * been acknowledged. Soft acks are good enough since clients
3670 * are not allowed to clear their receive queues. */
3671 if (call->state == RX_STATE_HOLD &&
3672 call->tfirst + call->nSoftAcked >= call->tnext) {
3673 call->state = RX_STATE_DALLY;
3674 rxi_ClearTransmitQueue(call, 0);
3675 } else if (!queue_IsEmpty(&call->tq)) {
3676 rxi_Start(0, call, istack);
3681 /* Received a response to a challenge packet */
3682 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3683 register struct rx_connection *conn;
3684 register struct rx_packet *np;
3689 /* Ignore the packet if we're the client */
3690 if (conn->type == RX_CLIENT_CONNECTION) return np;
3692 /* If already authenticated, ignore the packet (it's probably a retry) */
3693 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3696 /* Otherwise, have the security object evaluate the response packet */
3697 error = RXS_CheckResponse(conn->securityObject, conn, np);
3699 /* If the response is invalid, reset the connection, sending
3700 * an abort to the peer */
3704 rxi_ConnectionError(conn, error);
3705 MUTEX_ENTER(&conn->conn_data_lock);
3706 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3707 MUTEX_EXIT(&conn->conn_data_lock);
3711 /* If the response is valid, any calls waiting to attach
3712 * servers can now do so */
3714 for (i=0; i<RX_MAXCALLS; i++) {
3715 struct rx_call *call = conn->call[i];
3717 MUTEX_ENTER(&call->lock);
3718 if (call->state == RX_STATE_PRECALL)
3719 rxi_AttachServerProc(call, -1, NULL, NULL);
3720 MUTEX_EXIT(&call->lock);
3727 /* A client has received an authentication challenge: the security
3728 * object is asked to cough up a respectable response packet to send
3729 * back to the server. The server is responsible for retrying the
3730 * challenge if it fails to get a response. */
3733 rxi_ReceiveChallengePacket(conn, np, istack)
3734 register struct rx_connection *conn;
3735 register struct rx_packet *np;
3740 /* Ignore the challenge if we're the server */
3741 if (conn->type == RX_SERVER_CONNECTION) return np;
3743 /* Ignore the challenge if the connection is otherwise idle; someone's
3744 * trying to use us as an oracle. */
3745 if (!rxi_HasActiveCalls(conn)) return np;
3747 /* Send the security object the challenge packet. It is expected to fill
3748 * in the response. */
3749 error = RXS_GetResponse(conn->securityObject, conn, np);
3751 /* If the security object is unable to return a valid response, reset the
3752 * connection and send an abort to the peer. Otherwise send the response
3753 * packet to the peer connection. */
3755 rxi_ConnectionError(conn, error);
3756 MUTEX_ENTER(&conn->conn_data_lock);
3757 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3758 MUTEX_EXIT(&conn->conn_data_lock);
3761 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3762 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3768 /* Find an available server process to service the current request in
3769 * the given call structure. If one isn't available, queue up this
3770 * call so it eventually gets one */
3772 rxi_AttachServerProc(call, socket, tnop, newcallp)
3773 register struct rx_call *call;
3774 register osi_socket socket;
3776 register struct rx_call **newcallp;
3778 register struct rx_serverQueueEntry *sq;
3779 register struct rx_service *service = call->conn->service;
3780 #ifdef RX_ENABLE_LOCKS
3781 register int haveQuota = 0;
3782 #endif /* RX_ENABLE_LOCKS */
3783 /* May already be attached */
3784 if (call->state == RX_STATE_ACTIVE) return;
3786 MUTEX_ENTER(&rx_serverPool_lock);
3787 #ifdef RX_ENABLE_LOCKS
3788 while(rxi_ServerThreadSelectingCall) {
3789 MUTEX_EXIT(&call->lock);
3790 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3791 MUTEX_EXIT(&rx_serverPool_lock);
3792 MUTEX_ENTER(&call->lock);
3793 MUTEX_ENTER(&rx_serverPool_lock);
3794 /* Call may have been attached */
3795 if (call->state == RX_STATE_ACTIVE) return;
3798 haveQuota = QuotaOK(service);
3799 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3800 /* If there are no processes available to service this call,
3801 * put the call on the incoming call queue (unless it's
3802 * already on the queue).
3805 ReturnToServerPool(service);
3806 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3807 call->flags |= RX_CALL_WAIT_PROC;
3808 MUTEX_ENTER(&rx_stats_mutex);
3810 MUTEX_EXIT(&rx_stats_mutex);
3811 rxi_calltrace(RX_CALL_ARRIVAL, call);
3812 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3813 queue_Append(&rx_incomingCallQueue, call);
3816 #else /* RX_ENABLE_LOCKS */
3817 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3818 /* If there are no processes available to service this call,
3819 * put the call on the incoming call queue (unless it's
3820 * already on the queue).
3822 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3823 call->flags |= RX_CALL_WAIT_PROC;
3825 rxi_calltrace(RX_CALL_ARRIVAL, call);
3826 queue_Append(&rx_incomingCallQueue, call);
3829 #endif /* RX_ENABLE_LOCKS */
3831 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3833 /* If hot threads are enabled, and both newcallp and sq->socketp
3834 * are non-null, then this thread will process the call, and the
3835 * idle server thread will start listening on this threads socket.
3838 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3841 *sq->socketp = socket;
3842 clock_GetTime(&call->startTime);
3843 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3847 if (call->flags & RX_CALL_WAIT_PROC) {
3848 /* Conservative: I don't think this should happen */
3849 call->flags &= ~RX_CALL_WAIT_PROC;
3850 MUTEX_ENTER(&rx_stats_mutex);
3852 MUTEX_EXIT(&rx_stats_mutex);
3855 call->state = RX_STATE_ACTIVE;
3856 call->mode = RX_MODE_RECEIVING;
3857 if (call->flags & RX_CALL_CLEARED) {
3858 /* send an ack now to start the packet flow up again */
3859 call->flags &= ~RX_CALL_CLEARED;
3860 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3862 #ifdef RX_ENABLE_LOCKS
3865 service->nRequestsRunning++;
3866 if (service->nRequestsRunning <= service->minProcs)
3872 MUTEX_EXIT(&rx_serverPool_lock);
3875 /* Delay the sending of an acknowledge event for a short while, while
3876 * a new call is being prepared (in the case of a client) or a reply
3877 * is being prepared (in the case of a server). Rather than sending
3878 * an ack packet, an ACKALL packet is sent. */
3879 void rxi_AckAll(event, call, dummy)
3880 struct rxevent *event;
3881 register struct rx_call *call;
3884 #ifdef RX_ENABLE_LOCKS
3886 MUTEX_ENTER(&call->lock);
3887 call->delayedAckEvent = (struct rxevent *) 0;
3888 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3890 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3891 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3893 MUTEX_EXIT(&call->lock);
3894 #else /* RX_ENABLE_LOCKS */
3895 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3896 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3897 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3898 #endif /* RX_ENABLE_LOCKS */
3901 void rxi_SendDelayedAck(event, call, dummy)
3902 struct rxevent *event;
3903 register struct rx_call *call;
3906 #ifdef RX_ENABLE_LOCKS
3908 MUTEX_ENTER(&call->lock);
3909 if (event == call->delayedAckEvent)
3910 call->delayedAckEvent = (struct rxevent *) 0;
3911 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3913 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3915 MUTEX_EXIT(&call->lock);
3916 #else /* RX_ENABLE_LOCKS */
3917 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3918 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3919 #endif /* RX_ENABLE_LOCKS */
3923 #ifdef RX_ENABLE_LOCKS
3924 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3925 * clearing them out.
3927 static void rxi_SetAcksInTransmitQueue(call)
3928 register struct rx_call *call;
3930 register struct rx_packet *p, *tp;
3933 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3940 call->flags |= RX_CALL_TQ_CLEARME;
3941 call->flags |= RX_CALL_TQ_SOME_ACKED;
3944 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3945 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3946 call->tfirst = call->tnext;
3947 call->nSoftAcked = 0;
3949 if (call->flags & RX_CALL_FAST_RECOVER) {
3950 call->flags &= ~RX_CALL_FAST_RECOVER;
3951 call->cwind = call->nextCwind;
3952 call->nextCwind = 0;
3955 CV_SIGNAL(&call->cv_twind);
3957 #endif /* RX_ENABLE_LOCKS */
3959 /* Clear out the transmit queue for the current call (all packets have
3960 * been received by peer) */
3961 void rxi_ClearTransmitQueue(call, force)
3962 register struct rx_call *call;
3965 register struct rx_packet *p, *tp;
3967 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3968 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3970 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3977 call->flags |= RX_CALL_TQ_CLEARME;
3978 call->flags |= RX_CALL_TQ_SOME_ACKED;
3981 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3982 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3988 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3989 call->flags &= ~RX_CALL_TQ_CLEARME;
3991 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3993 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3994 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3995 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3996 call->nSoftAcked = 0;
3998 if (call->flags & RX_CALL_FAST_RECOVER) {
3999 call->flags &= ~RX_CALL_FAST_RECOVER;
4000 call->cwind = call->nextCwind;
4003 #ifdef RX_ENABLE_LOCKS
4004 CV_SIGNAL(&call->cv_twind);
4006 osi_rxWakeup(&call->twind);
4010 void rxi_ClearReceiveQueue(call)
4011 register struct rx_call *call;
4013 register struct rx_packet *p, *tp;
4014 if (queue_IsNotEmpty(&call->rq)) {
4015 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4020 rx_packetReclaims++;
4022 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4024 if (call->state == RX_STATE_PRECALL) {
4025 call->flags |= RX_CALL_CLEARED;
4029 /* Send an abort packet for the specified call */
4030 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4031 register struct rx_call *call;
4032 struct rx_packet *packet;
4042 /* Clients should never delay abort messages */
4043 if (rx_IsClientConn(call->conn))
4046 if (call->abortCode != call->error) {
4047 call->abortCode = call->error;
4048 call->abortCount = 0;
4051 if (force || rxi_callAbortThreshhold == 0 ||
4052 call->abortCount < rxi_callAbortThreshhold) {
4053 if (call->delayedAbortEvent) {
4054 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4056 error = htonl(call->error);
4058 packet = rxi_SendSpecial(call, call->conn, packet,
4059 RX_PACKET_TYPE_ABORT, (char *)&error,
4060 sizeof(error), istack);
4061 } else if (!call->delayedAbortEvent) {
4062 clock_GetTime(&when);
4063 clock_Addmsec(&when, rxi_callAbortDelay);
4064 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4065 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4071 /* Send an abort packet for the specified connection. Packet is an
4072 * optional pointer to a packet that can be used to send the abort.
4073 * Once the number of abort messages reaches the threshhold, an
4074 * event is scheduled to send the abort. Setting the force flag
4075 * overrides sending delayed abort messages.
4077 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4078 * to send the abort packet.
4080 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4081 register struct rx_connection *conn;
4082 struct rx_packet *packet;
4092 /* Clients should never delay abort messages */
4093 if (rx_IsClientConn(conn))
4096 if (force || rxi_connAbortThreshhold == 0 ||
4097 conn->abortCount < rxi_connAbortThreshhold) {
4098 if (conn->delayedAbortEvent) {
4099 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4101 error = htonl(conn->error);
4103 MUTEX_EXIT(&conn->conn_data_lock);
4104 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4105 RX_PACKET_TYPE_ABORT, (char *)&error,
4106 sizeof(error), istack);
4107 MUTEX_ENTER(&conn->conn_data_lock);
4108 } else if (!conn->delayedAbortEvent) {
4109 clock_GetTime(&when);
4110 clock_Addmsec(&when, rxi_connAbortDelay);
4111 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4117 /* Associate an error all of the calls owned by a connection. Called
4118 * with error non-zero. This is only for really fatal things, like
4119 * bad authentication responses. The connection itself is set in
4120 * error at this point, so that future packets received will be
4122 void rxi_ConnectionError(conn, error)
4123 register struct rx_connection *conn;
4124 register afs_int32 error;
4128 if (conn->challengeEvent)
4129 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4130 for (i=0; i<RX_MAXCALLS; i++) {
4131 struct rx_call *call = conn->call[i];
4133 MUTEX_ENTER(&call->lock);
4134 rxi_CallError(call, error);
4135 MUTEX_EXIT(&call->lock);
4138 conn->error = error;
4139 MUTEX_ENTER(&rx_stats_mutex);
4140 rx_stats.fatalErrors++;
4141 MUTEX_EXIT(&rx_stats_mutex);
4145 void rxi_CallError(call, error)
4146 register struct rx_call *call;
4149 if (call->error) error = call->error;
4150 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4151 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4152 rxi_ResetCall(call, 0);
4155 rxi_ResetCall(call, 0);
4157 call->error = error;
4158 call->mode = RX_MODE_ERROR;
4161 /* Reset various fields in a call structure, and wakeup waiting
4162 * processes. Some fields aren't changed: state & mode are not
4163 * touched (these must be set by the caller), and bufptr, nLeft, and
4164 * nFree are not reset, since these fields are manipulated by
4165 * unprotected macros, and may only be reset by non-interrupting code.
4168 /* this code requires that call->conn be set properly as a pre-condition. */
4169 #endif /* ADAPT_WINDOW */
4171 void rxi_ResetCall(call, newcall)
4172 register struct rx_call *call;
4173 register int newcall;
4176 register struct rx_peer *peer;
4177 struct rx_packet *packet;
4179 /* Notify anyone who is waiting for asynchronous packet arrival */
4180 if (call->arrivalProc) {
4181 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4182 call->arrivalProc = (VOID (*)()) 0;
4185 if (call->delayedAbortEvent) {
4186 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4187 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4189 rxi_SendCallAbort(call, packet, 0, 1);
4190 rxi_FreePacket(packet);
4195 * Update the peer with the congestion information in this call
4196 * so other calls on this connection can pick up where this call
4197 * left off. If the congestion sequence numbers don't match then
4198 * another call experienced a retransmission.
4200 peer = call->conn->peer;
4201 MUTEX_ENTER(&peer->peer_lock);
4203 if (call->congestSeq == peer->congestSeq) {
4204 peer->cwind = MAX(peer->cwind, call->cwind);
4205 peer->MTU = MAX(peer->MTU, call->MTU);
4206 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4209 call->abortCode = 0;
4210 call->abortCount = 0;
4212 if (peer->maxDgramPackets > 1) {
4213 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4215 call->MTU = peer->MTU;
4217 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4218 call->ssthresh = rx_maxSendWindow;
4219 call->nDgramPackets = peer->nDgramPackets;
4220 call->congestSeq = peer->congestSeq;
4221 MUTEX_EXIT(&peer->peer_lock);
4223 flags = call->flags;
4224 rxi_ClearReceiveQueue(call);
4225 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4226 if (call->flags & RX_CALL_TQ_BUSY) {
4227 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4228 call->flags |= (flags & RX_CALL_TQ_WAIT);
4230 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4232 rxi_ClearTransmitQueue(call, 0);
4233 queue_Init(&call->tq);
4236 queue_Init(&call->rq);
4238 call->rwind = rx_initReceiveWindow;
4239 call->twind = rx_initSendWindow;
4240 call->nSoftAcked = 0;
4241 call->nextCwind = 0;
4244 call->nCwindAcks = 0;
4245 call->nSoftAcks = 0;
4246 call->nHardAcks = 0;
4248 call->tfirst = call->rnext = call->tnext = 1;
4250 call->lastAcked = 0;
4251 call->localStatus = call->remoteStatus = 0;
4253 if (flags & RX_CALL_READER_WAIT) {
4254 #ifdef RX_ENABLE_LOCKS
4255 CV_BROADCAST(&call->cv_rq);
4257 osi_rxWakeup(&call->rq);
4260 if (flags & RX_CALL_WAIT_PACKETS) {
4261 MUTEX_ENTER(&rx_freePktQ_lock);
4262 rxi_PacketsUnWait(); /* XXX */
4263 MUTEX_EXIT(&rx_freePktQ_lock);
4266 #ifdef RX_ENABLE_LOCKS
4267 CV_SIGNAL(&call->cv_twind);
4269 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4270 osi_rxWakeup(&call->twind);
4273 #ifdef RX_ENABLE_LOCKS
4274 /* The following ensures that we don't mess with any queue while some
4275 * other thread might also be doing so. The call_queue_lock field is
4276 * is only modified under the call lock. If the call is in the process
4277 * of being removed from a queue, the call is not locked until the
4278 * the queue lock is dropped and only then is the call_queue_lock field
4279 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4280 * Note that any other routine which removes a call from a queue has to
4281 * obtain the queue lock before examing the queue and removing the call.
4283 if (call->call_queue_lock) {
4284 MUTEX_ENTER(call->call_queue_lock);
4285 if (queue_IsOnQueue(call)) {
4287 if (flags & RX_CALL_WAIT_PROC) {
4288 MUTEX_ENTER(&rx_stats_mutex);
4290 MUTEX_EXIT(&rx_stats_mutex);
4293 MUTEX_EXIT(call->call_queue_lock);
4294 CLEAR_CALL_QUEUE_LOCK(call);
4296 #else /* RX_ENABLE_LOCKS */
4297 if (queue_IsOnQueue(call)) {
4299 if (flags & RX_CALL_WAIT_PROC)
4302 #endif /* RX_ENABLE_LOCKS */
4304 rxi_KeepAliveOff(call);
4305 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4308 /* Send an acknowledge for the indicated packet (seq,serial) of the
4309 * indicated call, for the indicated reason (reason). This
4310 * acknowledge will specifically acknowledge receiving the packet, and
4311 * will also specify which other packets for this call have been
4312 * received. This routine returns the packet that was used to the
4313 * caller. The caller is responsible for freeing it or re-using it.
4314 * This acknowledgement also returns the highest sequence number
4315 * actually read out by the higher level to the sender; the sender
4316 * promises to keep around packets that have not been read by the
4317 * higher level yet (unless, of course, the sender decides to abort
4318 * the call altogether). Any of p, seq, serial, pflags, or reason may
4319 * be set to zero without ill effect. That is, if they are zero, they
4320 * will not convey any information.
4321 * NOW there is a trailer field, after the ack where it will safely be
4322 * ignored by mundanes, which indicates the maximum size packet this
4323 * host can swallow. */
4324 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4325 register struct rx_call *call;
4326 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4327 int seq; /* Sequence number of the packet we are acking */
4328 int serial; /* Serial number of the packet */
4329 int pflags; /* Flags field from packet header */
4330 int reason; /* Reason an acknowledge was prompted */
4333 struct rx_ackPacket *ap;
4334 register struct rx_packet *rqp;
4335 register struct rx_packet *nxp; /* For queue_Scan */
4336 register struct rx_packet *p;
4341 * Open the receive window once a thread starts reading packets
4343 if (call->rnext > 1) {
4344 call->rwind = rx_maxReceiveWindow;
4347 call->nHardAcks = 0;
4348 call->nSoftAcks = 0;
4349 if (call->rnext > call->lastAcked)
4350 call->lastAcked = call->rnext;
4354 rx_computelen(p, p->length); /* reset length, you never know */
4355 } /* where that's been... */
4357 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4358 /* We won't send the ack, but don't panic. */
4359 return optionalPacket;
4362 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4364 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4365 if (!optionalPacket) rxi_FreePacket(p);
4366 return optionalPacket;
4368 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4369 if (rx_Contiguous(p)<templ) {
4370 if (!optionalPacket) rxi_FreePacket(p);
4371 return optionalPacket;
4373 } /* MTUXXX failing to send an ack is very serious. We should */
4374 /* try as hard as possible to send even a partial ack; it's */
4375 /* better than nothing. */
4377 ap = (struct rx_ackPacket *) rx_DataOf(p);
4378 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4379 ap->reason = reason;
4381 /* The skew computation used to be bogus, I think it's better now. */
4382 /* We should start paying attention to skew. XXX */
4383 ap->serial = htonl(call->conn->maxSerial);
4384 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4386 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4387 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4389 /* No fear of running out of ack packet here because there can only be at most
4390 * one window full of unacknowledged packets. The window size must be constrained
4391 * to be less than the maximum ack size, of course. Also, an ack should always
4392 * fit into a single packet -- it should not ever be fragmented. */
4393 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4394 if (!rqp || !call->rq.next
4395 || (rqp->header.seq > (call->rnext + call->rwind))) {
4396 if (!optionalPacket) rxi_FreePacket(p);
4397 rxi_CallError(call, RX_CALL_DEAD);
4398 return optionalPacket;
4401 while (rqp->header.seq > call->rnext + offset)
4402 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4403 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4405 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4406 if (!optionalPacket) rxi_FreePacket(p);
4407 rxi_CallError(call, RX_CALL_DEAD);
4408 return optionalPacket;
4413 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4415 /* these are new for AFS 3.3 */
4416 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4417 templ = htonl(templ);
4418 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4419 templ = htonl(call->conn->peer->ifMTU);
4420 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4422 /* new for AFS 3.4 */
4423 templ = htonl(call->rwind);
4424 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4426 /* new for AFS 3.5 */
4427 templ = htonl(call->conn->peer->ifDgramPackets);
4428 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4430 p->header.serviceId = call->conn->serviceId;
4431 p->header.cid = (call->conn->cid | call->channel);
4432 p->header.callNumber = *call->callNumber;
4433 p->header.seq = seq;
4434 p->header.securityIndex = call->conn->securityIndex;
4435 p->header.epoch = call->conn->epoch;
4436 p->header.type = RX_PACKET_TYPE_ACK;
4437 p->header.flags = RX_SLOW_START_OK;
4438 if (reason == RX_ACK_PING) {
4439 p->header.flags |= RX_REQUEST_ACK;
4441 clock_GetTime(&call->pingRequestTime);
4444 if (call->conn->type == RX_CLIENT_CONNECTION)
4445 p->header.flags |= RX_CLIENT_INITIATED;
4449 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4450 ap->reason, ntohl(ap->previousPacket),
4451 (unsigned int) p->header.seq, ntohl(ap->firstPacket));
4453 for (offset = 0; offset < ap->nAcks; offset++)
4454 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4461 register int i, nbytes = p->length;
4463 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4464 if (nbytes <= p->wirevec[i].iov_len) {
4465 register int savelen, saven;
4467 savelen = p->wirevec[i].iov_len;
4469 p->wirevec[i].iov_len = nbytes;
4471 rxi_Send(call, p, istack);
4472 p->wirevec[i].iov_len = savelen;
4476 else nbytes -= p->wirevec[i].iov_len;
4479 MUTEX_ENTER(&rx_stats_mutex);
4480 rx_stats.ackPacketsSent++;
4481 MUTEX_EXIT(&rx_stats_mutex);
4482 if (!optionalPacket) rxi_FreePacket(p);
4483 return optionalPacket; /* Return packet for re-use by caller */
4486 /* Send all of the packets in the list in single datagram */
4487 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4488 struct rx_call *call;
4489 struct rx_packet **list;
4494 struct clock *retryTime;
4500 struct rx_connection *conn = call->conn;
4501 struct rx_peer *peer = conn->peer;
4503 MUTEX_ENTER(&peer->peer_lock);
4505 if (resending) peer->reSends += len;
4506 MUTEX_ENTER(&rx_stats_mutex);
4507 rx_stats.dataPacketsSent += len;
4508 MUTEX_EXIT(&rx_stats_mutex);
4509 MUTEX_EXIT(&peer->peer_lock);
4511 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4515 /* Set the packet flags and schedule the resend events */
4516 /* Only request an ack for the last packet in the list */
4517 for (i = 0 ; i < len ; i++) {
4518 list[i]->retryTime = *retryTime;
4519 if (list[i]->header.serial) {
4520 /* Exponentially backoff retry times */
4521 if (list[i]->backoff < MAXBACKOFF) {
4522 /* so it can't stay == 0 */
4523 list[i]->backoff = (list[i]->backoff << 1) +1;
4525 else list[i]->backoff++;
4526 clock_Addmsec(&(list[i]->retryTime),
4527 ((afs_uint32) list[i]->backoff) << 8);
4530 /* Wait a little extra for the ack on the last packet */
4531 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4532 clock_Addmsec(&(list[i]->retryTime), 400);
4535 /* Record the time sent */
4536 list[i]->timeSent = *now;
4538 /* Ask for an ack on retransmitted packets, on every other packet
4539 * if the peer doesn't support slow start. Ask for an ack on every
4540 * packet until the congestion window reaches the ack rate. */
4541 if (list[i]->header.serial) {
4543 MUTEX_ENTER(&rx_stats_mutex);
4544 rx_stats.dataPacketsReSent++;
4545 MUTEX_EXIT(&rx_stats_mutex);
4547 /* improved RTO calculation- not Karn */
4548 list[i]->firstSent = *now;
4550 && (call->cwind <= (u_short)(conn->ackRate+1)
4551 || (!(call->flags & RX_CALL_SLOW_START_OK)
4552 && (list[i]->header.seq & 1)))) {
4557 MUTEX_ENTER(&peer->peer_lock);
4559 if (resending) peer->reSends++;
4560 MUTEX_ENTER(&rx_stats_mutex);
4561 rx_stats.dataPacketsSent++;
4562 MUTEX_EXIT(&rx_stats_mutex);
4563 MUTEX_EXIT(&peer->peer_lock);
4565 /* Tag this packet as not being the last in this group,
4566 * for the receiver's benefit */
4567 if (i < len-1 || moreFlag) {
4568 list[i]->header.flags |= RX_MORE_PACKETS;
4571 /* Install the new retransmit time for the packet, and
4572 * record the time sent */
4573 list[i]->timeSent = *now;
4577 list[len-1]->header.flags |= RX_REQUEST_ACK;
4580 /* Since we're about to send a data packet to the peer, it's
4581 * safe to nuke any scheduled end-of-packets ack */
4582 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4584 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4585 MUTEX_EXIT(&call->lock);
4587 rxi_SendPacketList(conn, list, len, istack);
4589 rxi_SendPacket(conn, list[0], istack);
4591 MUTEX_ENTER(&call->lock);
4592 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4594 /* Update last send time for this call (for keep-alive
4595 * processing), and for the connection (so that we can discover
4596 * idle connections) */
4597 conn->lastSendTime = call->lastSendTime = clock_Sec();
4600 /* When sending packets we need to follow these rules:
4601 * 1. Never send more than maxDgramPackets in a jumbogram.
4602 * 2. Never send a packet with more than two iovecs in a jumbogram.
4603 * 3. Never send a retransmitted packet in a jumbogram.
4604 * 4. Never send more than cwind/4 packets in a jumbogram
4605 * We always keep the last list we should have sent so we
4606 * can set the RX_MORE_PACKETS flags correctly.
4608 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4609 struct rx_call *call;
4610 struct rx_packet **list;
4614 struct clock *retryTime;
4617 int i, cnt, lastCnt = 0;
4618 struct rx_packet **listP, **lastP = 0;
4619 struct rx_peer *peer = call->conn->peer;
4620 int morePackets = 0;
4622 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4623 /* Does the current packet force us to flush the current list? */
4625 && (list[i]->header.serial
4627 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4629 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4630 /* If the call enters an error state stop sending, or if
4631 * we entered congestion recovery mode, stop sending */
4632 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4640 /* Add the current packet to the list if it hasn't been acked.
4641 * Otherwise adjust the list pointer to skip the current packet. */
4642 if (!list[i]->acked) {
4644 /* Do we need to flush the list? */
4645 if (cnt >= (int)peer->maxDgramPackets
4646 || cnt >= (int)call->nDgramPackets
4647 || cnt >= (int)call->cwind
4648 || list[i]->header.serial
4649 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4651 rxi_SendList(call, lastP, lastCnt, istack, 1,
4652 now, retryTime, resending);
4653 /* If the call enters an error state stop sending, or if
4654 * we entered congestion recovery mode, stop sending */
4655 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4665 osi_Panic("rxi_SendList error");
4671 /* Send the whole list when the call is in receive mode, when
4672 * the call is in eof mode, when we are in fast recovery mode,
4673 * and when we have the last packet */
4674 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4675 || call->mode == RX_MODE_RECEIVING
4676 || call->mode == RX_MODE_EOF
4677 || (call->flags & RX_CALL_FAST_RECOVER)) {
4678 /* Check for the case where the current list contains
4679 * an acked packet. Since we always send retransmissions
4680 * in a separate packet, we only need to check the first
4681 * packet in the list */
4682 if (cnt > 0 && !listP[0]->acked) {
4686 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4687 now, retryTime, resending);
4688 /* If the call enters an error state stop sending, or if
4689 * we entered congestion recovery mode, stop sending */
4690 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4694 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4696 } else if (lastCnt > 0) {
4697 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4701 #ifdef RX_ENABLE_LOCKS
4702 /* Call rxi_Start, below, but with the call lock held. */
4703 void rxi_StartUnlocked(event, call, istack)
4704 struct rxevent *event;
4705 register struct rx_call *call;
4708 MUTEX_ENTER(&call->lock);
4709 rxi_Start(event, call, istack);
4710 MUTEX_EXIT(&call->lock);
4712 #endif /* RX_ENABLE_LOCKS */
4714 /* This routine is called when new packets are readied for
4715 * transmission and when retransmission may be necessary, or when the
4716 * transmission window or burst count are favourable. This should be
4717 * better optimized for new packets, the usual case, now that we've
4718 * got rid of queues of send packets. XXXXXXXXXXX */
4719 void rxi_Start(event, call, istack)
4720 struct rxevent *event;
4721 register struct rx_call *call;
4724 struct rx_packet *p;
4725 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4726 struct rx_peer *peer = call->conn->peer;
4727 struct clock now, retryTime;
4731 struct rx_packet **xmitList;
4734 /* If rxi_Start is being called as a result of a resend event,
4735 * then make sure that the event pointer is removed from the call
4736 * structure, since there is no longer a per-call retransmission
4738 if (event && event == call->resendEvent) {
4739 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4740 call->resendEvent = NULL;
4742 if (queue_IsEmpty(&call->tq)) {
4746 /* Timeouts trigger congestion recovery */
4747 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4748 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4749 /* someone else is waiting to start recovery */
4752 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4753 while (call->flags & RX_CALL_TQ_BUSY) {
4754 call->flags |= RX_CALL_TQ_WAIT;
4755 #ifdef RX_ENABLE_LOCKS
4756 CV_WAIT(&call->cv_tq, &call->lock);
4757 #else /* RX_ENABLE_LOCKS */
4758 osi_rxSleep(&call->tq);
4759 #endif /* RX_ENABLE_LOCKS */
4761 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4762 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4763 call->flags |= RX_CALL_FAST_RECOVER;
4764 if (peer->maxDgramPackets > 1) {
4765 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4767 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4769 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4770 call->nDgramPackets = 1;
4772 call->nextCwind = 1;
4775 MUTEX_ENTER(&peer->peer_lock);
4776 peer->MTU = call->MTU;
4777 peer->cwind = call->cwind;
4778 peer->nDgramPackets = 1;
4780 call->congestSeq = peer->congestSeq;
4781 MUTEX_EXIT(&peer->peer_lock);
4782 /* Clear retry times on packets. Otherwise, it's possible for
4783 * some packets in the queue to force resends at rates faster
4784 * than recovery rates.
4786 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4788 clock_Zero(&p->retryTime);
4793 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4794 MUTEX_ENTER(&rx_stats_mutex);
4795 rx_tq_debug.rxi_start_in_error ++;
4796 MUTEX_EXIT(&rx_stats_mutex);
4801 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4802 /* Get clock to compute the re-transmit time for any packets
4803 * in this burst. Note, if we back off, it's reasonable to
4804 * back off all of the packets in the same manner, even if
4805 * some of them have been retransmitted more times than more
4806 * recent additions */
4807 clock_GetTime(&now);
4808 retryTime = now; /* initialize before use */
4809 MUTEX_ENTER(&peer->peer_lock);
4810 clock_Add(&retryTime, &peer->timeout);
4811 MUTEX_EXIT(&peer->peer_lock);
4813 /* Send (or resend) any packets that need it, subject to
4814 * window restrictions and congestion burst control
4815 * restrictions. Ask for an ack on the last packet sent in
4816 * this burst. For now, we're relying upon the window being
4817 * considerably bigger than the largest number of packets that
4818 * are typically sent at once by one initial call to
4819 * rxi_Start. This is probably bogus (perhaps we should ask
4820 * for an ack when we're half way through the current
4821 * window?). Also, for non file transfer applications, this
4822 * may end up asking for an ack for every packet. Bogus. XXXX
4825 * But check whether we're here recursively, and let the other guy
4828 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4829 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4830 call->flags |= RX_CALL_TQ_BUSY;
4832 call->flags &= ~RX_CALL_NEED_START;
4833 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4835 maxXmitPackets = MIN(call->twind, call->cwind);
4836 xmitList = (struct rx_packet **)
4837 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4838 if (xmitList == NULL)
4839 osi_Panic("rxi_Start, failed to allocate xmit list");
4840 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4841 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4842 /* We shouldn't be sending packets if a thread is waiting
4843 * to initiate congestion recovery */
4846 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4847 /* Only send one packet during fast recovery */
4850 if ((p->header.flags == RX_FREE_PACKET) ||
4851 (!queue_IsEnd(&call->tq, nxp)
4852 && (nxp->header.flags == RX_FREE_PACKET)) ||
4853 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4854 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4855 osi_Panic("rxi_Start: xmit queue clobbered");
4858 MUTEX_ENTER(&rx_stats_mutex);
4859 rx_stats.ignoreAckedPacket++;
4860 MUTEX_EXIT(&rx_stats_mutex);
4861 continue; /* Ignore this packet if it has been acknowledged */
4864 /* Turn off all flags except these ones, which are the same
4865 * on each transmission */
4866 p->header.flags &= RX_PRESET_FLAGS;
4868 if (p->header.seq >= call->tfirst +
4869 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4870 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4871 /* Note: if we're waiting for more window space, we can
4872 * still send retransmits; hence we don't return here, but
4873 * break out to schedule a retransmit event */
4874 dpf(("call %d waiting for window", *(call->callNumber)));
4878 /* Transmit the packet if it needs to be sent. */
4879 if (!clock_Lt(&now, &p->retryTime)) {
4880 if (nXmitPackets == maxXmitPackets) {
4881 osi_Panic("rxi_Start: xmit list overflowed");
4883 xmitList[nXmitPackets++] = p;
4887 /* xmitList now hold pointers to all of the packets that are
4888 * ready to send. Now we loop to send the packets */
4889 if (nXmitPackets > 0) {
4890 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4891 &now, &retryTime, resending);
4893 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4895 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4897 * TQ references no longer protected by this flag; they must remain
4898 * protected by the global lock.
4900 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4901 call->flags &= ~RX_CALL_TQ_BUSY;
4902 if (call->flags & RX_CALL_TQ_WAIT) {
4903 call->flags &= ~RX_CALL_TQ_WAIT;
4904 #ifdef RX_ENABLE_LOCKS
4905 CV_BROADCAST(&call->cv_tq);
4906 #else /* RX_ENABLE_LOCKS */
4907 osi_rxWakeup(&call->tq);
4908 #endif /* RX_ENABLE_LOCKS */
4913 /* We went into the error state while sending packets. Now is
4914 * the time to reset the call. This will also inform the using
4915 * process that the call is in an error state.
4917 MUTEX_ENTER(&rx_stats_mutex);
4918 rx_tq_debug.rxi_start_aborted ++;
4919 MUTEX_EXIT(&rx_stats_mutex);
4920 call->flags &= ~RX_CALL_TQ_BUSY;
4921 if (call->flags & RX_CALL_TQ_WAIT) {
4922 call->flags &= ~RX_CALL_TQ_WAIT;
4923 #ifdef RX_ENABLE_LOCKS
4924 CV_BROADCAST(&call->cv_tq);
4925 #else /* RX_ENABLE_LOCKS */
4926 osi_rxWakeup(&call->tq);
4927 #endif /* RX_ENABLE_LOCKS */
4929 rxi_CallError(call, call->error);
4932 #ifdef RX_ENABLE_LOCKS
4933 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4934 register int missing;
4935 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4936 /* Some packets have received acks. If they all have, we can clear
4937 * the transmit queue.
4939 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4940 if (p->header.seq < call->tfirst && p->acked) {
4948 call->flags |= RX_CALL_TQ_CLEARME;
4950 #endif /* RX_ENABLE_LOCKS */
4951 /* Don't bother doing retransmits if the TQ is cleared. */
4952 if (call->flags & RX_CALL_TQ_CLEARME) {
4953 rxi_ClearTransmitQueue(call, 1);
4955 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4958 /* Always post a resend event, if there is anything in the
4959 * queue, and resend is possible. There should be at least
4960 * one unacknowledged packet in the queue ... otherwise none
4961 * of these packets should be on the queue in the first place.
4963 if (call->resendEvent) {
4964 /* Cancel the existing event and post a new one */
4965 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4968 /* The retry time is the retry time on the first unacknowledged
4969 * packet inside the current window */
4970 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4971 /* Don't set timers for packets outside the window */
4972 if (p->header.seq >= call->tfirst + call->twind) {
4976 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4978 retryTime = p->retryTime;
4983 /* Post a new event to re-run rxi_Start when retries may be needed */
4984 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4985 #ifdef RX_ENABLE_LOCKS
4986 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4987 call->resendEvent = rxevent_Post(&retryTime,
4989 (char *)call, istack);
4990 #else /* RX_ENABLE_LOCKS */
4991 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4992 (char *)call, (void*)(long)istack);
4993 #endif /* RX_ENABLE_LOCKS */
4996 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4997 } while (call->flags & RX_CALL_NEED_START);
4999 * TQ references no longer protected by this flag; they must remain
5000 * protected by the global lock.
5002 call->flags &= ~RX_CALL_TQ_BUSY;
5003 if (call->flags & RX_CALL_TQ_WAIT) {
5004 call->flags &= ~RX_CALL_TQ_WAIT;
5005 #ifdef RX_ENABLE_LOCKS
5006 CV_BROADCAST(&call->cv_tq);
5007 #else /* RX_ENABLE_LOCKS */
5008 osi_rxWakeup(&call->tq);
5009 #endif /* RX_ENABLE_LOCKS */
5012 call->flags |= RX_CALL_NEED_START;
5014 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5016 if (call->resendEvent) {
5017 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5022 /* Also adjusts the keep alive parameters for the call, to reflect
5023 * that we have just sent a packet (so keep alives aren't sent
5025 void rxi_Send(call, p, istack)
5026 register struct rx_call *call;
5027 register struct rx_packet *p;
5030 register struct rx_connection *conn = call->conn;
5032 /* Stamp each packet with the user supplied status */
5033 p->header.userStatus = call->localStatus;
5035 /* Allow the security object controlling this call's security to
5036 * make any last-minute changes to the packet */
5037 RXS_SendPacket(conn->securityObject, call, p);
5039 /* Since we're about to send SOME sort of packet to the peer, it's
5040 * safe to nuke any scheduled end-of-packets ack */
5041 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5043 /* Actually send the packet, filling in more connection-specific fields */
5044 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5045 MUTEX_EXIT(&call->lock);
5046 rxi_SendPacket(conn, p, istack);
5047 MUTEX_ENTER(&call->lock);
5048 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5050 /* Update last send time for this call (for keep-alive
5051 * processing), and for the connection (so that we can discover
5052 * idle connections) */
5053 conn->lastSendTime = call->lastSendTime = clock_Sec();
5057 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5058 * that things are fine. Also called periodically to guarantee that nothing
5059 * falls through the cracks (e.g. (error + dally) connections have keepalive
5060 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5063 #ifdef RX_ENABLE_LOCKS
5064 int rxi_CheckCall(call, haveCTLock)
5065 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5066 #else /* RX_ENABLE_LOCKS */
5067 int rxi_CheckCall(call)
5068 #endif /* RX_ENABLE_LOCKS */
5069 register struct rx_call *call;
5071 register struct rx_connection *conn = call->conn;
5072 register struct rx_service *tservice;
5074 afs_uint32 deadTime;
5076 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5077 if (call->flags & RX_CALL_TQ_BUSY) {
5078 /* Call is active and will be reset by rxi_Start if it's
5079 * in an error state.
5084 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5085 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5086 ((afs_uint32)conn->peer->rtt >> 3) +
5087 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5089 /* These are computed to the second (+- 1 second). But that's
5090 * good enough for these values, which should be a significant
5091 * number of seconds. */
5092 if (now > (call->lastReceiveTime + deadTime)) {
5093 if (call->state == RX_STATE_ACTIVE) {
5094 rxi_CallError(call, RX_CALL_DEAD);
5098 #ifdef RX_ENABLE_LOCKS
5099 /* Cancel pending events */
5100 rxevent_Cancel(call->delayedAckEvent, call,
5101 RX_CALL_REFCOUNT_DELAY);
5102 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5103 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5104 if (call->refCount == 0) {
5105 rxi_FreeCall(call, haveCTLock);
5109 #else /* RX_ENABLE_LOCKS */
5112 #endif /* RX_ENABLE_LOCKS */
5114 /* Non-active calls are destroyed if they are not responding
5115 * to pings; active calls are simply flagged in error, so the
5116 * attached process can die reasonably gracefully. */
5118 /* see if we have a non-activity timeout */
5119 tservice = conn->service;
5120 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5121 && tservice->idleDeadTime
5122 && ((call->startWait + tservice->idleDeadTime) < now)) {
5123 if (call->state == RX_STATE_ACTIVE) {
5124 rxi_CallError(call, RX_CALL_TIMEOUT);
5128 /* see if we have a hard timeout */
5129 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5130 if (call->state == RX_STATE_ACTIVE)
5131 rxi_CallError(call, RX_CALL_TIMEOUT);
5138 /* When a call is in progress, this routine is called occasionally to
5139 * make sure that some traffic has arrived (or been sent to) the peer.
5140 * If nothing has arrived in a reasonable amount of time, the call is
5141 * declared dead; if nothing has been sent for a while, we send a
5142 * keep-alive packet (if we're actually trying to keep the call alive)
5144 void rxi_KeepAliveEvent(event, call, dummy)
5145 struct rxevent *event;
5146 register struct rx_call *call;
5148 struct rx_connection *conn;
5151 MUTEX_ENTER(&call->lock);
5152 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5153 if (event == call->keepAliveEvent)
5154 call->keepAliveEvent = (struct rxevent *) 0;
5157 #ifdef RX_ENABLE_LOCKS
5158 if(rxi_CheckCall(call, 0)) {
5159 MUTEX_EXIT(&call->lock);
5162 #else /* RX_ENABLE_LOCKS */
5163 if (rxi_CheckCall(call)) return;
5164 #endif /* RX_ENABLE_LOCKS */
5166 /* Don't try to keep alive dallying calls */
5167 if (call->state == RX_STATE_DALLY) {
5168 MUTEX_EXIT(&call->lock);
5173 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5174 /* Don't try to send keepalives if there is unacknowledged data */
5175 /* the rexmit code should be good enough, this little hack
5176 * doesn't quite work XXX */
5177 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5179 rxi_ScheduleKeepAliveEvent(call);
5180 MUTEX_EXIT(&call->lock);
5184 void rxi_ScheduleKeepAliveEvent(call)
5185 register struct rx_call *call;
5187 if (!call->keepAliveEvent) {
5189 clock_GetTime(&when);
5190 when.sec += call->conn->secondsUntilPing;
5191 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5192 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5196 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5197 void rxi_KeepAliveOn(call)
5198 register struct rx_call *call;
5200 /* Pretend last packet received was received now--i.e. if another
5201 * packet isn't received within the keep alive time, then the call
5202 * will die; Initialize last send time to the current time--even
5203 * if a packet hasn't been sent yet. This will guarantee that a
5204 * keep-alive is sent within the ping time */
5205 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5206 rxi_ScheduleKeepAliveEvent(call);
5209 /* This routine is called to send connection abort messages
5210 * that have been delayed to throttle looping clients. */
5211 void rxi_SendDelayedConnAbort(event, conn, dummy)
5212 struct rxevent *event;
5213 register struct rx_connection *conn;
5217 struct rx_packet *packet;
5219 MUTEX_ENTER(&conn->conn_data_lock);
5220 conn->delayedAbortEvent = (struct rxevent *) 0;
5221 error = htonl(conn->error);
5223 MUTEX_EXIT(&conn->conn_data_lock);
5224 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5226 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5227 RX_PACKET_TYPE_ABORT, (char *)&error,
5229 rxi_FreePacket(packet);
5233 /* This routine is called to send call abort messages
5234 * that have been delayed to throttle looping clients. */
5235 void rxi_SendDelayedCallAbort(event, call, dummy)
5236 struct rxevent *event;
5237 register struct rx_call *call;
5241 struct rx_packet *packet;
5243 MUTEX_ENTER(&call->lock);
5244 call->delayedAbortEvent = (struct rxevent *) 0;
5245 error = htonl(call->error);
5247 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5249 packet = rxi_SendSpecial(call, call->conn, packet,
5250 RX_PACKET_TYPE_ABORT, (char *)&error,
5252 rxi_FreePacket(packet);
5254 MUTEX_EXIT(&call->lock);
5257 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5258 * seconds) to ask the client to authenticate itself. The routine
5259 * issues a challenge to the client, which is obtained from the
5260 * security object associated with the connection */
5261 void rxi_ChallengeEvent(event, conn, dummy)
5262 struct rxevent *event;
5263 register struct rx_connection *conn;
5266 conn->challengeEvent = (struct rxevent *) 0;
5267 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5268 register struct rx_packet *packet;
5270 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5272 /* If there's no packet available, do this later. */
5273 RXS_GetChallenge(conn->securityObject, conn, packet);
5274 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5275 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5276 rxi_FreePacket(packet);
5278 clock_GetTime(&when);
5279 when.sec += RX_CHALLENGE_TIMEOUT;
5280 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5284 /* Call this routine to start requesting the client to authenticate
5285 * itself. This will continue until authentication is established,
5286 * the call times out, or an invalid response is returned. The
5287 * security object associated with the connection is asked to create
5288 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5289 * defined earlier. */
5290 void rxi_ChallengeOn(conn)
5291 register struct rx_connection *conn;
5293 if (!conn->challengeEvent) {
5294 RXS_CreateChallenge(conn->securityObject, conn);
5295 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5300 /* Compute round trip time of the packet provided, in *rttp.
5303 /* rxi_ComputeRoundTripTime is called with peer locked. */
5304 void rxi_ComputeRoundTripTime(p, sentp, peer)
5305 register struct clock *sentp; /* may be null */
5306 register struct rx_peer *peer; /* may be null */
5307 register struct rx_packet *p;
5309 struct clock thisRtt, *rttp = &thisRtt;
5311 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5312 /* making year 2038 bugs to get this running now - stroucki */
5313 struct timeval temptime;
5315 register int rtt_timeout;
5317 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5318 /* yet again. This was the worst Heisenbug of the port - stroucki */
5319 clock_GetTime(&temptime);
5320 rttp->sec=(afs_int32)temptime.tv_sec;
5321 rttp->usec=(afs_int32)temptime.tv_usec;
5323 clock_GetTime(rttp);
5325 if (clock_Lt(rttp, sentp)) {
5327 return; /* somebody set the clock back, don't count this time. */
5329 clock_Sub(rttp, sentp);
5330 MUTEX_ENTER(&rx_stats_mutex);
5331 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5332 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5333 if (rttp->sec > 60) {
5334 MUTEX_EXIT(&rx_stats_mutex);
5335 return; /* somebody set the clock ahead */
5337 rx_stats.maxRtt = *rttp;
5339 clock_Add(&rx_stats.totalRtt, rttp);
5340 rx_stats.nRttSamples++;
5341 MUTEX_EXIT(&rx_stats_mutex);
5343 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5345 /* Apply VanJacobson round-trip estimations */
5350 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5351 * srtt is stored as fixed point with 3 bits after the binary
5352 * point (i.e., scaled by 8). The following magic is
5353 * equivalent to the smoothing algorithm in rfc793 with an
5354 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5355 * srtt*8 = srtt*8 + rtt - srtt
5356 * srtt = srtt + rtt/8 - srtt/8
5359 delta = MSEC(rttp) - (peer->rtt >> 3);
5363 * We accumulate a smoothed rtt variance (actually, a smoothed
5364 * mean difference), then set the retransmit timer to smoothed
5365 * rtt + 4 times the smoothed variance (was 2x in van's original
5366 * paper, but 4x works better for me, and apparently for him as
5368 * rttvar is stored as
5369 * fixed point with 2 bits after the binary point (scaled by
5370 * 4). The following is equivalent to rfc793 smoothing with
5371 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5372 * replaces rfc793's wired-in beta.
5373 * dev*4 = dev*4 + (|actual - expected| - dev)
5379 delta -= (peer->rtt_dev >> 2);
5380 peer->rtt_dev += delta;
5383 /* I don't have a stored RTT so I start with this value. Since I'm
5384 * probably just starting a call, and will be pushing more data down
5385 * this, I expect congestion to increase rapidly. So I fudge a
5386 * little, and I set deviance to half the rtt. In practice,
5387 * deviance tends to approach something a little less than
5388 * half the smoothed rtt. */
5389 peer->rtt = (MSEC(rttp) << 3) + 8;
5390 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5392 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5393 * the other of these connections is usually in a user process, and can
5394 * be switched and/or swapped out. So on fast, reliable networks, the
5395 * timeout would otherwise be too short.
5397 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5398 clock_Zero(&(peer->timeout));
5399 clock_Addmsec(&(peer->timeout), rtt_timeout);
5401 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5402 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5403 (peer->timeout.sec),(peer->timeout.usec)) );
5407 /* Find all server connections that have not been active for a long time, and
5409 void rxi_ReapConnections()
5412 clock_GetTime(&now);
5414 /* Find server connection structures that haven't been used for
5415 * greater than rx_idleConnectionTime */
5416 { struct rx_connection **conn_ptr, **conn_end;
5417 int i, havecalls = 0;
5418 MUTEX_ENTER(&rx_connHashTable_lock);
5419 for (conn_ptr = &rx_connHashTable[0],
5420 conn_end = &rx_connHashTable[rx_hashTableSize];
5421 conn_ptr < conn_end; conn_ptr++) {
5422 struct rx_connection *conn, *next;
5423 struct rx_call *call;
5427 for (conn = *conn_ptr; conn; conn = next) {
5428 /* XXX -- Shouldn't the connection be locked? */
5431 for(i=0;i<RX_MAXCALLS;i++) {
5432 call = conn->call[i];
5435 MUTEX_ENTER(&call->lock);
5436 #ifdef RX_ENABLE_LOCKS
5437 result = rxi_CheckCall(call, 1);
5438 #else /* RX_ENABLE_LOCKS */
5439 result = rxi_CheckCall(call);
5440 #endif /* RX_ENABLE_LOCKS */
5441 MUTEX_EXIT(&call->lock);
5443 /* If CheckCall freed the call, it might
5444 * have destroyed the connection as well,
5445 * which screws up the linked lists.
5451 if (conn->type == RX_SERVER_CONNECTION) {
5452 /* This only actually destroys the connection if
5453 * there are no outstanding calls */
5454 MUTEX_ENTER(&conn->conn_data_lock);
5455 if (!havecalls && !conn->refCount &&
5456 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5457 conn->refCount++; /* it will be decr in rx_DestroyConn */
5458 MUTEX_EXIT(&conn->conn_data_lock);
5459 #ifdef RX_ENABLE_LOCKS
5460 rxi_DestroyConnectionNoLock(conn);
5461 #else /* RX_ENABLE_LOCKS */
5462 rxi_DestroyConnection(conn);
5463 #endif /* RX_ENABLE_LOCKS */
5465 #ifdef RX_ENABLE_LOCKS
5467 MUTEX_EXIT(&conn->conn_data_lock);
5469 #endif /* RX_ENABLE_LOCKS */
5473 #ifdef RX_ENABLE_LOCKS
5474 while (rx_connCleanup_list) {
5475 struct rx_connection *conn;
5476 conn = rx_connCleanup_list;
5477 rx_connCleanup_list = rx_connCleanup_list->next;
5478 MUTEX_EXIT(&rx_connHashTable_lock);
5479 rxi_CleanupConnection(conn);
5480 MUTEX_ENTER(&rx_connHashTable_lock);
5482 MUTEX_EXIT(&rx_connHashTable_lock);
5483 #endif /* RX_ENABLE_LOCKS */
5486 /* Find any peer structures that haven't been used (haven't had an
5487 * associated connection) for greater than rx_idlePeerTime */
5488 { struct rx_peer **peer_ptr, **peer_end;
5490 MUTEX_ENTER(&rx_rpc_stats);
5491 MUTEX_ENTER(&rx_peerHashTable_lock);
5492 for (peer_ptr = &rx_peerHashTable[0],
5493 peer_end = &rx_peerHashTable[rx_hashTableSize];
5494 peer_ptr < peer_end; peer_ptr++) {
5495 struct rx_peer *peer, *next, *prev;
5496 for (prev = peer = *peer_ptr; peer; peer = next) {
5498 code = MUTEX_TRYENTER(&peer->peer_lock);
5499 if ((code) && (peer->refCount == 0)
5500 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5501 rx_interface_stat_p rpc_stat, nrpc_stat;
5503 MUTEX_EXIT(&peer->peer_lock);
5504 MUTEX_DESTROY(&peer->peer_lock);
5505 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5506 rx_interface_stat)) {
5507 unsigned int num_funcs;
5508 if (!rpc_stat) break;
5509 queue_Remove(&rpc_stat->queue_header);
5510 queue_Remove(&rpc_stat->all_peers);
5511 num_funcs = rpc_stat->stats[0].func_total;
5512 space = sizeof(rx_interface_stat_t) +
5513 rpc_stat->stats[0].func_total *
5514 sizeof(rx_function_entry_v1_t);
5516 rxi_Free(rpc_stat, space);
5517 rxi_rpc_peer_stat_cnt -= num_funcs;
5520 MUTEX_ENTER(&rx_stats_mutex);
5521 rx_stats.nPeerStructs--;
5522 MUTEX_EXIT(&rx_stats_mutex);
5523 if (prev == *peer_ptr) {
5532 MUTEX_EXIT(&peer->peer_lock);
5538 MUTEX_EXIT(&rx_peerHashTable_lock);
5539 MUTEX_EXIT(&rx_rpc_stats);
5542 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5543 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5544 GC, just below. Really, we shouldn't have to keep moving packets from
5545 one place to another, but instead ought to always know if we can
5546 afford to hold onto a packet in its particular use. */
5547 MUTEX_ENTER(&rx_freePktQ_lock);
5548 if (rx_waitingForPackets) {
5549 rx_waitingForPackets = 0;
5550 #ifdef RX_ENABLE_LOCKS
5551 CV_BROADCAST(&rx_waitingForPackets_cv);
5553 osi_rxWakeup(&rx_waitingForPackets);
5556 MUTEX_EXIT(&rx_freePktQ_lock);
5558 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5559 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5563 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5564 * rx.h is sort of strange this is better. This is called with a security
5565 * object before it is discarded. Each connection using a security object has
5566 * its own refcount to the object so it won't actually be freed until the last
5567 * connection is destroyed.
5569 * This is the only rxs module call. A hold could also be written but no one
5572 int rxs_Release (aobj)
5573 struct rx_securityClass *aobj;
5575 return RXS_Close (aobj);
5579 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5580 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5581 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5582 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5584 /* Adjust our estimate of the transmission rate to this peer, given
5585 * that the packet p was just acked. We can adjust peer->timeout and
5586 * call->twind. Pragmatically, this is called
5587 * only with packets of maximal length.
5588 * Called with peer and call locked.
5591 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5592 register struct rx_peer *peer;
5593 register struct rx_call *call;
5594 struct rx_packet *p, *ackp;
5597 afs_int32 xferSize, xferMs;
5598 register afs_int32 minTime;
5601 /* Count down packets */
5602 if (peer->rateFlag > 0) peer->rateFlag--;
5603 /* Do nothing until we're enabled */
5604 if (peer->rateFlag != 0) return;
5605 if (!call->conn) return;
5607 /* Count only when the ack seems legitimate */
5608 switch (ackReason) {
5609 case RX_ACK_REQUESTED:
5610 xferSize = p->length + RX_HEADER_SIZE +
5611 call->conn->securityMaxTrailerSize;
5615 case RX_ACK_PING_RESPONSE:
5616 if (p) /* want the response to ping-request, not data send */
5618 clock_GetTime(&newTO);
5619 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5620 clock_Sub(&newTO, &call->pingRequestTime);
5621 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5625 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5632 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5633 ntohl(peer->host), ntohs(peer->port),
5634 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5635 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5638 /* Track only packets that are big enough. */
5639 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5643 /* absorb RTT data (in milliseconds) for these big packets */
5644 if (peer->smRtt == 0) {
5645 peer->smRtt = xferMs;
5647 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5648 if (!peer->smRtt) peer->smRtt = 1;
5651 if (peer->countDown) {
5655 peer->countDown = 10; /* recalculate only every so often */
5657 /* In practice, we can measure only the RTT for full packets,
5658 * because of the way Rx acks the data that it receives. (If it's
5659 * smaller than a full packet, it often gets implicitly acked
5660 * either by the call response (from a server) or by the next call
5661 * (from a client), and either case confuses transmission times
5662 * with processing times.) Therefore, replace the above
5663 * more-sophisticated processing with a simpler version, where the
5664 * smoothed RTT is kept for full-size packets, and the time to
5665 * transmit a windowful of full-size packets is simply RTT *
5666 * windowSize. Again, we take two steps:
5667 - ensure the timeout is large enough for a single packet's RTT;
5668 - ensure that the window is small enough to fit in the desired timeout.*/
5670 /* First, the timeout check. */
5671 minTime = peer->smRtt;
5672 /* Get a reasonable estimate for a timeout period */
5674 newTO.sec = minTime / 1000;
5675 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5677 /* Increase the timeout period so that we can always do at least
5678 * one packet exchange */
5679 if (clock_Gt(&newTO, &peer->timeout)) {
5681 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5682 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5683 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5686 peer->timeout = newTO;
5689 /* Now, get an estimate for the transmit window size. */
5690 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5691 /* Now, convert to the number of full packets that could fit in a
5692 * reasonable fraction of that interval */
5693 minTime /= (peer->smRtt << 1);
5694 xferSize = minTime; /* (make a copy) */
5696 /* Now clamp the size to reasonable bounds. */
5697 if (minTime <= 1) minTime = 1;
5698 else if (minTime > rx_Window) minTime = rx_Window;
5699 /* if (minTime != peer->maxWindow) {
5700 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5701 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5702 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5704 peer->maxWindow = minTime;
5705 elide... call->twind = minTime;
5709 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5710 * Discern this by calculating the timeout necessary for rx_Window
5712 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5713 /* calculate estimate for transmission interval in milliseconds */
5714 minTime = rx_Window * peer->smRtt;
5715 if (minTime < 1000) {
5716 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5717 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5718 peer->timeout.usec, peer->smRtt,
5721 newTO.sec = 0; /* cut back on timeout by half a second */
5722 newTO.usec = 500000;
5723 clock_Sub(&peer->timeout, &newTO);
5728 } /* end of rxi_ComputeRate */
5729 #endif /* ADAPT_WINDOW */
5737 /* Don't call this debugging routine directly; use dpf */
5739 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5740 a11, a12, a13, a14, a15)
5744 clock_GetTime(&now);
5745 fprintf(rx_Log, " %u.%.3u:", (unsigned int) now.sec, (unsigned int) now.usec/1000);
5746 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5753 * This function is used to process the rx_stats structure that is local
5754 * to a process as well as an rx_stats structure received from a remote
5755 * process (via rxdebug). Therefore, it needs to do minimal version
5758 void rx_PrintTheseStats (file, s, size, freePackets, version)
5761 int size; /* some idea of version control */
5762 afs_int32 freePackets;
5767 if (size != sizeof(struct rx_stats)) {
5769 "Unexpected size of stats structure: was %d, expected %d\n",
5770 size, sizeof(struct rx_stats));
5774 "rx stats: free packets %d, allocs %d, ",
5778 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5780 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5781 s->receivePktAllocFailures,
5782 s->receiveCbufPktAllocFailures,
5783 s->sendPktAllocFailures,
5784 s->sendCbufPktAllocFailures,
5785 s->specialPktAllocFailures);
5788 "alloc-failures(rcv %d,send %d,ack %d)\n",
5789 s->receivePktAllocFailures,
5790 s->sendPktAllocFailures,
5791 s->specialPktAllocFailures);
5796 "bogusReads %d (last from host %x), "
5802 s->bogusPacketOnRead,
5805 s->noPacketBuffersOnRead,
5809 fprintf(file, " packets read: ");
5810 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5816 fprintf(file, "\n");
5819 " other read counters: data %d, "
5827 s->spuriousPacketsRead,
5828 s->ignorePacketDally);
5830 fprintf(file, " packets sent: ");
5831 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5837 fprintf(file, "\n");
5840 " other send counters: ack %d, "
5841 "data %d (not resends), "
5844 "acked&ignored %d\n",
5847 s->dataPacketsReSent,
5848 s->dataPacketsPushed,
5849 s->ignoreAckedPacket);
5852 " \t(these should be small) sendFailed %d, "
5855 (int) s->fatalErrors);
5857 if (s->nRttSamples) {
5859 " Average rtt is %0.3f, with %d samples\n",
5860 clock_Float(&s->totalRtt)/s->nRttSamples,
5864 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5865 clock_Float(&s->minRtt),
5866 clock_Float(&s->maxRtt));
5870 " %d server connections, "
5871 "%d client connections, "
5874 "%d free call structs\n",
5879 s->nFreeCallStructs);
5881 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5883 " %d clock updates\n",
5889 /* for backward compatibility */
5890 void rx_PrintStats(file)
5893 MUTEX_ENTER(&rx_stats_mutex);
5894 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5895 MUTEX_EXIT(&rx_stats_mutex);
5898 void rx_PrintPeerStats(file, peer)
5900 struct rx_peer *peer;
5905 "burst wait %u.%d.\n",
5908 (int) peer->burstSize,
5909 (int) peer->burstWait.sec,
5910 (int) peer->burstWait.usec);
5914 "retry time %u.%06d, "
5918 (int) peer->timeout.sec,
5919 (int) peer->timeout.usec,
5925 "max in packet skew %d, "
5926 "max out packet skew %d\n",
5928 (int) peer->inPacketSkew,
5929 (int) peer->outPacketSkew);
5932 #ifdef AFS_PTHREAD_ENV
5934 * This mutex protects the following static variables:
5938 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5939 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5941 #define LOCK_RX_DEBUG
5942 #define UNLOCK_RX_DEBUG
5943 #endif /* AFS_PTHREAD_ENV */
5945 static int MakeDebugCall(
5947 afs_uint32 remoteAddr,
5948 afs_uint16 remotePort,
5956 static afs_int32 counter = 100;
5958 struct rx_header theader;
5960 register afs_int32 code;
5962 struct sockaddr_in taddr, faddr;
5967 endTime = time(0) + 20; /* try for 20 seconds */
5971 tp = &tbuffer[sizeof(struct rx_header)];
5972 taddr.sin_family = AF_INET;
5973 taddr.sin_port = remotePort;
5974 taddr.sin_addr.s_addr = remoteAddr;
5976 memset(&theader, 0, sizeof(theader));
5977 theader.epoch = htonl(999);
5979 theader.callNumber = htonl(counter);
5982 theader.type = type;
5983 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5984 theader.serviceId = 0;
5986 bcopy(&theader, tbuffer, sizeof(theader));
5987 bcopy(inputData, tp, inputLength);
5988 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5989 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5991 /* see if there's a packet available */
5993 FD_SET(socket, &imask);
5996 code = select(socket+1, &imask, 0, 0, &tv);
5998 /* now receive a packet */
5999 faddrLen = sizeof(struct sockaddr_in);
6000 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
6001 (struct sockaddr *) &faddr, &faddrLen);
6003 bcopy(tbuffer, &theader, sizeof(struct rx_header));
6004 if (counter == ntohl(theader.callNumber)) break;
6007 /* see if we've timed out */
6008 if (endTime < time(0)) return -1;
6010 code -= sizeof(struct rx_header);
6011 if (code > outputLength) code = outputLength;
6012 bcopy(tp, outputData, code);
6016 afs_int32 rx_GetServerDebug(
6018 afs_uint32 remoteAddr,
6019 afs_uint16 remotePort,
6020 struct rx_debugStats *stat,
6021 afs_uint32 *supportedValues
6024 struct rx_debugIn in;
6027 *supportedValues = 0;
6028 in.type = htonl(RX_DEBUGI_GETSTATS);
6031 rc = MakeDebugCall(socket,
6034 RX_PACKET_TYPE_DEBUG,
6041 * If the call was successful, fixup the version and indicate
6042 * what contents of the stat structure are valid.
6043 * Also do net to host conversion of fields here.
6047 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6048 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6050 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6051 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6053 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6054 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6056 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6057 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6059 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6060 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6062 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6063 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6065 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6066 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6069 stat->nFreePackets = ntohl(stat->nFreePackets);
6070 stat->packetReclaims = ntohl(stat->packetReclaims);
6071 stat->callsExecuted = ntohl(stat->callsExecuted);
6072 stat->nWaiting = ntohl(stat->nWaiting);
6073 stat->idleThreads = ntohl(stat->idleThreads);
6079 afs_int32 rx_GetServerStats(
6081 afs_uint32 remoteAddr,
6082 afs_uint16 remotePort,
6083 struct rx_stats *stat,
6084 afs_uint32 *supportedValues
6087 struct rx_debugIn in;
6088 afs_int32 *lp = (afs_int32 *) stat;
6093 * supportedValues is currently unused, but added to allow future
6094 * versioning of this function.
6097 *supportedValues = 0;
6098 in.type = htonl(RX_DEBUGI_RXSTATS);
6100 memset(stat, 0, sizeof(*stat));
6102 rc = MakeDebugCall(socket,
6105 RX_PACKET_TYPE_DEBUG,
6114 * Do net to host conversion here
6117 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6125 afs_int32 rx_GetServerVersion(
6127 afs_uint32 remoteAddr,
6128 afs_uint16 remotePort,
6129 size_t version_length,
6134 return MakeDebugCall(socket,
6137 RX_PACKET_TYPE_VERSION,
6144 afs_int32 rx_GetServerConnections(
6146 afs_uint32 remoteAddr,
6147 afs_uint16 remotePort,
6148 afs_int32 *nextConnection,
6150 afs_uint32 debugSupportedValues,
6151 struct rx_debugConn *conn,
6152 afs_uint32 *supportedValues
6155 struct rx_debugIn in;
6160 * supportedValues is currently unused, but added to allow future
6161 * versioning of this function.
6164 *supportedValues = 0;
6165 if (allConnections) {
6166 in.type = htonl(RX_DEBUGI_GETALLCONN);
6168 in.type = htonl(RX_DEBUGI_GETCONN);
6170 in.index = htonl(*nextConnection);
6171 memset(conn, 0, sizeof(*conn));
6173 rc = MakeDebugCall(socket,
6176 RX_PACKET_TYPE_DEBUG,
6183 *nextConnection += 1;
6186 * Convert old connection format to new structure.
6189 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6190 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6191 #define MOVEvL(a) (conn->a = vL->a)
6193 /* any old or unrecognized version... */
6194 for (i=0;i<RX_MAXCALLS;i++) {
6195 MOVEvL(callState[i]);
6196 MOVEvL(callMode[i]);
6197 MOVEvL(callFlags[i]);
6198 MOVEvL(callOther[i]);
6200 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6201 MOVEvL(secStats.type);
6202 MOVEvL(secStats.level);
6203 MOVEvL(secStats.flags);
6204 MOVEvL(secStats.expires);
6205 MOVEvL(secStats.packetsReceived);
6206 MOVEvL(secStats.packetsSent);
6207 MOVEvL(secStats.bytesReceived);
6208 MOVEvL(secStats.bytesSent);
6213 * Do net to host conversion here
6215 * I don't convert host or port since we are most likely
6216 * going to want these in NBO.
6218 conn->cid = ntohl(conn->cid);
6219 conn->serial = ntohl(conn->serial);
6220 for(i=0;i<RX_MAXCALLS;i++) {
6221 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6223 conn->error = ntohl(conn->error);
6224 conn->secStats.flags = ntohl(conn->secStats.flags);
6225 conn->secStats.expires = ntohl(conn->secStats.expires);
6226 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6227 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6228 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6229 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6230 conn->epoch = ntohl(conn->epoch);
6231 conn->natMTU = ntohl(conn->natMTU);
6237 afs_int32 rx_GetServerPeers(
6239 afs_uint32 remoteAddr,
6240 afs_uint16 remotePort,
6241 afs_int32 *nextPeer,
6242 afs_uint32 debugSupportedValues,
6243 struct rx_debugPeer *peer,
6244 afs_uint32 *supportedValues
6247 struct rx_debugIn in;
6251 * supportedValues is currently unused, but added to allow future
6252 * versioning of this function.
6255 *supportedValues = 0;
6256 in.type = htonl(RX_DEBUGI_GETPEER);
6257 in.index = htonl(*nextPeer);
6258 memset(peer, 0, sizeof(*peer));
6260 rc = MakeDebugCall(socket,
6263 RX_PACKET_TYPE_DEBUG,
6273 * Do net to host conversion here
6275 * I don't convert host or port since we are most likely
6276 * going to want these in NBO.
6278 peer->ifMTU = ntohs(peer->ifMTU);
6279 peer->idleWhen = ntohl(peer->idleWhen);
6280 peer->refCount = ntohs(peer->refCount);
6281 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6282 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6283 peer->rtt = ntohl(peer->rtt);
6284 peer->rtt_dev = ntohl(peer->rtt_dev);
6285 peer->timeout.sec = ntohl(peer->timeout.sec);
6286 peer->timeout.usec = ntohl(peer->timeout.usec);
6287 peer->nSent = ntohl(peer->nSent);
6288 peer->reSends = ntohl(peer->reSends);
6289 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6290 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6291 peer->rateFlag = ntohl(peer->rateFlag);
6292 peer->natMTU = ntohs(peer->natMTU);
6293 peer->maxMTU = ntohs(peer->maxMTU);
6294 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6295 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6296 peer->MTU = ntohs(peer->MTU);
6297 peer->cwind = ntohs(peer->cwind);
6298 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6299 peer->congestSeq = ntohs(peer->congestSeq);
6300 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6301 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6302 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6303 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6308 #endif /* RXDEBUG */
6310 void shutdown_rx(void)
6312 struct rx_serverQueueEntry *np;
6314 register struct rx_call *call;
6315 register struct rx_serverQueueEntry *sq;
6318 if (rxinit_status == 1) {
6320 return; /* Already shutdown. */
6325 #ifndef AFS_PTHREAD_ENV
6326 FD_ZERO(&rx_selectMask);
6327 #endif /* AFS_PTHREAD_ENV */
6328 rxi_dataQuota = RX_MAX_QUOTA;
6329 #ifndef AFS_PTHREAD_ENV
6331 #endif /* AFS_PTHREAD_ENV */
6334 #ifndef AFS_PTHREAD_ENV
6335 #ifndef AFS_USE_GETTIMEOFDAY
6337 #endif /* AFS_USE_GETTIMEOFDAY */
6338 #endif /* AFS_PTHREAD_ENV */
6340 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6341 call = queue_First(&rx_freeCallQueue, rx_call);
6343 rxi_Free(call, sizeof(struct rx_call));
6346 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6347 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6353 struct rx_peer **peer_ptr, **peer_end;
6354 for (peer_ptr = &rx_peerHashTable[0],
6355 peer_end = &rx_peerHashTable[rx_hashTableSize];
6356 peer_ptr < peer_end; peer_ptr++) {
6357 struct rx_peer *peer, *next;
6358 for (peer = *peer_ptr; peer; peer = next) {
6359 rx_interface_stat_p rpc_stat, nrpc_stat;
6361 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6362 rx_interface_stat)) {
6363 unsigned int num_funcs;
6364 if (!rpc_stat) break;
6365 queue_Remove(&rpc_stat->queue_header);
6366 queue_Remove(&rpc_stat->all_peers);
6367 num_funcs = rpc_stat->stats[0].func_total;
6368 space = sizeof(rx_interface_stat_t) +
6369 rpc_stat->stats[0].func_total *
6370 sizeof(rx_function_entry_v1_t);
6372 rxi_Free(rpc_stat, space);
6373 MUTEX_ENTER(&rx_rpc_stats);
6374 rxi_rpc_peer_stat_cnt -= num_funcs;
6375 MUTEX_EXIT(&rx_rpc_stats);
6379 MUTEX_ENTER(&rx_stats_mutex);
6380 rx_stats.nPeerStructs--;
6381 MUTEX_EXIT(&rx_stats_mutex);
6385 for (i = 0; i<RX_MAX_SERVICES; i++) {
6387 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6389 for (i = 0; i < rx_hashTableSize; i++) {
6390 register struct rx_connection *tc, *ntc;
6391 MUTEX_ENTER(&rx_connHashTable_lock);
6392 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6394 for (j = 0; j < RX_MAXCALLS; j++) {
6396 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6399 rxi_Free(tc, sizeof(*tc));
6401 MUTEX_EXIT(&rx_connHashTable_lock);
6404 MUTEX_ENTER(&freeSQEList_lock);
6406 while ((np = rx_FreeSQEList)) {
6407 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6408 MUTEX_DESTROY(&np->lock);
6409 rxi_Free(np, sizeof(*np));
6412 MUTEX_EXIT(&freeSQEList_lock);
6413 MUTEX_DESTROY(&freeSQEList_lock);
6414 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6415 MUTEX_DESTROY(&rx_connHashTable_lock);
6416 MUTEX_DESTROY(&rx_peerHashTable_lock);
6417 MUTEX_DESTROY(&rx_serverPool_lock);
6419 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6420 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6422 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6423 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6425 rxi_FreeAllPackets();
6427 MUTEX_ENTER(&rx_stats_mutex);
6428 rxi_dataQuota = RX_MAX_QUOTA;
6429 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6430 MUTEX_EXIT(&rx_stats_mutex);
6436 #ifdef RX_ENABLE_LOCKS
6437 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6439 if (!MUTEX_ISMINE(lockaddr))
6440 osi_Panic("Lock not held: %s", msg);
6442 #endif /* RX_ENABLE_LOCKS */
6447 * Routines to implement connection specific data.
6450 int rx_KeyCreate(rx_destructor_t rtn)
6453 MUTEX_ENTER(&rxi_keyCreate_lock);
6454 key = rxi_keyCreate_counter++;
6455 rxi_keyCreate_destructor = (rx_destructor_t *)
6456 realloc((void *)rxi_keyCreate_destructor,
6457 (key+1) * sizeof(rx_destructor_t));
6458 rxi_keyCreate_destructor[key] = rtn;
6459 MUTEX_EXIT(&rxi_keyCreate_lock);
6463 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6466 MUTEX_ENTER(&conn->conn_data_lock);
6467 if (!conn->specific) {
6468 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6469 for (i = 0 ; i < key ; i++)
6470 conn->specific[i] = NULL;
6471 conn->nSpecific = key+1;
6472 conn->specific[key] = ptr;
6473 } else if (key >= conn->nSpecific) {
6474 conn->specific = (void **)
6475 realloc(conn->specific,(key+1)*sizeof(void *));
6476 for (i = conn->nSpecific ; i < key ; i++)
6477 conn->specific[i] = NULL;
6478 conn->nSpecific = key+1;
6479 conn->specific[key] = ptr;
6481 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6482 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6483 conn->specific[key] = ptr;
6485 MUTEX_EXIT(&conn->conn_data_lock);
6488 void *rx_GetSpecific(struct rx_connection *conn, int key)
6491 MUTEX_ENTER(&conn->conn_data_lock);
6492 if (key >= conn->nSpecific)
6495 ptr = conn->specific[key];
6496 MUTEX_EXIT(&conn->conn_data_lock);
6500 #endif /* !KERNEL */
6503 * processStats is a queue used to store the statistics for the local
6504 * process. Its contents are similar to the contents of the rpcStats
6505 * queue on a rx_peer structure, but the actual data stored within
6506 * this queue contains totals across the lifetime of the process (assuming
6507 * the stats have not been reset) - unlike the per peer structures
6508 * which can come and go based upon the peer lifetime.
6511 static struct rx_queue processStats = {&processStats,&processStats};
6514 * peerStats is a queue used to store the statistics for all peer structs.
6515 * Its contents are the union of all the peer rpcStats queues.
6518 static struct rx_queue peerStats = {&peerStats,&peerStats};
6521 * rxi_monitor_processStats is used to turn process wide stat collection
6525 static int rxi_monitor_processStats = 0;
6528 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6531 static int rxi_monitor_peerStats = 0;
6534 * rxi_AddRpcStat - given all of the information for a particular rpc
6535 * call, create (if needed) and update the stat totals for the rpc.
6539 * IN stats - the queue of stats that will be updated with the new value
6541 * IN rxInterface - a unique number that identifies the rpc interface
6543 * IN currentFunc - the index of the function being invoked
6545 * IN totalFunc - the total number of functions in this interface
6547 * IN queueTime - the amount of time this function waited for a thread
6549 * IN execTime - the amount of time this function invocation took to execute
6551 * IN bytesSent - the number bytes sent by this invocation
6553 * IN bytesRcvd - the number bytes received by this invocation
6555 * IN isServer - if true, this invocation was made to a server
6557 * IN remoteHost - the ip address of the remote host
6559 * IN remotePort - the port of the remote host
6561 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6563 * INOUT counter - if a new stats structure is allocated, the counter will
6564 * be updated with the new number of allocated stat structures
6571 static int rxi_AddRpcStat(
6572 struct rx_queue *stats,
6573 afs_uint32 rxInterface,
6574 afs_uint32 currentFunc,
6575 afs_uint32 totalFunc,
6576 struct clock *queueTime,
6577 struct clock *execTime,
6578 afs_hyper_t *bytesSent,
6579 afs_hyper_t *bytesRcvd,
6581 afs_uint32 remoteHost,
6582 afs_uint32 remotePort,
6584 unsigned int *counter)
6587 rx_interface_stat_p rpc_stat, nrpc_stat;
6590 * See if there's already a structure for this interface
6593 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6594 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6595 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6599 * Didn't find a match so allocate a new structure and add it to the
6603 if (queue_IsEnd(stats, rpc_stat) ||
6604 (rpc_stat == NULL) ||
6605 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6606 (rpc_stat->stats[0].remote_is_server != isServer)) {
6610 space = sizeof(rx_interface_stat_t) + totalFunc *
6611 sizeof(rx_function_entry_v1_t);
6613 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6614 if (rpc_stat == NULL) {
6618 *counter += totalFunc;
6619 for(i=0;i<totalFunc;i++) {
6620 rpc_stat->stats[i].remote_peer = remoteHost;
6621 rpc_stat->stats[i].remote_port = remotePort;
6622 rpc_stat->stats[i].remote_is_server = isServer;
6623 rpc_stat->stats[i].interfaceId = rxInterface;
6624 rpc_stat->stats[i].func_total = totalFunc;
6625 rpc_stat->stats[i].func_index = i;
6626 hzero(rpc_stat->stats[i].invocations);
6627 hzero(rpc_stat->stats[i].bytes_sent);
6628 hzero(rpc_stat->stats[i].bytes_rcvd);
6629 rpc_stat->stats[i].queue_time_sum.sec = 0;
6630 rpc_stat->stats[i].queue_time_sum.usec = 0;
6631 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6632 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6633 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6634 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6635 rpc_stat->stats[i].queue_time_max.sec = 0;
6636 rpc_stat->stats[i].queue_time_max.usec = 0;
6637 rpc_stat->stats[i].execution_time_sum.sec = 0;
6638 rpc_stat->stats[i].execution_time_sum.usec = 0;
6639 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6640 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6641 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6642 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6643 rpc_stat->stats[i].execution_time_max.sec = 0;
6644 rpc_stat->stats[i].execution_time_max.usec = 0;
6646 queue_Prepend(stats, rpc_stat);
6647 if (addToPeerList) {
6648 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6653 * Increment the stats for this function
6656 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6657 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6658 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6659 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6660 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6661 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6662 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6664 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6665 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6667 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6668 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6669 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6670 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6672 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6673 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6681 * rx_IncrementTimeAndCount - increment the times and count for a particular
6686 * IN peer - the peer who invoked the rpc
6688 * IN rxInterface - a unique number that identifies the rpc interface
6690 * IN currentFunc - the index of the function being invoked
6692 * IN totalFunc - the total number of functions in this interface
6694 * IN queueTime - the amount of time this function waited for a thread
6696 * IN execTime - the amount of time this function invocation took to execute
6698 * IN bytesSent - the number bytes sent by this invocation
6700 * IN bytesRcvd - the number bytes received by this invocation
6702 * IN isServer - if true, this invocation was made to a server
6709 void rx_IncrementTimeAndCount(
6710 struct rx_peer *peer,
6711 afs_uint32 rxInterface,
6712 afs_uint32 currentFunc,
6713 afs_uint32 totalFunc,
6714 struct clock *queueTime,
6715 struct clock *execTime,
6716 afs_hyper_t *bytesSent,
6717 afs_hyper_t *bytesRcvd,
6721 MUTEX_ENTER(&rx_rpc_stats);
6722 MUTEX_ENTER(&peer->peer_lock);
6724 if (rxi_monitor_peerStats) {
6725 rxi_AddRpcStat(&peer->rpcStats,
6737 &rxi_rpc_peer_stat_cnt);
6740 if (rxi_monitor_processStats) {
6741 rxi_AddRpcStat(&processStats,
6753 &rxi_rpc_process_stat_cnt);
6756 MUTEX_EXIT(&peer->peer_lock);
6757 MUTEX_EXIT(&rx_rpc_stats);
6762 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6766 * IN callerVersion - the rpc stat version of the caller.
6768 * IN count - the number of entries to marshall.
6770 * IN stats - pointer to stats to be marshalled.
6772 * OUT ptr - Where to store the marshalled data.
6778 void rx_MarshallProcessRPCStats(
6779 afs_uint32 callerVersion,
6781 rx_function_entry_v1_t *stats,
6788 * We only support the first version
6790 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6791 *(ptr++) = stats->remote_peer;
6792 *(ptr++) = stats->remote_port;
6793 *(ptr++) = stats->remote_is_server;
6794 *(ptr++) = stats->interfaceId;
6795 *(ptr++) = stats->func_total;
6796 *(ptr++) = stats->func_index;
6797 *(ptr++) = hgethi(stats->invocations);
6798 *(ptr++) = hgetlo(stats->invocations);
6799 *(ptr++) = hgethi(stats->bytes_sent);
6800 *(ptr++) = hgetlo(stats->bytes_sent);
6801 *(ptr++) = hgethi(stats->bytes_rcvd);
6802 *(ptr++) = hgetlo(stats->bytes_rcvd);
6803 *(ptr++) = stats->queue_time_sum.sec;
6804 *(ptr++) = stats->queue_time_sum.usec;
6805 *(ptr++) = stats->queue_time_sum_sqr.sec;
6806 *(ptr++) = stats->queue_time_sum_sqr.usec;
6807 *(ptr++) = stats->queue_time_min.sec;
6808 *(ptr++) = stats->queue_time_min.usec;
6809 *(ptr++) = stats->queue_time_max.sec;
6810 *(ptr++) = stats->queue_time_max.usec;
6811 *(ptr++) = stats->execution_time_sum.sec;
6812 *(ptr++) = stats->execution_time_sum.usec;
6813 *(ptr++) = stats->execution_time_sum_sqr.sec;
6814 *(ptr++) = stats->execution_time_sum_sqr.usec;
6815 *(ptr++) = stats->execution_time_min.sec;
6816 *(ptr++) = stats->execution_time_min.usec;
6817 *(ptr++) = stats->execution_time_max.sec;
6818 *(ptr++) = stats->execution_time_max.usec;
6824 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6829 * IN callerVersion - the rpc stat version of the caller
6831 * OUT myVersion - the rpc stat version of this function
6833 * OUT clock_sec - local time seconds
6835 * OUT clock_usec - local time microseconds
6837 * OUT allocSize - the number of bytes allocated to contain stats
6839 * OUT statCount - the number stats retrieved from this process.
6841 * OUT stats - the actual stats retrieved from this process.
6845 * Returns void. If successful, stats will != NULL.
6848 int rx_RetrieveProcessRPCStats(
6849 afs_uint32 callerVersion,
6850 afs_uint32 *myVersion,
6851 afs_uint32 *clock_sec,
6852 afs_uint32 *clock_usec,
6854 afs_uint32 *statCount,
6865 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6868 * Check to see if stats are enabled
6871 MUTEX_ENTER(&rx_rpc_stats);
6872 if (!rxi_monitor_processStats) {
6873 MUTEX_EXIT(&rx_rpc_stats);
6877 clock_GetTime(&now);
6878 *clock_sec = now.sec;
6879 *clock_usec = now.usec;
6882 * Allocate the space based upon the caller version
6884 * If the client is at an older version than we are,
6885 * we return the statistic data in the older data format, but
6886 * we still return our version number so the client knows we
6887 * are maintaining more data than it can retrieve.
6890 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6891 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6892 *statCount = rxi_rpc_process_stat_cnt;
6895 * This can't happen yet, but in the future version changes
6896 * can be handled by adding additional code here
6900 if (space > (size_t) 0) {
6902 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6905 rx_interface_stat_p rpc_stat, nrpc_stat;
6908 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6909 rx_interface_stat)) {
6911 * Copy the data based upon the caller version
6913 rx_MarshallProcessRPCStats(callerVersion,
6914 rpc_stat->stats[0].func_total,
6915 rpc_stat->stats, &ptr);
6921 MUTEX_EXIT(&rx_rpc_stats);
6926 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6930 * IN callerVersion - the rpc stat version of the caller
6932 * OUT myVersion - the rpc stat version of this function
6934 * OUT clock_sec - local time seconds
6936 * OUT clock_usec - local time microseconds
6938 * OUT allocSize - the number of bytes allocated to contain stats
6940 * OUT statCount - the number of stats retrieved from the individual
6943 * OUT stats - the actual stats retrieved from the individual peer structures.
6947 * Returns void. If successful, stats will != NULL.
6950 int rx_RetrievePeerRPCStats(
6951 afs_uint32 callerVersion,
6952 afs_uint32 *myVersion,
6953 afs_uint32 *clock_sec,
6954 afs_uint32 *clock_usec,
6956 afs_uint32 *statCount,
6967 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6970 * Check to see if stats are enabled
6973 MUTEX_ENTER(&rx_rpc_stats);
6974 if (!rxi_monitor_peerStats) {
6975 MUTEX_EXIT(&rx_rpc_stats);
6979 clock_GetTime(&now);
6980 *clock_sec = now.sec;
6981 *clock_usec = now.usec;
6984 * Allocate the space based upon the caller version
6986 * If the client is at an older version than we are,
6987 * we return the statistic data in the older data format, but
6988 * we still return our version number so the client knows we
6989 * are maintaining more data than it can retrieve.
6992 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6993 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6994 *statCount = rxi_rpc_peer_stat_cnt;
6997 * This can't happen yet, but in the future version changes
6998 * can be handled by adding additional code here
7002 if (space > (size_t) 0) {
7004 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7007 rx_interface_stat_p rpc_stat, nrpc_stat;
7010 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7011 rx_interface_stat)) {
7013 * We have to fix the offset of rpc_stat since we are
7014 * keeping this structure on two rx_queues. The rx_queue
7015 * package assumes that the rx_queue member is the first
7016 * member of the structure. That is, rx_queue assumes that
7017 * any one item is only on one queue at a time. We are
7018 * breaking that assumption and so we have to do a little
7019 * math to fix our pointers.
7022 fix_offset = (char *) rpc_stat;
7023 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7024 rpc_stat = (rx_interface_stat_p) fix_offset;
7027 * Copy the data based upon the caller version
7029 rx_MarshallProcessRPCStats(callerVersion,
7030 rpc_stat->stats[0].func_total,
7031 rpc_stat->stats, &ptr);
7037 MUTEX_EXIT(&rx_rpc_stats);
7042 * rx_FreeRPCStats - free memory allocated by
7043 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7047 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7048 * rx_RetrievePeerRPCStats
7050 * IN allocSize - the number of bytes in stats.
7057 void rx_FreeRPCStats(
7061 rxi_Free(stats, allocSize);
7065 * rx_queryProcessRPCStats - see if process rpc stat collection is
7066 * currently enabled.
7072 * Returns 0 if stats are not enabled != 0 otherwise
7075 int rx_queryProcessRPCStats()
7078 MUTEX_ENTER(&rx_rpc_stats);
7079 rc = rxi_monitor_processStats;
7080 MUTEX_EXIT(&rx_rpc_stats);
7085 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7091 * Returns 0 if stats are not enabled != 0 otherwise
7094 int rx_queryPeerRPCStats()
7097 MUTEX_ENTER(&rx_rpc_stats);
7098 rc = rxi_monitor_peerStats;
7099 MUTEX_EXIT(&rx_rpc_stats);
7104 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7113 void rx_enableProcessRPCStats()
7115 MUTEX_ENTER(&rx_rpc_stats);
7116 rx_enable_stats = 1;
7117 rxi_monitor_processStats = 1;
7118 MUTEX_EXIT(&rx_rpc_stats);
7122 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7131 void rx_enablePeerRPCStats()
7133 MUTEX_ENTER(&rx_rpc_stats);
7134 rx_enable_stats = 1;
7135 rxi_monitor_peerStats = 1;
7136 MUTEX_EXIT(&rx_rpc_stats);
7140 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7149 void rx_disableProcessRPCStats()
7151 rx_interface_stat_p rpc_stat, nrpc_stat;
7154 MUTEX_ENTER(&rx_rpc_stats);
7157 * Turn off process statistics and if peer stats is also off, turn
7161 rxi_monitor_processStats = 0;
7162 if (rxi_monitor_peerStats == 0) {
7163 rx_enable_stats = 0;
7166 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7167 unsigned int num_funcs = 0;
7168 if (!rpc_stat) break;
7169 queue_Remove(rpc_stat);
7170 num_funcs = rpc_stat->stats[0].func_total;
7171 space = sizeof(rx_interface_stat_t) +
7172 rpc_stat->stats[0].func_total *
7173 sizeof(rx_function_entry_v1_t);
7175 rxi_Free(rpc_stat, space);
7176 rxi_rpc_process_stat_cnt -= num_funcs;
7178 MUTEX_EXIT(&rx_rpc_stats);
7182 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7191 void rx_disablePeerRPCStats()
7193 struct rx_peer **peer_ptr, **peer_end;
7196 MUTEX_ENTER(&rx_rpc_stats);
7199 * Turn off peer statistics and if process stats is also off, turn
7203 rxi_monitor_peerStats = 0;
7204 if (rxi_monitor_processStats == 0) {
7205 rx_enable_stats = 0;
7208 MUTEX_ENTER(&rx_peerHashTable_lock);
7209 for (peer_ptr = &rx_peerHashTable[0],
7210 peer_end = &rx_peerHashTable[rx_hashTableSize];
7211 peer_ptr < peer_end; peer_ptr++) {
7212 struct rx_peer *peer, *next, *prev;
7213 for (prev = peer = *peer_ptr; peer; peer = next) {
7215 code = MUTEX_TRYENTER(&peer->peer_lock);
7217 rx_interface_stat_p rpc_stat, nrpc_stat;
7219 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7220 rx_interface_stat)) {
7221 unsigned int num_funcs = 0;
7222 if (!rpc_stat) break;
7223 queue_Remove(&rpc_stat->queue_header);
7224 queue_Remove(&rpc_stat->all_peers);
7225 num_funcs = rpc_stat->stats[0].func_total;
7226 space = sizeof(rx_interface_stat_t) +
7227 rpc_stat->stats[0].func_total *
7228 sizeof(rx_function_entry_v1_t);
7230 rxi_Free(rpc_stat, space);
7231 rxi_rpc_peer_stat_cnt -= num_funcs;
7233 MUTEX_EXIT(&peer->peer_lock);
7234 if (prev == *peer_ptr) {
7246 MUTEX_EXIT(&rx_peerHashTable_lock);
7247 MUTEX_EXIT(&rx_rpc_stats);
7251 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7256 * IN clearFlag - flag indicating which stats to clear
7263 void rx_clearProcessRPCStats(
7264 afs_uint32 clearFlag)
7266 rx_interface_stat_p rpc_stat, nrpc_stat;
7268 MUTEX_ENTER(&rx_rpc_stats);
7270 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7271 unsigned int num_funcs = 0, i;
7272 num_funcs = rpc_stat->stats[0].func_total;
7273 for(i=0;i<num_funcs;i++) {
7274 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7275 hzero(rpc_stat->stats[i].invocations);
7277 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7278 hzero(rpc_stat->stats[i].bytes_sent);
7280 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7281 hzero(rpc_stat->stats[i].bytes_rcvd);
7283 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7284 rpc_stat->stats[i].queue_time_sum.sec = 0;
7285 rpc_stat->stats[i].queue_time_sum.usec = 0;
7287 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7288 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7289 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7291 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7292 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7293 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7295 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7296 rpc_stat->stats[i].queue_time_max.sec = 0;
7297 rpc_stat->stats[i].queue_time_max.usec = 0;
7299 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7300 rpc_stat->stats[i].execution_time_sum.sec = 0;
7301 rpc_stat->stats[i].execution_time_sum.usec = 0;
7303 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7304 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7305 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7307 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7308 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7309 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7311 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7312 rpc_stat->stats[i].execution_time_max.sec = 0;
7313 rpc_stat->stats[i].execution_time_max.usec = 0;
7318 MUTEX_EXIT(&rx_rpc_stats);
7322 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7327 * IN clearFlag - flag indicating which stats to clear
7334 void rx_clearPeerRPCStats(
7335 afs_uint32 clearFlag)
7337 rx_interface_stat_p rpc_stat, nrpc_stat;
7339 MUTEX_ENTER(&rx_rpc_stats);
7341 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7342 unsigned int num_funcs = 0, i;
7345 * We have to fix the offset of rpc_stat since we are
7346 * keeping this structure on two rx_queues. The rx_queue
7347 * package assumes that the rx_queue member is the first
7348 * member of the structure. That is, rx_queue assumes that
7349 * any one item is only on one queue at a time. We are
7350 * breaking that assumption and so we have to do a little
7351 * math to fix our pointers.
7354 fix_offset = (char *) rpc_stat;
7355 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7356 rpc_stat = (rx_interface_stat_p) fix_offset;
7358 num_funcs = rpc_stat->stats[0].func_total;
7359 for(i=0;i<num_funcs;i++) {
7360 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7361 hzero(rpc_stat->stats[i].invocations);
7363 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7364 hzero(rpc_stat->stats[i].bytes_sent);
7366 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7367 hzero(rpc_stat->stats[i].bytes_rcvd);
7369 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7370 rpc_stat->stats[i].queue_time_sum.sec = 0;
7371 rpc_stat->stats[i].queue_time_sum.usec = 0;
7373 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7374 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7375 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7377 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7378 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7379 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7381 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7382 rpc_stat->stats[i].queue_time_max.sec = 0;
7383 rpc_stat->stats[i].queue_time_max.usec = 0;
7385 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7386 rpc_stat->stats[i].execution_time_sum.sec = 0;
7387 rpc_stat->stats[i].execution_time_sum.usec = 0;
7389 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7390 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7391 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7393 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7394 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7395 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7397 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7398 rpc_stat->stats[i].execution_time_max.sec = 0;
7399 rpc_stat->stats[i].execution_time_max.usec = 0;
7404 MUTEX_EXIT(&rx_rpc_stats);
7408 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7409 * is authorized to enable/disable/clear RX statistics.
7411 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7413 void rx_SetRxStatUserOk(
7414 int (*proc)(struct rx_call *call))
7416 rxi_rxstat_userok = proc;
7419 int rx_RxStatUserOk(
7420 struct rx_call *call)
7422 if (!rxi_rxstat_userok)
7424 return rxi_rxstat_userok(call);