2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
21 #include <net/net_globals.h>
22 #endif /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
33 #undef RXDEBUG /* turn off debugging */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
38 #include "../afsint/afsint.h"
45 #endif /* AFS_ALPHA_ENV */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
60 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
69 # include <afs/param.h>
70 # include <sys/types.h>
77 # include <sys/socket.h>
78 # include <sys/file.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
95 extern afs_uint32 LWP_ThreadId();
98 int (*registerProgram)() = 0;
99 int (*swapNameProgram)() = 0;
101 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
103 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
104 afs_int32 rxi_start_in_error;
106 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
109 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
110 * currently allocated within rx. This number is used to allocate the
111 * memory required to return the statistics when queried.
114 static unsigned int rxi_rpc_peer_stat_cnt;
117 * rxi_rpc_process_stat_cnt counts the total number of local process stat
118 * structures currently allocated within rx. The number is used to allocate
119 * the memory required to return the statistics when queried.
122 static unsigned int rxi_rpc_process_stat_cnt;
124 #if !defined(offsetof)
125 #include <stddef.h> /* for definition of offsetof() */
128 #ifdef AFS_PTHREAD_ENV
132 * Use procedural initialization of mutexes/condition variables
136 extern pthread_mutex_t rxkad_stats_mutex;
137 extern pthread_mutex_t des_init_mutex;
138 extern pthread_mutex_t des_random_mutex;
139 extern pthread_mutex_t rx_clock_mutex;
140 extern pthread_mutex_t rxi_connCacheMutex;
141 extern pthread_mutex_t rx_event_mutex;
142 extern pthread_mutex_t osi_malloc_mutex;
143 extern pthread_mutex_t event_handler_mutex;
144 extern pthread_mutex_t listener_mutex;
145 extern pthread_mutex_t rx_if_init_mutex;
146 extern pthread_mutex_t rx_if_mutex;
147 extern pthread_mutex_t rxkad_client_uid_mutex;
148 extern pthread_mutex_t rxkad_random_mutex;
150 extern pthread_cond_t rx_event_handler_cond;
151 extern pthread_cond_t rx_listener_cond;
153 static pthread_mutex_t epoch_mutex;
154 static pthread_mutex_t rx_init_mutex;
155 static pthread_mutex_t rx_debug_mutex;
157 static void rxi_InitPthread(void) {
158 assert(pthread_mutex_init(&rx_clock_mutex,
159 (const pthread_mutexattr_t*)0)==0);
160 assert(pthread_mutex_init(&rxi_connCacheMutex,
161 (const pthread_mutexattr_t*)0)==0);
162 assert(pthread_mutex_init(&rx_init_mutex,
163 (const pthread_mutexattr_t*)0)==0);
164 assert(pthread_mutex_init(&epoch_mutex,
165 (const pthread_mutexattr_t*)0)==0);
166 assert(pthread_mutex_init(&rx_event_mutex,
167 (const pthread_mutexattr_t*)0)==0);
168 assert(pthread_mutex_init(&des_init_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&des_random_mutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&osi_malloc_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&event_handler_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&listener_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&rx_if_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&rx_if_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&rxkad_random_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&rxkad_stats_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_debug_mutex,
189 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_cond_init(&rx_event_handler_cond,
192 (const pthread_condattr_t*)0)==0);
193 assert(pthread_cond_init(&rx_listener_cond,
194 (const pthread_condattr_t*)0)==0);
195 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
198 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
199 #define INIT_PTHREAD_LOCKS \
200 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
202 * The rx_stats_mutex mutex protects the following global variables:
207 * rxi_lowConnRefCount
208 * rxi_lowPeerRefCount
217 #define INIT_PTHREAD_LOCKS
220 extern void rxi_DeleteCachedConnections(void);
223 /* Variables for handling the minProcs implementation. availProcs gives the
224 * number of threads available in the pool at this moment (not counting dudes
225 * executing right now). totalMin gives the total number of procs required
226 * for handling all minProcs requests. minDeficit is a dynamic variable
227 * tracking the # of procs required to satisfy all of the remaining minProcs
229 * For fine grain locking to work, the quota check and the reservation of
230 * a server thread has to come while rxi_availProcs and rxi_minDeficit
231 * are locked. To this end, the code has been modified under #ifdef
232 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
233 * same time. A new function, ReturnToServerPool() returns the allocation.
235 * A call can be on several queue's (but only one at a time). When
236 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
237 * that no one else is touching the queue. To this end, we store the address
238 * of the queue lock in the call structure (under the call lock) when we
239 * put the call on a queue, and we clear the call_queue_lock when the
240 * call is removed from a queue (once the call lock has been obtained).
241 * This allows rxi_ResetCall to safely synchronize with others wishing
242 * to manipulate the queue.
245 extern void rxi_Delay(int);
247 static int rxi_ServerThreadSelectingCall;
249 #ifdef RX_ENABLE_LOCKS
250 static afs_kmutex_t rx_rpc_stats;
251 void rxi_StartUnlocked();
254 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
255 ** pretty good that the next packet coming in is from the same connection
256 ** as the last packet, since we're send multiple packets in a transmit window.
258 struct rx_connection *rxLastConn = 0;
260 #ifdef RX_ENABLE_LOCKS
261 /* The locking hierarchy for rx fine grain locking is composed of five
263 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
264 * call->lock - locks call data fields.
265 * Most any other lock - these are all independent of each other.....
267 * rx_freeCallQueue_lock
269 * rx_connHashTable_lock
272 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
275 * peer_lock - locks peer data fields.
276 * conn_data_lock - that more than one thread is not updating a conn data
277 * field at the same time.
278 * Do we need a lock to protect the peer field in the conn structure?
279 * conn->peer was previously a constant for all intents and so has no
280 * lock protecting this field. The multihomed client delta introduced
281 * a RX code change : change the peer field in the connection structure
282 * to that remote inetrface from which the last packet for this
283 * connection was sent out. This may become an issue if further changes
286 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
287 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
289 /* rxdb_fileID is used to identify the lock location, along with line#. */
290 static int rxdb_fileID = RXDB_FILE_RX;
291 #endif /* RX_LOCKS_DB */
292 static void rxi_SetAcksInTransmitQueue();
293 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
294 #else /* RX_ENABLE_LOCKS */
295 #define SET_CALL_QUEUE_LOCK(C, L)
296 #define CLEAR_CALL_QUEUE_LOCK(C)
297 #endif /* RX_ENABLE_LOCKS */
298 static void rxi_DestroyConnectionNoLock();
299 void rxi_DestroyConnection();
300 void rxi_CleanupConnection();
301 struct rx_serverQueueEntry *rx_waitForPacket = 0;
303 /* ------------Exported Interfaces------------- */
305 /* This function allows rxkad to set the epoch to a suitably random number
306 * which rx_NewConnection will use in the future. The principle purpose is to
307 * get rxnull connections to use the same epoch as the rxkad connections do, at
308 * least once the first rxkad connection is established. This is important now
309 * that the host/port addresses aren't used in FindConnection: the uniqueness
310 * of epoch/cid matters and the start time won't do. */
312 #ifdef AFS_PTHREAD_ENV
314 * This mutex protects the following global variables:
318 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
319 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
323 #endif /* AFS_PTHREAD_ENV */
325 void rx_SetEpoch (epoch)
333 /* Initialize rx. A port number may be mentioned, in which case this
334 * becomes the default port number for any service installed later.
335 * If 0 is provided for the port number, a random port will be chosen
336 * by the kernel. Whether this will ever overlap anything in
337 * /etc/services is anybody's guess... Returns 0 on success, -1 on
339 static int rxinit_status = 1;
340 #ifdef AFS_PTHREAD_ENV
342 * This mutex protects the following global variables:
346 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
347 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT
353 int rx_Init(u_int port)
360 char *htable, *ptable;
367 if (rxinit_status == 0) {
368 tmp_status = rxinit_status;
370 return tmp_status; /* Already started; return previous error code. */
374 if (afs_winsockInit()<0)
380 * Initialize anything necessary to provide a non-premptive threading
383 rxi_InitializeThreadSupport();
386 /* Allocate and initialize a socket for client and perhaps server
389 rx_socket = rxi_GetUDPSocket((u_short)port);
390 if (rx_socket == OSI_NULLSOCKET) {
396 #ifdef RX_ENABLE_LOCKS
399 #endif /* RX_LOCKS_DB */
400 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
401 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
402 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
403 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
404 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
406 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
407 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
411 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
413 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
414 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
416 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
417 #endif /* KERNEL && AFS_HPUX110_ENV */
418 #else /* RX_ENABLE_LOCKS */
419 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
420 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
421 #endif /* AFS_GLOBAL_SUNLOCK */
422 #endif /* RX_ENABLE_LOCKS */
425 rx_connDeadTime = 12;
426 rx_tranquil = 0; /* reset flag */
427 bzero((char *)&rx_stats, sizeof(struct rx_stats));
429 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
430 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
431 bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
432 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
433 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
434 bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
436 /* Malloc up a bunch of packets & buffers */
438 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
439 queue_Init(&rx_freePacketQueue);
440 rxi_NeedMorePackets = FALSE;
441 rxi_MorePackets(rx_nPackets);
449 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
450 tv.tv_sec = clock_now.sec;
451 tv.tv_usec = clock_now.usec;
452 srand((unsigned int) tv.tv_usec);
459 #if defined(KERNEL) && !defined(UKERNEL)
460 /* Really, this should never happen in a real kernel */
463 struct sockaddr_in addr;
464 int addrlen = sizeof(addr);
465 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
469 rx_port = addr.sin_port;
472 rx_stats.minRtt.sec = 9999999;
474 rx_SetEpoch (tv.tv_sec | 0x80000000);
476 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
477 * will provide a randomer value. */
479 MUTEX_ENTER(&rx_stats_mutex);
480 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
481 MUTEX_EXIT(&rx_stats_mutex);
482 /* *Slightly* random start time for the cid. This is just to help
483 * out with the hashing function at the peer */
484 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
485 rx_connHashTable = (struct rx_connection **) htable;
486 rx_peerHashTable = (struct rx_peer **) ptable;
488 rx_lastAckDelay.sec = 0;
489 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
490 rx_hardAckDelay.sec = 0;
491 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
492 rx_softAckDelay.sec = 0;
493 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
495 rxevent_Init(20, rxi_ReScheduleEvents);
497 /* Initialize various global queues */
498 queue_Init(&rx_idleServerQueue);
499 queue_Init(&rx_incomingCallQueue);
500 queue_Init(&rx_freeCallQueue);
502 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
503 /* Initialize our list of usable IP addresses. */
507 /* Start listener process (exact function is dependent on the
508 * implementation environment--kernel or user space) */
513 tmp_status = rxinit_status = 0;
518 /* called with unincremented nRequestsRunning to see if it is OK to start
519 * a new thread in this service. Could be "no" for two reasons: over the
520 * max quota, or would prevent others from reaching their min quota.
522 #ifdef RX_ENABLE_LOCKS
523 /* This verion of QuotaOK reserves quota if it's ok while the
524 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
526 static int QuotaOK(aservice)
527 register struct rx_service *aservice;
529 /* check if over max quota */
530 if (aservice->nRequestsRunning >= aservice->maxProcs) {
534 /* under min quota, we're OK */
535 /* otherwise, can use only if there are enough to allow everyone
536 * to go to their min quota after this guy starts.
538 MUTEX_ENTER(&rx_stats_mutex);
539 if ((aservice->nRequestsRunning < aservice->minProcs) ||
540 (rxi_availProcs > rxi_minDeficit)) {
541 aservice->nRequestsRunning++;
542 /* just started call in minProcs pool, need fewer to maintain
544 if (aservice->nRequestsRunning <= aservice->minProcs)
547 MUTEX_EXIT(&rx_stats_mutex);
550 MUTEX_EXIT(&rx_stats_mutex);
554 static void ReturnToServerPool(aservice)
555 register struct rx_service *aservice;
557 aservice->nRequestsRunning--;
558 MUTEX_ENTER(&rx_stats_mutex);
559 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
561 MUTEX_EXIT(&rx_stats_mutex);
564 #else /* RX_ENABLE_LOCKS */
565 static QuotaOK(aservice)
566 register struct rx_service *aservice; {
568 /* under min quota, we're OK */
569 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
571 /* check if over max quota */
572 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
574 /* otherwise, can use only if there are enough to allow everyone
575 * to go to their min quota after this guy starts.
577 if (rxi_availProcs > rxi_minDeficit) rc = 1;
580 #endif /* RX_ENABLE_LOCKS */
583 /* Called by rx_StartServer to start up lwp's to service calls.
584 NExistingProcs gives the number of procs already existing, and which
585 therefore needn't be created. */
586 void rxi_StartServerProcs(nExistingProcs)
589 register struct rx_service *service;
594 /* For each service, reserve N processes, where N is the "minimum"
595 number of processes that MUST be able to execute a request in parallel,
596 at any time, for that process. Also compute the maximum difference
597 between any service's maximum number of processes that can run
598 (i.e. the maximum number that ever will be run, and a guarantee
599 that this number will run if other services aren't running), and its
600 minimum number. The result is the extra number of processes that
601 we need in order to provide the latter guarantee */
602 for (i=0; i<RX_MAX_SERVICES; i++) {
604 service = rx_services[i];
605 if (service == (struct rx_service *) 0) break;
606 nProcs += service->minProcs;
607 diff = service->maxProcs - service->minProcs;
608 if (diff > maxdiff) maxdiff = diff;
610 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
611 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
612 for (i = 0; i<nProcs; i++) {
613 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
618 /* This routine must be called if any services are exported. If the
619 * donateMe flag is set, the calling process is donated to the server
621 void rx_StartServer(donateMe)
623 register struct rx_service *service;
624 register int i, nProcs;
630 /* Start server processes, if necessary (exact function is dependent
631 * on the implementation environment--kernel or user space). DonateMe
632 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
633 * case, one less new proc will be created rx_StartServerProcs.
635 rxi_StartServerProcs(donateMe);
637 /* count up the # of threads in minProcs, and add set the min deficit to
638 * be that value, too.
640 for (i=0; i<RX_MAX_SERVICES; i++) {
641 service = rx_services[i];
642 if (service == (struct rx_service *) 0) break;
643 MUTEX_ENTER(&rx_stats_mutex);
644 rxi_totalMin += service->minProcs;
645 /* below works even if a thread is running, since minDeficit would
646 * still have been decremented and later re-incremented.
648 rxi_minDeficit += service->minProcs;
649 MUTEX_EXIT(&rx_stats_mutex);
652 /* Turn on reaping of idle server connections */
653 rxi_ReapConnections();
663 #ifdef AFS_PTHREAD_ENV
665 pid = pthread_self();
666 #else /* AFS_PTHREAD_ENV */
668 code = LWP_CurrentProcess(&pid);
669 #endif /* AFS_PTHREAD_ENV */
671 sprintf(name,"srv_%d", ++nProcs);
673 (*registerProgram)(pid, name);
675 #endif /* AFS_NT40_ENV */
676 rx_ServerProc(); /* Never returns */
681 /* Create a new client connection to the specified service, using the
682 * specified security object to implement the security model for this
684 struct rx_connection *
685 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
686 register afs_uint32 shost; /* Server host */
687 u_short sport; /* Server port */
688 u_short sservice; /* Server service id */
689 register struct rx_securityClass *securityObject;
690 int serviceSecurityIndex;
694 register struct rx_connection *conn;
699 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
700 shost, sport, sservice, securityObject, serviceSecurityIndex));
702 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
703 * the case of kmem_alloc? */
704 conn = rxi_AllocConnection();
705 #ifdef RX_ENABLE_LOCKS
706 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
707 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
708 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
712 MUTEX_ENTER(&rx_connHashTable_lock);
713 cid = (rx_nextCid += RX_MAXCALLS);
714 conn->type = RX_CLIENT_CONNECTION;
716 conn->epoch = rx_epoch;
717 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
718 conn->serviceId = sservice;
719 conn->securityObject = securityObject;
720 /* This doesn't work in all compilers with void (they're buggy), so fake it
722 conn->securityData = (VOID *) 0;
723 conn->securityIndex = serviceSecurityIndex;
724 rx_SetConnDeadTime(conn, rx_connDeadTime);
725 conn->ackRate = RX_FAST_ACK_RATE;
727 conn->specific = NULL;
728 conn->challengeEvent = (struct rxevent *)0;
729 conn->delayedAbortEvent = (struct rxevent *)0;
730 conn->abortCount = 0;
733 RXS_NewConnection(securityObject, conn);
734 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
736 conn->refCount++; /* no lock required since only this thread knows... */
737 conn->next = rx_connHashTable[hashindex];
738 rx_connHashTable[hashindex] = conn;
739 MUTEX_ENTER(&rx_stats_mutex);
740 rx_stats.nClientConns++;
741 MUTEX_EXIT(&rx_stats_mutex);
743 MUTEX_EXIT(&rx_connHashTable_lock);
749 void rx_SetConnDeadTime(conn, seconds)
750 register struct rx_connection *conn;
751 register int seconds;
753 /* The idea is to set the dead time to a value that allows several
754 * keepalives to be dropped without timing out the connection. */
755 conn->secondsUntilDead = MAX(seconds, 6);
756 conn->secondsUntilPing = conn->secondsUntilDead/6;
759 int rxi_lowPeerRefCount = 0;
760 int rxi_lowConnRefCount = 0;
763 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
764 * NOTE: must not be called with rx_connHashTable_lock held.
766 void rxi_CleanupConnection(conn)
767 struct rx_connection *conn;
771 /* Notify the service exporter, if requested, that this connection
772 * is being destroyed */
773 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
774 (*conn->service->destroyConnProc)(conn);
776 /* Notify the security module that this connection is being destroyed */
777 RXS_DestroyConnection(conn->securityObject, conn);
779 /* If this is the last connection using the rx_peer struct, set its
780 * idle time to now. rxi_ReapConnections will reap it if it's still
781 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
783 MUTEX_ENTER(&rx_peerHashTable_lock);
784 if (--conn->peer->refCount <= 0) {
785 conn->peer->idleWhen = clock_Sec();
786 if (conn->peer->refCount < 0) {
787 conn->peer->refCount = 0;
788 MUTEX_ENTER(&rx_stats_mutex);
789 rxi_lowPeerRefCount ++;
790 MUTEX_EXIT(&rx_stats_mutex);
793 MUTEX_EXIT(&rx_peerHashTable_lock);
795 MUTEX_ENTER(&rx_stats_mutex);
796 if (conn->type == RX_SERVER_CONNECTION)
797 rx_stats.nServerConns--;
799 rx_stats.nClientConns--;
800 MUTEX_EXIT(&rx_stats_mutex);
803 if (conn->specific) {
804 for (i = 0 ; i < conn->nSpecific ; i++) {
805 if (conn->specific[i] && rxi_keyCreate_destructor[i])
806 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
807 conn->specific[i] = NULL;
809 free(conn->specific);
811 conn->specific = NULL;
815 MUTEX_DESTROY(&conn->conn_call_lock);
816 MUTEX_DESTROY(&conn->conn_data_lock);
817 CV_DESTROY(&conn->conn_call_cv);
819 rxi_FreeConnection(conn);
822 /* Destroy the specified connection */
823 void rxi_DestroyConnection(conn)
824 register struct rx_connection *conn;
826 MUTEX_ENTER(&rx_connHashTable_lock);
827 rxi_DestroyConnectionNoLock(conn);
828 /* conn should be at the head of the cleanup list */
829 if (conn == rx_connCleanup_list) {
830 rx_connCleanup_list = rx_connCleanup_list->next;
831 MUTEX_EXIT(&rx_connHashTable_lock);
832 rxi_CleanupConnection(conn);
834 #ifdef RX_ENABLE_LOCKS
836 MUTEX_EXIT(&rx_connHashTable_lock);
838 #endif /* RX_ENABLE_LOCKS */
841 static void rxi_DestroyConnectionNoLock(conn)
842 register struct rx_connection *conn;
844 register struct rx_connection **conn_ptr;
845 register int havecalls = 0;
846 struct rx_packet *packet;
853 MUTEX_ENTER(&conn->conn_data_lock);
854 if (conn->refCount > 0)
857 MUTEX_ENTER(&rx_stats_mutex);
858 rxi_lowConnRefCount++;
859 MUTEX_EXIT(&rx_stats_mutex);
862 if (conn->refCount > 0) {
863 /* Busy; wait till the last guy before proceeding */
864 MUTEX_EXIT(&conn->conn_data_lock);
869 /* If the client previously called rx_NewCall, but it is still
870 * waiting, treat this as a running call, and wait to destroy the
871 * connection later when the call completes. */
872 if ((conn->type == RX_CLIENT_CONNECTION) &&
873 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
874 conn->flags |= RX_CONN_DESTROY_ME;
875 MUTEX_EXIT(&conn->conn_data_lock);
879 MUTEX_EXIT(&conn->conn_data_lock);
881 /* Check for extant references to this connection */
882 for (i = 0; i<RX_MAXCALLS; i++) {
883 register struct rx_call *call = conn->call[i];
886 if (conn->type == RX_CLIENT_CONNECTION) {
887 MUTEX_ENTER(&call->lock);
888 if (call->delayedAckEvent) {
889 /* Push the final acknowledgment out now--there
890 * won't be a subsequent call to acknowledge the
891 * last reply packets */
892 rxevent_Cancel(call->delayedAckEvent, call,
893 RX_CALL_REFCOUNT_DELAY);
894 rxi_AckAll((struct rxevent *)0, call, 0);
896 MUTEX_EXIT(&call->lock);
900 #ifdef RX_ENABLE_LOCKS
902 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
903 MUTEX_EXIT(&conn->conn_data_lock);
906 /* Someone is accessing a packet right now. */
910 #endif /* RX_ENABLE_LOCKS */
913 /* Don't destroy the connection if there are any call
914 * structures still in use */
915 MUTEX_ENTER(&conn->conn_data_lock);
916 conn->flags |= RX_CONN_DESTROY_ME;
917 MUTEX_EXIT(&conn->conn_data_lock);
922 if (conn->delayedAbortEvent) {
923 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
924 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
926 MUTEX_ENTER(&conn->conn_data_lock);
927 rxi_SendConnectionAbort(conn, packet, 0, 1);
928 MUTEX_EXIT(&conn->conn_data_lock);
929 rxi_FreePacket(packet);
933 /* Remove from connection hash table before proceeding */
934 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
935 conn->epoch, conn->type) ];
936 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
937 if (*conn_ptr == conn) {
938 *conn_ptr = conn->next;
942 /* if the conn that we are destroying was the last connection, then we
943 * clear rxLastConn as well */
944 if ( rxLastConn == conn )
947 /* Make sure the connection is completely reset before deleting it. */
948 /* get rid of pending events that could zap us later */
949 if (conn->challengeEvent) {
950 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
953 /* Add the connection to the list of destroyed connections that
954 * need to be cleaned up. This is necessary to avoid deadlocks
955 * in the routines we call to inform others that this connection is
956 * being destroyed. */
957 conn->next = rx_connCleanup_list;
958 rx_connCleanup_list = conn;
961 /* Externally available version */
962 void rx_DestroyConnection(conn)
963 register struct rx_connection *conn;
969 rxi_DestroyConnection (conn);
974 /* Start a new rx remote procedure call, on the specified connection.
975 * If wait is set to 1, wait for a free call channel; otherwise return
976 * 0. Maxtime gives the maximum number of seconds this call may take,
977 * after rx_MakeCall returns. After this time interval, a call to any
978 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
979 * For fine grain locking, we hold the conn_call_lock in order to
980 * to ensure that we don't get signalle after we found a call in an active
981 * state and before we go to sleep.
983 struct rx_call *rx_NewCall(conn)
984 register struct rx_connection *conn;
987 register struct rx_call *call;
988 struct clock queueTime;
992 dpf (("rx_MakeCall(conn %x)\n", conn));
995 clock_GetTime(&queueTime);
997 MUTEX_ENTER(&conn->conn_call_lock);
999 for (i=0; i<RX_MAXCALLS; i++) {
1000 call = conn->call[i];
1002 MUTEX_ENTER(&call->lock);
1003 if (call->state == RX_STATE_DALLY) {
1004 rxi_ResetCall(call, 0);
1005 (*call->callNumber)++;
1008 MUTEX_EXIT(&call->lock);
1011 call = rxi_NewCall(conn, i);
1012 MUTEX_ENTER(&call->lock);
1016 if (i < RX_MAXCALLS) {
1019 MUTEX_ENTER(&conn->conn_data_lock);
1020 conn->flags |= RX_CONN_MAKECALL_WAITING;
1021 MUTEX_EXIT(&conn->conn_data_lock);
1022 #ifdef RX_ENABLE_LOCKS
1023 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1029 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1031 /* Client is initially in send mode */
1032 call->state = RX_STATE_ACTIVE;
1033 call->mode = RX_MODE_SENDING;
1035 /* remember start time for call in case we have hard dead time limit */
1036 call->queueTime = queueTime;
1037 clock_GetTime(&call->startTime);
1038 hzero(call->bytesSent);
1039 hzero(call->bytesRcvd);
1041 /* Turn on busy protocol. */
1042 rxi_KeepAliveOn(call);
1044 MUTEX_EXIT(&call->lock);
1045 MUTEX_EXIT(&conn->conn_call_lock);
1049 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1050 /* Now, if TQ wasn't cleared earlier, do it now. */
1052 MUTEX_ENTER(&call->lock);
1053 while (call->flags & RX_CALL_TQ_BUSY) {
1054 call->flags |= RX_CALL_TQ_WAIT;
1055 #ifdef RX_ENABLE_LOCKS
1056 CV_WAIT(&call->cv_tq, &call->lock);
1057 #else /* RX_ENABLE_LOCKS */
1058 osi_rxSleep(&call->tq);
1059 #endif /* RX_ENABLE_LOCKS */
1061 if (call->flags & RX_CALL_TQ_CLEARME) {
1062 rxi_ClearTransmitQueue(call, 0);
1063 queue_Init(&call->tq);
1065 MUTEX_EXIT(&call->lock);
1067 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1072 rxi_HasActiveCalls(aconn)
1073 register struct rx_connection *aconn; {
1075 register struct rx_call *tcall;
1079 for(i=0; i<RX_MAXCALLS; i++) {
1080 if (tcall = aconn->call[i]) {
1081 if ((tcall->state == RX_STATE_ACTIVE)
1082 || (tcall->state == RX_STATE_PRECALL)) {
1092 rxi_GetCallNumberVector(aconn, aint32s)
1093 register struct rx_connection *aconn;
1094 register afs_int32 *aint32s; {
1096 register struct rx_call *tcall;
1100 for(i=0; i<RX_MAXCALLS; i++) {
1101 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1102 aint32s[i] = aconn->callNumber[i]+1;
1104 aint32s[i] = aconn->callNumber[i];
1110 rxi_SetCallNumberVector(aconn, aint32s)
1111 register struct rx_connection *aconn;
1112 register afs_int32 *aint32s; {
1114 register struct rx_call *tcall;
1118 for(i=0; i<RX_MAXCALLS; i++) {
1119 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1120 aconn->callNumber[i] = aint32s[i] - 1;
1122 aconn->callNumber[i] = aint32s[i];
1128 /* Advertise a new service. A service is named locally by a UDP port
1129 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1132 rx_NewService(port, serviceId, serviceName, securityObjects,
1133 nSecurityObjects, serviceProc)
1136 char *serviceName; /* Name for identification purposes (e.g. the
1137 * service name might be used for probing for
1139 struct rx_securityClass **securityObjects;
1140 int nSecurityObjects;
1141 afs_int32 (*serviceProc)();
1143 osi_socket socket = OSI_NULLSOCKET;
1144 register struct rx_service *tservice;
1150 if (serviceId == 0) {
1151 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1157 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1164 tservice = rxi_AllocService();
1167 for (i = 0; i<RX_MAX_SERVICES; i++) {
1168 register struct rx_service *service = rx_services[i];
1170 if (port == service->servicePort) {
1171 if (service->serviceId == serviceId) {
1172 /* The identical service has already been
1173 * installed; if the caller was intending to
1174 * change the security classes used by this
1175 * service, he/she loses. */
1176 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1179 rxi_FreeService(tservice);
1182 /* Different service, same port: re-use the socket
1183 * which is bound to the same port */
1184 socket = service->socket;
1187 if (socket == OSI_NULLSOCKET) {
1188 /* If we don't already have a socket (from another
1189 * service on same port) get a new one */
1190 socket = rxi_GetUDPSocket(port);
1191 if (socket == OSI_NULLSOCKET) {
1194 rxi_FreeService(tservice);
1199 service->socket = socket;
1200 service->servicePort = port;
1201 service->serviceId = serviceId;
1202 service->serviceName = serviceName;
1203 service->nSecurityObjects = nSecurityObjects;
1204 service->securityObjects = securityObjects;
1205 service->minProcs = 0;
1206 service->maxProcs = 1;
1207 service->idleDeadTime = 60;
1208 service->connDeadTime = rx_connDeadTime;
1209 service->executeRequestProc = serviceProc;
1210 rx_services[i] = service; /* not visible until now */
1218 rxi_FreeService(tservice);
1219 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1223 /* Generic request processing loop. This routine should be called
1224 * by the implementation dependent rx_ServerProc. If socketp is
1225 * non-null, it will be set to the file descriptor that this thread
1226 * is now listening on. If socketp is null, this routine will never
1228 void rxi_ServerProc(threadID, newcall, socketp)
1230 struct rx_call *newcall;
1231 osi_socket *socketp;
1233 register struct rx_call *call;
1234 register afs_int32 code;
1235 register struct rx_service *tservice = NULL;
1242 call = rx_GetCall(threadID, tservice, socketp);
1243 if (socketp && *socketp != OSI_NULLSOCKET) {
1244 /* We are now a listener thread */
1249 /* if server is restarting( typically smooth shutdown) then do not
1250 * allow any new calls.
1253 if ( rx_tranquil && (call != NULL) ) {
1258 MUTEX_ENTER(&call->lock);
1260 rxi_CallError(call, RX_RESTARTING);
1261 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1263 MUTEX_EXIT(&call->lock);
1269 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1270 #ifdef RX_ENABLE_LOCKS
1272 #endif /* RX_ENABLE_LOCKS */
1273 afs_termState = AFSOP_STOP_AFS;
1274 afs_osi_Wakeup(&afs_termState);
1275 #ifdef RX_ENABLE_LOCKS
1277 #endif /* RX_ENABLE_LOCKS */
1282 tservice = call->conn->service;
1284 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1286 code = call->conn->service->executeRequestProc(call);
1288 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1290 rx_EndCall(call, code);
1291 MUTEX_ENTER(&rx_stats_mutex);
1293 MUTEX_EXIT(&rx_stats_mutex);
1298 void rx_WakeupServerProcs()
1300 struct rx_serverQueueEntry *np, *tqp;
1305 MUTEX_ENTER(&rx_serverPool_lock);
1307 #ifdef RX_ENABLE_LOCKS
1308 if (rx_waitForPacket)
1309 CV_BROADCAST(&rx_waitForPacket->cv);
1310 #else /* RX_ENABLE_LOCKS */
1311 if (rx_waitForPacket)
1312 osi_rxWakeup(rx_waitForPacket);
1313 #endif /* RX_ENABLE_LOCKS */
1314 MUTEX_ENTER(&freeSQEList_lock);
1315 for (np = rx_FreeSQEList; np; np = tqp) {
1316 tqp = *(struct rx_serverQueueEntry **)np;
1317 #ifdef RX_ENABLE_LOCKS
1318 CV_BROADCAST(&np->cv);
1319 #else /* RX_ENABLE_LOCKS */
1321 #endif /* RX_ENABLE_LOCKS */
1323 MUTEX_EXIT(&freeSQEList_lock);
1324 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1325 #ifdef RX_ENABLE_LOCKS
1326 CV_BROADCAST(&np->cv);
1327 #else /* RX_ENABLE_LOCKS */
1329 #endif /* RX_ENABLE_LOCKS */
1331 MUTEX_EXIT(&rx_serverPool_lock);
1337 * One thing that seems to happen is that all the server threads get
1338 * tied up on some empty or slow call, and then a whole bunch of calls
1339 * arrive at once, using up the packet pool, so now there are more
1340 * empty calls. The most critical resources here are server threads
1341 * and the free packet pool. The "doreclaim" code seems to help in
1342 * general. I think that eventually we arrive in this state: there
1343 * are lots of pending calls which do have all their packets present,
1344 * so they won't be reclaimed, are multi-packet calls, so they won't
1345 * be scheduled until later, and thus are tying up most of the free
1346 * packet pool for a very long time.
1348 * 1. schedule multi-packet calls if all the packets are present.
1349 * Probably CPU-bound operation, useful to return packets to pool.
1350 * Do what if there is a full window, but the last packet isn't here?
1351 * 3. preserve one thread which *only* runs "best" calls, otherwise
1352 * it sleeps and waits for that type of call.
1353 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1354 * the current dataquota business is badly broken. The quota isn't adjusted
1355 * to reflect how many packets are presently queued for a running call.
1356 * So, when we schedule a queued call with a full window of packets queued
1357 * up for it, that *should* free up a window full of packets for other 2d-class
1358 * calls to be able to use from the packet pool. But it doesn't.
1360 * NB. Most of the time, this code doesn't run -- since idle server threads
1361 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1362 * as a new call arrives.
1364 /* Sleep until a call arrives. Returns a pointer to the call, ready
1365 * for an rx_Read. */
1366 #ifdef RX_ENABLE_LOCKS
1368 rx_GetCall(tno, cur_service, socketp)
1370 struct rx_service *cur_service;
1371 osi_socket *socketp;
1373 struct rx_serverQueueEntry *sq;
1374 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1375 struct rx_service *service;
1378 MUTEX_ENTER(&freeSQEList_lock);
1380 if (sq = rx_FreeSQEList) {
1381 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1382 MUTEX_EXIT(&freeSQEList_lock);
1383 } else { /* otherwise allocate a new one and return that */
1384 MUTEX_EXIT(&freeSQEList_lock);
1385 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1386 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1387 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1390 MUTEX_ENTER(&rx_serverPool_lock);
1391 if (cur_service != NULL) {
1392 ReturnToServerPool(cur_service);
1395 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1396 register struct rx_call *tcall, *ncall;
1397 choice2 = (struct rx_call *) 0;
1398 /* Scan for eligible incoming calls. A call is not eligible
1399 * if the maximum number of calls for its service type are
1400 * already executing */
1401 /* One thread will process calls FCFS (to prevent starvation),
1402 * while the other threads may run ahead looking for calls which
1403 * have all their input data available immediately. This helps
1404 * keep threads from blocking, waiting for data from the client. */
1405 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1406 service = tcall->conn->service;
1407 if (!QuotaOK(service)) {
1410 if (!tno || !tcall->queue_item_header.next ) {
1411 /* If we're thread 0, then we'll just use
1412 * this call. If we haven't been able to find an optimal
1413 * choice, and we're at the end of the list, then use a
1414 * 2d choice if one has been identified. Otherwise... */
1415 call = (choice2 ? choice2 : tcall);
1416 service = call->conn->service;
1417 } else if (!queue_IsEmpty(&tcall->rq)) {
1418 struct rx_packet *rp;
1419 rp = queue_First(&tcall->rq, rx_packet);
1420 if (rp->header.seq == 1) {
1421 if (!meltdown_1pkt ||
1422 (rp->header.flags & RX_LAST_PACKET)) {
1424 } else if (rxi_2dchoice && !choice2 &&
1425 !(tcall->flags & RX_CALL_CLEARED) &&
1426 (tcall->rprev > rxi_HardAckRate)) {
1428 } else rxi_md2cnt++;
1434 ReturnToServerPool(service);
1441 rxi_ServerThreadSelectingCall = 1;
1442 MUTEX_EXIT(&rx_serverPool_lock);
1443 MUTEX_ENTER(&call->lock);
1444 MUTEX_ENTER(&rx_serverPool_lock);
1446 if (queue_IsEmpty(&call->rq) ||
1447 queue_First(&call->rq, rx_packet)->header.seq != 1)
1448 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1450 CLEAR_CALL_QUEUE_LOCK(call);
1452 MUTEX_EXIT(&call->lock);
1453 ReturnToServerPool(service);
1454 rxi_ServerThreadSelectingCall = 0;
1455 CV_SIGNAL(&rx_serverPool_cv);
1456 call = (struct rx_call*)0;
1459 call->flags &= (~RX_CALL_WAIT_PROC);
1460 MUTEX_ENTER(&rx_stats_mutex);
1462 MUTEX_EXIT(&rx_stats_mutex);
1463 rxi_ServerThreadSelectingCall = 0;
1464 CV_SIGNAL(&rx_serverPool_cv);
1465 MUTEX_EXIT(&rx_serverPool_lock);
1469 /* If there are no eligible incoming calls, add this process
1470 * to the idle server queue, to wait for one */
1474 *socketp = OSI_NULLSOCKET;
1476 sq->socketp = socketp;
1477 queue_Append(&rx_idleServerQueue, sq);
1478 #ifndef AFS_AIX41_ENV
1479 rx_waitForPacket = sq;
1480 #endif /* AFS_AIX41_ENV */
1482 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1484 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1485 MUTEX_EXIT(&rx_serverPool_lock);
1486 return (struct rx_call *)0;
1489 } while (!(call = sq->newcall) &&
1490 !(socketp && *socketp != OSI_NULLSOCKET));
1491 MUTEX_EXIT(&rx_serverPool_lock);
1493 MUTEX_ENTER(&call->lock);
1499 MUTEX_ENTER(&freeSQEList_lock);
1500 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1501 rx_FreeSQEList = sq;
1502 MUTEX_EXIT(&freeSQEList_lock);
1505 clock_GetTime(&call->startTime);
1506 call->state = RX_STATE_ACTIVE;
1507 call->mode = RX_MODE_RECEIVING;
1509 rxi_calltrace(RX_CALL_START, call);
1510 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1511 call->conn->service->servicePort,
1512 call->conn->service->serviceId, call));
1514 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1515 MUTEX_EXIT(&call->lock);
1517 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1522 #else /* RX_ENABLE_LOCKS */
1524 rx_GetCall(tno, cur_service, socketp)
1526 struct rx_service *cur_service;
1527 osi_socket *socketp;
1529 struct rx_serverQueueEntry *sq;
1530 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1531 struct rx_service *service;
1536 MUTEX_ENTER(&freeSQEList_lock);
1538 if (sq = rx_FreeSQEList) {
1539 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1540 MUTEX_EXIT(&freeSQEList_lock);
1541 } else { /* otherwise allocate a new one and return that */
1542 MUTEX_EXIT(&freeSQEList_lock);
1543 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1544 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1545 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1547 MUTEX_ENTER(&sq->lock);
1549 if (cur_service != NULL) {
1550 cur_service->nRequestsRunning--;
1551 if (cur_service->nRequestsRunning < cur_service->minProcs)
1555 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1556 register struct rx_call *tcall, *ncall;
1557 /* Scan for eligible incoming calls. A call is not eligible
1558 * if the maximum number of calls for its service type are
1559 * already executing */
1560 /* One thread will process calls FCFS (to prevent starvation),
1561 * while the other threads may run ahead looking for calls which
1562 * have all their input data available immediately. This helps
1563 * keep threads from blocking, waiting for data from the client. */
1564 choice2 = (struct rx_call *) 0;
1565 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1566 service = tcall->conn->service;
1567 if (QuotaOK(service)) {
1568 if (!tno || !tcall->queue_item_header.next ) {
1569 /* If we're thread 0, then we'll just use
1570 * this call. If we haven't been able to find an optimal
1571 * choice, and we're at the end of the list, then use a
1572 * 2d choice if one has been identified. Otherwise... */
1573 call = (choice2 ? choice2 : tcall);
1574 service = call->conn->service;
1575 } else if (!queue_IsEmpty(&tcall->rq)) {
1576 struct rx_packet *rp;
1577 rp = queue_First(&tcall->rq, rx_packet);
1578 if (rp->header.seq == 1
1579 && (!meltdown_1pkt ||
1580 (rp->header.flags & RX_LAST_PACKET))) {
1582 } else if (rxi_2dchoice && !choice2 &&
1583 !(tcall->flags & RX_CALL_CLEARED) &&
1584 (tcall->rprev > rxi_HardAckRate)) {
1586 } else rxi_md2cnt++;
1596 /* we can't schedule a call if there's no data!!! */
1597 /* send an ack if there's no data, if we're missing the
1598 * first packet, or we're missing something between first
1599 * and last -- there's a "hole" in the incoming data. */
1600 if (queue_IsEmpty(&call->rq) ||
1601 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1602 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1603 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1605 call->flags &= (~RX_CALL_WAIT_PROC);
1606 service->nRequestsRunning++;
1607 /* just started call in minProcs pool, need fewer to maintain
1609 if (service->nRequestsRunning <= service->minProcs)
1613 /* MUTEX_EXIT(&call->lock); */
1616 /* If there are no eligible incoming calls, add this process
1617 * to the idle server queue, to wait for one */
1620 *socketp = OSI_NULLSOCKET;
1622 sq->socketp = socketp;
1623 queue_Append(&rx_idleServerQueue, sq);
1627 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1630 return (struct rx_call *)0;
1633 } while (!(call = sq->newcall) &&
1634 !(socketp && *socketp != OSI_NULLSOCKET));
1636 MUTEX_EXIT(&sq->lock);
1638 MUTEX_ENTER(&freeSQEList_lock);
1639 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1640 rx_FreeSQEList = sq;
1641 MUTEX_EXIT(&freeSQEList_lock);
1644 clock_GetTime(&call->startTime);
1645 call->state = RX_STATE_ACTIVE;
1646 call->mode = RX_MODE_RECEIVING;
1648 rxi_calltrace(RX_CALL_START, call);
1649 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1650 call->conn->service->servicePort,
1651 call->conn->service->serviceId, call));
1653 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1661 #endif /* RX_ENABLE_LOCKS */
1665 /* Establish a procedure to be called when a packet arrives for a
1666 * call. This routine will be called at most once after each call,
1667 * and will also be called if there is an error condition on the or
1668 * the call is complete. Used by multi rx to build a selection
1669 * function which determines which of several calls is likely to be a
1670 * good one to read from.
1671 * NOTE: the way this is currently implemented it is probably only a
1672 * good idea to (1) use it immediately after a newcall (clients only)
1673 * and (2) only use it once. Other uses currently void your warranty
1675 void rx_SetArrivalProc(call, proc, handle, arg)
1676 register struct rx_call *call;
1677 register VOID (*proc)();
1678 register VOID *handle;
1681 call->arrivalProc = proc;
1682 call->arrivalProcHandle = handle;
1683 call->arrivalProcArg = arg;
1686 /* Call is finished (possibly prematurely). Return rc to the peer, if
1687 * appropriate, and return the final error code from the conversation
1690 afs_int32 rx_EndCall(call, rc)
1691 register struct rx_call *call;
1694 register struct rx_connection *conn = call->conn;
1695 register struct rx_service *service;
1696 register struct rx_packet *tp; /* Temporary packet pointer */
1697 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1701 dpf(("rx_EndCall(call %x)\n", call));
1705 MUTEX_ENTER(&call->lock);
1707 if (rc == 0 && call->error == 0) {
1708 call->abortCode = 0;
1709 call->abortCount = 0;
1712 call->arrivalProc = (VOID (*)()) 0;
1713 if (rc && call->error == 0) {
1714 rxi_CallError(call, rc);
1715 /* Send an abort message to the peer if this error code has
1716 * only just been set. If it was set previously, assume the
1717 * peer has already been sent the error code or will request it
1719 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1721 if (conn->type == RX_SERVER_CONNECTION) {
1722 /* Make sure reply or at least dummy reply is sent */
1723 if (call->mode == RX_MODE_RECEIVING) {
1724 rxi_WriteProc(call, 0, 0);
1726 if (call->mode == RX_MODE_SENDING) {
1727 rxi_FlushWrite(call);
1729 service = conn->service;
1730 rxi_calltrace(RX_CALL_END, call);
1731 /* Call goes to hold state until reply packets are acknowledged */
1732 if (call->tfirst + call->nSoftAcked < call->tnext) {
1733 call->state = RX_STATE_HOLD;
1735 call->state = RX_STATE_DALLY;
1736 rxi_ClearTransmitQueue(call, 0);
1737 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1738 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1741 else { /* Client connection */
1743 /* Make sure server receives input packets, in the case where
1744 * no reply arguments are expected */
1745 if ((call->mode == RX_MODE_SENDING)
1746 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1747 (void) rxi_ReadProc(call, &dummy, 1);
1749 /* We need to release the call lock since it's lower than the
1750 * conn_call_lock and we don't want to hold the conn_call_lock
1751 * over the rx_ReadProc call. The conn_call_lock needs to be held
1752 * here for the case where rx_NewCall is perusing the calls on
1753 * the connection structure. We don't want to signal until
1754 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1755 * have checked this call, found it active and by the time it
1756 * goes to sleep, will have missed the signal.
1758 MUTEX_EXIT(&call->lock);
1759 MUTEX_ENTER(&conn->conn_call_lock);
1760 MUTEX_ENTER(&call->lock);
1761 MUTEX_ENTER(&conn->conn_data_lock);
1762 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1763 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1764 MUTEX_EXIT(&conn->conn_data_lock);
1765 #ifdef RX_ENABLE_LOCKS
1766 CV_BROADCAST(&conn->conn_call_cv);
1771 #ifdef RX_ENABLE_LOCKS
1773 MUTEX_EXIT(&conn->conn_data_lock);
1775 #endif /* RX_ENABLE_LOCKS */
1776 call->state = RX_STATE_DALLY;
1778 error = call->error;
1780 /* currentPacket, nLeft, and NFree must be zeroed here, because
1781 * ResetCall cannot: ResetCall may be called at splnet(), in the
1782 * kernel version, and may interrupt the macros rx_Read or
1783 * rx_Write, which run at normal priority for efficiency. */
1784 if (call->currentPacket) {
1785 rxi_FreePacket(call->currentPacket);
1786 call->currentPacket = (struct rx_packet *) 0;
1787 call->nLeft = call->nFree = call->curlen = 0;
1790 call->nLeft = call->nFree = call->curlen = 0;
1792 /* Free any packets from the last call to ReadvProc/WritevProc */
1793 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1798 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1799 MUTEX_EXIT(&call->lock);
1800 if (conn->type == RX_CLIENT_CONNECTION)
1801 MUTEX_EXIT(&conn->conn_call_lock);
1805 * Map errors to the local host's errno.h format.
1807 error = ntoh_syserr_conv(error);
1811 #if !defined(KERNEL)
1813 /* Call this routine when shutting down a server or client (especially
1814 * clients). This will allow Rx to gracefully garbage collect server
1815 * connections, and reduce the number of retries that a server might
1816 * make to a dead client.
1817 * This is not quite right, since some calls may still be ongoing and
1818 * we can't lock them to destroy them. */
1819 void rx_Finalize() {
1820 register struct rx_connection **conn_ptr, **conn_end;
1824 if (rxinit_status == 1) {
1826 return; /* Already shutdown. */
1828 rxi_DeleteCachedConnections();
1829 if (rx_connHashTable) {
1830 MUTEX_ENTER(&rx_connHashTable_lock);
1831 for (conn_ptr = &rx_connHashTable[0],
1832 conn_end = &rx_connHashTable[rx_hashTableSize];
1833 conn_ptr < conn_end; conn_ptr++) {
1834 struct rx_connection *conn, *next;
1835 for (conn = *conn_ptr; conn; conn = next) {
1837 if (conn->type == RX_CLIENT_CONNECTION) {
1838 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1840 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1841 #ifdef RX_ENABLE_LOCKS
1842 rxi_DestroyConnectionNoLock(conn);
1843 #else /* RX_ENABLE_LOCKS */
1844 rxi_DestroyConnection(conn);
1845 #endif /* RX_ENABLE_LOCKS */
1849 #ifdef RX_ENABLE_LOCKS
1850 while (rx_connCleanup_list) {
1851 struct rx_connection *conn;
1852 conn = rx_connCleanup_list;
1853 rx_connCleanup_list = rx_connCleanup_list->next;
1854 MUTEX_EXIT(&rx_connHashTable_lock);
1855 rxi_CleanupConnection(conn);
1856 MUTEX_ENTER(&rx_connHashTable_lock);
1858 MUTEX_EXIT(&rx_connHashTable_lock);
1859 #endif /* RX_ENABLE_LOCKS */
1868 /* if we wakeup packet waiter too often, can get in loop with two
1869 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1871 rxi_PacketsUnWait() {
1873 if (!rx_waitingForPackets) {
1877 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1878 return; /* still over quota */
1881 rx_waitingForPackets = 0;
1882 #ifdef RX_ENABLE_LOCKS
1883 CV_BROADCAST(&rx_waitingForPackets_cv);
1885 osi_rxWakeup(&rx_waitingForPackets);
1891 /* ------------------Internal interfaces------------------------- */
1893 /* Return this process's service structure for the
1894 * specified socket and service */
1895 struct rx_service *rxi_FindService(socket, serviceId)
1896 register osi_socket socket;
1897 register u_short serviceId;
1899 register struct rx_service **sp;
1900 for (sp = &rx_services[0]; *sp; sp++) {
1901 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1907 /* Allocate a call structure, for the indicated channel of the
1908 * supplied connection. The mode and state of the call must be set by
1910 struct rx_call *rxi_NewCall(conn, channel)
1911 register struct rx_connection *conn;
1912 register int channel;
1914 register struct rx_call *call;
1915 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1916 register struct rx_call *cp; /* Call pointer temp */
1917 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1918 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1920 /* Grab an existing call structure, or allocate a new one.
1921 * Existing call structures are assumed to have been left reset by
1923 MUTEX_ENTER(&rx_freeCallQueue_lock);
1925 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1927 * EXCEPT that the TQ might not yet be cleared out.
1928 * Skip over those with in-use TQs.
1931 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1932 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1938 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1939 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1940 call = queue_First(&rx_freeCallQueue, rx_call);
1941 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1943 MUTEX_ENTER(&rx_stats_mutex);
1944 rx_stats.nFreeCallStructs--;
1945 MUTEX_EXIT(&rx_stats_mutex);
1946 MUTEX_EXIT(&rx_freeCallQueue_lock);
1947 MUTEX_ENTER(&call->lock);
1948 CLEAR_CALL_QUEUE_LOCK(call);
1949 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1950 /* Now, if TQ wasn't cleared earlier, do it now. */
1951 if (call->flags & RX_CALL_TQ_CLEARME) {
1952 rxi_ClearTransmitQueue(call, 0);
1953 queue_Init(&call->tq);
1955 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1956 /* Bind the call to its connection structure */
1958 rxi_ResetCall(call, 1);
1961 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1963 MUTEX_EXIT(&rx_freeCallQueue_lock);
1964 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1965 MUTEX_ENTER(&call->lock);
1966 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1967 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1968 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1970 MUTEX_ENTER(&rx_stats_mutex);
1971 rx_stats.nCallStructs++;
1972 MUTEX_EXIT(&rx_stats_mutex);
1973 /* Initialize once-only items */
1974 queue_Init(&call->tq);
1975 queue_Init(&call->rq);
1976 queue_Init(&call->iovq);
1977 /* Bind the call to its connection structure (prereq for reset) */
1979 rxi_ResetCall(call, 1);
1981 call->channel = channel;
1982 call->callNumber = &conn->callNumber[channel];
1983 /* Note that the next expected call number is retained (in
1984 * conn->callNumber[i]), even if we reallocate the call structure
1986 conn->call[channel] = call;
1987 /* if the channel's never been used (== 0), we should start at 1, otherwise
1988 the call number is valid from the last time this channel was used */
1989 if (*call->callNumber == 0) *call->callNumber = 1;
1991 MUTEX_EXIT(&call->lock);
1995 /* A call has been inactive long enough that so we can throw away
1996 * state, including the call structure, which is placed on the call
1998 * Call is locked upon entry.
2000 #ifdef RX_ENABLE_LOCKS
2001 void rxi_FreeCall(call, haveCTLock)
2002 int haveCTLock; /* Set if called from rxi_ReapConnections */
2003 #else /* RX_ENABLE_LOCKS */
2004 void rxi_FreeCall(call)
2005 #endif /* RX_ENABLE_LOCKS */
2006 register struct rx_call *call;
2008 register int channel = call->channel;
2009 register struct rx_connection *conn = call->conn;
2012 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2013 (*call->callNumber)++;
2014 rxi_ResetCall(call, 0);
2015 call->conn->call[channel] = (struct rx_call *) 0;
2017 MUTEX_ENTER(&rx_freeCallQueue_lock);
2018 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2019 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2020 /* A call may be free even though its transmit queue is still in use.
2021 * Since we search the call list from head to tail, put busy calls at
2022 * the head of the list, and idle calls at the tail.
2024 if (call->flags & RX_CALL_TQ_BUSY)
2025 queue_Prepend(&rx_freeCallQueue, call);
2027 queue_Append(&rx_freeCallQueue, call);
2028 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2029 queue_Append(&rx_freeCallQueue, call);
2030 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2031 MUTEX_ENTER(&rx_stats_mutex);
2032 rx_stats.nFreeCallStructs++;
2033 MUTEX_EXIT(&rx_stats_mutex);
2035 MUTEX_EXIT(&rx_freeCallQueue_lock);
2037 /* Destroy the connection if it was previously slated for
2038 * destruction, i.e. the Rx client code previously called
2039 * rx_DestroyConnection (client connections), or
2040 * rxi_ReapConnections called the same routine (server
2041 * connections). Only do this, however, if there are no
2042 * outstanding calls. Note that for fine grain locking, there appears
2043 * to be a deadlock in that rxi_FreeCall has a call locked and
2044 * DestroyConnectionNoLock locks each call in the conn. But note a
2045 * few lines up where we have removed this call from the conn.
2046 * If someone else destroys a connection, they either have no
2047 * call lock held or are going through this section of code.
2049 if (conn->flags & RX_CONN_DESTROY_ME) {
2050 MUTEX_ENTER(&conn->conn_data_lock);
2052 MUTEX_EXIT(&conn->conn_data_lock);
2053 #ifdef RX_ENABLE_LOCKS
2055 rxi_DestroyConnectionNoLock(conn);
2057 rxi_DestroyConnection(conn);
2058 #else /* RX_ENABLE_LOCKS */
2059 rxi_DestroyConnection(conn);
2060 #endif /* RX_ENABLE_LOCKS */
2064 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2065 char *rxi_Alloc(size)
2066 register size_t size;
2070 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2071 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2074 int glockOwner = ISAFS_GLOCK();
2078 MUTEX_ENTER(&rx_stats_mutex);
2079 rxi_Alloccnt++; rxi_Allocsize += size;
2080 MUTEX_EXIT(&rx_stats_mutex);
2081 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2082 if (size > AFS_SMALLOCSIZ) {
2083 p = (char *) osi_AllocMediumSpace(size);
2085 p = (char *) osi_AllocSmall(size, 1);
2086 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2091 p = (char *) osi_Alloc(size);
2093 if (!p) osi_Panic("rxi_Alloc error");
2098 void rxi_Free(addr, size)
2100 register size_t size;
2102 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2103 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2106 int glockOwner = ISAFS_GLOCK();
2110 MUTEX_ENTER(&rx_stats_mutex);
2111 rxi_Alloccnt--; rxi_Allocsize -= size;
2112 MUTEX_EXIT(&rx_stats_mutex);
2113 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2114 if (size > AFS_SMALLOCSIZ)
2115 osi_FreeMediumSpace(addr);
2117 osi_FreeSmall(addr);
2118 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2123 osi_Free(addr, size);
2127 /* Find the peer process represented by the supplied (host,port)
2128 * combination. If there is no appropriate active peer structure, a
2129 * new one will be allocated and initialized
2130 * The origPeer, if set, is a pointer to a peer structure on which the
2131 * refcount will be be decremented. This is used to replace the peer
2132 * structure hanging off a connection structure */
2133 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2134 register afs_uint32 host;
2135 register u_short port;
2136 struct rx_peer *origPeer;
2139 register struct rx_peer *pp;
2141 hashIndex = PEER_HASH(host, port);
2142 MUTEX_ENTER(&rx_peerHashTable_lock);
2143 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2144 if ((pp->host == host) && (pp->port == port)) break;
2148 pp = rxi_AllocPeer(); /* This bzero's *pp */
2149 pp->host = host; /* set here or in InitPeerParams is zero */
2151 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2152 queue_Init(&pp->congestionQueue);
2153 queue_Init(&pp->rpcStats);
2154 pp->next = rx_peerHashTable[hashIndex];
2155 rx_peerHashTable[hashIndex] = pp;
2156 rxi_InitPeerParams(pp);
2157 MUTEX_ENTER(&rx_stats_mutex);
2158 rx_stats.nPeerStructs++;
2159 MUTEX_EXIT(&rx_stats_mutex);
2166 origPeer->refCount--;
2167 MUTEX_EXIT(&rx_peerHashTable_lock);
2172 /* Find the connection at (host, port) started at epoch, and with the
2173 * given connection id. Creates the server connection if necessary.
2174 * The type specifies whether a client connection or a server
2175 * connection is desired. In both cases, (host, port) specify the
2176 * peer's (host, pair) pair. Client connections are not made
2177 * automatically by this routine. The parameter socket gives the
2178 * socket descriptor on which the packet was received. This is used,
2179 * in the case of server connections, to check that *new* connections
2180 * come via a valid (port, serviceId). Finally, the securityIndex
2181 * parameter must match the existing index for the connection. If a
2182 * server connection is created, it will be created using the supplied
2183 * index, if the index is valid for this service */
2184 struct rx_connection *
2185 rxi_FindConnection(socket, host, port, serviceId, cid,
2186 epoch, type, securityIndex)
2188 register afs_int32 host;
2189 register u_short port;
2194 u_int securityIndex;
2196 int hashindex, flag;
2197 register struct rx_connection *conn;
2198 struct rx_peer *peer;
2199 hashindex = CONN_HASH(host, port, cid, epoch, type);
2200 MUTEX_ENTER(&rx_connHashTable_lock);
2201 rxLastConn ? (conn = rxLastConn, flag = 0) :
2202 (conn = rx_connHashTable[hashindex], flag = 1);
2204 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2205 && (epoch == conn->epoch)) {
2206 register struct rx_peer *pp = conn->peer;
2207 if (securityIndex != conn->securityIndex) {
2208 /* this isn't supposed to happen, but someone could forge a packet
2209 like this, and there seems to be some CM bug that makes this
2210 happen from time to time -- in which case, the fileserver
2212 MUTEX_EXIT(&rx_connHashTable_lock);
2213 return (struct rx_connection *) 0;
2215 /* epoch's high order bits mean route for security reasons only on
2216 * the cid, not the host and port fields.
2218 if (conn->epoch & 0x80000000) break;
2219 if (((type == RX_CLIENT_CONNECTION)
2220 || (pp->host == host)) && (pp->port == port))
2225 /* the connection rxLastConn that was used the last time is not the
2226 ** one we are looking for now. Hence, start searching in the hash */
2228 conn = rx_connHashTable[hashindex];
2234 struct rx_service *service;
2235 if (type == RX_CLIENT_CONNECTION) {
2236 MUTEX_EXIT(&rx_connHashTable_lock);
2237 return (struct rx_connection *) 0;
2239 service = rxi_FindService(socket, serviceId);
2240 if (!service || (securityIndex >= service->nSecurityObjects)
2241 || (service->securityObjects[securityIndex] == 0)) {
2242 MUTEX_EXIT(&rx_connHashTable_lock);
2243 return (struct rx_connection *) 0;
2245 conn = rxi_AllocConnection(); /* This bzero's the connection */
2246 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2248 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2250 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2251 conn->next = rx_connHashTable[hashindex];
2252 rx_connHashTable[hashindex] = conn;
2253 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2254 conn->type = RX_SERVER_CONNECTION;
2255 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2256 conn->epoch = epoch;
2257 conn->cid = cid & RX_CIDMASK;
2258 /* conn->serial = conn->lastSerial = 0; */
2259 /* conn->timeout = 0; */
2260 conn->ackRate = RX_FAST_ACK_RATE;
2261 conn->service = service;
2262 conn->serviceId = serviceId;
2263 conn->securityIndex = securityIndex;
2264 conn->securityObject = service->securityObjects[securityIndex];
2265 conn->nSpecific = 0;
2266 conn->specific = NULL;
2267 rx_SetConnDeadTime(conn, service->connDeadTime);
2268 /* Notify security object of the new connection */
2269 RXS_NewConnection(conn->securityObject, conn);
2270 /* XXXX Connection timeout? */
2271 if (service->newConnProc) (*service->newConnProc)(conn);
2272 MUTEX_ENTER(&rx_stats_mutex);
2273 rx_stats.nServerConns++;
2274 MUTEX_EXIT(&rx_stats_mutex);
2278 /* Ensure that the peer structure is set up in such a way that
2279 ** replies in this connection go back to that remote interface
2280 ** from which the last packet was sent out. In case, this packet's
2281 ** source IP address does not match the peer struct for this conn,
2282 ** then drop the refCount on conn->peer and get a new peer structure.
2283 ** We can check the host,port field in the peer structure without the
2284 ** rx_peerHashTable_lock because the peer structure has its refCount
2285 ** incremented and the only time the host,port in the peer struct gets
2286 ** updated is when the peer structure is created.
2288 if (conn->peer->host == host )
2289 peer = conn->peer; /* no change to the peer structure */
2291 peer = rxi_FindPeer(host, port, conn->peer, 1);
2294 MUTEX_ENTER(&conn->conn_data_lock);
2297 MUTEX_EXIT(&conn->conn_data_lock);
2299 rxLastConn = conn; /* store this connection as the last conn used */
2300 MUTEX_EXIT(&rx_connHashTable_lock);
2304 /* There are two packet tracing routines available for testing and monitoring
2305 * Rx. One is called just after every packet is received and the other is
2306 * called just before every packet is sent. Received packets, have had their
2307 * headers decoded, and packets to be sent have not yet had their headers
2308 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2309 * containing the network address. Both can be modified. The return value, if
2310 * non-zero, indicates that the packet should be dropped. */
2312 int (*rx_justReceived)() = 0;
2313 int (*rx_almostSent)() = 0;
2315 /* A packet has been received off the interface. Np is the packet, socket is
2316 * the socket number it was received from (useful in determining which service
2317 * this packet corresponds to), and (host, port) reflect the host,port of the
2318 * sender. This call returns the packet to the caller if it is finished with
2319 * it, rather than de-allocating it, just as a small performance hack */
2321 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2322 register struct rx_packet *np;
2327 struct rx_call **newcallp;
2329 register struct rx_call *call;
2330 register struct rx_connection *conn;
2332 afs_uint32 currentCallNumber;
2338 struct rx_packet *tnp;
2341 /* We don't print out the packet until now because (1) the time may not be
2342 * accurate enough until now in the lwp implementation (rx_Listener only gets
2343 * the time after the packet is read) and (2) from a protocol point of view,
2344 * this is the first time the packet has been seen */
2345 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2346 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2347 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2348 np->header.serial, packetType, host, port, np->header.serviceId,
2349 np->header.epoch, np->header.cid, np->header.callNumber,
2350 np->header.seq, np->header.flags, np));
2353 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2354 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2357 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2358 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2361 /* If an input tracer function is defined, call it with the packet and
2362 * network address. Note this function may modify its arguments. */
2363 if (rx_justReceived) {
2364 struct sockaddr_in addr;
2366 addr.sin_family = AF_INET;
2367 addr.sin_port = port;
2368 addr.sin_addr.s_addr = host;
2369 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2370 addr.sin_len = sizeof(addr);
2371 #endif /* AFS_OSF_ENV */
2372 drop = (*rx_justReceived) (np, &addr);
2373 /* drop packet if return value is non-zero */
2374 if (drop) return np;
2375 port = addr.sin_port; /* in case fcn changed addr */
2376 host = addr.sin_addr.s_addr;
2380 /* If packet was not sent by the client, then *we* must be the client */
2381 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2382 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2384 /* Find the connection (or fabricate one, if we're the server & if
2385 * necessary) associated with this packet */
2386 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2387 np->header.cid, np->header.epoch, type,
2388 np->header.securityIndex);
2391 /* If no connection found or fabricated, just ignore the packet.
2392 * (An argument could be made for sending an abort packet for
2397 MUTEX_ENTER(&conn->conn_data_lock);
2398 if (conn->maxSerial < np->header.serial)
2399 conn->maxSerial = np->header.serial;
2400 MUTEX_EXIT(&conn->conn_data_lock);
2402 /* If the connection is in an error state, send an abort packet and ignore
2403 * the incoming packet */
2405 /* Don't respond to an abort packet--we don't want loops! */
2406 MUTEX_ENTER(&conn->conn_data_lock);
2407 if (np->header.type != RX_PACKET_TYPE_ABORT)
2408 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2410 MUTEX_EXIT(&conn->conn_data_lock);
2414 /* Check for connection-only requests (i.e. not call specific). */
2415 if (np->header.callNumber == 0) {
2416 switch (np->header.type) {
2417 case RX_PACKET_TYPE_ABORT:
2418 /* What if the supplied error is zero? */
2419 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2420 MUTEX_ENTER(&conn->conn_data_lock);
2422 MUTEX_EXIT(&conn->conn_data_lock);
2424 case RX_PACKET_TYPE_CHALLENGE:
2425 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2426 MUTEX_ENTER(&conn->conn_data_lock);
2428 MUTEX_EXIT(&conn->conn_data_lock);
2430 case RX_PACKET_TYPE_RESPONSE:
2431 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2432 MUTEX_ENTER(&conn->conn_data_lock);
2434 MUTEX_EXIT(&conn->conn_data_lock);
2436 case RX_PACKET_TYPE_PARAMS:
2437 case RX_PACKET_TYPE_PARAMS+1:
2438 case RX_PACKET_TYPE_PARAMS+2:
2439 /* ignore these packet types for now */
2440 MUTEX_ENTER(&conn->conn_data_lock);
2442 MUTEX_EXIT(&conn->conn_data_lock);
2447 /* Should not reach here, unless the peer is broken: send an
2449 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2450 MUTEX_ENTER(&conn->conn_data_lock);
2451 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2453 MUTEX_EXIT(&conn->conn_data_lock);
2458 channel = np->header.cid & RX_CHANNELMASK;
2459 call = conn->call[channel];
2460 #ifdef RX_ENABLE_LOCKS
2462 MUTEX_ENTER(&call->lock);
2463 /* Test to see if call struct is still attached to conn. */
2464 if (call != conn->call[channel]) {
2466 MUTEX_EXIT(&call->lock);
2467 if (type == RX_SERVER_CONNECTION) {
2468 call = conn->call[channel];
2469 /* If we started with no call attached and there is one now,
2470 * another thread is also running this routine and has gotten
2471 * the connection channel. We should drop this packet in the tests
2472 * below. If there was a call on this connection and it's now
2473 * gone, then we'll be making a new call below.
2474 * If there was previously a call and it's now different then
2475 * the old call was freed and another thread running this routine
2476 * has created a call on this channel. One of these two threads
2477 * has a packet for the old call and the code below handles those
2481 MUTEX_ENTER(&call->lock);
2484 /* This packet can't be for this call. If the new call address is
2485 * 0 then no call is running on this channel. If there is a call
2486 * then, since this is a client connection we're getting data for
2487 * it must be for the previous call.
2489 MUTEX_ENTER(&rx_stats_mutex);
2490 rx_stats.spuriousPacketsRead++;
2491 MUTEX_EXIT(&rx_stats_mutex);
2492 MUTEX_ENTER(&conn->conn_data_lock);
2494 MUTEX_EXIT(&conn->conn_data_lock);
2499 currentCallNumber = conn->callNumber[channel];
2501 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2502 if (np->header.callNumber < currentCallNumber) {
2503 MUTEX_ENTER(&rx_stats_mutex);
2504 rx_stats.spuriousPacketsRead++;
2505 MUTEX_EXIT(&rx_stats_mutex);
2506 #ifdef RX_ENABLE_LOCKS
2508 MUTEX_EXIT(&call->lock);
2510 MUTEX_ENTER(&conn->conn_data_lock);
2512 MUTEX_EXIT(&conn->conn_data_lock);
2516 call = rxi_NewCall(conn, channel);
2517 MUTEX_ENTER(&call->lock);
2518 *call->callNumber = np->header.callNumber;
2519 call->state = RX_STATE_PRECALL;
2520 clock_GetTime(&call->queueTime);
2521 hzero(call->bytesSent);
2522 hzero(call->bytesRcvd);
2523 rxi_KeepAliveOn(call);
2525 else if (np->header.callNumber != currentCallNumber) {
2526 /* Wait until the transmit queue is idle before deciding
2527 * whether to reset the current call. Chances are that the
2528 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2531 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2532 while ((call->state == RX_STATE_ACTIVE) &&
2533 (call->flags & RX_CALL_TQ_BUSY)) {
2534 call->flags |= RX_CALL_TQ_WAIT;
2535 #ifdef RX_ENABLE_LOCKS
2536 CV_WAIT(&call->cv_tq, &call->lock);
2537 #else /* RX_ENABLE_LOCKS */
2538 osi_rxSleep(&call->tq);
2539 #endif /* RX_ENABLE_LOCKS */
2541 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2542 /* If the new call cannot be taken right now send a busy and set
2543 * the error condition in this call, so that it terminates as
2544 * quickly as possible */
2545 if (call->state == RX_STATE_ACTIVE) {
2546 struct rx_packet *tp;
2548 rxi_CallError(call, RX_CALL_DEAD);
2549 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2550 MUTEX_EXIT(&call->lock);
2551 MUTEX_ENTER(&conn->conn_data_lock);
2553 MUTEX_EXIT(&conn->conn_data_lock);
2556 rxi_ResetCall(call, 0);
2557 *call->callNumber = np->header.callNumber;
2558 call->state = RX_STATE_PRECALL;
2559 clock_GetTime(&call->queueTime);
2560 hzero(call->bytesSent);
2561 hzero(call->bytesRcvd);
2563 * If the number of queued calls exceeds the overload
2564 * threshold then abort this call.
2566 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2567 struct rx_packet *tp;
2569 rxi_CallError(call, rx_BusyError);
2570 tp = rxi_SendCallAbort(call, np, 1, 0);
2571 MUTEX_EXIT(&call->lock);
2572 MUTEX_ENTER(&conn->conn_data_lock);
2574 MUTEX_EXIT(&conn->conn_data_lock);
2577 rxi_KeepAliveOn(call);
2580 /* Continuing call; do nothing here. */
2582 } else { /* we're the client */
2583 /* Ignore all incoming acknowledgements for calls in DALLY state */
2584 if ( call && (call->state == RX_STATE_DALLY)
2585 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2586 MUTEX_ENTER(&rx_stats_mutex);
2587 rx_stats.ignorePacketDally++;
2588 MUTEX_EXIT(&rx_stats_mutex);
2589 #ifdef RX_ENABLE_LOCKS
2591 MUTEX_EXIT(&call->lock);
2594 MUTEX_ENTER(&conn->conn_data_lock);
2596 MUTEX_EXIT(&conn->conn_data_lock);
2600 /* Ignore anything that's not relevant to the current call. If there
2601 * isn't a current call, then no packet is relevant. */
2602 if (!call || (np->header.callNumber != currentCallNumber)) {
2603 MUTEX_ENTER(&rx_stats_mutex);
2604 rx_stats.spuriousPacketsRead++;
2605 MUTEX_EXIT(&rx_stats_mutex);
2606 #ifdef RX_ENABLE_LOCKS
2608 MUTEX_EXIT(&call->lock);
2611 MUTEX_ENTER(&conn->conn_data_lock);
2613 MUTEX_EXIT(&conn->conn_data_lock);
2616 /* If the service security object index stamped in the packet does not
2617 * match the connection's security index, ignore the packet */
2618 if (np->header.securityIndex != conn->securityIndex) {
2619 #ifdef RX_ENABLE_LOCKS
2620 MUTEX_EXIT(&call->lock);
2622 MUTEX_ENTER(&conn->conn_data_lock);
2624 MUTEX_EXIT(&conn->conn_data_lock);
2628 /* If we're receiving the response, then all transmit packets are
2629 * implicitly acknowledged. Get rid of them. */
2630 if (np->header.type == RX_PACKET_TYPE_DATA) {
2631 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2632 /* XXX Hack. Because we must release the global rx lock when
2633 * sending packets (osi_NetSend) we drop all acks while we're
2634 * traversing the tq in rxi_Start sending packets out because
2635 * packets may move to the freePacketQueue as result of being here!
2636 * So we drop these packets until we're safely out of the
2637 * traversing. Really ugly!
2638 * For fine grain RX locking, we set the acked field in the
2639 * packets and let rxi_Start remove them from the transmit queue.
2641 if (call->flags & RX_CALL_TQ_BUSY) {
2642 #ifdef RX_ENABLE_LOCKS
2643 rxi_SetAcksInTransmitQueue(call);
2646 return np; /* xmitting; drop packet */
2650 rxi_ClearTransmitQueue(call, 0);
2652 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2653 rxi_ClearTransmitQueue(call, 0);
2654 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2656 if (np->header.type == RX_PACKET_TYPE_ACK) {
2657 /* now check to see if this is an ack packet acknowledging that the
2658 * server actually *lost* some hard-acked data. If this happens we
2659 * ignore this packet, as it may indicate that the server restarted in
2660 * the middle of a call. It is also possible that this is an old ack
2661 * packet. We don't abort the connection in this case, because this
2662 * *might* just be an old ack packet. The right way to detect a server
2663 * restart in the midst of a call is to notice that the server epoch
2665 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2666 * XXX unacknowledged. I think that this is off-by-one, but
2667 * XXX I don't dare change it just yet, since it will
2668 * XXX interact badly with the server-restart detection
2669 * XXX code in receiveackpacket. */
2670 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2671 MUTEX_ENTER(&rx_stats_mutex);
2672 rx_stats.spuriousPacketsRead++;
2673 MUTEX_EXIT(&rx_stats_mutex);
2674 MUTEX_EXIT(&call->lock);
2675 MUTEX_ENTER(&conn->conn_data_lock);
2677 MUTEX_EXIT(&conn->conn_data_lock);
2681 } /* else not a data packet */
2684 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2685 /* Set remote user defined status from packet */
2686 call->remoteStatus = np->header.userStatus;
2688 /* Note the gap between the expected next packet and the actual
2689 * packet that arrived, when the new packet has a smaller serial number
2690 * than expected. Rioses frequently reorder packets all by themselves,
2691 * so this will be quite important with very large window sizes.
2692 * Skew is checked against 0 here to avoid any dependence on the type of
2693 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2695 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2696 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2697 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2699 MUTEX_ENTER(&conn->conn_data_lock);
2700 skew = conn->lastSerial - np->header.serial;
2701 conn->lastSerial = np->header.serial;
2702 MUTEX_EXIT(&conn->conn_data_lock);
2704 register struct rx_peer *peer;
2706 if (skew > peer->inPacketSkew) {
2707 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2708 peer->inPacketSkew = skew;
2712 /* Now do packet type-specific processing */
2713 switch (np->header.type) {
2714 case RX_PACKET_TYPE_DATA:
2715 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2718 case RX_PACKET_TYPE_ACK:
2719 /* Respond immediately to ack packets requesting acknowledgement
2721 if (np->header.flags & RX_REQUEST_ACK) {
2722 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2723 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2725 np = rxi_ReceiveAckPacket(call, np, 1);
2727 case RX_PACKET_TYPE_ABORT:
2728 /* An abort packet: reset the connection, passing the error up to
2730 /* What if error is zero? */
2731 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2733 case RX_PACKET_TYPE_BUSY:
2736 case RX_PACKET_TYPE_ACKALL:
2737 /* All packets acknowledged, so we can drop all packets previously
2738 * readied for sending */
2739 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2740 /* XXX Hack. We because we can't release the global rx lock when
2741 * sending packets (osi_NetSend) we drop all ack pkts while we're
2742 * traversing the tq in rxi_Start sending packets out because
2743 * packets may move to the freePacketQueue as result of being
2744 * here! So we drop these packets until we're safely out of the
2745 * traversing. Really ugly!
2746 * For fine grain RX locking, we set the acked field in the packets
2747 * and let rxi_Start remove the packets from the transmit queue.
2749 if (call->flags & RX_CALL_TQ_BUSY) {
2750 #ifdef RX_ENABLE_LOCKS
2751 rxi_SetAcksInTransmitQueue(call);
2753 #else /* RX_ENABLE_LOCKS */
2755 return np; /* xmitting; drop packet */
2756 #endif /* RX_ENABLE_LOCKS */
2758 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2759 rxi_ClearTransmitQueue(call, 0);
2762 /* Should not reach here, unless the peer is broken: send an abort
2764 rxi_CallError(call, RX_PROTOCOL_ERROR);
2765 np = rxi_SendCallAbort(call, np, 1, 0);
2768 /* Note when this last legitimate packet was received, for keep-alive
2769 * processing. Note, we delay getting the time until now in the hope that
2770 * the packet will be delivered to the user before any get time is required
2771 * (if not, then the time won't actually be re-evaluated here). */
2772 call->lastReceiveTime = clock_Sec();
2773 MUTEX_EXIT(&call->lock);
2774 MUTEX_ENTER(&conn->conn_data_lock);
2776 MUTEX_EXIT(&conn->conn_data_lock);
2780 /* return true if this is an "interesting" connection from the point of view
2781 of someone trying to debug the system */
2782 int rxi_IsConnInteresting(struct rx_connection *aconn)
2785 register struct rx_call *tcall;
2787 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2789 for(i=0;i<RX_MAXCALLS;i++) {
2790 tcall = aconn->call[i];
2792 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2794 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2802 /* if this is one of the last few packets AND it wouldn't be used by the
2803 receiving call to immediately satisfy a read request, then drop it on
2804 the floor, since accepting it might prevent a lock-holding thread from
2805 making progress in its reading. If a call has been cleared while in
2806 the precall state then ignore all subsequent packets until the call
2807 is assigned to a thread. */
2809 static TooLow(ap, acall)
2810 struct rx_call *acall;
2811 struct rx_packet *ap; {
2813 MUTEX_ENTER(&rx_stats_mutex);
2814 if (((ap->header.seq != 1) &&
2815 (acall->flags & RX_CALL_CLEARED) &&
2816 (acall->state == RX_STATE_PRECALL)) ||
2817 ((rx_nFreePackets < rxi_dataQuota+2) &&
2818 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2819 && (acall->flags & RX_CALL_READER_WAIT)))) {
2822 MUTEX_EXIT(&rx_stats_mutex);
2827 /* try to attach call, if authentication is complete */
2828 static void TryAttach(acall, socket, tnop, newcallp)
2829 register struct rx_call *acall;
2830 register osi_socket socket;
2832 register struct rx_call **newcallp; {
2833 register struct rx_connection *conn;
2835 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2836 /* Don't attach until we have any req'd. authentication. */
2837 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2838 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2839 /* Note: this does not necessarily succeed; there
2840 may not any proc available */
2843 rxi_ChallengeOn(acall->conn);
2848 /* A data packet has been received off the interface. This packet is
2849 * appropriate to the call (the call is in the right state, etc.). This
2850 * routine can return a packet to the caller, for re-use */
2852 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2853 port, tnop, newcallp)
2854 register struct rx_call *call;
2855 register struct rx_packet *np;
2861 struct rx_call **newcallp;
2867 afs_uint32 seq, serial, flags;
2869 struct rx_packet *tnp;
2871 MUTEX_ENTER(&rx_stats_mutex);
2872 rx_stats.dataPacketsRead++;
2873 MUTEX_EXIT(&rx_stats_mutex);
2876 /* If there are no packet buffers, drop this new packet, unless we can find
2877 * packet buffers from inactive calls */
2879 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2880 MUTEX_ENTER(&rx_freePktQ_lock);
2881 rxi_NeedMorePackets = TRUE;
2882 MUTEX_EXIT(&rx_freePktQ_lock);
2883 MUTEX_ENTER(&rx_stats_mutex);
2884 rx_stats.noPacketBuffersOnRead++;
2885 MUTEX_EXIT(&rx_stats_mutex);
2886 call->rprev = np->header.serial;
2887 rxi_calltrace(RX_TRACE_DROP, call);
2888 dpf (("packet %x dropped on receipt - quota problems", np));
2890 rxi_ClearReceiveQueue(call);
2891 clock_GetTime(&when);
2892 clock_Add(&when, &rx_softAckDelay);
2893 if (!call->delayedAckEvent ||
2894 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2895 rxevent_Cancel(call->delayedAckEvent, call,
2896 RX_CALL_REFCOUNT_DELAY);
2897 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2898 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2901 /* we've damaged this call already, might as well do it in. */
2907 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2908 * packet is one of several packets transmitted as a single
2909 * datagram. Do not send any soft or hard acks until all packets
2910 * in a jumbogram have been processed. Send negative acks right away.
2912 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2913 /* tnp is non-null when there are more packets in the
2914 * current jumbo gram */
2921 seq = np->header.seq;
2922 serial = np->header.serial;
2923 flags = np->header.flags;
2925 /* If the call is in an error state, send an abort message */
2927 return rxi_SendCallAbort(call, np, istack, 0);
2929 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2930 * AFS 3.5 jumbogram. */
2931 if (flags & RX_JUMBO_PACKET) {
2932 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2937 if (np->header.spare != 0) {
2938 MUTEX_ENTER(&call->conn->conn_data_lock);
2939 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2940 MUTEX_EXIT(&call->conn->conn_data_lock);
2943 /* The usual case is that this is the expected next packet */
2944 if (seq == call->rnext) {
2946 /* Check to make sure it is not a duplicate of one already queued */
2947 if (queue_IsNotEmpty(&call->rq)
2948 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2949 MUTEX_ENTER(&rx_stats_mutex);
2950 rx_stats.dupPacketsRead++;
2951 MUTEX_EXIT(&rx_stats_mutex);
2952 dpf (("packet %x dropped on receipt - duplicate", np));
2953 rxevent_Cancel(call->delayedAckEvent, call,
2954 RX_CALL_REFCOUNT_DELAY);
2955 np = rxi_SendAck(call, np, seq, serial,
2956 flags, RX_ACK_DUPLICATE, istack);
2962 /* It's the next packet. Stick it on the receive queue
2963 * for this call. Set newPackets to make sure we wake
2964 * the reader once all packets have been processed */
2965 queue_Prepend(&call->rq, np);
2967 np = NULL; /* We can't use this anymore */
2970 /* If an ack is requested then set a flag to make sure we
2971 * send an acknowledgement for this packet */
2972 if (flags & RX_REQUEST_ACK) {
2976 /* Keep track of whether we have received the last packet */
2977 if (flags & RX_LAST_PACKET) {
2978 call->flags |= RX_CALL_HAVE_LAST;
2982 /* Check whether we have all of the packets for this call */
2983 if (call->flags & RX_CALL_HAVE_LAST) {
2984 afs_uint32 tseq; /* temporary sequence number */
2985 struct rx_packet *tp; /* Temporary packet pointer */
2986 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
2988 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2989 if (tseq != tp->header.seq)
2991 if (tp->header.flags & RX_LAST_PACKET) {
2992 call->flags |= RX_CALL_RECEIVE_DONE;
2999 /* Provide asynchronous notification for those who want it
3000 * (e.g. multi rx) */
3001 if (call->arrivalProc) {
3002 (*call->arrivalProc)(call, call->arrivalProcHandle,
3003 call->arrivalProcArg);
3004 call->arrivalProc = (VOID (*)()) 0;
3007 /* Update last packet received */
3010 /* If there is no server process serving this call, grab
3011 * one, if available. We only need to do this once. If a
3012 * server thread is available, this thread becomes a server
3013 * thread and the server thread becomes a listener thread. */
3015 TryAttach(call, socket, tnop, newcallp);
3018 /* This is not the expected next packet. */
3020 /* Determine whether this is a new or old packet, and if it's
3021 * a new one, whether it fits into the current receive window.
3022 * Also figure out whether the packet was delivered in sequence.
3023 * We use the prev variable to determine whether the new packet
3024 * is the successor of its immediate predecessor in the
3025 * receive queue, and the missing flag to determine whether
3026 * any of this packets predecessors are missing. */
3028 afs_uint32 prev; /* "Previous packet" sequence number */
3029 struct rx_packet *tp; /* Temporary packet pointer */
3030 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3031 int missing; /* Are any predecessors missing? */
3033 /* If the new packet's sequence number has been sent to the
3034 * application already, then this is a duplicate */
3035 if (seq < call->rnext) {
3036 MUTEX_ENTER(&rx_stats_mutex);
3037 rx_stats.dupPacketsRead++;
3038 MUTEX_EXIT(&rx_stats_mutex);
3039 rxevent_Cancel(call->delayedAckEvent, call,
3040 RX_CALL_REFCOUNT_DELAY);
3041 np = rxi_SendAck(call, np, seq, serial,
3042 flags, RX_ACK_DUPLICATE, istack);
3048 /* If the sequence number is greater than what can be
3049 * accomodated by the current window, then send a negative
3050 * acknowledge and drop the packet */
3051 if ((call->rnext + call->rwind) <= seq) {
3052 rxevent_Cancel(call->delayedAckEvent, call,
3053 RX_CALL_REFCOUNT_DELAY);
3054 np = rxi_SendAck(call, np, seq, serial,
3055 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3061 /* Look for the packet in the queue of old received packets */
3062 for (prev = call->rnext - 1, missing = 0,
3063 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3064 /*Check for duplicate packet */
3065 if (seq == tp->header.seq) {
3066 MUTEX_ENTER(&rx_stats_mutex);
3067 rx_stats.dupPacketsRead++;
3068 MUTEX_EXIT(&rx_stats_mutex);
3069 rxevent_Cancel(call->delayedAckEvent, call,
3070 RX_CALL_REFCOUNT_DELAY);
3071 np = rxi_SendAck(call, np, seq, serial,
3072 flags, RX_ACK_DUPLICATE, istack);
3077 /* If we find a higher sequence packet, break out and
3078 * insert the new packet here. */
3079 if (seq < tp->header.seq) break;
3080 /* Check for missing packet */
3081 if (tp->header.seq != prev+1) {
3085 prev = tp->header.seq;
3088 /* Keep track of whether we have received the last packet. */
3089 if (flags & RX_LAST_PACKET) {
3090 call->flags |= RX_CALL_HAVE_LAST;
3093 /* It's within the window: add it to the the receive queue.
3094 * tp is left by the previous loop either pointing at the
3095 * packet before which to insert the new packet, or at the
3096 * queue head if the queue is empty or the packet should be
3098 queue_InsertBefore(tp, np);
3102 /* Check whether we have all of the packets for this call */
3103 if ((call->flags & RX_CALL_HAVE_LAST)
3104 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3105 afs_uint32 tseq; /* temporary sequence number */
3107 for (tseq = call->rnext,
3108 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3109 if (tseq != tp->header.seq)
3111 if (tp->header.flags & RX_LAST_PACKET) {
3112 call->flags |= RX_CALL_RECEIVE_DONE;
3119 /* We need to send an ack of the packet is out of sequence,
3120 * or if an ack was requested by the peer. */
3121 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3125 /* Acknowledge the last packet for each call */
3126 if (flags & RX_LAST_PACKET) {
3137 * If the receiver is waiting for an iovec, fill the iovec
3138 * using the data from the receive queue */
3139 if (call->flags & RX_CALL_IOVEC_WAIT) {
3140 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3141 /* the call may have been aborted */
3150 /* Wakeup the reader if any */
3151 if ((call->flags & RX_CALL_READER_WAIT) &&
3152 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3153 (call->iovNext >= call->iovMax) ||
3154 (call->flags & RX_CALL_RECEIVE_DONE))) {
3155 call->flags &= ~RX_CALL_READER_WAIT;
3156 #ifdef RX_ENABLE_LOCKS
3157 CV_BROADCAST(&call->cv_rq);
3159 osi_rxWakeup(&call->rq);
3165 * Send an ack when requested by the peer, or once every
3166 * rxi_SoftAckRate packets until the last packet has been
3167 * received. Always send a soft ack for the last packet in
3168 * the server's reply. */
3170 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3171 np = rxi_SendAck(call, np, seq, serial, flags,
3172 RX_ACK_REQUESTED, istack);
3173 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3174 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3175 np = rxi_SendAck(call, np, seq, serial, flags,
3176 RX_ACK_IDLE, istack);
3177 } else if (call->nSoftAcks) {
3178 clock_GetTime(&when);
3179 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3180 clock_Add(&when, &rx_lastAckDelay);
3182 clock_Add(&when, &rx_softAckDelay);
3184 if (!call->delayedAckEvent ||
3185 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3186 rxevent_Cancel(call->delayedAckEvent, call,
3187 RX_CALL_REFCOUNT_DELAY);
3188 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3189 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3192 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3193 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3200 static void rxi_ComputeRate();
3203 /* The real smarts of the whole thing. */
3204 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3205 register struct rx_call *call;
3206 struct rx_packet *np;
3209 struct rx_ackPacket *ap;
3211 register struct rx_packet *tp;
3212 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3213 register struct rx_connection *conn = call->conn;
3214 struct rx_peer *peer = conn->peer;
3217 /* because there are CM's that are bogus, sending weird values for this. */
3218 afs_uint32 skew = 0;
3219 int needRxStart = 0;
3224 int newAckCount = 0;
3225 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3226 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3228 MUTEX_ENTER(&rx_stats_mutex);
3229 rx_stats.ackPacketsRead++;
3230 MUTEX_EXIT(&rx_stats_mutex);
3231 ap = (struct rx_ackPacket *) rx_DataOf(np);
3232 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3234 return np; /* truncated ack packet */
3236 /* depends on ack packet struct */
3237 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3238 first = ntohl(ap->firstPacket);
3239 serial = ntohl(ap->serial);
3240 /* temporarily disabled -- needs to degrade over time
3241 skew = ntohs(ap->maxSkew); */
3243 /* Ignore ack packets received out of order */
3244 if (first < call->tfirst) {
3248 if (np->header.flags & RX_SLOW_START_OK) {
3249 call->flags |= RX_CALL_SLOW_START_OK;
3255 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3256 ap->reason, ntohl(ap->previousPacket), np->header.seq, serial,
3257 skew, ntohl(ap->firstPacket));
3260 for (offset = 0; offset < nAcks; offset++)
3261 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3267 /* if a server connection has been re-created, it doesn't remember what
3268 serial # it was up to. An ack will tell us, since the serial field
3269 contains the largest serial received by the other side */
3270 MUTEX_ENTER(&conn->conn_data_lock);
3271 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3272 conn->serial = serial+1;
3274 MUTEX_EXIT(&conn->conn_data_lock);
3276 /* Update the outgoing packet skew value to the latest value of
3277 * the peer's incoming packet skew value. The ack packet, of
3278 * course, could arrive out of order, but that won't affect things
3280 MUTEX_ENTER(&peer->peer_lock);
3281 peer->outPacketSkew = skew;
3283 /* Check for packets that no longer need to be transmitted, and
3284 * discard them. This only applies to packets positively
3285 * acknowledged as having been sent to the peer's upper level.
3286 * All other packets must be retained. So only packets with
3287 * sequence numbers < ap->firstPacket are candidates. */
3288 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3289 if (tp->header.seq >= first) break;
3290 call->tfirst = tp->header.seq + 1;
3291 if (tp->header.serial == serial) {
3292 /* Use RTT if not delayed by client. */
3293 if (ap->reason != RX_ACK_DELAY)
3294 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3296 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3299 else if (tp->firstSerial == serial) {
3300 /* Use RTT if not delayed by client. */
3301 if (ap->reason != RX_ACK_DELAY)
3302 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3304 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3307 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3308 /* XXX Hack. Because we have to release the global rx lock when sending
3309 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3310 * in rxi_Start sending packets out because packets may move to the
3311 * freePacketQueue as result of being here! So we drop these packets until
3312 * we're safely out of the traversing. Really ugly!
3313 * To make it even uglier, if we're using fine grain locking, we can
3314 * set the ack bits in the packets and have rxi_Start remove the packets
3315 * when it's done transmitting.
3320 if (call->flags & RX_CALL_TQ_BUSY) {
3321 #ifdef RX_ENABLE_LOCKS
3323 call->flags |= RX_CALL_TQ_SOME_ACKED;
3324 #else /* RX_ENABLE_LOCKS */
3326 #endif /* RX_ENABLE_LOCKS */
3328 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3331 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3336 /* Give rate detector a chance to respond to ping requests */
3337 if (ap->reason == RX_ACK_PING_RESPONSE) {
3338 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3342 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3344 /* Now go through explicit acks/nacks and record the results in
3345 * the waiting packets. These are packets that can't be released
3346 * yet, even with a positive acknowledge. This positive
3347 * acknowledge only means the packet has been received by the
3348 * peer, not that it will be retained long enough to be sent to
3349 * the peer's upper level. In addition, reset the transmit timers
3350 * of any missing packets (those packets that must be missing
3351 * because this packet was out of sequence) */
3353 call->nSoftAcked = 0;
3354 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3355 /* Update round trip time if the ack was stimulated on receipt
3357 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3358 #ifdef RX_ENABLE_LOCKS
3359 if (tp->header.seq >= first) {
3360 #endif /* RX_ENABLE_LOCKS */
3361 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3362 if (tp->header.serial == serial) {
3363 /* Use RTT if not delayed by client. */
3364 if (ap->reason != RX_ACK_DELAY)
3365 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3367 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3370 else if ((tp->firstSerial == serial)) {
3371 /* Use RTT if not delayed by client. */
3372 if (ap->reason != RX_ACK_DELAY)
3373 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3375 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3378 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3379 #ifdef RX_ENABLE_LOCKS
3381 #endif /* RX_ENABLE_LOCKS */
3382 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3384 /* Set the acknowledge flag per packet based on the
3385 * information in the ack packet. An acknowlegded packet can
3386 * be downgraded when the server has discarded a packet it
3387 * soacked previously, or when an ack packet is received
3388 * out of sequence. */
3389 if (tp->header.seq < first) {
3390 /* Implicit ack information */
3396 else if (tp->header.seq < first + nAcks) {
3397 /* Explicit ack information: set it in the packet appropriately */
3398 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3418 /* If packet isn't yet acked, and it has been transmitted at least
3419 * once, reset retransmit time using latest timeout
3420 * ie, this should readjust the retransmit timer for all outstanding
3421 * packets... So we don't just retransmit when we should know better*/
3423 if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3424 tp->retryTime = tp->timeSent;
3425 clock_Add(&tp->retryTime, &peer->timeout);
3426 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3427 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3431 /* If the window has been extended by this acknowledge packet,
3432 * then wakeup a sender waiting in alloc for window space, or try
3433 * sending packets now, if he's been sitting on packets due to
3434 * lack of window space */
3435 if (call->tnext < (call->tfirst + call->twind)) {
3436 #ifdef RX_ENABLE_LOCKS
3437 CV_SIGNAL(&call->cv_twind);
3439 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3440 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3441 osi_rxWakeup(&call->twind);
3444 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3445 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3449 /* if the ack packet has a receivelen field hanging off it,
3450 * update our state */
3451 if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3454 /* If the ack packet has a "recommended" size that is less than
3455 * what I am using now, reduce my size to match */
3456 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3457 sizeof(afs_int32), &tSize);
3458 tSize = (afs_uint32) ntohl(tSize);
3459 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3461 /* Get the maximum packet size to send to this peer */
3462 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3464 tSize = (afs_uint32)ntohl(tSize);
3465 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3466 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3468 /* sanity check - peer might have restarted with different params.
3469 * If peer says "send less", dammit, send less... Peer should never
3470 * be unable to accept packets of the size that prior AFS versions would
3471 * send without asking. */
3472 if (peer->maxMTU != tSize) {
3473 peer->maxMTU = tSize;
3474 peer->MTU = MIN(tSize, peer->MTU);
3475 call->MTU = MIN(call->MTU, tSize);
3479 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3481 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3482 sizeof(afs_int32), &tSize);
3483 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3484 if (tSize < call->twind) { /* smaller than our send */
3485 call->twind = tSize; /* window, we must send less... */
3486 call->ssthresh = MIN(call->twind, call->ssthresh);
3489 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3490 * network MTU confused with the loopback MTU. Calculate the
3491 * maximum MTU here for use in the slow start code below.
3493 maxMTU = peer->maxMTU;
3494 /* Did peer restart with older RX version? */
3495 if (peer->maxDgramPackets > 1) {
3496 peer->maxDgramPackets = 1;
3498 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3500 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3501 sizeof(afs_int32), &tSize);
3502 tSize = (afs_uint32) ntohl(tSize);
3504 * As of AFS 3.5 we set the send window to match the receive window.
3506 if (tSize < call->twind) {
3507 call->twind = tSize;
3508 call->ssthresh = MIN(call->twind, call->ssthresh);
3509 } else if (tSize > call->twind) {
3510 call->twind = tSize;
3514 * As of AFS 3.5, a jumbogram is more than one fixed size
3515 * packet transmitted in a single UDP datagram. If the remote
3516 * MTU is smaller than our local MTU then never send a datagram
3517 * larger than the natural MTU.
3519 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3520 sizeof(afs_int32), &tSize);
3521 maxDgramPackets = (afs_uint32) ntohl(tSize);
3522 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3523 maxDgramPackets = MIN(maxDgramPackets,
3524 (int)(peer->ifDgramPackets));
3525 maxDgramPackets = MIN(maxDgramPackets, tSize);
3526 if (maxDgramPackets > 1) {
3527 peer->maxDgramPackets = maxDgramPackets;
3528 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3530 peer->maxDgramPackets = 1;
3531 call->MTU = peer->natMTU;
3533 } else if (peer->maxDgramPackets > 1) {
3534 /* Restarted with lower version of RX */
3535 peer->maxDgramPackets = 1;
3537 } else if (peer->maxDgramPackets > 1 ||
3538 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3539 /* Restarted with lower version of RX */
3540 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3541 peer->natMTU = OLD_MAX_PACKET_SIZE;
3542 peer->MTU = OLD_MAX_PACKET_SIZE;
3543 peer->maxDgramPackets = 1;
3544 peer->nDgramPackets = 1;
3546 call->MTU = OLD_MAX_PACKET_SIZE;
3551 * Calculate how many datagrams were successfully received after
3552 * the first missing packet and adjust the negative ack counter
3557 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3558 if (call->nNacks < nNacked) {
3559 call->nNacks = nNacked;
3568 if (call->flags & RX_CALL_FAST_RECOVER) {
3570 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3572 call->flags &= ~RX_CALL_FAST_RECOVER;
3573 call->cwind = call->nextCwind;
3574 call->nextCwind = 0;
3577 call->nCwindAcks = 0;
3579 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3580 /* Three negative acks in a row trigger congestion recovery */
3581 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3582 MUTEX_EXIT(&peer->peer_lock);
3583 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3584 /* someone else is waiting to start recovery */
3587 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3588 while (call->flags & RX_CALL_TQ_BUSY) {
3589 call->flags |= RX_CALL_TQ_WAIT;
3590 #ifdef RX_ENABLE_LOCKS
3591 CV_WAIT(&call->cv_tq, &call->lock);
3592 #else /* RX_ENABLE_LOCKS */
3593 osi_rxSleep(&call->tq);
3594 #endif /* RX_ENABLE_LOCKS */
3596 MUTEX_ENTER(&peer->peer_lock);
3597 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3598 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3599 call->flags |= RX_CALL_FAST_RECOVER;
3600 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3601 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3603 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3604 call->nextCwind = call->ssthresh;
3607 peer->MTU = call->MTU;
3608 peer->cwind = call->nextCwind;
3609 peer->nDgramPackets = call->nDgramPackets;
3611 call->congestSeq = peer->congestSeq;
3612 /* Reset the resend times on the packets that were nacked
3613 * so we will retransmit as soon as the window permits*/
3614 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3617 clock_Zero(&tp->retryTime);
3619 } else if (tp->acked) {
3624 /* If cwind is smaller than ssthresh, then increase
3625 * the window one packet for each ack we receive (exponential
3627 * If cwind is greater than or equal to ssthresh then increase
3628 * the congestion window by one packet for each cwind acks we
3629 * receive (linear growth). */
3630 if (call->cwind < call->ssthresh) {
3631 call->cwind = MIN((int)call->ssthresh,
3632 (int)(call->cwind + newAckCount));
3633 call->nCwindAcks = 0;
3635 call->nCwindAcks += newAckCount;
3636 if (call->nCwindAcks >= call->cwind) {
3637 call->nCwindAcks = 0;
3638 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3642 * If we have received several acknowledgements in a row then
3643 * it is time to increase the size of our datagrams
3645 if ((int)call->nAcks > rx_nDgramThreshold) {
3646 if (peer->maxDgramPackets > 1) {
3647 if (call->nDgramPackets < peer->maxDgramPackets) {
3648 call->nDgramPackets++;
3650 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3651 } else if (call->MTU < peer->maxMTU) {
3652 call->MTU += peer->natMTU;
3653 call->MTU = MIN(call->MTU, peer->maxMTU);
3659 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3661 /* Servers need to hold the call until all response packets have
3662 * been acknowledged. Soft acks are good enough since clients
3663 * are not allowed to clear their receive queues. */
3664 if (call->state == RX_STATE_HOLD &&
3665 call->tfirst + call->nSoftAcked >= call->tnext) {
3666 call->state = RX_STATE_DALLY;
3667 rxi_ClearTransmitQueue(call, 0);
3668 } else if (!queue_IsEmpty(&call->tq)) {
3669 rxi_Start(0, call, istack);
3674 /* Received a response to a challenge packet */
3675 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3676 register struct rx_connection *conn;
3677 register struct rx_packet *np;
3682 /* Ignore the packet if we're the client */
3683 if (conn->type == RX_CLIENT_CONNECTION) return np;
3685 /* If already authenticated, ignore the packet (it's probably a retry) */
3686 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3689 /* Otherwise, have the security object evaluate the response packet */
3690 error = RXS_CheckResponse(conn->securityObject, conn, np);
3692 /* If the response is invalid, reset the connection, sending
3693 * an abort to the peer */
3697 rxi_ConnectionError(conn, error);
3698 MUTEX_ENTER(&conn->conn_data_lock);
3699 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3700 MUTEX_EXIT(&conn->conn_data_lock);
3704 /* If the response is valid, any calls waiting to attach
3705 * servers can now do so */
3707 for (i=0; i<RX_MAXCALLS; i++) {
3708 struct rx_call *call = conn->call[i];
3710 MUTEX_ENTER(&call->lock);
3711 if (call->state == RX_STATE_PRECALL)
3712 rxi_AttachServerProc(call, -1, NULL, NULL);
3713 MUTEX_EXIT(&call->lock);
3720 /* A client has received an authentication challenge: the security
3721 * object is asked to cough up a respectable response packet to send
3722 * back to the server. The server is responsible for retrying the
3723 * challenge if it fails to get a response. */
3726 rxi_ReceiveChallengePacket(conn, np, istack)
3727 register struct rx_connection *conn;
3728 register struct rx_packet *np;
3733 /* Ignore the challenge if we're the server */
3734 if (conn->type == RX_SERVER_CONNECTION) return np;
3736 /* Ignore the challenge if the connection is otherwise idle; someone's
3737 * trying to use us as an oracle. */
3738 if (!rxi_HasActiveCalls(conn)) return np;
3740 /* Send the security object the challenge packet. It is expected to fill
3741 * in the response. */
3742 error = RXS_GetResponse(conn->securityObject, conn, np);
3744 /* If the security object is unable to return a valid response, reset the
3745 * connection and send an abort to the peer. Otherwise send the response
3746 * packet to the peer connection. */
3748 rxi_ConnectionError(conn, error);
3749 MUTEX_ENTER(&conn->conn_data_lock);
3750 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3751 MUTEX_EXIT(&conn->conn_data_lock);
3754 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3755 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3761 /* Find an available server process to service the current request in
3762 * the given call structure. If one isn't available, queue up this
3763 * call so it eventually gets one */
3765 rxi_AttachServerProc(call, socket, tnop, newcallp)
3766 register struct rx_call *call;
3767 register osi_socket socket;
3769 register struct rx_call **newcallp;
3771 register struct rx_serverQueueEntry *sq;
3772 register struct rx_service *service = call->conn->service;
3773 #ifdef RX_ENABLE_LOCKS
3774 register int haveQuota = 0;
3775 #endif /* RX_ENABLE_LOCKS */
3776 /* May already be attached */
3777 if (call->state == RX_STATE_ACTIVE) return;
3779 MUTEX_ENTER(&rx_serverPool_lock);
3780 #ifdef RX_ENABLE_LOCKS
3781 while(rxi_ServerThreadSelectingCall) {
3782 MUTEX_EXIT(&call->lock);
3783 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3784 MUTEX_EXIT(&rx_serverPool_lock);
3785 MUTEX_ENTER(&call->lock);
3786 MUTEX_ENTER(&rx_serverPool_lock);
3787 /* Call may have been attached */
3788 if (call->state == RX_STATE_ACTIVE) return;
3791 haveQuota = QuotaOK(service);
3792 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3793 /* If there are no processes available to service this call,
3794 * put the call on the incoming call queue (unless it's
3795 * already on the queue).
3798 ReturnToServerPool(service);
3799 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3800 call->flags |= RX_CALL_WAIT_PROC;
3801 MUTEX_ENTER(&rx_stats_mutex);
3803 MUTEX_EXIT(&rx_stats_mutex);
3804 rxi_calltrace(RX_CALL_ARRIVAL, call);
3805 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3806 queue_Append(&rx_incomingCallQueue, call);
3809 #else /* RX_ENABLE_LOCKS */
3810 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3811 /* If there are no processes available to service this call,
3812 * put the call on the incoming call queue (unless it's
3813 * already on the queue).
3815 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3816 call->flags |= RX_CALL_WAIT_PROC;
3818 rxi_calltrace(RX_CALL_ARRIVAL, call);
3819 queue_Append(&rx_incomingCallQueue, call);
3822 #endif /* RX_ENABLE_LOCKS */
3824 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3826 /* If hot threads are enabled, and both newcallp and sq->socketp
3827 * are non-null, then this thread will process the call, and the
3828 * idle server thread will start listening on this threads socket.
3831 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3834 *sq->socketp = socket;
3835 clock_GetTime(&call->startTime);
3836 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3840 if (call->flags & RX_CALL_WAIT_PROC) {
3841 /* Conservative: I don't think this should happen */
3842 call->flags &= ~RX_CALL_WAIT_PROC;
3843 MUTEX_ENTER(&rx_stats_mutex);
3845 MUTEX_EXIT(&rx_stats_mutex);
3848 call->state = RX_STATE_ACTIVE;
3849 call->mode = RX_MODE_RECEIVING;
3850 if (call->flags & RX_CALL_CLEARED) {
3851 /* send an ack now to start the packet flow up again */
3852 call->flags &= ~RX_CALL_CLEARED;
3853 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3855 #ifdef RX_ENABLE_LOCKS
3858 service->nRequestsRunning++;
3859 if (service->nRequestsRunning <= service->minProcs)
3865 MUTEX_EXIT(&rx_serverPool_lock);
3868 /* Delay the sending of an acknowledge event for a short while, while
3869 * a new call is being prepared (in the case of a client) or a reply
3870 * is being prepared (in the case of a server). Rather than sending
3871 * an ack packet, an ACKALL packet is sent. */
3872 void rxi_AckAll(event, call, dummy)
3873 struct rxevent *event;
3874 register struct rx_call *call;
3877 #ifdef RX_ENABLE_LOCKS
3879 MUTEX_ENTER(&call->lock);
3880 call->delayedAckEvent = (struct rxevent *) 0;
3881 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3883 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3884 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3886 MUTEX_EXIT(&call->lock);
3887 #else /* RX_ENABLE_LOCKS */
3888 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3889 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3890 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3891 #endif /* RX_ENABLE_LOCKS */
3894 void rxi_SendDelayedAck(event, call, dummy)
3895 struct rxevent *event;
3896 register struct rx_call *call;
3899 #ifdef RX_ENABLE_LOCKS
3901 MUTEX_ENTER(&call->lock);
3902 if (event == call->delayedAckEvent)
3903 call->delayedAckEvent = (struct rxevent *) 0;
3904 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3906 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3908 MUTEX_EXIT(&call->lock);
3909 #else /* RX_ENABLE_LOCKS */
3910 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3911 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3912 #endif /* RX_ENABLE_LOCKS */
3916 #ifdef RX_ENABLE_LOCKS
3917 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3918 * clearing them out.
3920 static void rxi_SetAcksInTransmitQueue(call)
3921 register struct rx_call *call;
3923 register struct rx_packet *p, *tp;
3926 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3933 call->flags |= RX_CALL_TQ_CLEARME;
3934 call->flags |= RX_CALL_TQ_SOME_ACKED;
3937 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3938 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3939 call->tfirst = call->tnext;
3940 call->nSoftAcked = 0;
3942 if (call->flags & RX_CALL_FAST_RECOVER) {
3943 call->flags &= ~RX_CALL_FAST_RECOVER;
3944 call->cwind = call->nextCwind;
3945 call->nextCwind = 0;
3948 CV_SIGNAL(&call->cv_twind);
3950 #endif /* RX_ENABLE_LOCKS */
3952 /* Clear out the transmit queue for the current call (all packets have
3953 * been received by peer) */
3954 void rxi_ClearTransmitQueue(call, force)
3955 register struct rx_call *call;
3958 register struct rx_packet *p, *tp;
3960 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3961 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3963 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3970 call->flags |= RX_CALL_TQ_CLEARME;
3971 call->flags |= RX_CALL_TQ_SOME_ACKED;
3974 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3975 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3981 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3982 call->flags &= ~RX_CALL_TQ_CLEARME;
3984 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3986 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3987 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3988 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3989 call->nSoftAcked = 0;
3991 if (call->flags & RX_CALL_FAST_RECOVER) {
3992 call->flags &= ~RX_CALL_FAST_RECOVER;
3993 call->cwind = call->nextCwind;
3996 #ifdef RX_ENABLE_LOCKS
3997 CV_SIGNAL(&call->cv_twind);
3999 osi_rxWakeup(&call->twind);
4003 void rxi_ClearReceiveQueue(call)
4004 register struct rx_call *call;
4006 register struct rx_packet *p, *tp;
4007 if (queue_IsNotEmpty(&call->rq)) {
4008 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4013 rx_packetReclaims++;
4015 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4017 if (call->state == RX_STATE_PRECALL) {
4018 call->flags |= RX_CALL_CLEARED;
4022 /* Send an abort packet for the specified call */
4023 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4024 register struct rx_call *call;
4025 struct rx_packet *packet;
4035 /* Clients should never delay abort messages */
4036 if (rx_IsClientConn(call->conn))
4039 if (call->abortCode != call->error) {
4040 call->abortCode = call->error;
4041 call->abortCount = 0;
4044 if (force || rxi_callAbortThreshhold == 0 ||
4045 call->abortCount < rxi_callAbortThreshhold) {
4046 if (call->delayedAbortEvent) {
4047 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4049 error = htonl(call->error);
4051 packet = rxi_SendSpecial(call, call->conn, packet,
4052 RX_PACKET_TYPE_ABORT, (char *)&error,
4053 sizeof(error), istack);
4054 } else if (!call->delayedAbortEvent) {
4055 clock_GetTime(&when);
4056 clock_Addmsec(&when, rxi_callAbortDelay);
4057 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4058 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4064 /* Send an abort packet for the specified connection. Packet is an
4065 * optional pointer to a packet that can be used to send the abort.
4066 * Once the number of abort messages reaches the threshhold, an
4067 * event is scheduled to send the abort. Setting the force flag
4068 * overrides sending delayed abort messages.
4070 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4071 * to send the abort packet.
4073 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4074 register struct rx_connection *conn;
4075 struct rx_packet *packet;
4085 /* Clients should never delay abort messages */
4086 if (rx_IsClientConn(conn))
4089 if (force || rxi_connAbortThreshhold == 0 ||
4090 conn->abortCount < rxi_connAbortThreshhold) {
4091 if (conn->delayedAbortEvent) {
4092 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4094 error = htonl(conn->error);
4096 MUTEX_EXIT(&conn->conn_data_lock);
4097 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4098 RX_PACKET_TYPE_ABORT, (char *)&error,
4099 sizeof(error), istack);
4100 MUTEX_ENTER(&conn->conn_data_lock);
4101 } else if (!conn->delayedAbortEvent) {
4102 clock_GetTime(&when);
4103 clock_Addmsec(&when, rxi_connAbortDelay);
4104 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4110 /* Associate an error all of the calls owned by a connection. Called
4111 * with error non-zero. This is only for really fatal things, like
4112 * bad authentication responses. The connection itself is set in
4113 * error at this point, so that future packets received will be
4115 void rxi_ConnectionError(conn, error)
4116 register struct rx_connection *conn;
4117 register afs_int32 error;
4121 if (conn->challengeEvent)
4122 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4123 for (i=0; i<RX_MAXCALLS; i++) {
4124 struct rx_call *call = conn->call[i];
4126 MUTEX_ENTER(&call->lock);
4127 rxi_CallError(call, error);
4128 MUTEX_EXIT(&call->lock);
4131 conn->error = error;
4132 MUTEX_ENTER(&rx_stats_mutex);
4133 rx_stats.fatalErrors++;
4134 MUTEX_EXIT(&rx_stats_mutex);
4138 void rxi_CallError(call, error)
4139 register struct rx_call *call;
4142 if (call->error) error = call->error;
4143 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4144 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4145 rxi_ResetCall(call, 0);
4148 rxi_ResetCall(call, 0);
4150 call->error = error;
4151 call->mode = RX_MODE_ERROR;
4154 /* Reset various fields in a call structure, and wakeup waiting
4155 * processes. Some fields aren't changed: state & mode are not
4156 * touched (these must be set by the caller), and bufptr, nLeft, and
4157 * nFree are not reset, since these fields are manipulated by
4158 * unprotected macros, and may only be reset by non-interrupting code.
4161 /* this code requires that call->conn be set properly as a pre-condition. */
4162 #endif /* ADAPT_WINDOW */
4164 void rxi_ResetCall(call, newcall)
4165 register struct rx_call *call;
4166 register int newcall;
4169 register struct rx_peer *peer;
4170 struct rx_packet *packet;
4172 /* Notify anyone who is waiting for asynchronous packet arrival */
4173 if (call->arrivalProc) {
4174 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4175 call->arrivalProc = (VOID (*)()) 0;
4178 if (call->delayedAbortEvent) {
4179 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4180 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4182 rxi_SendCallAbort(call, packet, 0, 1);
4183 rxi_FreePacket(packet);
4188 * Update the peer with the congestion information in this call
4189 * so other calls on this connection can pick up where this call
4190 * left off. If the congestion sequence numbers don't match then
4191 * another call experienced a retransmission.
4193 peer = call->conn->peer;
4194 MUTEX_ENTER(&peer->peer_lock);
4196 if (call->congestSeq == peer->congestSeq) {
4197 peer->cwind = MAX(peer->cwind, call->cwind);
4198 peer->MTU = MAX(peer->MTU, call->MTU);
4199 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4202 call->abortCode = 0;
4203 call->abortCount = 0;
4205 if (peer->maxDgramPackets > 1) {
4206 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4208 call->MTU = peer->MTU;
4210 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4211 call->ssthresh = rx_maxSendWindow;
4212 call->nDgramPackets = peer->nDgramPackets;
4213 call->congestSeq = peer->congestSeq;
4214 MUTEX_EXIT(&peer->peer_lock);
4216 flags = call->flags;
4217 rxi_ClearReceiveQueue(call);
4218 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4219 if (call->flags & RX_CALL_TQ_BUSY) {
4220 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4221 call->flags |= (flags & RX_CALL_TQ_WAIT);
4223 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4225 rxi_ClearTransmitQueue(call, 0);
4226 queue_Init(&call->tq);
4229 queue_Init(&call->rq);
4231 call->rwind = rx_initReceiveWindow;
4232 call->twind = rx_initSendWindow;
4233 call->nSoftAcked = 0;
4234 call->nextCwind = 0;
4237 call->nCwindAcks = 0;
4238 call->nSoftAcks = 0;
4239 call->nHardAcks = 0;
4241 call->tfirst = call->rnext = call->tnext = 1;
4243 call->lastAcked = 0;
4244 call->localStatus = call->remoteStatus = 0;
4246 if (flags & RX_CALL_READER_WAIT) {
4247 #ifdef RX_ENABLE_LOCKS
4248 CV_BROADCAST(&call->cv_rq);
4250 osi_rxWakeup(&call->rq);
4253 if (flags & RX_CALL_WAIT_PACKETS) {
4254 MUTEX_ENTER(&rx_freePktQ_lock);
4255 rxi_PacketsUnWait(); /* XXX */
4256 MUTEX_EXIT(&rx_freePktQ_lock);
4259 #ifdef RX_ENABLE_LOCKS
4260 CV_SIGNAL(&call->cv_twind);
4262 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4263 osi_rxWakeup(&call->twind);
4266 #ifdef RX_ENABLE_LOCKS
4267 /* The following ensures that we don't mess with any queue while some
4268 * other thread might also be doing so. The call_queue_lock field is
4269 * is only modified under the call lock. If the call is in the process
4270 * of being removed from a queue, the call is not locked until the
4271 * the queue lock is dropped and only then is the call_queue_lock field
4272 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4273 * Note that any other routine which removes a call from a queue has to
4274 * obtain the queue lock before examing the queue and removing the call.
4276 if (call->call_queue_lock) {
4277 MUTEX_ENTER(call->call_queue_lock);
4278 if (queue_IsOnQueue(call)) {
4280 if (flags & RX_CALL_WAIT_PROC) {
4281 MUTEX_ENTER(&rx_stats_mutex);
4283 MUTEX_EXIT(&rx_stats_mutex);
4286 MUTEX_EXIT(call->call_queue_lock);
4287 CLEAR_CALL_QUEUE_LOCK(call);
4289 #else /* RX_ENABLE_LOCKS */
4290 if (queue_IsOnQueue(call)) {
4292 if (flags & RX_CALL_WAIT_PROC)
4295 #endif /* RX_ENABLE_LOCKS */
4297 rxi_KeepAliveOff(call);
4298 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4301 /* Send an acknowledge for the indicated packet (seq,serial) of the
4302 * indicated call, for the indicated reason (reason). This
4303 * acknowledge will specifically acknowledge receiving the packet, and
4304 * will also specify which other packets for this call have been
4305 * received. This routine returns the packet that was used to the
4306 * caller. The caller is responsible for freeing it or re-using it.
4307 * This acknowledgement also returns the highest sequence number
4308 * actually read out by the higher level to the sender; the sender
4309 * promises to keep around packets that have not been read by the
4310 * higher level yet (unless, of course, the sender decides to abort
4311 * the call altogether). Any of p, seq, serial, pflags, or reason may
4312 * be set to zero without ill effect. That is, if they are zero, they
4313 * will not convey any information.
4314 * NOW there is a trailer field, after the ack where it will safely be
4315 * ignored by mundanes, which indicates the maximum size packet this
4316 * host can swallow. */
4317 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4318 register struct rx_call *call;
4319 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4320 int seq; /* Sequence number of the packet we are acking */
4321 int serial; /* Serial number of the packet */
4322 int pflags; /* Flags field from packet header */
4323 int reason; /* Reason an acknowledge was prompted */
4326 struct rx_ackPacket *ap;
4327 register struct rx_packet *rqp;
4328 register struct rx_packet *nxp; /* For queue_Scan */
4329 register struct rx_packet *p;
4334 * Open the receive window once a thread starts reading packets
4336 if (call->rnext > 1) {
4337 call->rwind = rx_maxReceiveWindow;
4340 call->nHardAcks = 0;
4341 call->nSoftAcks = 0;
4342 if (call->rnext > call->lastAcked)
4343 call->lastAcked = call->rnext;
4347 rx_computelen(p, p->length); /* reset length, you never know */
4348 } /* where that's been... */
4350 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4351 /* We won't send the ack, but don't panic. */
4352 return optionalPacket;
4355 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4357 if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4358 if (!optionalPacket) rxi_FreePacket(p);
4359 return optionalPacket;
4361 templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32);
4362 if (rx_Contiguous(p)<templ) {
4363 if (!optionalPacket) rxi_FreePacket(p);
4364 return optionalPacket;
4366 } /* MTUXXX failing to send an ack is very serious. We should */
4367 /* try as hard as possible to send even a partial ack; it's */
4368 /* better than nothing. */
4370 ap = (struct rx_ackPacket *) rx_DataOf(p);
4371 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4372 ap->reason = reason;
4374 /* The skew computation used to be bogus, I think it's better now. */
4375 /* We should start paying attention to skew. XXX */
4376 ap->serial = htonl(call->conn->maxSerial);
4377 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
4379 ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4380 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4382 /* No fear of running out of ack packet here because there can only be at most
4383 * one window full of unacknowledged packets. The window size must be constrained
4384 * to be less than the maximum ack size, of course. Also, an ack should always
4385 * fit into a single packet -- it should not ever be fragmented. */
4386 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4387 if (!rqp || !call->rq.next
4388 || (rqp->header.seq > (call->rnext + call->rwind))) {
4389 if (!optionalPacket) rxi_FreePacket(p);
4390 rxi_CallError(call, RX_CALL_DEAD);
4391 return optionalPacket;
4394 while (rqp->header.seq > call->rnext + offset)
4395 ap->acks[offset++] = RX_ACK_TYPE_NACK;
4396 ap->acks[offset++] = RX_ACK_TYPE_ACK;
4398 if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4399 if (!optionalPacket) rxi_FreePacket(p);
4400 rxi_CallError(call, RX_CALL_DEAD);
4401 return optionalPacket;
4406 p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4408 /* these are new for AFS 3.3 */
4409 templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4410 templ = htonl(templ);
4411 rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4412 templ = htonl(call->conn->peer->ifMTU);
4413 rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4415 /* new for AFS 3.4 */
4416 templ = htonl(call->rwind);
4417 rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4419 /* new for AFS 3.5 */
4420 templ = htonl(call->conn->peer->ifDgramPackets);
4421 rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4423 p->header.serviceId = call->conn->serviceId;
4424 p->header.cid = (call->conn->cid | call->channel);
4425 p->header.callNumber = *call->callNumber;
4426 p->header.seq = seq;
4427 p->header.securityIndex = call->conn->securityIndex;
4428 p->header.epoch = call->conn->epoch;
4429 p->header.type = RX_PACKET_TYPE_ACK;
4430 p->header.flags = RX_SLOW_START_OK;
4431 if (reason == RX_ACK_PING) {
4432 p->header.flags |= RX_REQUEST_ACK;
4434 clock_GetTime(&call->pingRequestTime);
4437 if (call->conn->type == RX_CLIENT_CONNECTION)
4438 p->header.flags |= RX_CLIENT_INITIATED;
4442 fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
4443 ap->reason, ntohl(ap->previousPacket), p->header.seq,
4444 ntohl(ap->firstPacket));
4446 for (offset = 0; offset < ap->nAcks; offset++)
4447 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
4454 register int i, nbytes = p->length;
4456 for (i=1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
4457 if (nbytes <= p->wirevec[i].iov_len) {
4458 register int savelen, saven;
4460 savelen = p->wirevec[i].iov_len;
4462 p->wirevec[i].iov_len = nbytes;
4464 rxi_Send(call, p, istack);
4465 p->wirevec[i].iov_len = savelen;
4469 else nbytes -= p->wirevec[i].iov_len;
4472 MUTEX_ENTER(&rx_stats_mutex);
4473 rx_stats.ackPacketsSent++;
4474 MUTEX_EXIT(&rx_stats_mutex);
4475 if (!optionalPacket) rxi_FreePacket(p);
4476 return optionalPacket; /* Return packet for re-use by caller */
4479 /* Send all of the packets in the list in single datagram */
4480 static void rxi_SendList(call, list, len, istack, moreFlag, now, retryTime, resending)
4481 struct rx_call *call;
4482 struct rx_packet **list;
4487 struct clock *retryTime;
4493 struct rx_connection *conn = call->conn;
4494 struct rx_peer *peer = conn->peer;
4496 MUTEX_ENTER(&peer->peer_lock);
4498 if (resending) peer->reSends += len;
4499 MUTEX_ENTER(&rx_stats_mutex);
4500 rx_stats.dataPacketsSent += len;
4501 MUTEX_EXIT(&rx_stats_mutex);
4502 MUTEX_EXIT(&peer->peer_lock);
4504 if (list[len-1]->header.flags & RX_LAST_PACKET) {
4508 /* Set the packet flags and schedule the resend events */
4509 /* Only request an ack for the last packet in the list */
4510 for (i = 0 ; i < len ; i++) {
4511 list[i]->retryTime = *retryTime;
4512 if (list[i]->header.serial) {
4513 /* Exponentially backoff retry times */
4514 if (list[i]->backoff < MAXBACKOFF) {
4515 /* so it can't stay == 0 */
4516 list[i]->backoff = (list[i]->backoff << 1) +1;
4518 else list[i]->backoff++;
4519 clock_Addmsec(&(list[i]->retryTime),
4520 ((afs_uint32) list[i]->backoff) << 8);
4523 /* Wait a little extra for the ack on the last packet */
4524 if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
4525 clock_Addmsec(&(list[i]->retryTime), 400);
4528 /* Record the time sent */
4529 list[i]->timeSent = *now;
4531 /* Ask for an ack on retransmitted packets, on every other packet
4532 * if the peer doesn't support slow start. Ask for an ack on every
4533 * packet until the congestion window reaches the ack rate. */
4534 if (list[i]->header.serial) {
4536 MUTEX_ENTER(&rx_stats_mutex);
4537 rx_stats.dataPacketsReSent++;
4538 MUTEX_EXIT(&rx_stats_mutex);
4540 /* improved RTO calculation- not Karn */
4541 list[i]->firstSent = *now;
4543 && (call->cwind <= (u_short)(conn->ackRate+1)
4544 || (!(call->flags & RX_CALL_SLOW_START_OK)
4545 && (list[i]->header.seq & 1)))) {
4550 MUTEX_ENTER(&peer->peer_lock);
4552 if (resending) peer->reSends++;
4553 MUTEX_ENTER(&rx_stats_mutex);
4554 rx_stats.dataPacketsSent++;
4555 MUTEX_EXIT(&rx_stats_mutex);
4556 MUTEX_EXIT(&peer->peer_lock);
4558 /* Tag this packet as not being the last in this group,
4559 * for the receiver's benefit */
4560 if (i < len-1 || moreFlag) {
4561 list[i]->header.flags |= RX_MORE_PACKETS;
4564 /* Install the new retransmit time for the packet, and
4565 * record the time sent */
4566 list[i]->timeSent = *now;
4570 list[len-1]->header.flags |= RX_REQUEST_ACK;
4573 /* Since we're about to send a data packet to the peer, it's
4574 * safe to nuke any scheduled end-of-packets ack */
4575 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4577 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
4578 MUTEX_EXIT(&call->lock);
4580 rxi_SendPacketList(conn, list, len, istack);
4582 rxi_SendPacket(conn, list[0], istack);
4584 MUTEX_ENTER(&call->lock);
4585 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
4587 /* Update last send time for this call (for keep-alive
4588 * processing), and for the connection (so that we can discover
4589 * idle connections) */
4590 conn->lastSendTime = call->lastSendTime = clock_Sec();
4593 /* When sending packets we need to follow these rules:
4594 * 1. Never send more than maxDgramPackets in a jumbogram.
4595 * 2. Never send a packet with more than two iovecs in a jumbogram.
4596 * 3. Never send a retransmitted packet in a jumbogram.
4597 * 4. Never send more than cwind/4 packets in a jumbogram
4598 * We always keep the last list we should have sent so we
4599 * can set the RX_MORE_PACKETS flags correctly.
4601 static void rxi_SendXmitList(call, list, len, istack, now, retryTime, resending)
4602 struct rx_call *call;
4603 struct rx_packet **list;
4607 struct clock *retryTime;
4610 int i, cnt, lastCnt = 0;
4611 struct rx_packet **listP, **lastP = 0;
4612 struct rx_peer *peer = call->conn->peer;
4613 int morePackets = 0;
4615 for (cnt = 0, listP = &list[0], i = 0 ; i < len ; i++) {
4616 /* Does the current packet force us to flush the current list? */
4618 && (list[i]->header.serial
4620 || list[i]->length > RX_JUMBOBUFFERSIZE)) {
4622 rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
4623 /* If the call enters an error state stop sending, or if
4624 * we entered congestion recovery mode, stop sending */
4625 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4633 /* Add the current packet to the list if it hasn't been acked.
4634 * Otherwise adjust the list pointer to skip the current packet. */
4635 if (!list[i]->acked) {
4637 /* Do we need to flush the list? */
4638 if (cnt >= (int)peer->maxDgramPackets
4639 || cnt >= (int)call->nDgramPackets
4640 || cnt >= (int)call->cwind
4641 || list[i]->header.serial
4642 || list[i]->length != RX_JUMBOBUFFERSIZE) {
4644 rxi_SendList(call, lastP, lastCnt, istack, 1,
4645 now, retryTime, resending);
4646 /* If the call enters an error state stop sending, or if
4647 * we entered congestion recovery mode, stop sending */
4648 if (call->error || (call->flags&RX_CALL_FAST_RECOVER_WAIT))
4658 osi_Panic("rxi_SendList error");
4664 /* Send the whole list when the call is in receive mode, when
4665 * the call is in eof mode, when we are in fast recovery mode,
4666 * and when we have the last packet */
4667 if ((list[len-1]->header.flags & RX_LAST_PACKET)
4668 || call->mode == RX_MODE_RECEIVING
4669 || call->mode == RX_MODE_EOF
4670 || (call->flags & RX_CALL_FAST_RECOVER)) {
4671 /* Check for the case where the current list contains
4672 * an acked packet. Since we always send retransmissions
4673 * in a separate packet, we only need to check the first
4674 * packet in the list */
4675 if (cnt > 0 && !listP[0]->acked) {
4679 rxi_SendList(call, lastP, lastCnt, istack, morePackets,
4680 now, retryTime, resending);
4681 /* If the call enters an error state stop sending, or if
4682 * we entered congestion recovery mode, stop sending */
4683 if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
4687 rxi_SendList(call, listP, cnt, istack, 0, now, retryTime, resending);
4689 } else if (lastCnt > 0) {
4690 rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime, resending);
4694 #ifdef RX_ENABLE_LOCKS
4695 /* Call rxi_Start, below, but with the call lock held. */
4696 void rxi_StartUnlocked(event, call, istack)
4697 struct rxevent *event;
4698 register struct rx_call *call;
4701 MUTEX_ENTER(&call->lock);
4702 rxi_Start(event, call, istack);
4703 MUTEX_EXIT(&call->lock);
4705 #endif /* RX_ENABLE_LOCKS */
4707 /* This routine is called when new packets are readied for
4708 * transmission and when retransmission may be necessary, or when the
4709 * transmission window or burst count are favourable. This should be
4710 * better optimized for new packets, the usual case, now that we've
4711 * got rid of queues of send packets. XXXXXXXXXXX */
4712 void rxi_Start(event, call, istack)
4713 struct rxevent *event;
4714 register struct rx_call *call;
4717 struct rx_packet *p;
4718 register struct rx_packet *nxp; /* Next pointer for queue_Scan */
4719 struct rx_peer *peer = call->conn->peer;
4720 struct clock now, retryTime;
4724 struct rx_packet **xmitList;
4727 /* If rxi_Start is being called as a result of a resend event,
4728 * then make sure that the event pointer is removed from the call
4729 * structure, since there is no longer a per-call retransmission
4731 if (event && event == call->resendEvent) {
4732 CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
4733 call->resendEvent = NULL;
4735 if (queue_IsEmpty(&call->tq)) {
4739 /* Timeouts trigger congestion recovery */
4740 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4741 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4742 /* someone else is waiting to start recovery */
4745 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
4746 while (call->flags & RX_CALL_TQ_BUSY) {
4747 call->flags |= RX_CALL_TQ_WAIT;
4748 #ifdef RX_ENABLE_LOCKS
4749 CV_WAIT(&call->cv_tq, &call->lock);
4750 #else /* RX_ENABLE_LOCKS */
4751 osi_rxSleep(&call->tq);
4752 #endif /* RX_ENABLE_LOCKS */
4754 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4755 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
4756 call->flags |= RX_CALL_FAST_RECOVER;
4757 if (peer->maxDgramPackets > 1) {
4758 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
4760 call->MTU = MIN(peer->natMTU, peer->maxMTU);
4762 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
4763 call->nDgramPackets = 1;
4765 call->nextCwind = 1;
4768 MUTEX_ENTER(&peer->peer_lock);
4769 peer->MTU = call->MTU;
4770 peer->cwind = call->cwind;
4771 peer->nDgramPackets = 1;
4773 call->congestSeq = peer->congestSeq;
4774 MUTEX_EXIT(&peer->peer_lock);
4775 /* Clear retry times on packets. Otherwise, it's possible for
4776 * some packets in the queue to force resends at rates faster
4777 * than recovery rates.
4779 for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
4781 clock_Zero(&p->retryTime);
4786 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4787 MUTEX_ENTER(&rx_stats_mutex);
4788 rx_tq_debug.rxi_start_in_error ++;
4789 MUTEX_EXIT(&rx_stats_mutex);
4794 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
4795 /* Get clock to compute the re-transmit time for any packets
4796 * in this burst. Note, if we back off, it's reasonable to
4797 * back off all of the packets in the same manner, even if
4798 * some of them have been retransmitted more times than more
4799 * recent additions */
4800 clock_GetTime(&now);
4801 retryTime = now; /* initialize before use */
4802 MUTEX_ENTER(&peer->peer_lock);
4803 clock_Add(&retryTime, &peer->timeout);
4804 MUTEX_EXIT(&peer->peer_lock);
4806 /* Send (or resend) any packets that need it, subject to
4807 * window restrictions and congestion burst control
4808 * restrictions. Ask for an ack on the last packet sent in
4809 * this burst. For now, we're relying upon the window being
4810 * considerably bigger than the largest number of packets that
4811 * are typically sent at once by one initial call to
4812 * rxi_Start. This is probably bogus (perhaps we should ask
4813 * for an ack when we're half way through the current
4814 * window?). Also, for non file transfer applications, this
4815 * may end up asking for an ack for every packet. Bogus. XXXX
4818 * But check whether we're here recursively, and let the other guy
4821 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4822 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4823 call->flags |= RX_CALL_TQ_BUSY;
4825 call->flags &= ~RX_CALL_NEED_START;
4826 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4828 maxXmitPackets = MIN(call->twind, call->cwind);
4829 xmitList = (struct rx_packet **)
4830 osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
4831 if (xmitList == NULL)
4832 osi_Panic("rxi_Start, failed to allocate xmit list");
4833 for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
4834 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4835 /* We shouldn't be sending packets if a thread is waiting
4836 * to initiate congestion recovery */
4839 if ((nXmitPackets) && (call->flags & RX_CALL_FAST_RECOVER)) {
4840 /* Only send one packet during fast recovery */
4843 if ((p->header.flags == RX_FREE_PACKET) ||
4844 (!queue_IsEnd(&call->tq, nxp)
4845 && (nxp->header.flags == RX_FREE_PACKET)) ||
4846 (p == (struct rx_packet *)&rx_freePacketQueue) ||
4847 (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
4848 osi_Panic("rxi_Start: xmit queue clobbered");
4851 MUTEX_ENTER(&rx_stats_mutex);
4852 rx_stats.ignoreAckedPacket++;
4853 MUTEX_EXIT(&rx_stats_mutex);
4854 continue; /* Ignore this packet if it has been acknowledged */
4857 /* Turn off all flags except these ones, which are the same
4858 * on each transmission */
4859 p->header.flags &= RX_PRESET_FLAGS;
4861 if (p->header.seq >= call->tfirst +
4862 MIN((int)call->twind, (int)(call->nSoftAcked+call->cwind))) {
4863 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /* Wait for transmit window */
4864 /* Note: if we're waiting for more window space, we can
4865 * still send retransmits; hence we don't return here, but
4866 * break out to schedule a retransmit event */
4867 dpf(("call %d waiting for window", *(call->callNumber)));
4871 /* Transmit the packet if it needs to be sent. */
4872 if (!clock_Lt(&now, &p->retryTime)) {
4873 if (nXmitPackets == maxXmitPackets) {
4874 osi_Panic("rxi_Start: xmit list overflowed");
4876 xmitList[nXmitPackets++] = p;
4880 /* xmitList now hold pointers to all of the packets that are
4881 * ready to send. Now we loop to send the packets */
4882 if (nXmitPackets > 0) {
4883 rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
4884 &now, &retryTime, resending);
4886 osi_Free(xmitList, maxXmitPackets * sizeof(struct rx_packet *));
4888 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4890 * TQ references no longer protected by this flag; they must remain
4891 * protected by the global lock.
4893 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
4894 call->flags &= ~RX_CALL_TQ_BUSY;
4895 if (call->flags & RX_CALL_TQ_WAIT) {
4896 call->flags &= ~RX_CALL_TQ_WAIT;
4897 #ifdef RX_ENABLE_LOCKS
4898 CV_BROADCAST(&call->cv_tq);
4899 #else /* RX_ENABLE_LOCKS */
4900 osi_rxWakeup(&call->tq);
4901 #endif /* RX_ENABLE_LOCKS */
4906 /* We went into the error state while sending packets. Now is
4907 * the time to reset the call. This will also inform the using
4908 * process that the call is in an error state.
4910 MUTEX_ENTER(&rx_stats_mutex);
4911 rx_tq_debug.rxi_start_aborted ++;
4912 MUTEX_EXIT(&rx_stats_mutex);
4913 call->flags &= ~RX_CALL_TQ_BUSY;
4914 if (call->flags & RX_CALL_TQ_WAIT) {
4915 call->flags &= ~RX_CALL_TQ_WAIT;
4916 #ifdef RX_ENABLE_LOCKS
4917 CV_BROADCAST(&call->cv_tq);
4918 #else /* RX_ENABLE_LOCKS */
4919 osi_rxWakeup(&call->tq);
4920 #endif /* RX_ENABLE_LOCKS */
4922 rxi_CallError(call, call->error);
4925 #ifdef RX_ENABLE_LOCKS
4926 if (call->flags & RX_CALL_TQ_SOME_ACKED) {
4927 register int missing;
4928 call->flags &= ~RX_CALL_TQ_SOME_ACKED;
4929 /* Some packets have received acks. If they all have, we can clear
4930 * the transmit queue.
4932 for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4933 if (p->header.seq < call->tfirst && p->acked) {
4941 call->flags |= RX_CALL_TQ_CLEARME;
4943 #endif /* RX_ENABLE_LOCKS */
4944 /* Don't bother doing retransmits if the TQ is cleared. */
4945 if (call->flags & RX_CALL_TQ_CLEARME) {
4946 rxi_ClearTransmitQueue(call, 1);
4948 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4951 /* Always post a resend event, if there is anything in the
4952 * queue, and resend is possible. There should be at least
4953 * one unacknowledged packet in the queue ... otherwise none
4954 * of these packets should be on the queue in the first place.
4956 if (call->resendEvent) {
4957 /* Cancel the existing event and post a new one */
4958 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4961 /* The retry time is the retry time on the first unacknowledged
4962 * packet inside the current window */
4963 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
4964 /* Don't set timers for packets outside the window */
4965 if (p->header.seq >= call->tfirst + call->twind) {
4969 if (!p->acked && !clock_IsZero(&p->retryTime)) {
4971 retryTime = p->retryTime;
4976 /* Post a new event to re-run rxi_Start when retries may be needed */
4977 if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
4978 #ifdef RX_ENABLE_LOCKS
4979 CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
4980 call->resendEvent = rxevent_Post(&retryTime,
4982 (char *)call, istack);
4983 #else /* RX_ENABLE_LOCKS */
4984 call->resendEvent = rxevent_Post(&retryTime, rxi_Start,
4985 (char *)call, (void*)(long)istack);
4986 #endif /* RX_ENABLE_LOCKS */
4989 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4990 } while (call->flags & RX_CALL_NEED_START);
4992 * TQ references no longer protected by this flag; they must remain
4993 * protected by the global lock.
4995 call->flags &= ~RX_CALL_TQ_BUSY;
4996 if (call->flags & RX_CALL_TQ_WAIT) {
4997 call->flags &= ~RX_CALL_TQ_WAIT;
4998 #ifdef RX_ENABLE_LOCKS
4999 CV_BROADCAST(&call->cv_tq);
5000 #else /* RX_ENABLE_LOCKS */
5001 osi_rxWakeup(&call->tq);
5002 #endif /* RX_ENABLE_LOCKS */
5005 call->flags |= RX_CALL_NEED_START;
5007 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
5009 if (call->resendEvent) {
5010 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5015 /* Also adjusts the keep alive parameters for the call, to reflect
5016 * that we have just sent a packet (so keep alives aren't sent
5018 void rxi_Send(call, p, istack)
5019 register struct rx_call *call;
5020 register struct rx_packet *p;
5023 register struct rx_connection *conn = call->conn;
5025 /* Stamp each packet with the user supplied status */
5026 p->header.userStatus = call->localStatus;
5028 /* Allow the security object controlling this call's security to
5029 * make any last-minute changes to the packet */
5030 RXS_SendPacket(conn->securityObject, call, p);
5032 /* Since we're about to send SOME sort of packet to the peer, it's
5033 * safe to nuke any scheduled end-of-packets ack */
5034 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
5036 /* Actually send the packet, filling in more connection-specific fields */
5037 CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
5038 MUTEX_EXIT(&call->lock);
5039 rxi_SendPacket(conn, p, istack);
5040 MUTEX_ENTER(&call->lock);
5041 CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
5043 /* Update last send time for this call (for keep-alive
5044 * processing), and for the connection (so that we can discover
5045 * idle connections) */
5046 conn->lastSendTime = call->lastSendTime = clock_Sec();
5050 /* Check if a call needs to be destroyed. Called by keep-alive code to ensure
5051 * that things are fine. Also called periodically to guarantee that nothing
5052 * falls through the cracks (e.g. (error + dally) connections have keepalive
5053 * turned off. Returns 0 if conn is well, -1 otherwise. If otherwise, call
5056 #ifdef RX_ENABLE_LOCKS
5057 int rxi_CheckCall(call, haveCTLock)
5058 int haveCTLock; /* Set if calling from rxi_ReapConnections */
5059 #else /* RX_ENABLE_LOCKS */
5060 int rxi_CheckCall(call)
5061 #endif /* RX_ENABLE_LOCKS */
5062 register struct rx_call *call;
5064 register struct rx_connection *conn = call->conn;
5065 register struct rx_service *tservice;
5067 afs_uint32 deadTime;
5069 #ifdef RX_GLOBAL_RXLOCK_KERNEL
5070 if (call->flags & RX_CALL_TQ_BUSY) {
5071 /* Call is active and will be reset by rxi_Start if it's
5072 * in an error state.
5077 /* dead time + RTT + 8*MDEV, rounded up to next second. */
5078 deadTime = (((afs_uint32)conn->secondsUntilDead << 10) +
5079 ((afs_uint32)conn->peer->rtt >> 3) +
5080 ((afs_uint32)conn->peer->rtt_dev << 1) + 1023) >> 10;
5082 /* These are computed to the second (+- 1 second). But that's
5083 * good enough for these values, which should be a significant
5084 * number of seconds. */
5085 if (now > (call->lastReceiveTime + deadTime)) {
5086 if (call->state == RX_STATE_ACTIVE) {
5087 rxi_CallError(call, RX_CALL_DEAD);
5091 #ifdef RX_ENABLE_LOCKS
5092 /* Cancel pending events */
5093 rxevent_Cancel(call->delayedAckEvent, call,
5094 RX_CALL_REFCOUNT_DELAY);
5095 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
5096 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
5097 if (call->refCount == 0) {
5098 rxi_FreeCall(call, haveCTLock);
5102 #else /* RX_ENABLE_LOCKS */
5105 #endif /* RX_ENABLE_LOCKS */
5107 /* Non-active calls are destroyed if they are not responding
5108 * to pings; active calls are simply flagged in error, so the
5109 * attached process can die reasonably gracefully. */
5111 /* see if we have a non-activity timeout */
5112 tservice = conn->service;
5113 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
5114 && tservice->idleDeadTime
5115 && ((call->startWait + tservice->idleDeadTime) < now)) {
5116 if (call->state == RX_STATE_ACTIVE) {
5117 rxi_CallError(call, RX_CALL_TIMEOUT);
5121 /* see if we have a hard timeout */
5122 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime.sec))) {
5123 if (call->state == RX_STATE_ACTIVE)
5124 rxi_CallError(call, RX_CALL_TIMEOUT);
5131 /* When a call is in progress, this routine is called occasionally to
5132 * make sure that some traffic has arrived (or been sent to) the peer.
5133 * If nothing has arrived in a reasonable amount of time, the call is
5134 * declared dead; if nothing has been sent for a while, we send a
5135 * keep-alive packet (if we're actually trying to keep the call alive)
5137 void rxi_KeepAliveEvent(event, call, dummy)
5138 struct rxevent *event;
5139 register struct rx_call *call;
5141 struct rx_connection *conn;
5144 MUTEX_ENTER(&call->lock);
5145 CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
5146 if (event == call->keepAliveEvent)
5147 call->keepAliveEvent = (struct rxevent *) 0;
5150 #ifdef RX_ENABLE_LOCKS
5151 if(rxi_CheckCall(call, 0)) {
5152 MUTEX_EXIT(&call->lock);
5155 #else /* RX_ENABLE_LOCKS */
5156 if (rxi_CheckCall(call)) return;
5157 #endif /* RX_ENABLE_LOCKS */
5159 /* Don't try to keep alive dallying calls */
5160 if (call->state == RX_STATE_DALLY) {
5161 MUTEX_EXIT(&call->lock);
5166 if ((now - call->lastSendTime) > conn->secondsUntilPing) {
5167 /* Don't try to send keepalives if there is unacknowledged data */
5168 /* the rexmit code should be good enough, this little hack
5169 * doesn't quite work XXX */
5170 (void) rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
5172 rxi_ScheduleKeepAliveEvent(call);
5173 MUTEX_EXIT(&call->lock);
5177 void rxi_ScheduleKeepAliveEvent(call)
5178 register struct rx_call *call;
5180 if (!call->keepAliveEvent) {
5182 clock_GetTime(&when);
5183 when.sec += call->conn->secondsUntilPing;
5184 CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
5185 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
5189 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
5190 void rxi_KeepAliveOn(call)
5191 register struct rx_call *call;
5193 /* Pretend last packet received was received now--i.e. if another
5194 * packet isn't received within the keep alive time, then the call
5195 * will die; Initialize last send time to the current time--even
5196 * if a packet hasn't been sent yet. This will guarantee that a
5197 * keep-alive is sent within the ping time */
5198 call->lastReceiveTime = call->lastSendTime = clock_Sec();
5199 rxi_ScheduleKeepAliveEvent(call);
5202 /* This routine is called to send connection abort messages
5203 * that have been delayed to throttle looping clients. */
5204 void rxi_SendDelayedConnAbort(event, conn, dummy)
5205 struct rxevent *event;
5206 register struct rx_connection *conn;
5210 struct rx_packet *packet;
5212 MUTEX_ENTER(&conn->conn_data_lock);
5213 conn->delayedAbortEvent = (struct rxevent *) 0;
5214 error = htonl(conn->error);
5216 MUTEX_EXIT(&conn->conn_data_lock);
5217 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5219 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
5220 RX_PACKET_TYPE_ABORT, (char *)&error,
5222 rxi_FreePacket(packet);
5226 /* This routine is called to send call abort messages
5227 * that have been delayed to throttle looping clients. */
5228 void rxi_SendDelayedCallAbort(event, call, dummy)
5229 struct rxevent *event;
5230 register struct rx_call *call;
5234 struct rx_packet *packet;
5236 MUTEX_ENTER(&call->lock);
5237 call->delayedAbortEvent = (struct rxevent *) 0;
5238 error = htonl(call->error);
5240 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5242 packet = rxi_SendSpecial(call, call->conn, packet,
5243 RX_PACKET_TYPE_ABORT, (char *)&error,
5245 rxi_FreePacket(packet);
5247 MUTEX_EXIT(&call->lock);
5250 /* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
5251 * seconds) to ask the client to authenticate itself. The routine
5252 * issues a challenge to the client, which is obtained from the
5253 * security object associated with the connection */
5254 void rxi_ChallengeEvent(event, conn, dummy)
5255 struct rxevent *event;
5256 register struct rx_connection *conn;
5259 conn->challengeEvent = (struct rxevent *) 0;
5260 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
5261 register struct rx_packet *packet;
5263 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
5265 /* If there's no packet available, do this later. */
5266 RXS_GetChallenge(conn->securityObject, conn, packet);
5267 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
5268 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1, 0);
5269 rxi_FreePacket(packet);
5271 clock_GetTime(&when);
5272 when.sec += RX_CHALLENGE_TIMEOUT;
5273 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
5277 /* Call this routine to start requesting the client to authenticate
5278 * itself. This will continue until authentication is established,
5279 * the call times out, or an invalid response is returned. The
5280 * security object associated with the connection is asked to create
5281 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
5282 * defined earlier. */
5283 void rxi_ChallengeOn(conn)
5284 register struct rx_connection *conn;
5286 if (!conn->challengeEvent) {
5287 RXS_CreateChallenge(conn->securityObject, conn);
5288 rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
5293 /* Compute round trip time of the packet provided, in *rttp.
5296 /* rxi_ComputeRoundTripTime is called with peer locked. */
5297 void rxi_ComputeRoundTripTime(p, sentp, peer)
5298 register struct clock *sentp; /* may be null */
5299 register struct rx_peer *peer; /* may be null */
5300 register struct rx_packet *p;
5302 struct clock thisRtt, *rttp = &thisRtt;
5304 #if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5305 /* making year 2038 bugs to get this running now - stroucki */
5306 struct timeval temptime;
5308 register int rtt_timeout;
5309 static char id[]="@(#)adaptive RTO";
5311 #if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
5312 /* yet again. This was the worst Heisenbug of the port - stroucki */
5313 clock_GetTime(&temptime);
5314 rttp->sec=(afs_int32)temptime.tv_sec;
5315 rttp->usec=(afs_int32)temptime.tv_usec;
5317 clock_GetTime(rttp);
5319 if (clock_Lt(rttp, sentp)) {
5321 return; /* somebody set the clock back, don't count this time. */
5323 clock_Sub(rttp, sentp);
5324 MUTEX_ENTER(&rx_stats_mutex);
5325 if (clock_Lt(rttp, &rx_stats.minRtt)) rx_stats.minRtt = *rttp;
5326 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
5327 if (rttp->sec > 60) {
5328 MUTEX_EXIT(&rx_stats_mutex);
5329 return; /* somebody set the clock ahead */
5331 rx_stats.maxRtt = *rttp;
5333 clock_Add(&rx_stats.totalRtt, rttp);
5334 rx_stats.nRttSamples++;
5335 MUTEX_EXIT(&rx_stats_mutex);
5337 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
5339 /* Apply VanJacobson round-trip estimations */
5344 * srtt (peer->rtt) is in units of one-eighth-milliseconds.
5345 * srtt is stored as fixed point with 3 bits after the binary
5346 * point (i.e., scaled by 8). The following magic is
5347 * equivalent to the smoothing algorithm in rfc793 with an
5348 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
5349 * srtt*8 = srtt*8 + rtt - srtt
5350 * srtt = srtt + rtt/8 - srtt/8
5353 delta = MSEC(rttp) - (peer->rtt >> 3);
5357 * We accumulate a smoothed rtt variance (actually, a smoothed
5358 * mean difference), then set the retransmit timer to smoothed
5359 * rtt + 4 times the smoothed variance (was 2x in van's original
5360 * paper, but 4x works better for me, and apparently for him as
5362 * rttvar is stored as
5363 * fixed point with 2 bits after the binary point (scaled by
5364 * 4). The following is equivalent to rfc793 smoothing with
5365 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
5366 * replaces rfc793's wired-in beta.
5367 * dev*4 = dev*4 + (|actual - expected| - dev)
5373 delta -= (peer->rtt_dev >> 2);
5374 peer->rtt_dev += delta;
5377 /* I don't have a stored RTT so I start with this value. Since I'm
5378 * probably just starting a call, and will be pushing more data down
5379 * this, I expect congestion to increase rapidly. So I fudge a
5380 * little, and I set deviance to half the rtt. In practice,
5381 * deviance tends to approach something a little less than
5382 * half the smoothed rtt. */
5383 peer->rtt = (MSEC(rttp) << 3) + 8;
5384 peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
5386 /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
5387 * the other of these connections is usually in a user process, and can
5388 * be switched and/or swapped out. So on fast, reliable networks, the
5389 * timeout would otherwise be too short.
5391 rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
5392 clock_Zero(&(peer->timeout));
5393 clock_Addmsec(&(peer->timeout), rtt_timeout);
5395 dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
5396 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
5397 (peer->timeout.sec),(peer->timeout.usec)) );
5401 /* Find all server connections that have not been active for a long time, and
5403 void rxi_ReapConnections()
5406 clock_GetTime(&now);
5408 /* Find server connection structures that haven't been used for
5409 * greater than rx_idleConnectionTime */
5410 { struct rx_connection **conn_ptr, **conn_end;
5411 int i, havecalls = 0;
5412 MUTEX_ENTER(&rx_connHashTable_lock);
5413 for (conn_ptr = &rx_connHashTable[0],
5414 conn_end = &rx_connHashTable[rx_hashTableSize];
5415 conn_ptr < conn_end; conn_ptr++) {
5416 struct rx_connection *conn, *next;
5417 struct rx_call *call;
5421 for (conn = *conn_ptr; conn; conn = next) {
5422 /* XXX -- Shouldn't the connection be locked? */
5425 for(i=0;i<RX_MAXCALLS;i++) {
5426 call = conn->call[i];
5429 MUTEX_ENTER(&call->lock);
5430 #ifdef RX_ENABLE_LOCKS
5431 result = rxi_CheckCall(call, 1);
5432 #else /* RX_ENABLE_LOCKS */
5433 result = rxi_CheckCall(call);
5434 #endif /* RX_ENABLE_LOCKS */
5435 MUTEX_EXIT(&call->lock);
5437 /* If CheckCall freed the call, it might
5438 * have destroyed the connection as well,
5439 * which screws up the linked lists.
5445 if (conn->type == RX_SERVER_CONNECTION) {
5446 /* This only actually destroys the connection if
5447 * there are no outstanding calls */
5448 MUTEX_ENTER(&conn->conn_data_lock);
5449 if (!havecalls && !conn->refCount &&
5450 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
5451 conn->refCount++; /* it will be decr in rx_DestroyConn */
5452 MUTEX_EXIT(&conn->conn_data_lock);
5453 #ifdef RX_ENABLE_LOCKS
5454 rxi_DestroyConnectionNoLock(conn);
5455 #else /* RX_ENABLE_LOCKS */
5456 rxi_DestroyConnection(conn);
5457 #endif /* RX_ENABLE_LOCKS */
5459 #ifdef RX_ENABLE_LOCKS
5461 MUTEX_EXIT(&conn->conn_data_lock);
5463 #endif /* RX_ENABLE_LOCKS */
5467 #ifdef RX_ENABLE_LOCKS
5468 while (rx_connCleanup_list) {
5469 struct rx_connection *conn;
5470 conn = rx_connCleanup_list;
5471 rx_connCleanup_list = rx_connCleanup_list->next;
5472 MUTEX_EXIT(&rx_connHashTable_lock);
5473 rxi_CleanupConnection(conn);
5474 MUTEX_ENTER(&rx_connHashTable_lock);
5476 MUTEX_EXIT(&rx_connHashTable_lock);
5477 #endif /* RX_ENABLE_LOCKS */
5480 /* Find any peer structures that haven't been used (haven't had an
5481 * associated connection) for greater than rx_idlePeerTime */
5482 { struct rx_peer **peer_ptr, **peer_end;
5484 MUTEX_ENTER(&rx_rpc_stats);
5485 MUTEX_ENTER(&rx_peerHashTable_lock);
5486 for (peer_ptr = &rx_peerHashTable[0],
5487 peer_end = &rx_peerHashTable[rx_hashTableSize];
5488 peer_ptr < peer_end; peer_ptr++) {
5489 struct rx_peer *peer, *next, *prev;
5490 for (prev = peer = *peer_ptr; peer; peer = next) {
5492 code = MUTEX_TRYENTER(&peer->peer_lock);
5493 if ((code) && (peer->refCount == 0)
5494 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
5495 rx_interface_stat_p rpc_stat, nrpc_stat;
5497 MUTEX_EXIT(&peer->peer_lock);
5498 MUTEX_DESTROY(&peer->peer_lock);
5499 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
5500 rx_interface_stat)) {
5501 unsigned int num_funcs;
5502 if (!rpc_stat) break;
5503 queue_Remove(&rpc_stat->queue_header);
5504 queue_Remove(&rpc_stat->all_peers);
5505 num_funcs = rpc_stat->stats[0].func_total;
5506 space = sizeof(rx_interface_stat_t) +
5507 rpc_stat->stats[0].func_total *
5508 sizeof(rx_function_entry_v1_t);
5510 rxi_Free(rpc_stat, space);
5511 rxi_rpc_peer_stat_cnt -= num_funcs;
5514 MUTEX_ENTER(&rx_stats_mutex);
5515 rx_stats.nPeerStructs--;
5516 MUTEX_EXIT(&rx_stats_mutex);
5517 if (prev == *peer_ptr) {
5526 MUTEX_EXIT(&peer->peer_lock);
5532 MUTEX_EXIT(&rx_peerHashTable_lock);
5533 MUTEX_EXIT(&rx_rpc_stats);
5536 /* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
5537 rxi_AllocSendPacket, if it hits, will be handled at the next conn
5538 GC, just below. Really, we shouldn't have to keep moving packets from
5539 one place to another, but instead ought to always know if we can
5540 afford to hold onto a packet in its particular use. */
5541 MUTEX_ENTER(&rx_freePktQ_lock);
5542 if (rx_waitingForPackets) {
5543 rx_waitingForPackets = 0;
5544 #ifdef RX_ENABLE_LOCKS
5545 CV_BROADCAST(&rx_waitingForPackets_cv);
5547 osi_rxWakeup(&rx_waitingForPackets);
5550 MUTEX_EXIT(&rx_freePktQ_lock);
5552 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
5553 rxevent_Post(&now, rxi_ReapConnections, 0, 0);
5557 /* rxs_Release - This isn't strictly necessary but, since the macro name from
5558 * rx.h is sort of strange this is better. This is called with a security
5559 * object before it is discarded. Each connection using a security object has
5560 * its own refcount to the object so it won't actually be freed until the last
5561 * connection is destroyed.
5563 * This is the only rxs module call. A hold could also be written but no one
5566 int rxs_Release (aobj)
5567 struct rx_securityClass *aobj;
5569 return RXS_Close (aobj);
5573 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
5574 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
5575 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
5576 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
5578 /* Adjust our estimate of the transmission rate to this peer, given
5579 * that the packet p was just acked. We can adjust peer->timeout and
5580 * call->twind. Pragmatically, this is called
5581 * only with packets of maximal length.
5582 * Called with peer and call locked.
5585 static void rxi_ComputeRate(peer, call, p, ackp, ackReason)
5586 register struct rx_peer *peer;
5587 register struct rx_call *call;
5588 struct rx_packet *p, *ackp;
5591 afs_int32 xferSize, xferMs;
5592 register afs_int32 minTime;
5595 /* Count down packets */
5596 if (peer->rateFlag > 0) peer->rateFlag--;
5597 /* Do nothing until we're enabled */
5598 if (peer->rateFlag != 0) return;
5599 if (!call->conn) return;
5601 /* Count only when the ack seems legitimate */
5602 switch (ackReason) {
5603 case RX_ACK_REQUESTED:
5604 xferSize = p->length + RX_HEADER_SIZE +
5605 call->conn->securityMaxTrailerSize;
5609 case RX_ACK_PING_RESPONSE:
5610 if (p) /* want the response to ping-request, not data send */
5612 clock_GetTime(&newTO);
5613 if (clock_Gt(&newTO, &call->pingRequestTime)) {
5614 clock_Sub(&newTO, &call->pingRequestTime);
5615 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
5619 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
5626 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)",
5627 ntohl(peer->host), ntohs(peer->port),
5628 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
5629 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5632 /* Track only packets that are big enough. */
5633 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
5637 /* absorb RTT data (in milliseconds) for these big packets */
5638 if (peer->smRtt == 0) {
5639 peer->smRtt = xferMs;
5641 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
5642 if (!peer->smRtt) peer->smRtt = 1;
5645 if (peer->countDown) {
5649 peer->countDown = 10; /* recalculate only every so often */
5651 /* In practice, we can measure only the RTT for full packets,
5652 * because of the way Rx acks the data that it receives. (If it's
5653 * smaller than a full packet, it often gets implicitly acked
5654 * either by the call response (from a server) or by the next call
5655 * (from a client), and either case confuses transmission times
5656 * with processing times.) Therefore, replace the above
5657 * more-sophisticated processing with a simpler version, where the
5658 * smoothed RTT is kept for full-size packets, and the time to
5659 * transmit a windowful of full-size packets is simply RTT *
5660 * windowSize. Again, we take two steps:
5661 - ensure the timeout is large enough for a single packet's RTT;
5662 - ensure that the window is small enough to fit in the desired timeout.*/
5664 /* First, the timeout check. */
5665 minTime = peer->smRtt;
5666 /* Get a reasonable estimate for a timeout period */
5668 newTO.sec = minTime / 1000;
5669 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
5671 /* Increase the timeout period so that we can always do at least
5672 * one packet exchange */
5673 if (clock_Gt(&newTO, &peer->timeout)) {
5675 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)",
5676 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5677 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
5680 peer->timeout = newTO;
5683 /* Now, get an estimate for the transmit window size. */
5684 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
5685 /* Now, convert to the number of full packets that could fit in a
5686 * reasonable fraction of that interval */
5687 minTime /= (peer->smRtt << 1);
5688 xferSize = minTime; /* (make a copy) */
5690 /* Now clamp the size to reasonable bounds. */
5691 if (minTime <= 1) minTime = 1;
5692 else if (minTime > rx_Window) minTime = rx_Window;
5693 /* if (minTime != peer->maxWindow) {
5694 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
5695 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
5696 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
5698 peer->maxWindow = minTime;
5699 elide... call->twind = minTime;
5703 /* Cut back on the peer timeout if it had earlier grown unreasonably.
5704 * Discern this by calculating the timeout necessary for rx_Window
5706 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
5707 /* calculate estimate for transmission interval in milliseconds */
5708 minTime = rx_Window * peer->smRtt;
5709 if (minTime < 1000) {
5710 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
5711 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
5712 peer->timeout.usec, peer->smRtt,
5715 newTO.sec = 0; /* cut back on timeout by half a second */
5716 newTO.usec = 500000;
5717 clock_Sub(&peer->timeout, &newTO);
5722 } /* end of rxi_ComputeRate */
5723 #endif /* ADAPT_WINDOW */
5731 /* Don't call this debugging routine directly; use dpf */
5733 rxi_DebugPrint(format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
5734 a11, a12, a13, a14, a15)
5738 clock_GetTime(&now);
5739 fprintf(rx_Log, " %u.%.3u:", now.sec, now.usec/1000);
5740 fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15);
5747 * This function is used to process the rx_stats structure that is local
5748 * to a process as well as an rx_stats structure received from a remote
5749 * process (via rxdebug). Therefore, it needs to do minimal version
5752 void rx_PrintTheseStats (file, s, size, freePackets, version)
5755 int size; /* some idea of version control */
5756 afs_int32 freePackets;
5761 if (size != sizeof(struct rx_stats)) {
5763 "Unexpected size of stats structure: was %d, expected %d\n",
5764 size, sizeof(struct rx_stats));
5768 "rx stats: free packets %d, "
5773 if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
5775 "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
5776 s->receivePktAllocFailures,
5777 s->receiveCbufPktAllocFailures,
5778 s->sendPktAllocFailures,
5779 s->sendCbufPktAllocFailures,
5780 s->specialPktAllocFailures);
5783 "alloc-failures(rcv %d,send %d,ack %d)\n",
5784 s->receivePktAllocFailures,
5785 s->sendPktAllocFailures,
5786 s->specialPktAllocFailures);
5791 "bogusReads %d (last from host %x), "
5797 s->bogusPacketOnRead,
5800 s->noPacketBuffersOnRead,
5804 fprintf(file, " packets read: ");
5805 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5811 fprintf(file, "\n");
5814 " other read counters: data %d, "
5822 s->spuriousPacketsRead,
5823 s->ignorePacketDally);
5825 fprintf(file, " packets sent: ");
5826 for (i = 0; i<RX_N_PACKET_TYPES; i++) {
5832 fprintf(file, "\n");
5835 " other send counters: ack %d, "
5836 "data %d (not resends), "
5839 "acked&ignored %d\n",
5842 s->dataPacketsReSent,
5843 s->dataPacketsPushed,
5844 s->ignoreAckedPacket);
5847 " \t(these should be small) sendFailed %d, "
5852 if (s->nRttSamples) {
5854 " Average rtt is %0.3f, with %d samples\n",
5855 clock_Float(&s->totalRtt)/s->nRttSamples,
5859 " Minimum rtt is %0.3f, maximum is %0.3f\n",
5860 clock_Float(&s->minRtt),
5861 clock_Float(&s->maxRtt));
5865 " %d server connections, "
5866 "%d client connections, "
5869 "%d free call structs\n",
5874 s->nFreeCallStructs);
5876 #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
5878 " %d clock updates\n",
5884 /* for backward compatibility */
5885 void rx_PrintStats(file)
5888 MUTEX_ENTER(&rx_stats_mutex);
5889 rx_PrintTheseStats (file, &rx_stats, sizeof(rx_stats), rx_nFreePackets, RX_DEBUGI_VERSION);
5890 MUTEX_EXIT(&rx_stats_mutex);
5893 void rx_PrintPeerStats(file, peer)
5895 struct rx_peer *peer;
5900 "burst wait %u.%d.\n",
5904 peer->burstWait.sec,
5905 peer->burstWait.usec);
5909 "retry time %u.%06d, "
5920 "max in packet skew %d, "
5921 "max out packet skew %d\n",
5924 peer->outPacketSkew);
5927 #ifdef AFS_PTHREAD_ENV
5929 * This mutex protects the following static variables:
5933 #define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
5934 #define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
5936 #define LOCK_RX_DEBUG
5937 #define UNLOCK_RX_DEBUG
5938 #endif /* AFS_PTHREAD_ENV */
5940 static int MakeDebugCall(
5942 afs_uint32 remoteAddr,
5943 afs_uint16 remotePort,
5951 static afs_int32 counter = 100;
5953 struct rx_header theader;
5955 register afs_int32 code;
5957 struct sockaddr_in taddr, faddr;
5962 endTime = time(0) + 20; /* try for 20 seconds */
5966 tp = &tbuffer[sizeof(struct rx_header)];
5967 taddr.sin_family = AF_INET;
5968 taddr.sin_port = remotePort;
5969 taddr.sin_addr.s_addr = remoteAddr;
5971 memset(&theader, 0, sizeof(theader));
5972 theader.epoch = htonl(999);
5974 theader.callNumber = htonl(counter);
5977 theader.type = type;
5978 theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
5979 theader.serviceId = 0;
5981 bcopy(&theader, tbuffer, sizeof(theader));
5982 bcopy(inputData, tp, inputLength);
5983 code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
5984 (struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
5986 /* see if there's a packet available */
5988 FD_SET(socket, &imask);
5991 code = select(socket+1, &imask, 0, 0, &tv);
5993 /* now receive a packet */
5994 faddrLen = sizeof(struct sockaddr_in);
5995 code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
5996 (struct sockaddr *) &faddr, &faddrLen);
5998 bcopy(tbuffer, &theader, sizeof(struct rx_header));
5999 if (counter == ntohl(theader.callNumber)) break;
6002 /* see if we've timed out */
6003 if (endTime < time(0)) return -1;
6005 code -= sizeof(struct rx_header);
6006 if (code > outputLength) code = outputLength;
6007 bcopy(tp, outputData, code);
6011 afs_int32 rx_GetServerDebug(
6013 afs_uint32 remoteAddr,
6014 afs_uint16 remotePort,
6015 struct rx_debugStats *stat,
6016 afs_uint32 *supportedValues
6019 struct rx_debugIn in;
6022 *supportedValues = 0;
6023 in.type = htonl(RX_DEBUGI_GETSTATS);
6026 rc = MakeDebugCall(socket,
6029 RX_PACKET_TYPE_DEBUG,
6036 * If the call was successful, fixup the version and indicate
6037 * what contents of the stat structure are valid.
6038 * Also do net to host conversion of fields here.
6042 if (stat->version >= RX_DEBUGI_VERSION_W_SECSTATS) {
6043 *supportedValues |= RX_SERVER_DEBUG_SEC_STATS;
6045 if (stat->version >= RX_DEBUGI_VERSION_W_GETALLCONN) {
6046 *supportedValues |= RX_SERVER_DEBUG_ALL_CONN;
6048 if (stat->version >= RX_DEBUGI_VERSION_W_RXSTATS) {
6049 *supportedValues |= RX_SERVER_DEBUG_RX_STATS;
6051 if (stat->version >= RX_DEBUGI_VERSION_W_WAITERS) {
6052 *supportedValues |= RX_SERVER_DEBUG_WAITER_CNT;
6054 if (stat->version >= RX_DEBUGI_VERSION_W_IDLETHREADS) {
6055 *supportedValues |= RX_SERVER_DEBUG_IDLE_THREADS;
6057 if (stat->version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
6058 *supportedValues |= RX_SERVER_DEBUG_NEW_PACKETS;
6060 if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
6061 *supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
6064 stat->nFreePackets = ntohl(stat->nFreePackets);
6065 stat->packetReclaims = ntohl(stat->packetReclaims);
6066 stat->callsExecuted = ntohl(stat->callsExecuted);
6067 stat->nWaiting = ntohl(stat->nWaiting);
6068 stat->idleThreads = ntohl(stat->idleThreads);
6074 afs_int32 rx_GetServerStats(
6076 afs_uint32 remoteAddr,
6077 afs_uint16 remotePort,
6078 struct rx_stats *stat,
6079 afs_uint32 *supportedValues
6082 struct rx_debugIn in;
6083 afs_int32 *lp = (afs_int32 *) stat;
6088 * supportedValues is currently unused, but added to allow future
6089 * versioning of this function.
6092 *supportedValues = 0;
6093 in.type = htonl(RX_DEBUGI_RXSTATS);
6095 memset(stat, 0, sizeof(*stat));
6097 rc = MakeDebugCall(socket,
6100 RX_PACKET_TYPE_DEBUG,
6109 * Do net to host conversion here
6112 for (i=0;i<sizeof(*stat)/sizeof(afs_int32);i++,lp++) {
6120 afs_int32 rx_GetServerVersion(
6122 afs_uint32 remoteAddr,
6123 afs_uint16 remotePort,
6124 size_t version_length,
6129 return MakeDebugCall(socket,
6132 RX_PACKET_TYPE_VERSION,
6139 afs_int32 rx_GetServerConnections(
6141 afs_uint32 remoteAddr,
6142 afs_uint16 remotePort,
6143 afs_int32 *nextConnection,
6145 afs_uint32 debugSupportedValues,
6146 struct rx_debugConn *conn,
6147 afs_uint32 *supportedValues
6150 struct rx_debugIn in;
6155 * supportedValues is currently unused, but added to allow future
6156 * versioning of this function.
6159 *supportedValues = 0;
6160 if (allConnections) {
6161 in.type = htonl(RX_DEBUGI_GETALLCONN);
6163 in.type = htonl(RX_DEBUGI_GETCONN);
6165 in.index = htonl(*nextConnection);
6166 memset(conn, 0, sizeof(*conn));
6168 rc = MakeDebugCall(socket,
6171 RX_PACKET_TYPE_DEBUG,
6178 *nextConnection += 1;
6181 * Convert old connection format to new structure.
6184 if (debugSupportedValues & RX_SERVER_DEBUG_OLD_CONN) {
6185 struct rx_debugConn_vL *vL = (struct rx_debugConn_vL *)conn;
6186 #define MOVEvL(a) (conn->a = vL->a)
6188 /* any old or unrecognized version... */
6189 for (i=0;i<RX_MAXCALLS;i++) {
6190 MOVEvL(callState[i]);
6191 MOVEvL(callMode[i]);
6192 MOVEvL(callFlags[i]);
6193 MOVEvL(callOther[i]);
6195 if (debugSupportedValues & RX_SERVER_DEBUG_SEC_STATS) {
6196 MOVEvL(secStats.type);
6197 MOVEvL(secStats.level);
6198 MOVEvL(secStats.flags);
6199 MOVEvL(secStats.expires);
6200 MOVEvL(secStats.packetsReceived);
6201 MOVEvL(secStats.packetsSent);
6202 MOVEvL(secStats.bytesReceived);
6203 MOVEvL(secStats.bytesSent);
6208 * Do net to host conversion here
6210 * I don't convert host or port since we are most likely
6211 * going to want these in NBO.
6213 conn->cid = ntohl(conn->cid);
6214 conn->serial = ntohl(conn->serial);
6215 for(i=0;i<RX_MAXCALLS;i++) {
6216 conn->callNumber[i] = ntohl(conn->callNumber[i]);
6218 conn->error = ntohl(conn->error);
6219 conn->secStats.flags = ntohl(conn->secStats.flags);
6220 conn->secStats.expires = ntohl(conn->secStats.expires);
6221 conn->secStats.packetsReceived = ntohl(conn->secStats.packetsReceived);
6222 conn->secStats.packetsSent = ntohl(conn->secStats.packetsSent);
6223 conn->secStats.bytesReceived = ntohl(conn->secStats.bytesReceived);
6224 conn->secStats.bytesSent = ntohl(conn->secStats.bytesSent);
6225 conn->epoch = ntohl(conn->epoch);
6226 conn->natMTU = ntohl(conn->natMTU);
6232 afs_int32 rx_GetServerPeers(
6234 afs_uint32 remoteAddr,
6235 afs_uint16 remotePort,
6236 afs_int32 *nextPeer,
6237 afs_uint32 debugSupportedValues,
6238 struct rx_debugPeer *peer,
6239 afs_uint32 *supportedValues
6242 struct rx_debugIn in;
6247 * supportedValues is currently unused, but added to allow future
6248 * versioning of this function.
6251 *supportedValues = 0;
6252 in.type = htonl(RX_DEBUGI_GETPEER);
6253 in.index = htonl(*nextPeer);
6254 memset(peer, 0, sizeof(*peer));
6256 rc = MakeDebugCall(socket,
6259 RX_PACKET_TYPE_DEBUG,
6269 * Do net to host conversion here
6271 * I don't convert host or port since we are most likely
6272 * going to want these in NBO.
6274 peer->ifMTU = ntohs(peer->ifMTU);
6275 peer->idleWhen = ntohl(peer->idleWhen);
6276 peer->refCount = ntohs(peer->refCount);
6277 peer->burstWait.sec = ntohl(peer->burstWait.sec);
6278 peer->burstWait.usec = ntohl(peer->burstWait.usec);
6279 peer->rtt = ntohl(peer->rtt);
6280 peer->rtt_dev = ntohl(peer->rtt_dev);
6281 peer->timeout.sec = ntohl(peer->timeout.sec);
6282 peer->timeout.usec = ntohl(peer->timeout.usec);
6283 peer->nSent = ntohl(peer->nSent);
6284 peer->reSends = ntohl(peer->reSends);
6285 peer->inPacketSkew = ntohl(peer->inPacketSkew);
6286 peer->outPacketSkew = ntohl(peer->outPacketSkew);
6287 peer->rateFlag = ntohl(peer->rateFlag);
6288 peer->natMTU = ntohs(peer->natMTU);
6289 peer->maxMTU = ntohs(peer->maxMTU);
6290 peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
6291 peer->ifDgramPackets = ntohs(peer->ifDgramPackets);
6292 peer->MTU = ntohs(peer->MTU);
6293 peer->cwind = ntohs(peer->cwind);
6294 peer->nDgramPackets = ntohs(peer->nDgramPackets);
6295 peer->congestSeq = ntohs(peer->congestSeq);
6296 peer->bytesSent.high = ntohl(peer->bytesSent.high);
6297 peer->bytesSent.low = ntohl(peer->bytesSent.low);
6298 peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
6299 peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
6304 #endif /* RXDEBUG */
6306 void shutdown_rx(void)
6308 struct rx_serverQueueEntry *np;
6310 register struct rx_call *call;
6311 register struct rx_serverQueueEntry *sq;
6314 if (rxinit_status == 1) {
6316 return; /* Already shutdown. */
6321 #ifndef AFS_PTHREAD_ENV
6322 FD_ZERO(&rx_selectMask);
6323 #endif /* AFS_PTHREAD_ENV */
6324 rxi_dataQuota = RX_MAX_QUOTA;
6325 #ifndef AFS_PTHREAD_ENV
6327 #endif /* AFS_PTHREAD_ENV */
6330 #ifndef AFS_PTHREAD_ENV
6331 #ifndef AFS_USE_GETTIMEOFDAY
6333 #endif /* AFS_USE_GETTIMEOFDAY */
6334 #endif /* AFS_PTHREAD_ENV */
6336 while (!queue_IsEmpty(&rx_freeCallQueue)) {
6337 call = queue_First(&rx_freeCallQueue, rx_call);
6339 rxi_Free(call, sizeof(struct rx_call));
6342 while (!queue_IsEmpty(&rx_idleServerQueue)) {
6343 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
6349 struct rx_peer **peer_ptr, **peer_end;
6350 for (peer_ptr = &rx_peerHashTable[0],
6351 peer_end = &rx_peerHashTable[rx_hashTableSize];
6352 peer_ptr < peer_end; peer_ptr++) {
6353 struct rx_peer *peer, *next;
6354 for (peer = *peer_ptr; peer; peer = next) {
6355 rx_interface_stat_p rpc_stat, nrpc_stat;
6357 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
6358 rx_interface_stat)) {
6359 unsigned int num_funcs;
6360 if (!rpc_stat) break;
6361 queue_Remove(&rpc_stat->queue_header);
6362 queue_Remove(&rpc_stat->all_peers);
6363 num_funcs = rpc_stat->stats[0].func_total;
6364 space = sizeof(rx_interface_stat_t) +
6365 rpc_stat->stats[0].func_total *
6366 sizeof(rx_function_entry_v1_t);
6368 rxi_Free(rpc_stat, space);
6369 MUTEX_ENTER(&rx_rpc_stats);
6370 rxi_rpc_peer_stat_cnt -= num_funcs;
6371 MUTEX_EXIT(&rx_rpc_stats);
6375 MUTEX_ENTER(&rx_stats_mutex);
6376 rx_stats.nPeerStructs--;
6377 MUTEX_EXIT(&rx_stats_mutex);
6381 for (i = 0; i<RX_MAX_SERVICES; i++) {
6383 rxi_Free(rx_services[i], sizeof(*rx_services[i]));
6385 for (i = 0; i < rx_hashTableSize; i++) {
6386 register struct rx_connection *tc, *ntc;
6387 MUTEX_ENTER(&rx_connHashTable_lock);
6388 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
6390 for (j = 0; j < RX_MAXCALLS; j++) {
6392 rxi_Free(tc->call[j], sizeof(*tc->call[j]));
6395 rxi_Free(tc, sizeof(*tc));
6397 MUTEX_EXIT(&rx_connHashTable_lock);
6400 MUTEX_ENTER(&freeSQEList_lock);
6402 while (np = rx_FreeSQEList) {
6403 rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
6404 MUTEX_DESTROY(&np->lock);
6405 rxi_Free(np, sizeof(*np));
6408 MUTEX_EXIT(&freeSQEList_lock);
6409 MUTEX_DESTROY(&freeSQEList_lock);
6410 MUTEX_DESTROY(&rx_freeCallQueue_lock);
6411 MUTEX_DESTROY(&rx_connHashTable_lock);
6412 MUTEX_DESTROY(&rx_peerHashTable_lock);
6413 MUTEX_DESTROY(&rx_serverPool_lock);
6415 osi_Free(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6416 osi_Free(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6418 UNPIN(rx_connHashTable, rx_hashTableSize*sizeof(struct rx_connection *));
6419 UNPIN(rx_peerHashTable, rx_hashTableSize*sizeof(struct rx_peer *));
6421 rxi_FreeAllPackets();
6423 MUTEX_ENTER(&rx_stats_mutex);
6424 rxi_dataQuota = RX_MAX_QUOTA;
6425 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
6426 MUTEX_EXIT(&rx_stats_mutex);
6432 #ifdef RX_ENABLE_LOCKS
6433 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg)
6435 if (!MUTEX_ISMINE(lockaddr))
6436 osi_Panic("Lock not held: %s", msg);
6438 #endif /* RX_ENABLE_LOCKS */
6443 * Routines to implement connection specific data.
6446 int rx_KeyCreate(rx_destructor_t rtn)
6449 MUTEX_ENTER(&rxi_keyCreate_lock);
6450 key = rxi_keyCreate_counter++;
6451 rxi_keyCreate_destructor = (rx_destructor_t *)
6452 realloc((void *)rxi_keyCreate_destructor,
6453 (key+1) * sizeof(rx_destructor_t));
6454 rxi_keyCreate_destructor[key] = rtn;
6455 MUTEX_EXIT(&rxi_keyCreate_lock);
6459 void rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
6462 MUTEX_ENTER(&conn->conn_data_lock);
6463 if (!conn->specific) {
6464 conn->specific = (void **)malloc((key+1)*sizeof(void *));
6465 for (i = 0 ; i < key ; i++)
6466 conn->specific[i] = NULL;
6467 conn->nSpecific = key+1;
6468 conn->specific[key] = ptr;
6469 } else if (key >= conn->nSpecific) {
6470 conn->specific = (void **)
6471 realloc(conn->specific,(key+1)*sizeof(void *));
6472 for (i = conn->nSpecific ; i < key ; i++)
6473 conn->specific[i] = NULL;
6474 conn->nSpecific = key+1;
6475 conn->specific[key] = ptr;
6477 if (conn->specific[key] && rxi_keyCreate_destructor[key])
6478 (*rxi_keyCreate_destructor[key])(conn->specific[key]);
6479 conn->specific[key] = ptr;
6481 MUTEX_EXIT(&conn->conn_data_lock);
6484 void *rx_GetSpecific(struct rx_connection *conn, int key)
6487 MUTEX_ENTER(&conn->conn_data_lock);
6488 if (key >= conn->nSpecific)
6491 ptr = conn->specific[key];
6492 MUTEX_EXIT(&conn->conn_data_lock);
6496 #endif /* !KERNEL */
6499 * processStats is a queue used to store the statistics for the local
6500 * process. Its contents are similar to the contents of the rpcStats
6501 * queue on a rx_peer structure, but the actual data stored within
6502 * this queue contains totals across the lifetime of the process (assuming
6503 * the stats have not been reset) - unlike the per peer structures
6504 * which can come and go based upon the peer lifetime.
6507 static struct rx_queue processStats = {&processStats,&processStats};
6510 * peerStats is a queue used to store the statistics for all peer structs.
6511 * Its contents are the union of all the peer rpcStats queues.
6514 static struct rx_queue peerStats = {&peerStats,&peerStats};
6517 * rxi_monitor_processStats is used to turn process wide stat collection
6521 static int rxi_monitor_processStats = 0;
6524 * rxi_monitor_peerStats is used to turn per peer stat collection on and off
6527 static int rxi_monitor_peerStats = 0;
6530 * rxi_AddRpcStat - given all of the information for a particular rpc
6531 * call, create (if needed) and update the stat totals for the rpc.
6535 * IN stats - the queue of stats that will be updated with the new value
6537 * IN rxInterface - a unique number that identifies the rpc interface
6539 * IN currentFunc - the index of the function being invoked
6541 * IN totalFunc - the total number of functions in this interface
6543 * IN queueTime - the amount of time this function waited for a thread
6545 * IN execTime - the amount of time this function invocation took to execute
6547 * IN bytesSent - the number bytes sent by this invocation
6549 * IN bytesRcvd - the number bytes received by this invocation
6551 * IN isServer - if true, this invocation was made to a server
6553 * IN remoteHost - the ip address of the remote host
6555 * IN remotePort - the port of the remote host
6557 * IN addToPeerList - if != 0, add newly created stat to the global peer list
6559 * INOUT counter - if a new stats structure is allocated, the counter will
6560 * be updated with the new number of allocated stat structures
6567 static int rxi_AddRpcStat(
6568 struct rx_queue *stats,
6569 afs_uint32 rxInterface,
6570 afs_uint32 currentFunc,
6571 afs_uint32 totalFunc,
6572 struct clock *queueTime,
6573 struct clock *execTime,
6574 afs_hyper_t *bytesSent,
6575 afs_hyper_t *bytesRcvd,
6577 afs_uint32 remoteHost,
6578 afs_uint32 remotePort,
6580 unsigned int *counter)
6583 rx_interface_stat_p rpc_stat, nrpc_stat;
6586 * See if there's already a structure for this interface
6589 for(queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
6590 if ((rpc_stat->stats[0].interfaceId == rxInterface) &&
6591 (rpc_stat->stats[0].remote_is_server == isServer)) break;
6595 * Didn't find a match so allocate a new structure and add it to the
6599 if ((rpc_stat == NULL) ||
6600 (rpc_stat->stats[0].interfaceId != rxInterface) ||
6601 (rpc_stat->stats[0].remote_is_server != isServer)) {
6605 space = sizeof(rx_interface_stat_t) + totalFunc *
6606 sizeof(rx_function_entry_v1_t);
6608 rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
6609 if (rpc_stat == NULL) {
6613 *counter += totalFunc;
6614 for(i=0;i<totalFunc;i++) {
6615 rpc_stat->stats[i].remote_peer = remoteHost;
6616 rpc_stat->stats[i].remote_port = remotePort;
6617 rpc_stat->stats[i].remote_is_server = isServer;
6618 rpc_stat->stats[i].interfaceId = rxInterface;
6619 rpc_stat->stats[i].func_total = totalFunc;
6620 rpc_stat->stats[i].func_index = i;
6621 hzero(rpc_stat->stats[i].invocations);
6622 hzero(rpc_stat->stats[i].bytes_sent);
6623 hzero(rpc_stat->stats[i].bytes_rcvd);
6624 rpc_stat->stats[i].queue_time_sum.sec = 0;
6625 rpc_stat->stats[i].queue_time_sum.usec = 0;
6626 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
6627 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
6628 rpc_stat->stats[i].queue_time_min.sec = 9999999;
6629 rpc_stat->stats[i].queue_time_min.usec = 9999999;
6630 rpc_stat->stats[i].queue_time_max.sec = 0;
6631 rpc_stat->stats[i].queue_time_max.usec = 0;
6632 rpc_stat->stats[i].execution_time_sum.sec = 0;
6633 rpc_stat->stats[i].execution_time_sum.usec = 0;
6634 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
6635 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
6636 rpc_stat->stats[i].execution_time_min.sec = 9999999;
6637 rpc_stat->stats[i].execution_time_min.usec = 9999999;
6638 rpc_stat->stats[i].execution_time_max.sec = 0;
6639 rpc_stat->stats[i].execution_time_max.usec = 0;
6641 queue_Prepend(stats, rpc_stat);
6642 if (addToPeerList) {
6643 queue_Prepend(&peerStats, &rpc_stat->all_peers);
6648 * Increment the stats for this function
6651 hadd32(rpc_stat->stats[currentFunc].invocations, 1);
6652 hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
6653 hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
6654 clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum,queueTime);
6655 clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr,queueTime);
6656 if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
6657 rpc_stat->stats[currentFunc].queue_time_min = *queueTime;
6659 if (clock_Gt(queueTime, &rpc_stat->stats[currentFunc].queue_time_max)) {
6660 rpc_stat->stats[currentFunc].queue_time_max = *queueTime;
6662 clock_Add(&rpc_stat->stats[currentFunc].execution_time_sum,execTime);
6663 clock_AddSq(&rpc_stat->stats[currentFunc].execution_time_sum_sqr,execTime);
6664 if (clock_Lt(execTime, &rpc_stat->stats[currentFunc].execution_time_min)) {
6665 rpc_stat->stats[currentFunc].execution_time_min = *execTime;
6667 if (clock_Gt(execTime, &rpc_stat->stats[currentFunc].execution_time_max)) {
6668 rpc_stat->stats[currentFunc].execution_time_max = *execTime;
6676 * rx_IncrementTimeAndCount - increment the times and count for a particular
6681 * IN peer - the peer who invoked the rpc
6683 * IN rxInterface - a unique number that identifies the rpc interface
6685 * IN currentFunc - the index of the function being invoked
6687 * IN totalFunc - the total number of functions in this interface
6689 * IN queueTime - the amount of time this function waited for a thread
6691 * IN execTime - the amount of time this function invocation took to execute
6693 * IN bytesSent - the number bytes sent by this invocation
6695 * IN bytesRcvd - the number bytes received by this invocation
6697 * IN isServer - if true, this invocation was made to a server
6704 void rx_IncrementTimeAndCount(
6705 struct rx_peer *peer,
6706 afs_uint32 rxInterface,
6707 afs_uint32 currentFunc,
6708 afs_uint32 totalFunc,
6709 struct clock *queueTime,
6710 struct clock *execTime,
6711 afs_hyper_t *bytesSent,
6712 afs_hyper_t *bytesRcvd,
6716 MUTEX_ENTER(&rx_rpc_stats);
6717 MUTEX_ENTER(&peer->peer_lock);
6719 if (rxi_monitor_peerStats) {
6720 rxi_AddRpcStat(&peer->rpcStats,
6732 &rxi_rpc_peer_stat_cnt);
6735 if (rxi_monitor_processStats) {
6736 rxi_AddRpcStat(&processStats,
6748 &rxi_rpc_process_stat_cnt);
6751 MUTEX_EXIT(&peer->peer_lock);
6752 MUTEX_EXIT(&rx_rpc_stats);
6757 * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
6761 * IN callerVersion - the rpc stat version of the caller.
6763 * IN count - the number of entries to marshall.
6765 * IN stats - pointer to stats to be marshalled.
6767 * OUT ptr - Where to store the marshalled data.
6773 void rx_MarshallProcessRPCStats(
6774 afs_uint32 callerVersion,
6776 rx_function_entry_v1_t *stats,
6783 * We only support the first version
6785 for (ptr = *ptrP, i = 0 ; i < count ; i++, stats++) {
6786 *(ptr++) = stats->remote_peer;
6787 *(ptr++) = stats->remote_port;
6788 *(ptr++) = stats->remote_is_server;
6789 *(ptr++) = stats->interfaceId;
6790 *(ptr++) = stats->func_total;
6791 *(ptr++) = stats->func_index;
6792 *(ptr++) = hgethi(stats->invocations);
6793 *(ptr++) = hgetlo(stats->invocations);
6794 *(ptr++) = hgethi(stats->bytes_sent);
6795 *(ptr++) = hgetlo(stats->bytes_sent);
6796 *(ptr++) = hgethi(stats->bytes_rcvd);
6797 *(ptr++) = hgetlo(stats->bytes_rcvd);
6798 *(ptr++) = stats->queue_time_sum.sec;
6799 *(ptr++) = stats->queue_time_sum.usec;
6800 *(ptr++) = stats->queue_time_sum_sqr.sec;
6801 *(ptr++) = stats->queue_time_sum_sqr.usec;
6802 *(ptr++) = stats->queue_time_min.sec;
6803 *(ptr++) = stats->queue_time_min.usec;
6804 *(ptr++) = stats->queue_time_max.sec;
6805 *(ptr++) = stats->queue_time_max.usec;
6806 *(ptr++) = stats->execution_time_sum.sec;
6807 *(ptr++) = stats->execution_time_sum.usec;
6808 *(ptr++) = stats->execution_time_sum_sqr.sec;
6809 *(ptr++) = stats->execution_time_sum_sqr.usec;
6810 *(ptr++) = stats->execution_time_min.sec;
6811 *(ptr++) = stats->execution_time_min.usec;
6812 *(ptr++) = stats->execution_time_max.sec;
6813 *(ptr++) = stats->execution_time_max.usec;
6819 * rx_RetrieveProcessRPCStats - retrieve all of the rpc statistics for
6824 * IN callerVersion - the rpc stat version of the caller
6826 * OUT myVersion - the rpc stat version of this function
6828 * OUT clock_sec - local time seconds
6830 * OUT clock_usec - local time microseconds
6832 * OUT allocSize - the number of bytes allocated to contain stats
6834 * OUT statCount - the number stats retrieved from this process.
6836 * OUT stats - the actual stats retrieved from this process.
6840 * Returns void. If successful, stats will != NULL.
6843 int rx_RetrieveProcessRPCStats(
6844 afs_uint32 callerVersion,
6845 afs_uint32 *myVersion,
6846 afs_uint32 *clock_sec,
6847 afs_uint32 *clock_usec,
6849 afs_uint32 *statCount,
6860 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6863 * Check to see if stats are enabled
6866 MUTEX_ENTER(&rx_rpc_stats);
6867 if (!rxi_monitor_processStats) {
6868 MUTEX_EXIT(&rx_rpc_stats);
6872 clock_GetTime(&now);
6873 *clock_sec = now.sec;
6874 *clock_usec = now.usec;
6877 * Allocate the space based upon the caller version
6879 * If the client is at an older version than we are,
6880 * we return the statistic data in the older data format, but
6881 * we still return our version number so the client knows we
6882 * are maintaining more data than it can retrieve.
6885 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6886 space = rxi_rpc_process_stat_cnt * sizeof(rx_function_entry_v1_t);
6887 *statCount = rxi_rpc_process_stat_cnt;
6890 * This can't happen yet, but in the future version changes
6891 * can be handled by adding additional code here
6895 if (space > (size_t) 0) {
6897 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
6900 register struct rx_peer *pp;
6903 rx_interface_stat_p rpc_stat, nrpc_stat;
6906 for(queue_Scan(&processStats, rpc_stat, nrpc_stat,
6907 rx_interface_stat)) {
6909 * Copy the data based upon the caller version
6911 rx_MarshallProcessRPCStats(callerVersion,
6912 rpc_stat->stats[0].func_total,
6913 rpc_stat->stats, &ptr);
6919 MUTEX_EXIT(&rx_rpc_stats);
6924 * rx_RetrievePeerRPCStats - retrieve all of the rpc statistics for the peers
6928 * IN callerVersion - the rpc stat version of the caller
6930 * OUT myVersion - the rpc stat version of this function
6932 * OUT clock_sec - local time seconds
6934 * OUT clock_usec - local time microseconds
6936 * OUT allocSize - the number of bytes allocated to contain stats
6938 * OUT statCount - the number of stats retrieved from the individual
6941 * OUT stats - the actual stats retrieved from the individual peer structures.
6945 * Returns void. If successful, stats will != NULL.
6948 int rx_RetrievePeerRPCStats(
6949 afs_uint32 callerVersion,
6950 afs_uint32 *myVersion,
6951 afs_uint32 *clock_sec,
6952 afs_uint32 *clock_usec,
6954 afs_uint32 *statCount,
6965 *myVersion = RX_STATS_RETRIEVAL_VERSION;
6968 * Check to see if stats are enabled
6971 MUTEX_ENTER(&rx_rpc_stats);
6972 if (!rxi_monitor_peerStats) {
6973 MUTEX_EXIT(&rx_rpc_stats);
6977 clock_GetTime(&now);
6978 *clock_sec = now.sec;
6979 *clock_usec = now.usec;
6982 * Allocate the space based upon the caller version
6984 * If the client is at an older version than we are,
6985 * we return the statistic data in the older data format, but
6986 * we still return our version number so the client knows we
6987 * are maintaining more data than it can retrieve.
6990 if (callerVersion >= RX_STATS_RETRIEVAL_FIRST_EDITION) {
6991 space = rxi_rpc_peer_stat_cnt * sizeof(rx_function_entry_v1_t);
6992 *statCount = rxi_rpc_peer_stat_cnt;
6995 * This can't happen yet, but in the future version changes
6996 * can be handled by adding additional code here
7000 if (space > (size_t) 0) {
7002 ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
7007 rx_interface_stat_p rpc_stat, nrpc_stat;
7010 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat,
7011 rx_interface_stat)) {
7013 * We have to fix the offset of rpc_stat since we are
7014 * keeping this structure on two rx_queues. The rx_queue
7015 * package assumes that the rx_queue member is the first
7016 * member of the structure. That is, rx_queue assumes that
7017 * any one item is only on one queue at a time. We are
7018 * breaking that assumption and so we have to do a little
7019 * math to fix our pointers.
7022 fix_offset = (char *) rpc_stat;
7023 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7024 rpc_stat = (rx_interface_stat_p) fix_offset;
7027 * Copy the data based upon the caller version
7029 rx_MarshallProcessRPCStats(callerVersion,
7030 rpc_stat->stats[0].func_total,
7031 rpc_stat->stats, &ptr);
7037 MUTEX_EXIT(&rx_rpc_stats);
7042 * rx_FreeRPCStats - free memory allocated by
7043 * rx_RetrieveProcessRPCStats and rx_RetrievePeerRPCStats
7047 * IN stats - stats previously returned by rx_RetrieveProcessRPCStats or
7048 * rx_RetrievePeerRPCStats
7050 * IN allocSize - the number of bytes in stats.
7057 void rx_FreeRPCStats(
7061 rxi_Free(stats, allocSize);
7065 * rx_queryProcessRPCStats - see if process rpc stat collection is
7066 * currently enabled.
7072 * Returns 0 if stats are not enabled != 0 otherwise
7075 int rx_queryProcessRPCStats()
7078 MUTEX_ENTER(&rx_rpc_stats);
7079 rc = rxi_monitor_processStats;
7080 MUTEX_EXIT(&rx_rpc_stats);
7085 * rx_queryPeerRPCStats - see if peer stat collection is currently enabled.
7091 * Returns 0 if stats are not enabled != 0 otherwise
7094 int rx_queryPeerRPCStats()
7097 MUTEX_ENTER(&rx_rpc_stats);
7098 rc = rxi_monitor_peerStats;
7099 MUTEX_EXIT(&rx_rpc_stats);
7104 * rx_enableProcessRPCStats - begin rpc stat collection for entire process
7113 void rx_enableProcessRPCStats()
7115 MUTEX_ENTER(&rx_rpc_stats);
7116 rx_enable_stats = 1;
7117 rxi_monitor_processStats = 1;
7118 MUTEX_EXIT(&rx_rpc_stats);
7122 * rx_enablePeerRPCStats - begin rpc stat collection per peer structure
7131 void rx_enablePeerRPCStats()
7133 MUTEX_ENTER(&rx_rpc_stats);
7134 rx_enable_stats = 1;
7135 rxi_monitor_peerStats = 1;
7136 MUTEX_EXIT(&rx_rpc_stats);
7140 * rx_disableProcessRPCStats - stop rpc stat collection for entire process
7149 void rx_disableProcessRPCStats()
7151 rx_interface_stat_p rpc_stat, nrpc_stat;
7154 MUTEX_ENTER(&rx_rpc_stats);
7157 * Turn off process statistics and if peer stats is also off, turn
7161 rxi_monitor_processStats = 0;
7162 if (rxi_monitor_peerStats == 0) {
7163 rx_enable_stats = 0;
7166 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7167 unsigned int num_funcs = 0;
7168 if (!rpc_stat) break;
7169 queue_Remove(rpc_stat);
7170 num_funcs = rpc_stat->stats[0].func_total;
7171 space = sizeof(rx_interface_stat_t) +
7172 rpc_stat->stats[0].func_total *
7173 sizeof(rx_function_entry_v1_t);
7175 rxi_Free(rpc_stat, space);
7176 rxi_rpc_process_stat_cnt -= num_funcs;
7178 MUTEX_EXIT(&rx_rpc_stats);
7182 * rx_disablePeerRPCStats - stop rpc stat collection for peers
7191 void rx_disablePeerRPCStats()
7193 struct rx_peer **peer_ptr, **peer_end;
7196 MUTEX_ENTER(&rx_rpc_stats);
7199 * Turn off peer statistics and if process stats is also off, turn
7203 rxi_monitor_peerStats = 0;
7204 if (rxi_monitor_processStats == 0) {
7205 rx_enable_stats = 0;
7208 MUTEX_ENTER(&rx_peerHashTable_lock);
7209 for (peer_ptr = &rx_peerHashTable[0],
7210 peer_end = &rx_peerHashTable[rx_hashTableSize];
7211 peer_ptr < peer_end; peer_ptr++) {
7212 struct rx_peer *peer, *next, *prev;
7213 for (prev = peer = *peer_ptr; peer; peer = next) {
7215 code = MUTEX_TRYENTER(&peer->peer_lock);
7217 rx_interface_stat_p rpc_stat, nrpc_stat;
7219 for(queue_Scan(&peer->rpcStats, rpc_stat, nrpc_stat,
7220 rx_interface_stat)) {
7221 unsigned int num_funcs = 0;
7222 if (!rpc_stat) break;
7223 queue_Remove(&rpc_stat->queue_header);
7224 queue_Remove(&rpc_stat->all_peers);
7225 num_funcs = rpc_stat->stats[0].func_total;
7226 space = sizeof(rx_interface_stat_t) +
7227 rpc_stat->stats[0].func_total *
7228 sizeof(rx_function_entry_v1_t);
7230 rxi_Free(rpc_stat, space);
7231 rxi_rpc_peer_stat_cnt -= num_funcs;
7233 MUTEX_EXIT(&peer->peer_lock);
7234 if (prev == *peer_ptr) {
7246 MUTEX_EXIT(&rx_peerHashTable_lock);
7247 MUTEX_EXIT(&rx_rpc_stats);
7251 * rx_clearProcessRPCStats - clear the contents of the rpc stats according
7256 * IN clearFlag - flag indicating which stats to clear
7263 void rx_clearProcessRPCStats(
7264 afs_uint32 clearFlag)
7266 rx_interface_stat_p rpc_stat, nrpc_stat;
7268 MUTEX_ENTER(&rx_rpc_stats);
7270 for(queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7271 unsigned int num_funcs = 0, i;
7272 num_funcs = rpc_stat->stats[0].func_total;
7273 for(i=0;i<num_funcs;i++) {
7274 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7275 hzero(rpc_stat->stats[i].invocations);
7277 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7278 hzero(rpc_stat->stats[i].bytes_sent);
7280 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7281 hzero(rpc_stat->stats[i].bytes_rcvd);
7283 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7284 rpc_stat->stats[i].queue_time_sum.sec = 0;
7285 rpc_stat->stats[i].queue_time_sum.usec = 0;
7287 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7288 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7289 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7291 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7292 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7293 rpc_stat->stats[i].queue_time_min.usec= 9999999;
7295 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7296 rpc_stat->stats[i].queue_time_max.sec = 0;
7297 rpc_stat->stats[i].queue_time_max.usec = 0;
7299 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7300 rpc_stat->stats[i].execution_time_sum.sec = 0;
7301 rpc_stat->stats[i].execution_time_sum.usec = 0;
7303 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7304 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7305 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7307 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7308 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7309 rpc_stat->stats[i].execution_time_min.usec= 9999999;
7311 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7312 rpc_stat->stats[i].execution_time_max.sec = 0;
7313 rpc_stat->stats[i].execution_time_max.usec = 0;
7318 MUTEX_EXIT(&rx_rpc_stats);
7322 * rx_clearPeerRPCStats - clear the contents of the rpc stats according
7327 * IN clearFlag - flag indicating which stats to clear
7334 void rx_clearPeerRPCStats(
7335 afs_uint32 clearFlag)
7337 rx_interface_stat_p rpc_stat, nrpc_stat;
7339 MUTEX_ENTER(&rx_rpc_stats);
7341 for(queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
7342 unsigned int num_funcs = 0, i;
7345 * We have to fix the offset of rpc_stat since we are
7346 * keeping this structure on two rx_queues. The rx_queue
7347 * package assumes that the rx_queue member is the first
7348 * member of the structure. That is, rx_queue assumes that
7349 * any one item is only on one queue at a time. We are
7350 * breaking that assumption and so we have to do a little
7351 * math to fix our pointers.
7354 fix_offset = (char *) rpc_stat;
7355 fix_offset -= offsetof(rx_interface_stat_t, all_peers);
7356 rpc_stat = (rx_interface_stat_p) fix_offset;
7358 num_funcs = rpc_stat->stats[0].func_total;
7359 for(i=0;i<num_funcs;i++) {
7360 if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
7361 hzero(rpc_stat->stats[i].invocations);
7363 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
7364 hzero(rpc_stat->stats[i].bytes_sent);
7366 if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
7367 hzero(rpc_stat->stats[i].bytes_rcvd);
7369 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
7370 rpc_stat->stats[i].queue_time_sum.sec = 0;
7371 rpc_stat->stats[i].queue_time_sum.usec = 0;
7373 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SQUARE) {
7374 rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
7375 rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
7377 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MIN) {
7378 rpc_stat->stats[i].queue_time_min.sec = 9999999;
7379 rpc_stat->stats[i].queue_time_min.usec = 9999999;
7381 if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_MAX) {
7382 rpc_stat->stats[i].queue_time_max.sec = 0;
7383 rpc_stat->stats[i].queue_time_max.usec = 0;
7385 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SUM) {
7386 rpc_stat->stats[i].execution_time_sum.sec = 0;
7387 rpc_stat->stats[i].execution_time_sum.usec = 0;
7389 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_SQUARE) {
7390 rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
7391 rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
7393 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MIN) {
7394 rpc_stat->stats[i].execution_time_min.sec = 9999999;
7395 rpc_stat->stats[i].execution_time_min.usec = 9999999;
7397 if (clearFlag & AFS_RX_STATS_CLEAR_EXEC_TIME_MAX) {
7398 rpc_stat->stats[i].execution_time_max.sec = 0;
7399 rpc_stat->stats[i].execution_time_max.usec = 0;
7404 MUTEX_EXIT(&rx_rpc_stats);
7408 * rxi_rxstat_userok points to a routine that returns 1 if the caller
7409 * is authorized to enable/disable/clear RX statistics.
7411 static int (*rxi_rxstat_userok)(struct rx_call *call) = NULL;
7413 void rx_SetRxStatUserOk(
7414 int (*proc)(struct rx_call *call))
7416 rxi_rxstat_userok = proc;
7419 int rx_RxStatUserOk(
7420 struct rx_call *call)
7422 if (!rxi_rxstat_userok)
7424 return rxi_rxstat_userok(call);