2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "../afs/param.h"
16 #include <afs/param.h>
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
46 #include "../afsint/afsint.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include "rx_internal.h"
105 # include <afs/rxgen_consts.h>
108 int (*registerProgram)() = 0;
109 int (*swapNameProgram)() = 0;
111 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
113 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
114 afs_int32 rxi_start_in_error;
116 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
119 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
120 * currently allocated within rx. This number is used to allocate the
121 * memory required to return the statistics when queried.
124 static unsigned int rxi_rpc_peer_stat_cnt;
127 * rxi_rpc_process_stat_cnt counts the total number of local process stat
128 * structures currently allocated within rx. The number is used to allocate
129 * the memory required to return the statistics when queried.
132 static unsigned int rxi_rpc_process_stat_cnt;
134 #if !defined(offsetof)
135 #include <stddef.h> /* for definition of offsetof() */
138 #ifdef AFS_PTHREAD_ENV
142 * Use procedural initialization of mutexes/condition variables
146 extern pthread_mutex_t rxkad_stats_mutex;
147 extern pthread_mutex_t des_init_mutex;
148 extern pthread_mutex_t des_random_mutex;
149 extern pthread_mutex_t rx_clock_mutex;
150 extern pthread_mutex_t rxi_connCacheMutex;
151 extern pthread_mutex_t rx_event_mutex;
152 extern pthread_mutex_t osi_malloc_mutex;
153 extern pthread_mutex_t event_handler_mutex;
154 extern pthread_mutex_t listener_mutex;
155 extern pthread_mutex_t rx_if_init_mutex;
156 extern pthread_mutex_t rx_if_mutex;
157 extern pthread_mutex_t rxkad_client_uid_mutex;
158 extern pthread_mutex_t rxkad_random_mutex;
160 extern pthread_cond_t rx_event_handler_cond;
161 extern pthread_cond_t rx_listener_cond;
163 static pthread_mutex_t epoch_mutex;
164 static pthread_mutex_t rx_init_mutex;
165 static pthread_mutex_t rx_debug_mutex;
167 static void rxi_InitPthread(void) {
168 assert(pthread_mutex_init(&rx_clock_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rxi_connCacheMutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&rx_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&epoch_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&rx_event_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&des_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&des_random_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&osi_malloc_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&event_handler_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&listener_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_if_init_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rx_if_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
193 (const pthread_mutexattr_t*)0)==0);
194 assert(pthread_mutex_init(&rxkad_random_mutex,
195 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_mutex_init(&rxkad_stats_mutex,
197 (const pthread_mutexattr_t*)0)==0);
198 assert(pthread_mutex_init(&rx_debug_mutex,
199 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_cond_init(&rx_event_handler_cond,
202 (const pthread_condattr_t*)0)==0);
203 assert(pthread_cond_init(&rx_listener_cond,
204 (const pthread_condattr_t*)0)==0);
205 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
208 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
209 #define INIT_PTHREAD_LOCKS \
210 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
212 * The rx_stats_mutex mutex protects the following global variables:
217 * rxi_lowConnRefCount
218 * rxi_lowPeerRefCount
227 #define INIT_PTHREAD_LOCKS
231 /* Variables for handling the minProcs implementation. availProcs gives the
232 * number of threads available in the pool at this moment (not counting dudes
233 * executing right now). totalMin gives the total number of procs required
234 * for handling all minProcs requests. minDeficit is a dynamic variable
235 * tracking the # of procs required to satisfy all of the remaining minProcs
237 * For fine grain locking to work, the quota check and the reservation of
238 * a server thread has to come while rxi_availProcs and rxi_minDeficit
239 * are locked. To this end, the code has been modified under #ifdef
240 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
241 * same time. A new function, ReturnToServerPool() returns the allocation.
243 * A call can be on several queue's (but only one at a time). When
244 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
245 * that no one else is touching the queue. To this end, we store the address
246 * of the queue lock in the call structure (under the call lock) when we
247 * put the call on a queue, and we clear the call_queue_lock when the
248 * call is removed from a queue (once the call lock has been obtained).
249 * This allows rxi_ResetCall to safely synchronize with others wishing
250 * to manipulate the queue.
253 #ifdef RX_ENABLE_LOCKS
254 static int rxi_ServerThreadSelectingCall;
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
260 ** pretty good that the next packet coming in is from the same connection
261 ** as the last packet, since we're send multiple packets in a transmit window.
263 struct rx_connection *rxLastConn = 0;
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
268 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269 * call->lock - locks call data fields.
270 * Most any other lock - these are all independent of each other.....
272 * rx_freeCallQueue_lock
274 * rx_connHashTable_lock
277 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
280 * peer_lock - locks peer data fields.
281 * conn_data_lock - that more than one thread is not updating a conn data
282 * field at the same time.
283 * Do we need a lock to protect the peer field in the conn structure?
284 * conn->peer was previously a constant for all intents and so has no
285 * lock protecting this field. The multihomed client delta introduced
286 * a RX code change : change the peer field in the connection structure
287 * to that remote inetrface from which the last packet for this
288 * connection was sent out. This may become an issue if further changes
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 struct rx_serverQueueEntry *rx_waitForPacket = 0;
306 /* ------------Exported Interfaces------------- */
308 /* This function allows rxkad to set the epoch to a suitably random number
309 * which rx_NewConnection will use in the future. The principle purpose is to
310 * get rxnull connections to use the same epoch as the rxkad connections do, at
311 * least once the first rxkad connection is established. This is important now
312 * that the host/port addresses aren't used in FindConnection: the uniqueness
313 * of epoch/cid matters and the start time won't do. */
315 #ifdef AFS_PTHREAD_ENV
317 * This mutex protects the following global variables:
321 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
322 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
326 #endif /* AFS_PTHREAD_ENV */
328 void rx_SetEpoch (epoch)
336 /* Initialize rx. A port number may be mentioned, in which case this
337 * becomes the default port number for any service installed later.
338 * If 0 is provided for the port number, a random port will be chosen
339 * by the kernel. Whether this will ever overlap anything in
340 * /etc/services is anybody's guess... Returns 0 on success, -1 on
342 static int rxinit_status = 1;
343 #ifdef AFS_PTHREAD_ENV
345 * This mutex protects the following global variables:
349 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
353 #define UNLOCK_RX_INIT
356 int rx_Init(u_int port)
363 char *htable, *ptable;
366 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
367 __djgpp_set_quiet_socket(1);
374 if (rxinit_status == 0) {
375 tmp_status = rxinit_status;
377 return tmp_status; /* Already started; return previous error code. */
381 if (afs_winsockInit()<0)
387 * Initialize anything necessary to provide a non-premptive threading
390 rxi_InitializeThreadSupport();
393 /* Allocate and initialize a socket for client and perhaps server
396 rx_socket = rxi_GetUDPSocket((u_short)port);
397 if (rx_socket == OSI_NULLSOCKET) {
403 #ifdef RX_ENABLE_LOCKS
406 #endif /* RX_LOCKS_DB */
407 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
413 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
414 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
415 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
418 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
420 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
421 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
423 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
424 #endif /* KERNEL && AFS_HPUX110_ENV */
425 #else /* RX_ENABLE_LOCKS */
426 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
427 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
428 #endif /* AFS_GLOBAL_SUNLOCK */
429 #endif /* RX_ENABLE_LOCKS */
432 rx_connDeadTime = 12;
433 rx_tranquil = 0; /* reset flag */
434 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
436 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
437 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
438 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
439 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
440 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
441 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
443 /* Malloc up a bunch of packets & buffers */
445 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
446 queue_Init(&rx_freePacketQueue);
447 rxi_NeedMorePackets = FALSE;
448 rxi_MorePackets(rx_nPackets);
456 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
457 tv.tv_sec = clock_now.sec;
458 tv.tv_usec = clock_now.usec;
459 srand((unsigned int) tv.tv_usec);
466 #if defined(KERNEL) && !defined(UKERNEL)
467 /* Really, this should never happen in a real kernel */
470 struct sockaddr_in addr;
471 int addrlen = sizeof(addr);
472 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
476 rx_port = addr.sin_port;
479 rx_stats.minRtt.sec = 9999999;
481 rx_SetEpoch (tv.tv_sec | 0x80000000);
483 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
484 * will provide a randomer value. */
486 MUTEX_ENTER(&rx_stats_mutex);
487 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
488 MUTEX_EXIT(&rx_stats_mutex);
489 /* *Slightly* random start time for the cid. This is just to help
490 * out with the hashing function at the peer */
491 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
492 rx_connHashTable = (struct rx_connection **) htable;
493 rx_peerHashTable = (struct rx_peer **) ptable;
495 rx_lastAckDelay.sec = 0;
496 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
497 rx_hardAckDelay.sec = 0;
498 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
499 rx_softAckDelay.sec = 0;
500 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
502 rxevent_Init(20, rxi_ReScheduleEvents);
504 /* Initialize various global queues */
505 queue_Init(&rx_idleServerQueue);
506 queue_Init(&rx_incomingCallQueue);
507 queue_Init(&rx_freeCallQueue);
509 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
510 /* Initialize our list of usable IP addresses. */
514 /* Start listener process (exact function is dependent on the
515 * implementation environment--kernel or user space) */
520 tmp_status = rxinit_status = 0;
525 /* called with unincremented nRequestsRunning to see if it is OK to start
526 * a new thread in this service. Could be "no" for two reasons: over the
527 * max quota, or would prevent others from reaching their min quota.
529 #ifdef RX_ENABLE_LOCKS
530 /* This verion of QuotaOK reserves quota if it's ok while the
531 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
533 static int QuotaOK(aservice)
534 register struct rx_service *aservice;
536 /* check if over max quota */
537 if (aservice->nRequestsRunning >= aservice->maxProcs) {
541 /* under min quota, we're OK */
542 /* otherwise, can use only if there are enough to allow everyone
543 * to go to their min quota after this guy starts.
545 MUTEX_ENTER(&rx_stats_mutex);
546 if ((aservice->nRequestsRunning < aservice->minProcs) ||
547 (rxi_availProcs > rxi_minDeficit)) {
548 aservice->nRequestsRunning++;
549 /* just started call in minProcs pool, need fewer to maintain
551 if (aservice->nRequestsRunning <= aservice->minProcs)
554 MUTEX_EXIT(&rx_stats_mutex);
557 MUTEX_EXIT(&rx_stats_mutex);
561 static void ReturnToServerPool(aservice)
562 register struct rx_service *aservice;
564 aservice->nRequestsRunning--;
565 MUTEX_ENTER(&rx_stats_mutex);
566 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
568 MUTEX_EXIT(&rx_stats_mutex);
571 #else /* RX_ENABLE_LOCKS */
572 static int QuotaOK(aservice)
573 register struct rx_service *aservice; {
575 /* under min quota, we're OK */
576 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
578 /* check if over max quota */
579 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
581 /* otherwise, can use only if there are enough to allow everyone
582 * to go to their min quota after this guy starts.
584 if (rxi_availProcs > rxi_minDeficit) rc = 1;
587 #endif /* RX_ENABLE_LOCKS */
590 /* Called by rx_StartServer to start up lwp's to service calls.
591 NExistingProcs gives the number of procs already existing, and which
592 therefore needn't be created. */
593 void rxi_StartServerProcs(nExistingProcs)
596 register struct rx_service *service;
601 /* For each service, reserve N processes, where N is the "minimum"
602 number of processes that MUST be able to execute a request in parallel,
603 at any time, for that process. Also compute the maximum difference
604 between any service's maximum number of processes that can run
605 (i.e. the maximum number that ever will be run, and a guarantee
606 that this number will run if other services aren't running), and its
607 minimum number. The result is the extra number of processes that
608 we need in order to provide the latter guarantee */
609 for (i=0; i<RX_MAX_SERVICES; i++) {
611 service = rx_services[i];
612 if (service == (struct rx_service *) 0) break;
613 nProcs += service->minProcs;
614 diff = service->maxProcs - service->minProcs;
615 if (diff > maxdiff) maxdiff = diff;
617 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
618 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
619 for (i = 0; i<nProcs; i++) {
620 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
625 /* This routine must be called if any services are exported. If the
626 * donateMe flag is set, the calling process is donated to the server
628 void rx_StartServer(donateMe)
630 register struct rx_service *service;
631 register int i, nProcs=0;
637 /* Start server processes, if necessary (exact function is dependent
638 * on the implementation environment--kernel or user space). DonateMe
639 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
640 * case, one less new proc will be created rx_StartServerProcs.
642 rxi_StartServerProcs(donateMe);
644 /* count up the # of threads in minProcs, and add set the min deficit to
645 * be that value, too.
647 for (i=0; i<RX_MAX_SERVICES; i++) {
648 service = rx_services[i];
649 if (service == (struct rx_service *) 0) break;
650 MUTEX_ENTER(&rx_stats_mutex);
651 rxi_totalMin += service->minProcs;
652 /* below works even if a thread is running, since minDeficit would
653 * still have been decremented and later re-incremented.
655 rxi_minDeficit += service->minProcs;
656 MUTEX_EXIT(&rx_stats_mutex);
659 /* Turn on reaping of idle server connections */
660 rxi_ReapConnections();
669 #ifdef AFS_PTHREAD_ENV
671 pid = (pid_t) pthread_self();
672 #else /* AFS_PTHREAD_ENV */
674 LWP_CurrentProcess(&pid);
675 #endif /* AFS_PTHREAD_ENV */
677 sprintf(name,"srv_%d", ++nProcs);
679 (*registerProgram)(pid, name);
681 #endif /* AFS_NT40_ENV */
682 rx_ServerProc(); /* Never returns */
687 /* Create a new client connection to the specified service, using the
688 * specified security object to implement the security model for this
690 struct rx_connection *
691 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
692 register afs_uint32 shost; /* Server host */
693 u_short sport; /* Server port */
694 u_short sservice; /* Server service id */
695 register struct rx_securityClass *securityObject;
696 int serviceSecurityIndex;
700 register struct rx_connection *conn;
705 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
706 shost, sport, sservice, securityObject, serviceSecurityIndex));
708 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
709 * the case of kmem_alloc? */
710 conn = rxi_AllocConnection();
711 #ifdef RX_ENABLE_LOCKS
712 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
713 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
714 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
718 MUTEX_ENTER(&rx_connHashTable_lock);
719 cid = (rx_nextCid += RX_MAXCALLS);
720 conn->type = RX_CLIENT_CONNECTION;
722 conn->epoch = rx_epoch;
723 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
724 conn->serviceId = sservice;
725 conn->securityObject = securityObject;
726 /* This doesn't work in all compilers with void (they're buggy), so fake it
728 conn->securityData = (VOID *) 0;
729 conn->securityIndex = serviceSecurityIndex;
730 rx_SetConnDeadTime(conn, rx_connDeadTime);
731 conn->ackRate = RX_FAST_ACK_RATE;
733 conn->specific = NULL;
734 conn->challengeEvent = (struct rxevent *)0;
735 conn->delayedAbortEvent = (struct rxevent *)0;
736 conn->abortCount = 0;
739 RXS_NewConnection(securityObject, conn);
740 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
742 conn->refCount++; /* no lock required since only this thread knows... */
743 conn->next = rx_connHashTable[hashindex];
744 rx_connHashTable[hashindex] = conn;
745 MUTEX_ENTER(&rx_stats_mutex);
746 rx_stats.nClientConns++;
747 MUTEX_EXIT(&rx_stats_mutex);
749 MUTEX_EXIT(&rx_connHashTable_lock);
755 void rx_SetConnDeadTime(conn, seconds)
756 register struct rx_connection *conn;
757 register int seconds;
759 /* The idea is to set the dead time to a value that allows several
760 * keepalives to be dropped without timing out the connection. */
761 conn->secondsUntilDead = MAX(seconds, 6);
762 conn->secondsUntilPing = conn->secondsUntilDead/6;
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
769 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770 * NOTE: must not be called with rx_connHashTable_lock held.
772 void rxi_CleanupConnection(conn)
773 struct rx_connection *conn;
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(conn)
830 register struct rx_connection *conn;
832 MUTEX_ENTER(&rx_connHashTable_lock);
833 rxi_DestroyConnectionNoLock(conn);
834 /* conn should be at the head of the cleanup list */
835 if (conn == rx_connCleanup_list) {
836 rx_connCleanup_list = rx_connCleanup_list->next;
837 MUTEX_EXIT(&rx_connHashTable_lock);
838 rxi_CleanupConnection(conn);
840 #ifdef RX_ENABLE_LOCKS
842 MUTEX_EXIT(&rx_connHashTable_lock);
844 #endif /* RX_ENABLE_LOCKS */
847 static void rxi_DestroyConnectionNoLock(conn)
848 register struct rx_connection *conn;
850 register struct rx_connection **conn_ptr;
851 register int havecalls = 0;
852 struct rx_packet *packet;
859 MUTEX_ENTER(&conn->conn_data_lock);
860 if (conn->refCount > 0)
863 MUTEX_ENTER(&rx_stats_mutex);
864 rxi_lowConnRefCount++;
865 MUTEX_EXIT(&rx_stats_mutex);
868 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
869 /* Busy; wait till the last guy before proceeding */
870 MUTEX_EXIT(&conn->conn_data_lock);
875 /* If the client previously called rx_NewCall, but it is still
876 * waiting, treat this as a running call, and wait to destroy the
877 * connection later when the call completes. */
878 if ((conn->type == RX_CLIENT_CONNECTION) &&
879 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
880 conn->flags |= RX_CONN_DESTROY_ME;
881 MUTEX_EXIT(&conn->conn_data_lock);
885 MUTEX_EXIT(&conn->conn_data_lock);
887 /* Check for extant references to this connection */
888 for (i = 0; i<RX_MAXCALLS; i++) {
889 register struct rx_call *call = conn->call[i];
892 if (conn->type == RX_CLIENT_CONNECTION) {
893 MUTEX_ENTER(&call->lock);
894 if (call->delayedAckEvent) {
895 /* Push the final acknowledgment out now--there
896 * won't be a subsequent call to acknowledge the
897 * last reply packets */
898 rxevent_Cancel(call->delayedAckEvent, call,
899 RX_CALL_REFCOUNT_DELAY);
900 rxi_AckAll((struct rxevent *)0, call, 0);
902 MUTEX_EXIT(&call->lock);
906 #ifdef RX_ENABLE_LOCKS
908 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
909 MUTEX_EXIT(&conn->conn_data_lock);
912 /* Someone is accessing a packet right now. */
916 #endif /* RX_ENABLE_LOCKS */
919 /* Don't destroy the connection if there are any call
920 * structures still in use */
921 MUTEX_ENTER(&conn->conn_data_lock);
922 conn->flags |= RX_CONN_DESTROY_ME;
923 MUTEX_EXIT(&conn->conn_data_lock);
928 if (conn->delayedAbortEvent) {
929 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
930 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
932 MUTEX_ENTER(&conn->conn_data_lock);
933 rxi_SendConnectionAbort(conn, packet, 0, 1);
934 MUTEX_EXIT(&conn->conn_data_lock);
935 rxi_FreePacket(packet);
939 /* Remove from connection hash table before proceeding */
940 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
941 conn->epoch, conn->type) ];
942 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
943 if (*conn_ptr == conn) {
944 *conn_ptr = conn->next;
948 /* if the conn that we are destroying was the last connection, then we
949 * clear rxLastConn as well */
950 if ( rxLastConn == conn )
953 /* Make sure the connection is completely reset before deleting it. */
954 /* get rid of pending events that could zap us later */
955 if (conn->challengeEvent) {
956 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
959 /* Add the connection to the list of destroyed connections that
960 * need to be cleaned up. This is necessary to avoid deadlocks
961 * in the routines we call to inform others that this connection is
962 * being destroyed. */
963 conn->next = rx_connCleanup_list;
964 rx_connCleanup_list = conn;
967 /* Externally available version */
968 void rx_DestroyConnection(conn)
969 register struct rx_connection *conn;
975 rxi_DestroyConnection (conn);
980 /* Start a new rx remote procedure call, on the specified connection.
981 * If wait is set to 1, wait for a free call channel; otherwise return
982 * 0. Maxtime gives the maximum number of seconds this call may take,
983 * after rx_MakeCall returns. After this time interval, a call to any
984 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
985 * For fine grain locking, we hold the conn_call_lock in order to
986 * to ensure that we don't get signalle after we found a call in an active
987 * state and before we go to sleep.
989 struct rx_call *rx_NewCall(conn)
990 register struct rx_connection *conn;
993 register struct rx_call *call;
994 struct clock queueTime;
998 dpf (("rx_MakeCall(conn %x)\n", conn));
1001 clock_GetTime(&queueTime);
1003 MUTEX_ENTER(&conn->conn_call_lock);
1005 for (i=0; i<RX_MAXCALLS; i++) {
1006 call = conn->call[i];
1008 MUTEX_ENTER(&call->lock);
1009 if (call->state == RX_STATE_DALLY) {
1010 rxi_ResetCall(call, 0);
1011 (*call->callNumber)++;
1014 MUTEX_EXIT(&call->lock);
1017 call = rxi_NewCall(conn, i);
1018 MUTEX_ENTER(&call->lock);
1022 if (i < RX_MAXCALLS) {
1025 MUTEX_ENTER(&conn->conn_data_lock);
1026 conn->flags |= RX_CONN_MAKECALL_WAITING;
1027 MUTEX_EXIT(&conn->conn_data_lock);
1028 #ifdef RX_ENABLE_LOCKS
1029 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1035 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1037 /* Client is initially in send mode */
1038 call->state = RX_STATE_ACTIVE;
1039 call->mode = RX_MODE_SENDING;
1041 /* remember start time for call in case we have hard dead time limit */
1042 call->queueTime = queueTime;
1043 clock_GetTime(&call->startTime);
1044 hzero(call->bytesSent);
1045 hzero(call->bytesRcvd);
1047 /* Turn on busy protocol. */
1048 rxi_KeepAliveOn(call);
1050 MUTEX_EXIT(&call->lock);
1051 MUTEX_EXIT(&conn->conn_call_lock);
1055 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1056 /* Now, if TQ wasn't cleared earlier, do it now. */
1058 MUTEX_ENTER(&call->lock);
1059 while (call->flags & RX_CALL_TQ_BUSY) {
1060 call->flags |= RX_CALL_TQ_WAIT;
1061 #ifdef RX_ENABLE_LOCKS
1062 CV_WAIT(&call->cv_tq, &call->lock);
1063 #else /* RX_ENABLE_LOCKS */
1064 osi_rxSleep(&call->tq);
1065 #endif /* RX_ENABLE_LOCKS */
1067 if (call->flags & RX_CALL_TQ_CLEARME) {
1068 rxi_ClearTransmitQueue(call, 0);
1069 queue_Init(&call->tq);
1071 MUTEX_EXIT(&call->lock);
1073 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1079 rxi_HasActiveCalls(aconn)
1080 register struct rx_connection *aconn; {
1082 register struct rx_call *tcall;
1086 for(i=0; i<RX_MAXCALLS; i++) {
1087 if ((tcall = aconn->call[i])) {
1088 if ((tcall->state == RX_STATE_ACTIVE)
1089 || (tcall->state == RX_STATE_PRECALL)) {
1100 rxi_GetCallNumberVector(aconn, aint32s)
1101 register struct rx_connection *aconn;
1102 register afs_int32 *aint32s; {
1104 register struct rx_call *tcall;
1108 for(i=0; i<RX_MAXCALLS; i++) {
1109 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1110 aint32s[i] = aconn->callNumber[i]+1;
1112 aint32s[i] = aconn->callNumber[i];
1119 rxi_SetCallNumberVector(aconn, aint32s)
1120 register struct rx_connection *aconn;
1121 register afs_int32 *aint32s; {
1123 register struct rx_call *tcall;
1127 for(i=0; i<RX_MAXCALLS; i++) {
1128 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1129 aconn->callNumber[i] = aint32s[i] - 1;
1131 aconn->callNumber[i] = aint32s[i];
1137 /* Advertise a new service. A service is named locally by a UDP port
1138 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1141 rx_NewService(port, serviceId, serviceName, securityObjects,
1142 nSecurityObjects, serviceProc)
1145 char *serviceName; /* Name for identification purposes (e.g. the
1146 * service name might be used for probing for
1148 struct rx_securityClass **securityObjects;
1149 int nSecurityObjects;
1150 afs_int32 (*serviceProc)();
1152 osi_socket socket = OSI_NULLSOCKET;
1153 register struct rx_service *tservice;
1159 if (serviceId == 0) {
1160 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1166 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1173 tservice = rxi_AllocService();
1176 for (i = 0; i<RX_MAX_SERVICES; i++) {
1177 register struct rx_service *service = rx_services[i];
1179 if (port == service->servicePort) {
1180 if (service->serviceId == serviceId) {
1181 /* The identical service has already been
1182 * installed; if the caller was intending to
1183 * change the security classes used by this
1184 * service, he/she loses. */
1185 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1188 rxi_FreeService(tservice);
1191 /* Different service, same port: re-use the socket
1192 * which is bound to the same port */
1193 socket = service->socket;
1196 if (socket == OSI_NULLSOCKET) {
1197 /* If we don't already have a socket (from another
1198 * service on same port) get a new one */
1199 socket = rxi_GetUDPSocket(port);
1200 if (socket == OSI_NULLSOCKET) {
1203 rxi_FreeService(tservice);
1208 service->socket = socket;
1209 service->servicePort = port;
1210 service->serviceId = serviceId;
1211 service->serviceName = serviceName;
1212 service->nSecurityObjects = nSecurityObjects;
1213 service->securityObjects = securityObjects;
1214 service->minProcs = 0;
1215 service->maxProcs = 1;
1216 service->idleDeadTime = 60;
1217 service->connDeadTime = rx_connDeadTime;
1218 service->executeRequestProc = serviceProc;
1219 rx_services[i] = service; /* not visible until now */
1227 rxi_FreeService(tservice);
1228 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1232 /* Generic request processing loop. This routine should be called
1233 * by the implementation dependent rx_ServerProc. If socketp is
1234 * non-null, it will be set to the file descriptor that this thread
1235 * is now listening on. If socketp is null, this routine will never
1237 void rxi_ServerProc(threadID, newcall, socketp)
1239 struct rx_call *newcall;
1240 osi_socket *socketp;
1242 register struct rx_call *call;
1243 register afs_int32 code;
1244 register struct rx_service *tservice = NULL;
1251 call = rx_GetCall(threadID, tservice, socketp);
1252 if (socketp && *socketp != OSI_NULLSOCKET) {
1253 /* We are now a listener thread */
1258 /* if server is restarting( typically smooth shutdown) then do not
1259 * allow any new calls.
1262 if ( rx_tranquil && (call != NULL) ) {
1267 MUTEX_ENTER(&call->lock);
1269 rxi_CallError(call, RX_RESTARTING);
1270 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1272 MUTEX_EXIT(&call->lock);
1278 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1279 #ifdef RX_ENABLE_LOCKS
1281 #endif /* RX_ENABLE_LOCKS */
1282 afs_termState = AFSOP_STOP_AFS;
1283 afs_osi_Wakeup(&afs_termState);
1284 #ifdef RX_ENABLE_LOCKS
1286 #endif /* RX_ENABLE_LOCKS */
1291 tservice = call->conn->service;
1293 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1295 code = call->conn->service->executeRequestProc(call);
1297 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1299 rx_EndCall(call, code);
1300 MUTEX_ENTER(&rx_stats_mutex);
1302 MUTEX_EXIT(&rx_stats_mutex);
1307 void rx_WakeupServerProcs()
1309 struct rx_serverQueueEntry *np, *tqp;
1314 MUTEX_ENTER(&rx_serverPool_lock);
1316 #ifdef RX_ENABLE_LOCKS
1317 if (rx_waitForPacket)
1318 CV_BROADCAST(&rx_waitForPacket->cv);
1319 #else /* RX_ENABLE_LOCKS */
1320 if (rx_waitForPacket)
1321 osi_rxWakeup(rx_waitForPacket);
1322 #endif /* RX_ENABLE_LOCKS */
1323 MUTEX_ENTER(&freeSQEList_lock);
1324 for (np = rx_FreeSQEList; np; np = tqp) {
1325 tqp = *(struct rx_serverQueueEntry **)np;
1326 #ifdef RX_ENABLE_LOCKS
1327 CV_BROADCAST(&np->cv);
1328 #else /* RX_ENABLE_LOCKS */
1330 #endif /* RX_ENABLE_LOCKS */
1332 MUTEX_EXIT(&freeSQEList_lock);
1333 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1334 #ifdef RX_ENABLE_LOCKS
1335 CV_BROADCAST(&np->cv);
1336 #else /* RX_ENABLE_LOCKS */
1338 #endif /* RX_ENABLE_LOCKS */
1340 MUTEX_EXIT(&rx_serverPool_lock);
1346 * One thing that seems to happen is that all the server threads get
1347 * tied up on some empty or slow call, and then a whole bunch of calls
1348 * arrive at once, using up the packet pool, so now there are more
1349 * empty calls. The most critical resources here are server threads
1350 * and the free packet pool. The "doreclaim" code seems to help in
1351 * general. I think that eventually we arrive in this state: there
1352 * are lots of pending calls which do have all their packets present,
1353 * so they won't be reclaimed, are multi-packet calls, so they won't
1354 * be scheduled until later, and thus are tying up most of the free
1355 * packet pool for a very long time.
1357 * 1. schedule multi-packet calls if all the packets are present.
1358 * Probably CPU-bound operation, useful to return packets to pool.
1359 * Do what if there is a full window, but the last packet isn't here?
1360 * 3. preserve one thread which *only* runs "best" calls, otherwise
1361 * it sleeps and waits for that type of call.
1362 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1363 * the current dataquota business is badly broken. The quota isn't adjusted
1364 * to reflect how many packets are presently queued for a running call.
1365 * So, when we schedule a queued call with a full window of packets queued
1366 * up for it, that *should* free up a window full of packets for other 2d-class
1367 * calls to be able to use from the packet pool. But it doesn't.
1369 * NB. Most of the time, this code doesn't run -- since idle server threads
1370 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1371 * as a new call arrives.
1373 /* Sleep until a call arrives. Returns a pointer to the call, ready
1374 * for an rx_Read. */
1375 #ifdef RX_ENABLE_LOCKS
1377 rx_GetCall(tno, cur_service, socketp)
1379 struct rx_service *cur_service;
1380 osi_socket *socketp;
1382 struct rx_serverQueueEntry *sq;
1383 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1384 struct rx_service *service = NULL;
1387 MUTEX_ENTER(&freeSQEList_lock);
1389 if ((sq = rx_FreeSQEList)) {
1390 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1391 MUTEX_EXIT(&freeSQEList_lock);
1392 } else { /* otherwise allocate a new one and return that */
1393 MUTEX_EXIT(&freeSQEList_lock);
1394 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1395 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1396 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1399 MUTEX_ENTER(&rx_serverPool_lock);
1400 if (cur_service != NULL) {
1401 ReturnToServerPool(cur_service);
1404 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1405 register struct rx_call *tcall, *ncall;
1406 choice2 = (struct rx_call *) 0;
1407 /* Scan for eligible incoming calls. A call is not eligible
1408 * if the maximum number of calls for its service type are
1409 * already executing */
1410 /* One thread will process calls FCFS (to prevent starvation),
1411 * while the other threads may run ahead looking for calls which
1412 * have all their input data available immediately. This helps
1413 * keep threads from blocking, waiting for data from the client. */
1414 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1415 service = tcall->conn->service;
1416 if (!QuotaOK(service)) {
1419 if (!tno || !tcall->queue_item_header.next ) {
1420 /* If we're thread 0, then we'll just use
1421 * this call. If we haven't been able to find an optimal
1422 * choice, and we're at the end of the list, then use a
1423 * 2d choice if one has been identified. Otherwise... */
1424 call = (choice2 ? choice2 : tcall);
1425 service = call->conn->service;
1426 } else if (!queue_IsEmpty(&tcall->rq)) {
1427 struct rx_packet *rp;
1428 rp = queue_First(&tcall->rq, rx_packet);
1429 if (rp->header.seq == 1) {
1430 if (!meltdown_1pkt ||
1431 (rp->header.flags & RX_LAST_PACKET)) {
1433 } else if (rxi_2dchoice && !choice2 &&
1434 !(tcall->flags & RX_CALL_CLEARED) &&
1435 (tcall->rprev > rxi_HardAckRate)) {
1437 } else rxi_md2cnt++;
1443 ReturnToServerPool(service);
1450 rxi_ServerThreadSelectingCall = 1;
1451 MUTEX_EXIT(&rx_serverPool_lock);
1452 MUTEX_ENTER(&call->lock);
1453 MUTEX_ENTER(&rx_serverPool_lock);
1455 if (queue_IsEmpty(&call->rq) ||
1456 queue_First(&call->rq, rx_packet)->header.seq != 1)
1457 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1459 CLEAR_CALL_QUEUE_LOCK(call);
1461 MUTEX_EXIT(&call->lock);
1462 ReturnToServerPool(service);
1463 rxi_ServerThreadSelectingCall = 0;
1464 CV_SIGNAL(&rx_serverPool_cv);
1465 call = (struct rx_call*)0;
1468 call->flags &= (~RX_CALL_WAIT_PROC);
1469 MUTEX_ENTER(&rx_stats_mutex);
1471 MUTEX_EXIT(&rx_stats_mutex);
1472 rxi_ServerThreadSelectingCall = 0;
1473 CV_SIGNAL(&rx_serverPool_cv);
1474 MUTEX_EXIT(&rx_serverPool_lock);
1478 /* If there are no eligible incoming calls, add this process
1479 * to the idle server queue, to wait for one */
1483 *socketp = OSI_NULLSOCKET;
1485 sq->socketp = socketp;
1486 queue_Append(&rx_idleServerQueue, sq);
1487 #ifndef AFS_AIX41_ENV
1488 rx_waitForPacket = sq;
1489 #endif /* AFS_AIX41_ENV */
1491 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1493 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1494 MUTEX_EXIT(&rx_serverPool_lock);
1495 return (struct rx_call *)0;
1498 } while (!(call = sq->newcall) &&
1499 !(socketp && *socketp != OSI_NULLSOCKET));
1500 MUTEX_EXIT(&rx_serverPool_lock);
1502 MUTEX_ENTER(&call->lock);
1508 MUTEX_ENTER(&freeSQEList_lock);
1509 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1510 rx_FreeSQEList = sq;
1511 MUTEX_EXIT(&freeSQEList_lock);
1514 clock_GetTime(&call->startTime);
1515 call->state = RX_STATE_ACTIVE;
1516 call->mode = RX_MODE_RECEIVING;
1518 rxi_calltrace(RX_CALL_START, call);
1519 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1520 call->conn->service->servicePort,
1521 call->conn->service->serviceId, call));
1523 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1524 MUTEX_EXIT(&call->lock);
1526 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1531 #else /* RX_ENABLE_LOCKS */
1533 rx_GetCall(tno, cur_service, socketp)
1535 struct rx_service *cur_service;
1536 osi_socket *socketp;
1538 struct rx_serverQueueEntry *sq;
1539 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1540 struct rx_service *service = NULL;
1545 MUTEX_ENTER(&freeSQEList_lock);
1547 if ((sq = rx_FreeSQEList)) {
1548 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1549 MUTEX_EXIT(&freeSQEList_lock);
1550 } else { /* otherwise allocate a new one and return that */
1551 MUTEX_EXIT(&freeSQEList_lock);
1552 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1553 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1554 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1556 MUTEX_ENTER(&sq->lock);
1558 if (cur_service != NULL) {
1559 cur_service->nRequestsRunning--;
1560 if (cur_service->nRequestsRunning < cur_service->minProcs)
1564 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1565 register struct rx_call *tcall, *ncall;
1566 /* Scan for eligible incoming calls. A call is not eligible
1567 * if the maximum number of calls for its service type are
1568 * already executing */
1569 /* One thread will process calls FCFS (to prevent starvation),
1570 * while the other threads may run ahead looking for calls which
1571 * have all their input data available immediately. This helps
1572 * keep threads from blocking, waiting for data from the client. */
1573 choice2 = (struct rx_call *) 0;
1574 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1575 service = tcall->conn->service;
1576 if (QuotaOK(service)) {
1577 if (!tno || !tcall->queue_item_header.next ) {
1578 /* If we're thread 0, then we'll just use
1579 * this call. If we haven't been able to find an optimal
1580 * choice, and we're at the end of the list, then use a
1581 * 2d choice if one has been identified. Otherwise... */
1582 call = (choice2 ? choice2 : tcall);
1583 service = call->conn->service;
1584 } else if (!queue_IsEmpty(&tcall->rq)) {
1585 struct rx_packet *rp;
1586 rp = queue_First(&tcall->rq, rx_packet);
1587 if (rp->header.seq == 1
1588 && (!meltdown_1pkt ||
1589 (rp->header.flags & RX_LAST_PACKET))) {
1591 } else if (rxi_2dchoice && !choice2 &&
1592 !(tcall->flags & RX_CALL_CLEARED) &&
1593 (tcall->rprev > rxi_HardAckRate)) {
1595 } else rxi_md2cnt++;
1605 /* we can't schedule a call if there's no data!!! */
1606 /* send an ack if there's no data, if we're missing the
1607 * first packet, or we're missing something between first
1608 * and last -- there's a "hole" in the incoming data. */
1609 if (queue_IsEmpty(&call->rq) ||
1610 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1611 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1612 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1614 call->flags &= (~RX_CALL_WAIT_PROC);
1615 service->nRequestsRunning++;
1616 /* just started call in minProcs pool, need fewer to maintain
1618 if (service->nRequestsRunning <= service->minProcs)
1622 /* MUTEX_EXIT(&call->lock); */
1625 /* If there are no eligible incoming calls, add this process
1626 * to the idle server queue, to wait for one */
1629 *socketp = OSI_NULLSOCKET;
1631 sq->socketp = socketp;
1632 queue_Append(&rx_idleServerQueue, sq);
1636 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1639 return (struct rx_call *)0;
1642 } while (!(call = sq->newcall) &&
1643 !(socketp && *socketp != OSI_NULLSOCKET));
1645 MUTEX_EXIT(&sq->lock);
1647 MUTEX_ENTER(&freeSQEList_lock);
1648 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1649 rx_FreeSQEList = sq;
1650 MUTEX_EXIT(&freeSQEList_lock);
1653 clock_GetTime(&call->startTime);
1654 call->state = RX_STATE_ACTIVE;
1655 call->mode = RX_MODE_RECEIVING;
1657 rxi_calltrace(RX_CALL_START, call);
1658 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1659 call->conn->service->servicePort,
1660 call->conn->service->serviceId, call));
1662 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1670 #endif /* RX_ENABLE_LOCKS */
1674 /* Establish a procedure to be called when a packet arrives for a
1675 * call. This routine will be called at most once after each call,
1676 * and will also be called if there is an error condition on the or
1677 * the call is complete. Used by multi rx to build a selection
1678 * function which determines which of several calls is likely to be a
1679 * good one to read from.
1680 * NOTE: the way this is currently implemented it is probably only a
1681 * good idea to (1) use it immediately after a newcall (clients only)
1682 * and (2) only use it once. Other uses currently void your warranty
1684 void rx_SetArrivalProc(call, proc, handle, arg)
1685 register struct rx_call *call;
1686 register VOID (*proc)();
1687 register VOID *handle;
1690 call->arrivalProc = proc;
1691 call->arrivalProcHandle = handle;
1692 call->arrivalProcArg = arg;
1695 /* Call is finished (possibly prematurely). Return rc to the peer, if
1696 * appropriate, and return the final error code from the conversation
1699 afs_int32 rx_EndCall(call, rc)
1700 register struct rx_call *call;
1703 register struct rx_connection *conn = call->conn;
1704 register struct rx_service *service;
1705 register struct rx_packet *tp; /* Temporary packet pointer */
1706 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1710 dpf(("rx_EndCall(call %x)\n", call));
1714 MUTEX_ENTER(&call->lock);
1716 if (rc == 0 && call->error == 0) {
1717 call->abortCode = 0;
1718 call->abortCount = 0;
1721 call->arrivalProc = (VOID (*)()) 0;
1722 if (rc && call->error == 0) {
1723 rxi_CallError(call, rc);
1724 /* Send an abort message to the peer if this error code has
1725 * only just been set. If it was set previously, assume the
1726 * peer has already been sent the error code or will request it
1728 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1730 if (conn->type == RX_SERVER_CONNECTION) {
1731 /* Make sure reply or at least dummy reply is sent */
1732 if (call->mode == RX_MODE_RECEIVING) {
1733 rxi_WriteProc(call, 0, 0);
1735 if (call->mode == RX_MODE_SENDING) {
1736 rxi_FlushWrite(call);
1738 service = conn->service;
1739 rxi_calltrace(RX_CALL_END, call);
1740 /* Call goes to hold state until reply packets are acknowledged */
1741 if (call->tfirst + call->nSoftAcked < call->tnext) {
1742 call->state = RX_STATE_HOLD;
1744 call->state = RX_STATE_DALLY;
1745 rxi_ClearTransmitQueue(call, 0);
1746 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1747 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1750 else { /* Client connection */
1752 /* Make sure server receives input packets, in the case where
1753 * no reply arguments are expected */
1754 if ((call->mode == RX_MODE_SENDING)
1755 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1756 (void) rxi_ReadProc(call, &dummy, 1);
1758 /* We need to release the call lock since it's lower than the
1759 * conn_call_lock and we don't want to hold the conn_call_lock
1760 * over the rx_ReadProc call. The conn_call_lock needs to be held
1761 * here for the case where rx_NewCall is perusing the calls on
1762 * the connection structure. We don't want to signal until
1763 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1764 * have checked this call, found it active and by the time it
1765 * goes to sleep, will have missed the signal.
1767 MUTEX_EXIT(&call->lock);
1768 MUTEX_ENTER(&conn->conn_call_lock);
1769 MUTEX_ENTER(&call->lock);
1770 MUTEX_ENTER(&conn->conn_data_lock);
1771 conn->flags |= RX_CONN_BUSY;
1772 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1773 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1774 MUTEX_EXIT(&conn->conn_data_lock);
1775 #ifdef RX_ENABLE_LOCKS
1776 CV_BROADCAST(&conn->conn_call_cv);
1781 #ifdef RX_ENABLE_LOCKS
1783 MUTEX_EXIT(&conn->conn_data_lock);
1785 #endif /* RX_ENABLE_LOCKS */
1786 call->state = RX_STATE_DALLY;
1788 error = call->error;
1790 /* currentPacket, nLeft, and NFree must be zeroed here, because
1791 * ResetCall cannot: ResetCall may be called at splnet(), in the
1792 * kernel version, and may interrupt the macros rx_Read or
1793 * rx_Write, which run at normal priority for efficiency. */
1794 if (call->currentPacket) {
1795 rxi_FreePacket(call->currentPacket);
1796 call->currentPacket = (struct rx_packet *) 0;
1797 call->nLeft = call->nFree = call->curlen = 0;
1800 call->nLeft = call->nFree = call->curlen = 0;
1802 /* Free any packets from the last call to ReadvProc/WritevProc */
1803 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1808 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1809 MUTEX_EXIT(&call->lock);
1810 if (conn->type == RX_CLIENT_CONNECTION) {
1811 MUTEX_EXIT(&conn->conn_call_lock);
1812 conn->flags &= ~RX_CONN_BUSY;
1817 * Map errors to the local host's errno.h format.
1819 error = ntoh_syserr_conv(error);
1823 #if !defined(KERNEL)
1825 /* Call this routine when shutting down a server or client (especially
1826 * clients). This will allow Rx to gracefully garbage collect server
1827 * connections, and reduce the number of retries that a server might
1828 * make to a dead client.
1829 * This is not quite right, since some calls may still be ongoing and
1830 * we can't lock them to destroy them. */
1831 void rx_Finalize() {
1832 register struct rx_connection **conn_ptr, **conn_end;
1836 if (rxinit_status == 1) {
1838 return; /* Already shutdown. */
1840 rxi_DeleteCachedConnections();
1841 if (rx_connHashTable) {
1842 MUTEX_ENTER(&rx_connHashTable_lock);
1843 for (conn_ptr = &rx_connHashTable[0],
1844 conn_end = &rx_connHashTable[rx_hashTableSize];
1845 conn_ptr < conn_end; conn_ptr++) {
1846 struct rx_connection *conn, *next;
1847 for (conn = *conn_ptr; conn; conn = next) {
1849 if (conn->type == RX_CLIENT_CONNECTION) {
1850 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1852 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1853 #ifdef RX_ENABLE_LOCKS
1854 rxi_DestroyConnectionNoLock(conn);
1855 #else /* RX_ENABLE_LOCKS */
1856 rxi_DestroyConnection(conn);
1857 #endif /* RX_ENABLE_LOCKS */
1861 #ifdef RX_ENABLE_LOCKS
1862 while (rx_connCleanup_list) {
1863 struct rx_connection *conn;
1864 conn = rx_connCleanup_list;
1865 rx_connCleanup_list = rx_connCleanup_list->next;
1866 MUTEX_EXIT(&rx_connHashTable_lock);
1867 rxi_CleanupConnection(conn);
1868 MUTEX_ENTER(&rx_connHashTable_lock);
1870 MUTEX_EXIT(&rx_connHashTable_lock);
1871 #endif /* RX_ENABLE_LOCKS */
1880 /* if we wakeup packet waiter too often, can get in loop with two
1881 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1883 rxi_PacketsUnWait() {
1885 if (!rx_waitingForPackets) {
1889 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1890 return; /* still over quota */
1893 rx_waitingForPackets = 0;
1894 #ifdef RX_ENABLE_LOCKS
1895 CV_BROADCAST(&rx_waitingForPackets_cv);
1897 osi_rxWakeup(&rx_waitingForPackets);
1903 /* ------------------Internal interfaces------------------------- */
1905 /* Return this process's service structure for the
1906 * specified socket and service */
1907 struct rx_service *rxi_FindService(socket, serviceId)
1908 register osi_socket socket;
1909 register u_short serviceId;
1911 register struct rx_service **sp;
1912 for (sp = &rx_services[0]; *sp; sp++) {
1913 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1919 /* Allocate a call structure, for the indicated channel of the
1920 * supplied connection. The mode and state of the call must be set by
1922 struct rx_call *rxi_NewCall(conn, channel)
1923 register struct rx_connection *conn;
1924 register int channel;
1926 register struct rx_call *call;
1927 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1928 register struct rx_call *cp; /* Call pointer temp */
1929 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1930 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1932 /* Grab an existing call structure, or allocate a new one.
1933 * Existing call structures are assumed to have been left reset by
1935 MUTEX_ENTER(&rx_freeCallQueue_lock);
1937 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1939 * EXCEPT that the TQ might not yet be cleared out.
1940 * Skip over those with in-use TQs.
1943 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1944 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1950 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1951 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1952 call = queue_First(&rx_freeCallQueue, rx_call);
1953 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1955 MUTEX_ENTER(&rx_stats_mutex);
1956 rx_stats.nFreeCallStructs--;
1957 MUTEX_EXIT(&rx_stats_mutex);
1958 MUTEX_EXIT(&rx_freeCallQueue_lock);
1959 MUTEX_ENTER(&call->lock);
1960 CLEAR_CALL_QUEUE_LOCK(call);
1961 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1962 /* Now, if TQ wasn't cleared earlier, do it now. */
1963 if (call->flags & RX_CALL_TQ_CLEARME) {
1964 rxi_ClearTransmitQueue(call, 0);
1965 queue_Init(&call->tq);
1967 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1968 /* Bind the call to its connection structure */
1970 rxi_ResetCall(call, 1);
1973 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1975 MUTEX_EXIT(&rx_freeCallQueue_lock);
1976 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1977 MUTEX_ENTER(&call->lock);
1978 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1979 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1980 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1982 MUTEX_ENTER(&rx_stats_mutex);
1983 rx_stats.nCallStructs++;
1984 MUTEX_EXIT(&rx_stats_mutex);
1985 /* Initialize once-only items */
1986 queue_Init(&call->tq);
1987 queue_Init(&call->rq);
1988 queue_Init(&call->iovq);
1989 /* Bind the call to its connection structure (prereq for reset) */
1991 rxi_ResetCall(call, 1);
1993 call->channel = channel;
1994 call->callNumber = &conn->callNumber[channel];
1995 /* Note that the next expected call number is retained (in
1996 * conn->callNumber[i]), even if we reallocate the call structure
1998 conn->call[channel] = call;
1999 /* if the channel's never been used (== 0), we should start at 1, otherwise
2000 the call number is valid from the last time this channel was used */
2001 if (*call->callNumber == 0) *call->callNumber = 1;
2003 MUTEX_EXIT(&call->lock);
2007 /* A call has been inactive long enough that so we can throw away
2008 * state, including the call structure, which is placed on the call
2010 * Call is locked upon entry.
2012 #ifdef RX_ENABLE_LOCKS
2013 void rxi_FreeCall(call, haveCTLock)
2014 int haveCTLock; /* Set if called from rxi_ReapConnections */
2015 #else /* RX_ENABLE_LOCKS */
2016 void rxi_FreeCall(call)
2017 #endif /* RX_ENABLE_LOCKS */
2018 register struct rx_call *call;
2020 register int channel = call->channel;
2021 register struct rx_connection *conn = call->conn;
2024 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2025 (*call->callNumber)++;
2026 rxi_ResetCall(call, 0);
2027 call->conn->call[channel] = (struct rx_call *) 0;
2029 MUTEX_ENTER(&rx_freeCallQueue_lock);
2030 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2031 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2032 /* A call may be free even though its transmit queue is still in use.
2033 * Since we search the call list from head to tail, put busy calls at
2034 * the head of the list, and idle calls at the tail.
2036 if (call->flags & RX_CALL_TQ_BUSY)
2037 queue_Prepend(&rx_freeCallQueue, call);
2039 queue_Append(&rx_freeCallQueue, call);
2040 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2041 queue_Append(&rx_freeCallQueue, call);
2042 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2043 MUTEX_ENTER(&rx_stats_mutex);
2044 rx_stats.nFreeCallStructs++;
2045 MUTEX_EXIT(&rx_stats_mutex);
2047 MUTEX_EXIT(&rx_freeCallQueue_lock);
2049 /* Destroy the connection if it was previously slated for
2050 * destruction, i.e. the Rx client code previously called
2051 * rx_DestroyConnection (client connections), or
2052 * rxi_ReapConnections called the same routine (server
2053 * connections). Only do this, however, if there are no
2054 * outstanding calls. Note that for fine grain locking, there appears
2055 * to be a deadlock in that rxi_FreeCall has a call locked and
2056 * DestroyConnectionNoLock locks each call in the conn. But note a
2057 * few lines up where we have removed this call from the conn.
2058 * If someone else destroys a connection, they either have no
2059 * call lock held or are going through this section of code.
2061 if (conn->flags & RX_CONN_DESTROY_ME) {
2062 MUTEX_ENTER(&conn->conn_data_lock);
2064 MUTEX_EXIT(&conn->conn_data_lock);
2065 #ifdef RX_ENABLE_LOCKS
2067 rxi_DestroyConnectionNoLock(conn);
2069 rxi_DestroyConnection(conn);
2070 #else /* RX_ENABLE_LOCKS */
2071 rxi_DestroyConnection(conn);
2072 #endif /* RX_ENABLE_LOCKS */
2076 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2077 char *rxi_Alloc(size)
2078 register size_t size;
2082 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2083 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2086 int glockOwner = ISAFS_GLOCK();
2090 MUTEX_ENTER(&rx_stats_mutex);
2091 rxi_Alloccnt++; rxi_Allocsize += size;
2092 MUTEX_EXIT(&rx_stats_mutex);
2093 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2094 if (size > AFS_SMALLOCSIZ) {
2095 p = (char *) osi_AllocMediumSpace(size);
2097 p = (char *) osi_AllocSmall(size, 1);
2098 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2103 p = (char *) osi_Alloc(size);
2105 if (!p) osi_Panic("rxi_Alloc error");
2110 void rxi_Free(addr, size)
2112 register size_t size;
2114 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2115 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2118 int glockOwner = ISAFS_GLOCK();
2122 MUTEX_ENTER(&rx_stats_mutex);
2123 rxi_Alloccnt--; rxi_Allocsize -= size;
2124 MUTEX_EXIT(&rx_stats_mutex);
2125 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2126 if (size > AFS_SMALLOCSIZ)
2127 osi_FreeMediumSpace(addr);
2129 osi_FreeSmall(addr);
2130 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2135 osi_Free(addr, size);
2139 /* Find the peer process represented by the supplied (host,port)
2140 * combination. If there is no appropriate active peer structure, a
2141 * new one will be allocated and initialized
2142 * The origPeer, if set, is a pointer to a peer structure on which the
2143 * refcount will be be decremented. This is used to replace the peer
2144 * structure hanging off a connection structure */
2145 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2146 register afs_uint32 host;
2147 register u_short port;
2148 struct rx_peer *origPeer;
2151 register struct rx_peer *pp;
2153 hashIndex = PEER_HASH(host, port);
2154 MUTEX_ENTER(&rx_peerHashTable_lock);
2155 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2156 if ((pp->host == host) && (pp->port == port)) break;
2160 pp = rxi_AllocPeer(); /* This bzero's *pp */
2161 pp->host = host; /* set here or in InitPeerParams is zero */
2163 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2164 queue_Init(&pp->congestionQueue);
2165 queue_Init(&pp->rpcStats);
2166 pp->next = rx_peerHashTable[hashIndex];
2167 rx_peerHashTable[hashIndex] = pp;
2168 rxi_InitPeerParams(pp);
2169 MUTEX_ENTER(&rx_stats_mutex);
2170 rx_stats.nPeerStructs++;
2171 MUTEX_EXIT(&rx_stats_mutex);
2178 origPeer->refCount--;
2179 MUTEX_EXIT(&rx_peerHashTable_lock);
2184 /* Find the connection at (host, port) started at epoch, and with the
2185 * given connection id. Creates the server connection if necessary.
2186 * The type specifies whether a client connection or a server
2187 * connection is desired. In both cases, (host, port) specify the
2188 * peer's (host, pair) pair. Client connections are not made
2189 * automatically by this routine. The parameter socket gives the
2190 * socket descriptor on which the packet was received. This is used,
2191 * in the case of server connections, to check that *new* connections
2192 * come via a valid (port, serviceId). Finally, the securityIndex
2193 * parameter must match the existing index for the connection. If a
2194 * server connection is created, it will be created using the supplied
2195 * index, if the index is valid for this service */
2196 struct rx_connection *
2197 rxi_FindConnection(socket, host, port, serviceId, cid,
2198 epoch, type, securityIndex)
2200 register afs_int32 host;
2201 register u_short port;
2206 u_int securityIndex;
2208 int hashindex, flag;
2209 register struct rx_connection *conn;
2210 struct rx_peer *peer;
2211 hashindex = CONN_HASH(host, port, cid, epoch, type);
2212 MUTEX_ENTER(&rx_connHashTable_lock);
2213 rxLastConn ? (conn = rxLastConn, flag = 0) :
2214 (conn = rx_connHashTable[hashindex], flag = 1);
2216 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2217 && (epoch == conn->epoch)) {
2218 register struct rx_peer *pp = conn->peer;
2219 if (securityIndex != conn->securityIndex) {
2220 /* this isn't supposed to happen, but someone could forge a packet
2221 like this, and there seems to be some CM bug that makes this
2222 happen from time to time -- in which case, the fileserver
2224 MUTEX_EXIT(&rx_connHashTable_lock);
2225 return (struct rx_connection *) 0;
2227 /* epoch's high order bits mean route for security reasons only on
2228 * the cid, not the host and port fields.
2230 if (conn->epoch & 0x80000000) break;
2231 if (((type == RX_CLIENT_CONNECTION)
2232 || (pp->host == host)) && (pp->port == port))
2237 /* the connection rxLastConn that was used the last time is not the
2238 ** one we are looking for now. Hence, start searching in the hash */
2240 conn = rx_connHashTable[hashindex];
2246 struct rx_service *service;
2247 if (type == RX_CLIENT_CONNECTION) {
2248 MUTEX_EXIT(&rx_connHashTable_lock);
2249 return (struct rx_connection *) 0;
2251 service = rxi_FindService(socket, serviceId);
2252 if (!service || (securityIndex >= service->nSecurityObjects)
2253 || (service->securityObjects[securityIndex] == 0)) {
2254 MUTEX_EXIT(&rx_connHashTable_lock);
2255 return (struct rx_connection *) 0;
2257 conn = rxi_AllocConnection(); /* This bzero's the connection */
2258 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2260 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2262 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2263 conn->next = rx_connHashTable[hashindex];
2264 rx_connHashTable[hashindex] = conn;
2265 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2266 conn->type = RX_SERVER_CONNECTION;
2267 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2268 conn->epoch = epoch;
2269 conn->cid = cid & RX_CIDMASK;
2270 /* conn->serial = conn->lastSerial = 0; */
2271 /* conn->timeout = 0; */
2272 conn->ackRate = RX_FAST_ACK_RATE;
2273 conn->service = service;
2274 conn->serviceId = serviceId;
2275 conn->securityIndex = securityIndex;
2276 conn->securityObject = service->securityObjects[securityIndex];
2277 conn->nSpecific = 0;
2278 conn->specific = NULL;
2279 rx_SetConnDeadTime(conn, service->connDeadTime);
2280 /* Notify security object of the new connection */
2281 RXS_NewConnection(conn->securityObject, conn);
2282 /* XXXX Connection timeout? */
2283 if (service->newConnProc) (*service->newConnProc)(conn);
2284 MUTEX_ENTER(&rx_stats_mutex);
2285 rx_stats.nServerConns++;
2286 MUTEX_EXIT(&rx_stats_mutex);
2290 /* Ensure that the peer structure is set up in such a way that
2291 ** replies in this connection go back to that remote interface
2292 ** from which the last packet was sent out. In case, this packet's
2293 ** source IP address does not match the peer struct for this conn,
2294 ** then drop the refCount on conn->peer and get a new peer structure.
2295 ** We can check the host,port field in the peer structure without the
2296 ** rx_peerHashTable_lock because the peer structure has its refCount
2297 ** incremented and the only time the host,port in the peer struct gets
2298 ** updated is when the peer structure is created.
2300 if (conn->peer->host == host )
2301 peer = conn->peer; /* no change to the peer structure */
2303 peer = rxi_FindPeer(host, port, conn->peer, 1);
2306 MUTEX_ENTER(&conn->conn_data_lock);
2309 MUTEX_EXIT(&conn->conn_data_lock);
2311 rxLastConn = conn; /* store this connection as the last conn used */
2312 MUTEX_EXIT(&rx_connHashTable_lock);
2316 /* There are two packet tracing routines available for testing and monitoring
2317 * Rx. One is called just after every packet is received and the other is
2318 * called just before every packet is sent. Received packets, have had their
2319 * headers decoded, and packets to be sent have not yet had their headers
2320 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2321 * containing the network address. Both can be modified. The return value, if
2322 * non-zero, indicates that the packet should be dropped. */
2324 int (*rx_justReceived)() = 0;
2325 int (*rx_almostSent)() = 0;
2327 /* A packet has been received off the interface. Np is the packet, socket is
2328 * the socket number it was received from (useful in determining which service
2329 * this packet corresponds to), and (host, port) reflect the host,port of the
2330 * sender. This call returns the packet to the caller if it is finished with
2331 * it, rather than de-allocating it, just as a small performance hack */
2333 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2334 register struct rx_packet *np;
2339 struct rx_call **newcallp;
2341 register struct rx_call *call;
2342 register struct rx_connection *conn;
2344 afs_uint32 currentCallNumber;
2350 struct rx_packet *tnp;
2353 /* We don't print out the packet until now because (1) the time may not be
2354 * accurate enough until now in the lwp implementation (rx_Listener only gets
2355 * the time after the packet is read) and (2) from a protocol point of view,
2356 * this is the first time the packet has been seen */
2357 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2358 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2359 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2360 np->header.serial, packetType, host, port, np->header.serviceId,
2361 np->header.epoch, np->header.cid, np->header.callNumber,
2362 np->header.seq, np->header.flags, np));
2365 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2366 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2369 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2370 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2373 /* If an input tracer function is defined, call it with the packet and
2374 * network address. Note this function may modify its arguments. */
2375 if (rx_justReceived) {
2376 struct sockaddr_in addr;
2378 addr.sin_family = AF_INET;
2379 addr.sin_port = port;
2380 addr.sin_addr.s_addr = host;
2381 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2382 addr.sin_len = sizeof(addr);
2383 #endif /* AFS_OSF_ENV */
2384 drop = (*rx_justReceived) (np, &addr);
2385 /* drop packet if return value is non-zero */
2386 if (drop) return np;
2387 port = addr.sin_port; /* in case fcn changed addr */
2388 host = addr.sin_addr.s_addr;
2392 /* If packet was not sent by the client, then *we* must be the client */
2393 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2394 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2396 /* Find the connection (or fabricate one, if we're the server & if
2397 * necessary) associated with this packet */
2398 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2399 np->header.cid, np->header.epoch, type,
2400 np->header.securityIndex);
2403 /* If no connection found or fabricated, just ignore the packet.
2404 * (An argument could be made for sending an abort packet for
2409 MUTEX_ENTER(&conn->conn_data_lock);
2410 if (conn->maxSerial < np->header.serial)
2411 conn->maxSerial = np->header.serial;
2412 MUTEX_EXIT(&conn->conn_data_lock);
2414 /* If the connection is in an error state, send an abort packet and ignore
2415 * the incoming packet */
2417 /* Don't respond to an abort packet--we don't want loops! */
2418 MUTEX_ENTER(&conn->conn_data_lock);
2419 if (np->header.type != RX_PACKET_TYPE_ABORT)
2420 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2422 MUTEX_EXIT(&conn->conn_data_lock);
2426 /* Check for connection-only requests (i.e. not call specific). */
2427 if (np->header.callNumber == 0) {
2428 switch (np->header.type) {
2429 case RX_PACKET_TYPE_ABORT:
2430 /* What if the supplied error is zero? */
2431 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2432 MUTEX_ENTER(&conn->conn_data_lock);
2434 MUTEX_EXIT(&conn->conn_data_lock);
2436 case RX_PACKET_TYPE_CHALLENGE:
2437 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2438 MUTEX_ENTER(&conn->conn_data_lock);
2440 MUTEX_EXIT(&conn->conn_data_lock);
2442 case RX_PACKET_TYPE_RESPONSE:
2443 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2444 MUTEX_ENTER(&conn->conn_data_lock);
2446 MUTEX_EXIT(&conn->conn_data_lock);
2448 case RX_PACKET_TYPE_PARAMS:
2449 case RX_PACKET_TYPE_PARAMS+1:
2450 case RX_PACKET_TYPE_PARAMS+2:
2451 /* ignore these packet types for now */
2452 MUTEX_ENTER(&conn->conn_data_lock);
2454 MUTEX_EXIT(&conn->conn_data_lock);
2459 /* Should not reach here, unless the peer is broken: send an
2461 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2462 MUTEX_ENTER(&conn->conn_data_lock);
2463 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2465 MUTEX_EXIT(&conn->conn_data_lock);
2470 channel = np->header.cid & RX_CHANNELMASK;
2471 call = conn->call[channel];
2472 #ifdef RX_ENABLE_LOCKS
2474 MUTEX_ENTER(&call->lock);
2475 /* Test to see if call struct is still attached to conn. */
2476 if (call != conn->call[channel]) {
2478 MUTEX_EXIT(&call->lock);
2479 if (type == RX_SERVER_CONNECTION) {
2480 call = conn->call[channel];
2481 /* If we started with no call attached and there is one now,
2482 * another thread is also running this routine and has gotten
2483 * the connection channel. We should drop this packet in the tests
2484 * below. If there was a call on this connection and it's now
2485 * gone, then we'll be making a new call below.
2486 * If there was previously a call and it's now different then
2487 * the old call was freed and another thread running this routine
2488 * has created a call on this channel. One of these two threads
2489 * has a packet for the old call and the code below handles those
2493 MUTEX_ENTER(&call->lock);
2496 /* This packet can't be for this call. If the new call address is
2497 * 0 then no call is running on this channel. If there is a call
2498 * then, since this is a client connection we're getting data for
2499 * it must be for the previous call.
2501 MUTEX_ENTER(&rx_stats_mutex);
2502 rx_stats.spuriousPacketsRead++;
2503 MUTEX_EXIT(&rx_stats_mutex);
2504 MUTEX_ENTER(&conn->conn_data_lock);
2506 MUTEX_EXIT(&conn->conn_data_lock);
2511 currentCallNumber = conn->callNumber[channel];
2513 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2514 if (np->header.callNumber < currentCallNumber) {
2515 MUTEX_ENTER(&rx_stats_mutex);
2516 rx_stats.spuriousPacketsRead++;
2517 MUTEX_EXIT(&rx_stats_mutex);
2518 #ifdef RX_ENABLE_LOCKS
2520 MUTEX_EXIT(&call->lock);
2522 MUTEX_ENTER(&conn->conn_data_lock);
2524 MUTEX_EXIT(&conn->conn_data_lock);
2528 call = rxi_NewCall(conn, channel);
2529 MUTEX_ENTER(&call->lock);
2530 *call->callNumber = np->header.callNumber;
2531 call->state = RX_STATE_PRECALL;
2532 clock_GetTime(&call->queueTime);
2533 hzero(call->bytesSent);
2534 hzero(call->bytesRcvd);
2535 rxi_KeepAliveOn(call);
2537 else if (np->header.callNumber != currentCallNumber) {
2538 /* Wait until the transmit queue is idle before deciding
2539 * whether to reset the current call. Chances are that the
2540 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2543 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2544 while ((call->state == RX_STATE_ACTIVE) &&
2545 (call->flags & RX_CALL_TQ_BUSY)) {
2546 call->flags |= RX_CALL_TQ_WAIT;
2547 #ifdef RX_ENABLE_LOCKS
2548 CV_WAIT(&call->cv_tq, &call->lock);
2549 #else /* RX_ENABLE_LOCKS */
2550 osi_rxSleep(&call->tq);
2551 #endif /* RX_ENABLE_LOCKS */
2553 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2554 /* If the new call cannot be taken right now send a busy and set
2555 * the error condition in this call, so that it terminates as
2556 * quickly as possible */
2557 if (call->state == RX_STATE_ACTIVE) {
2558 struct rx_packet *tp;
2560 rxi_CallError(call, RX_CALL_DEAD);
2561 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2562 MUTEX_EXIT(&call->lock);
2563 MUTEX_ENTER(&conn->conn_data_lock);
2565 MUTEX_EXIT(&conn->conn_data_lock);
2568 rxi_ResetCall(call, 0);
2569 *call->callNumber = np->header.callNumber;
2570 call->state = RX_STATE_PRECALL;
2571 clock_GetTime(&call->queueTime);
2572 hzero(call->bytesSent);
2573 hzero(call->bytesRcvd);
2575 * If the number of queued calls exceeds the overload
2576 * threshold then abort this call.
2578 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2579 struct rx_packet *tp;
2581 rxi_CallError(call, rx_BusyError);
2582 tp = rxi_SendCallAbort(call, np, 1, 0);
2583 MUTEX_EXIT(&call->lock);
2584 MUTEX_ENTER(&conn->conn_data_lock);
2586 MUTEX_EXIT(&conn->conn_data_lock);
2589 rxi_KeepAliveOn(call);
2592 /* Continuing call; do nothing here. */
2594 } else { /* we're the client */
2595 /* Ignore all incoming acknowledgements for calls in DALLY state */
2596 if ( call && (call->state == RX_STATE_DALLY)
2597 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2598 MUTEX_ENTER(&rx_stats_mutex);
2599 rx_stats.ignorePacketDally++;
2600 MUTEX_EXIT(&rx_stats_mutex);
2601 #ifdef RX_ENABLE_LOCKS
2603 MUTEX_EXIT(&call->lock);
2606 MUTEX_ENTER(&conn->conn_data_lock);
2608 MUTEX_EXIT(&conn->conn_data_lock);
2612 /* Ignore anything that's not relevant to the current call. If there
2613 * isn't a current call, then no packet is relevant. */
2614 if (!call || (np->header.callNumber != currentCallNumber)) {
2615 MUTEX_ENTER(&rx_stats_mutex);
2616 rx_stats.spuriousPacketsRead++;
2617 MUTEX_EXIT(&rx_stats_mutex);
2618 #ifdef RX_ENABLE_LOCKS
2620 MUTEX_EXIT(&call->lock);
2623 MUTEX_ENTER(&conn->conn_data_lock);
2625 MUTEX_EXIT(&conn->conn_data_lock);
2628 /* If the service security object index stamped in the packet does not
2629 * match the connection's security index, ignore the packet */
2630 if (np->header.securityIndex != conn->securityIndex) {
2631 #ifdef RX_ENABLE_LOCKS
2632 MUTEX_EXIT(&call->lock);
2634 MUTEX_ENTER(&conn->conn_data_lock);
2636 MUTEX_EXIT(&conn->conn_data_lock);
2640 /* If we're receiving the response, then all transmit packets are
2641 * implicitly acknowledged. Get rid of them. */
2642 if (np->header.type == RX_PACKET_TYPE_DATA) {
2643 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2644 /* XXX Hack. Because we must release the global rx lock when
2645 * sending packets (osi_NetSend) we drop all acks while we're
2646 * traversing the tq in rxi_Start sending packets out because
2647 * packets may move to the freePacketQueue as result of being here!
2648 * So we drop these packets until we're safely out of the
2649 * traversing. Really ugly!
2650 * For fine grain RX locking, we set the acked field in the
2651 * packets and let rxi_Start remove them from the transmit queue.
2653 if (call->flags & RX_CALL_TQ_BUSY) {
2654 #ifdef RX_ENABLE_LOCKS
2655 rxi_SetAcksInTransmitQueue(call);
2658 return np; /* xmitting; drop packet */
2662 rxi_ClearTransmitQueue(call, 0);
2664 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2665 rxi_ClearTransmitQueue(call, 0);
2666 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2668 if (np->header.type == RX_PACKET_TYPE_ACK) {
2669 /* now check to see if this is an ack packet acknowledging that the
2670 * server actually *lost* some hard-acked data. If this happens we
2671 * ignore this packet, as it may indicate that the server restarted in
2672 * the middle of a call. It is also possible that this is an old ack
2673 * packet. We don't abort the connection in this case, because this
2674 * *might* just be an old ack packet. The right way to detect a server
2675 * restart in the midst of a call is to notice that the server epoch
2677 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2678 * XXX unacknowledged. I think that this is off-by-one, but
2679 * XXX I don't dare change it just yet, since it will
2680 * XXX interact badly with the server-restart detection
2681 * XXX code in receiveackpacket. */
2682 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2683 MUTEX_ENTER(&rx_stats_mutex);
2684 rx_stats.spuriousPacketsRead++;
2685 MUTEX_EXIT(&rx_stats_mutex);
2686 MUTEX_EXIT(&call->lock);
2687 MUTEX_ENTER(&conn->conn_data_lock);
2689 MUTEX_EXIT(&conn->conn_data_lock);
2693 } /* else not a data packet */
2696 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2697 /* Set remote user defined status from packet */
2698 call->remoteStatus = np->header.userStatus;
2700 /* Note the gap between the expected next packet and the actual
2701 * packet that arrived, when the new packet has a smaller serial number
2702 * than expected. Rioses frequently reorder packets all by themselves,
2703 * so this will be quite important with very large window sizes.
2704 * Skew is checked against 0 here to avoid any dependence on the type of
2705 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2707 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2708 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2709 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2711 MUTEX_ENTER(&conn->conn_data_lock);
2712 skew = conn->lastSerial - np->header.serial;
2713 conn->lastSerial = np->header.serial;
2714 MUTEX_EXIT(&conn->conn_data_lock);
2716 register struct rx_peer *peer;
2718 if (skew > peer->inPacketSkew) {
2719 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2720 peer->inPacketSkew = skew;
2724 /* Now do packet type-specific processing */
2725 switch (np->header.type) {
2726 case RX_PACKET_TYPE_DATA:
2727 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2730 case RX_PACKET_TYPE_ACK:
2731 /* Respond immediately to ack packets requesting acknowledgement
2733 if (np->header.flags & RX_REQUEST_ACK) {
2734 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2735 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2737 np = rxi_ReceiveAckPacket(call, np, 1);
2739 case RX_PACKET_TYPE_ABORT:
2740 /* An abort packet: reset the connection, passing the error up to
2742 /* What if error is zero? */
2743 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2745 case RX_PACKET_TYPE_BUSY:
2748 case RX_PACKET_TYPE_ACKALL:
2749 /* All packets acknowledged, so we can drop all packets previously
2750 * readied for sending */
2751 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2752 /* XXX Hack. We because we can't release the global rx lock when
2753 * sending packets (osi_NetSend) we drop all ack pkts while we're
2754 * traversing the tq in rxi_Start sending packets out because
2755 * packets may move to the freePacketQueue as result of being
2756 * here! So we drop these packets until we're safely out of the
2757 * traversing. Really ugly!
2758 * For fine grain RX locking, we set the acked field in the packets
2759 * and let rxi_Start remove the packets from the transmit queue.
2761 if (call->flags & RX_CALL_TQ_BUSY) {
2762 #ifdef RX_ENABLE_LOCKS
2763 rxi_SetAcksInTransmitQueue(call);
2765 #else /* RX_ENABLE_LOCKS */
2767 return np; /* xmitting; drop packet */
2768 #endif /* RX_ENABLE_LOCKS */
2770 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2771 rxi_ClearTransmitQueue(call, 0);
2774 /* Should not reach here, unless the peer is broken: send an abort
2776 rxi_CallError(call, RX_PROTOCOL_ERROR);
2777 np = rxi_SendCallAbort(call, np, 1, 0);
2780 /* Note when this last legitimate packet was received, for keep-alive
2781 * processing. Note, we delay getting the time until now in the hope that
2782 * the packet will be delivered to the user before any get time is required
2783 * (if not, then the time won't actually be re-evaluated here). */
2784 call->lastReceiveTime = clock_Sec();
2785 MUTEX_EXIT(&call->lock);
2786 MUTEX_ENTER(&conn->conn_data_lock);
2788 MUTEX_EXIT(&conn->conn_data_lock);
2792 /* return true if this is an "interesting" connection from the point of view
2793 of someone trying to debug the system */
2794 int rxi_IsConnInteresting(struct rx_connection *aconn)
2797 register struct rx_call *tcall;
2799 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2801 for(i=0;i<RX_MAXCALLS;i++) {
2802 tcall = aconn->call[i];
2804 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2806 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2814 /* if this is one of the last few packets AND it wouldn't be used by the
2815 receiving call to immediately satisfy a read request, then drop it on
2816 the floor, since accepting it might prevent a lock-holding thread from
2817 making progress in its reading. If a call has been cleared while in
2818 the precall state then ignore all subsequent packets until the call
2819 is assigned to a thread. */
2821 static TooLow(ap, acall)
2822 struct rx_call *acall;
2823 struct rx_packet *ap; {
2825 MUTEX_ENTER(&rx_stats_mutex);
2826 if (((ap->header.seq != 1) &&
2827 (acall->flags & RX_CALL_CLEARED) &&
2828 (acall->state == RX_STATE_PRECALL)) ||
2829 ((rx_nFreePackets < rxi_dataQuota+2) &&
2830 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2831 && (acall->flags & RX_CALL_READER_WAIT)))) {
2834 MUTEX_EXIT(&rx_stats_mutex);
2839 /* try to attach call, if authentication is complete */
2840 static void TryAttach(acall, socket, tnop, newcallp)
2841 register struct rx_call *acall;
2842 register osi_socket socket;
2844 register struct rx_call **newcallp; {
2845 register struct rx_connection *conn;
2847 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2848 /* Don't attach until we have any req'd. authentication. */
2849 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2850 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2851 /* Note: this does not necessarily succeed; there
2852 may not any proc available */
2855 rxi_ChallengeOn(acall->conn);
2860 /* A data packet has been received off the interface. This packet is
2861 * appropriate to the call (the call is in the right state, etc.). This
2862 * routine can return a packet to the caller, for re-use */
2864 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2865 port, tnop, newcallp)
2866 register struct rx_call *call;
2867 register struct rx_packet *np;
2873 struct rx_call **newcallp;
2879 afs_uint32 seq, serial, flags;
2881 struct rx_packet *tnp;
2883 MUTEX_ENTER(&rx_stats_mutex);
2884 rx_stats.dataPacketsRead++;
2885 MUTEX_EXIT(&rx_stats_mutex);
2888 /* If there are no packet buffers, drop this new packet, unless we can find
2889 * packet buffers from inactive calls */
2891 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2892 MUTEX_ENTER(&rx_freePktQ_lock);
2893 rxi_NeedMorePackets = TRUE;
2894 MUTEX_EXIT(&rx_freePktQ_lock);
2895 MUTEX_ENTER(&rx_stats_mutex);
2896 rx_stats.noPacketBuffersOnRead++;
2897 MUTEX_EXIT(&rx_stats_mutex);
2898 call->rprev = np->header.serial;
2899 rxi_calltrace(RX_TRACE_DROP, call);
2900 dpf (("packet %x dropped on receipt - quota problems", np));
2902 rxi_ClearReceiveQueue(call);
2903 clock_GetTime(&when);
2904 clock_Add(&when, &rx_softAckDelay);
2905 if (!call->delayedAckEvent ||
2906 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2907 rxevent_Cancel(call->delayedAckEvent, call,
2908 RX_CALL_REFCOUNT_DELAY);
2909 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2910 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2913 /* we've damaged this call already, might as well do it in. */
2919 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2920 * packet is one of several packets transmitted as a single
2921 * datagram. Do not send any soft or hard acks until all packets
2922 * in a jumbogram have been processed. Send negative acks right away.
2924 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2925 /* tnp is non-null when there are more packets in the
2926 * current jumbo gram */
2933 seq = np->header.seq;
2934 serial = np->header.serial;
2935 flags = np->header.flags;
2937 /* If the call is in an error state, send an abort message */
2939 return rxi_SendCallAbort(call, np, istack, 0);
2941 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2942 * AFS 3.5 jumbogram. */
2943 if (flags & RX_JUMBO_PACKET) {
2944 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2949 if (np->header.spare != 0) {
2950 MUTEX_ENTER(&call->conn->conn_data_lock);
2951 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2952 MUTEX_EXIT(&call->conn->conn_data_lock);
2955 /* The usual case is that this is the expected next packet */
2956 if (seq == call->rnext) {
2958 /* Check to make sure it is not a duplicate of one already queued */
2959 if (queue_IsNotEmpty(&call->rq)
2960 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2961 MUTEX_ENTER(&rx_stats_mutex);
2962 rx_stats.dupPacketsRead++;
2963 MUTEX_EXIT(&rx_stats_mutex);
2964 dpf (("packet %x dropped on receipt - duplicate", np));
2965 rxevent_Cancel(call->delayedAckEvent, call,
2966 RX_CALL_REFCOUNT_DELAY);
2967 np = rxi_SendAck(call, np, seq, serial,
2968 flags, RX_ACK_DUPLICATE, istack);
2974 /* It's the next packet. Stick it on the receive queue
2975 * for this call. Set newPackets to make sure we wake
2976 * the reader once all packets have been processed */
2977 queue_Prepend(&call->rq, np);
2979 np = NULL; /* We can't use this anymore */
2982 /* If an ack is requested then set a flag to make sure we
2983 * send an acknowledgement for this packet */
2984 if (flags & RX_REQUEST_ACK) {
2988 /* Keep track of whether we have received the last packet */
2989 if (flags & RX_LAST_PACKET) {
2990 call->flags |= RX_CALL_HAVE_LAST;
2994 /* Check whether we have all of the packets for this call */
2995 if (call->flags & RX_CALL_HAVE_LAST) {
2996 afs_uint32 tseq; /* temporary sequence number */
2997 struct rx_packet *tp; /* Temporary packet pointer */
2998 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3000 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3001 if (tseq != tp->header.seq)
3003 if (tp->header.flags & RX_LAST_PACKET) {
3004 call->flags |= RX_CALL_RECEIVE_DONE;
3011 /* Provide asynchronous notification for those who want it
3012 * (e.g. multi rx) */
3013 if (call->arrivalProc) {
3014 (*call->arrivalProc)(call, call->arrivalProcHandle,
3015 call->arrivalProcArg);
3016 call->arrivalProc = (VOID (*)()) 0;
3019 /* Update last packet received */
3022 /* If there is no server process serving this call, grab
3023 * one, if available. We only need to do this once. If a
3024 * server thread is available, this thread becomes a server
3025 * thread and the server thread becomes a listener thread. */
3027 TryAttach(call, socket, tnop, newcallp);
3030 /* This is not the expected next packet. */
3032 /* Determine whether this is a new or old packet, and if it's
3033 * a new one, whether it fits into the current receive window.
3034 * Also figure out whether the packet was delivered in sequence.
3035 * We use the prev variable to determine whether the new packet
3036 * is the successor of its immediate predecessor in the
3037 * receive queue, and the missing flag to determine whether
3038 * any of this packets predecessors are missing. */
3040 afs_uint32 prev; /* "Previous packet" sequence number */
3041 struct rx_packet *tp; /* Temporary packet pointer */
3042 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3043 int missing; /* Are any predecessors missing? */
3045 /* If the new packet's sequence number has been sent to the
3046 * application already, then this is a duplicate */
3047 if (seq < call->rnext) {
3048 MUTEX_ENTER(&rx_stats_mutex);
3049 rx_stats.dupPacketsRead++;
3050 MUTEX_EXIT(&rx_stats_mutex);
3051 rxevent_Cancel(call->delayedAckEvent, call,
3052 RX_CALL_REFCOUNT_DELAY);
3053 np = rxi_SendAck(call, np, seq, serial,
3054 flags, RX_ACK_DUPLICATE, istack);
3060 /* If the sequence number is greater than what can be
3061 * accomodated by the current window, then send a negative
3062 * acknowledge and drop the packet */
3063 if ((call->rnext + call->rwind) <= seq) {
3064 rxevent_Cancel(call->delayedAckEvent, call,
3065 RX_CALL_REFCOUNT_DELAY);
3066 np = rxi_SendAck(call, np, seq, serial,
3067 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3073 /* Look for the packet in the queue of old received packets */
3074 for (prev = call->rnext - 1, missing = 0,
3075 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3076 /*Check for duplicate packet */
3077 if (seq == tp->header.seq) {
3078 MUTEX_ENTER(&rx_stats_mutex);
3079 rx_stats.dupPacketsRead++;
3080 MUTEX_EXIT(&rx_stats_mutex);
3081 rxevent_Cancel(call->delayedAckEvent, call,
3082 RX_CALL_REFCOUNT_DELAY);
3083 np = rxi_SendAck(call, np, seq, serial,
3084 flags, RX_ACK_DUPLICATE, istack);
3089 /* If we find a higher sequence packet, break out and
3090 * insert the new packet here. */
3091 if (seq < tp->header.seq) break;
3092 /* Check for missing packet */
3093 if (tp->header.seq != prev+1) {
3097 prev = tp->header.seq;
3100 /* Keep track of whether we have received the last packet. */
3101 if (flags & RX_LAST_PACKET) {
3102 call->flags |= RX_CALL_HAVE_LAST;
3105 /* It's within the window: add it to the the receive queue.
3106 * tp is left by the previous loop either pointing at the
3107 * packet before which to insert the new packet, or at the
3108 * queue head if the queue is empty or the packet should be
3110 queue_InsertBefore(tp, np);
3114 /* Check whether we have all of the packets for this call */
3115 if ((call->flags & RX_CALL_HAVE_LAST)
3116 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3117 afs_uint32 tseq; /* temporary sequence number */
3119 for (tseq = call->rnext,
3120 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3121 if (tseq != tp->header.seq)
3123 if (tp->header.flags & RX_LAST_PACKET) {
3124 call->flags |= RX_CALL_RECEIVE_DONE;
3131 /* We need to send an ack of the packet is out of sequence,
3132 * or if an ack was requested by the peer. */
3133 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3137 /* Acknowledge the last packet for each call */
3138 if (flags & RX_LAST_PACKET) {
3149 * If the receiver is waiting for an iovec, fill the iovec
3150 * using the data from the receive queue */
3151 if (call->flags & RX_CALL_IOVEC_WAIT) {
3152 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3153 /* the call may have been aborted */
3162 /* Wakeup the reader if any */
3163 if ((call->flags & RX_CALL_READER_WAIT) &&
3164 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3165 (call->iovNext >= call->iovMax) ||
3166 (call->flags & RX_CALL_RECEIVE_DONE))) {
3167 call->flags &= ~RX_CALL_READER_WAIT;
3168 #ifdef RX_ENABLE_LOCKS
3169 CV_BROADCAST(&call->cv_rq);
3171 osi_rxWakeup(&call->rq);
3177 * Send an ack when requested by the peer, or once every
3178 * rxi_SoftAckRate packets until the last packet has been
3179 * received. Always send a soft ack for the last packet in
3180 * the server's reply. */
3182 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3183 np = rxi_SendAck(call, np, seq, serial, flags,
3184 RX_ACK_REQUESTED, istack);
3185 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3186 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3187 np = rxi_SendAck(call, np, seq, serial, flags,
3188 RX_ACK_IDLE, istack);
3189 } else if (call->nSoftAcks) {
3190 clock_GetTime(&when);
3191 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3192 clock_Add(&when, &rx_lastAckDelay);
3194 clock_Add(&when, &rx_softAckDelay);
3196 if (!call->delayedAckEvent ||
3197 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3198 rxevent_Cancel(call->delayedAckEvent, call,
3199 RX_CALL_REFCOUNT_DELAY);
3200 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3201 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3204 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3205 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3212 static void rxi_ComputeRate();
3215 /* The real smarts of the whole thing. */
3216 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3217 register struct rx_call *call;
3218 struct rx_packet *np;
3221 struct rx_ackPacket *ap;
3223 register struct rx_packet *tp;
3224 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3225 register struct rx_connection *conn = call->conn;
3226 struct rx_peer *peer = conn->peer;
3229 /* because there are CM's that are bogus, sending weird values for this. */
3230 afs_uint32 skew = 0;
3235 int newAckCount = 0;
3236 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3237 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3239 MUTEX_ENTER(&rx_stats_mutex);
3240 rx_stats.ackPacketsRead++;
3241 MUTEX_EXIT(&rx_stats_mutex);
3242 ap = (struct rx_ackPacket *) rx_DataOf(np);
3243 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3245 return np; /* truncated ack packet */
3247 /* depends on ack packet struct */
3248 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3249 first = ntohl(ap->firstPacket);
3250 serial = ntohl(ap->serial);
3251 /* temporarily disabled -- needs to degrade over time
3252 skew = ntohs(ap->maxSkew); */
3254 /* Ignore ack packets received out of order */
3255 if (first < call->tfirst) {
3259 if (np->header.flags & RX_SLOW_START_OK) {
3260 call->flags |= RX_CALL_SLOW_START_OK;
3266 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3267 ap->reason, ntohl(ap->previousPacket),
3268 (unsigned int) np->header.seq, (unsigned int) serial,
3269 (unsigned int) skew, ntohl(ap->firstPacket));
3272 for (offset = 0; offset < nAcks; offset++)
3273 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3279 /* if a server connection has been re-created, it doesn't remember what
3280 serial # it was up to. An ack will tell us, since the serial field
3281 contains the largest serial received by the other side */
3282 MUTEX_ENTER(&conn->conn_data_lock);
3283 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3284 conn->serial = serial+1;
3286 MUTEX_EXIT(&conn->conn_data_lock);
3288 /* Update the outgoing packet skew value to the latest value of
3289 * the peer's incoming packet skew value. The ack packet, of
3290 * course, could arrive out of order, but that won't affect things
3292 MUTEX_ENTER(&peer->peer_lock);
3293 peer->outPacketSkew = skew;
3295 /* Check for packets that no longer need to be transmitted, and
3296 * discard them. This only applies to packets positively
3297 * acknowledged as having been sent to the peer's upper level.
3298 * All other packets must be retained. So only packets with
3299 * sequence numbers < ap->firstPacket are candidates. */
3300 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3301 if (tp->header.seq >= first) break;
3302 call->tfirst = tp->header.seq + 1;
3303 if (tp->header.serial == serial) {
3304 /* Use RTT if not delayed by client. */
3305 if (ap->reason != RX_ACK_DELAY)
3306 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3308 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3311 else if (tp->firstSerial == serial) {
3312 /* Use RTT if not delayed by client. */
3313 if (ap->reason != RX_ACK_DELAY)
3314 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3316 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3319 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3320 /* XXX Hack. Because we have to release the global rx lock when sending
3321 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3322 * in rxi_Start sending packets out because packets may move to the
3323 * freePacketQueue as result of being here! So we drop these packets until
3324 * we're safely out of the traversing. Really ugly!
3325 * To make it even uglier, if we're using fine grain locking, we can
3326 * set the ack bits in the packets and have rxi_Start remove the packets
3327 * when it's done transmitting.
3329 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3332 if (call->flags & RX_CALL_TQ_BUSY) {
3333 #ifdef RX_ENABLE_LOCKS
3334 tp->flags |= RX_PKTFLAG_ACKED;
3335 call->flags |= RX_CALL_TQ_SOME_ACKED;
3336 #else /* RX_ENABLE_LOCKS */
3338 #endif /* RX_ENABLE_LOCKS */
3340 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3343 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3348 /* Give rate detector a chance to respond to ping requests */
3349 if (ap->reason == RX_ACK_PING_RESPONSE) {
3350 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3354 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3356 /* Now go through explicit acks/nacks and record the results in
3357 * the waiting packets. These are packets that can't be released
3358 * yet, even with a positive acknowledge. This positive
3359 * acknowledge only means the packet has been received by the
3360 * peer, not that it will be retained long enough to be sent to
3361 * the peer's upper level. In addition, reset the transmit timers
3362 * of any missing packets (those packets that must be missing
3363 * because this packet was out of sequence) */
3365 call->nSoftAcked = 0;
3366 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3367 /* Update round trip time if the ack was stimulated on receipt
3369 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3370 #ifdef RX_ENABLE_LOCKS
3371 if (tp->header.seq >= first) {
3372 #endif /* RX_ENABLE_LOCKS */
3373 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3374 if (tp->header.serial == serial) {
3375 /* Use RTT if not delayed by client. */
3376 if (ap->reason != RX_ACK_DELAY)
3377 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3379 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3382 else if ((tp->firstSerial == serial)) {
3383 /* Use RTT if not delayed by client. */
3384 if (ap->reason != RX_ACK_DELAY)
3385 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3387 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3390 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3391 #ifdef RX_ENABLE_LOCKS
3393 #endif /* RX_ENABLE_LOCKS */
3394 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3396 /* Set the acknowledge flag per packet based on the
3397 * information in the ack packet. An acknowlegded packet can
3398 * be downgraded when the server has discarded a packet it
3399 * soacked previously, or when an ack packet is received
3400 * out of sequence. */
3401 if (tp->header.seq < first) {
3402 /* Implicit ack information */
3403 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3406 tp->flags |= RX_PKTFLAG_ACKED;
3408 else if (tp->header.seq < first + nAcks) {
3409 /* Explicit ack information: set it in the packet appropriately */
3410 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3411 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3413 tp->flags |= RX_PKTFLAG_ACKED;
3421 tp->flags &= ~RX_PKTFLAG_ACKED;
3426 tp->flags &= ~RX_PKTFLAG_ACKED;
3430 /* If packet isn't yet acked, and it has been transmitted at least
3431 * once, reset retransmit time using latest timeout
3432 * ie, this should readjust the retransmit timer for all outstanding
3433 * packets... So we don't just retransmit when we should know better*/
3435 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3436 tp->retryTime = tp->timeSent;
3437 clock_Add(&tp->retryTime, &peer->timeout);
3438 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3439 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3443 /* If the window has been extended by this acknowledge packet,
3444 * then wakeup a sender waiting in alloc for window space, or try
3445 * sending packets now, if he's been sitting on packets due to
3446 * lack of window space */
3447 if (call->tnext < (call->tfirst + call->twind)) {
3448 #ifdef RX_ENABLE_LOCKS
3449 CV_SIGNAL(&call->cv_twind);
3451 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3452 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3453 osi_rxWakeup(&call->twind);
3456 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3457 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3461 /* if the ack packet has a receivelen field hanging off it,
3462 * update our state */
3463 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3466 /* If the ack packet has a "recommended" size that is less than
3467 * what I am using now, reduce my size to match */
3468 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3469 sizeof(afs_int32), &tSize);
3470 tSize = (afs_uint32) ntohl(tSize);
3471 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3473 /* Get the maximum packet size to send to this peer */
3474 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3476 tSize = (afs_uint32)ntohl(tSize);
3477 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3478 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3480 /* sanity check - peer might have restarted with different params.
3481 * If peer says "send less", dammit, send less... Peer should never
3482 * be unable to accept packets of the size that prior AFS versions would
3483 * send without asking. */
3484 if (peer->maxMTU != tSize) {
3485 peer->maxMTU = tSize;
3486 peer->MTU = MIN(tSize, peer->MTU);
3487 call->MTU = MIN(call->MTU, tSize);
3491 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3493 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3494 sizeof(afs_int32), &tSize);
3495 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3496 if (tSize < call->twind) { /* smaller than our send */
3497 call->twind = tSize; /* window, we must send less... */
3498 call->ssthresh = MIN(call->twind, call->ssthresh);
3501 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3502 * network MTU confused with the loopback MTU. Calculate the
3503 * maximum MTU here for use in the slow start code below.
3505 maxMTU = peer->maxMTU;
3506 /* Did peer restart with older RX version? */
3507 if (peer->maxDgramPackets > 1) {
3508 peer->maxDgramPackets = 1;
3510 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3512 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3513 sizeof(afs_int32), &tSize);
3514 tSize = (afs_uint32) ntohl(tSize);
3516 * As of AFS 3.5 we set the send window to match the receive window.
3518 if (tSize < call->twind) {
3519 call->twind = tSize;
3520 call->ssthresh = MIN(call->twind, call->ssthresh);
3521 } else if (tSize > call->twind) {
3522 call->twind = tSize;
3526 * As of AFS 3.5, a jumbogram is more than one fixed size
3527 * packet transmitted in a single UDP datagram. If the remote
3528 * MTU is smaller than our local MTU then never send a datagram
3529 * larger than the natural MTU.
3531 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3532 sizeof(afs_int32), &tSize);
3533 maxDgramPackets = (afs_uint32) ntohl(tSize);
3534 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3535 maxDgramPackets = MIN(maxDgramPackets,
3536 (int)(peer->ifDgramPackets));
3537 maxDgramPackets = MIN(maxDgramPackets, tSize);
3538 if (maxDgramPackets > 1) {
3539 peer->maxDgramPackets = maxDgramPackets;
3540 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3542 peer->maxDgramPackets = 1;
3543 call->MTU = peer->natMTU;
3545 } else if (peer->maxDgramPackets > 1) {
3546 /* Restarted with lower version of RX */
3547 peer->maxDgramPackets = 1;
3549 } else if (peer->maxDgramPackets > 1 ||
3550 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3551 /* Restarted with lower version of RX */
3552 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3553 peer->natMTU = OLD_MAX_PACKET_SIZE;
3554 peer->MTU = OLD_MAX_PACKET_SIZE;
3555 peer->maxDgramPackets = 1;
3556 peer->nDgramPackets = 1;
3558 call->MTU = OLD_MAX_PACKET_SIZE;
3563 * Calculate how many datagrams were successfully received after
3564 * the first missing packet and adjust the negative ack counter
3569 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3570 if (call->nNacks < nNacked) {
3571 call->nNacks = nNacked;
3580 if (call->flags & RX_CALL_FAST_RECOVER) {
3582 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3584 call->flags &= ~RX_CALL_FAST_RECOVER;
3585 call->cwind = call->nextCwind;
3586 call->nextCwind = 0;
3589 call->nCwindAcks = 0;
3591 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3592 /* Three negative acks in a row trigger congestion recovery */
3593 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3594 MUTEX_EXIT(&peer->peer_lock);
3595 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3596 /* someone else is waiting to start recovery */
3599 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3600 while (call->flags & RX_CALL_TQ_BUSY) {
3601 call->flags |= RX_CALL_TQ_WAIT;
3602 #ifdef RX_ENABLE_LOCKS
3603 CV_WAIT(&call->cv_tq, &call->lock);
3604 #else /* RX_ENABLE_LOCKS */
3605 osi_rxSleep(&call->tq);
3606 #endif /* RX_ENABLE_LOCKS */
3608 MUTEX_ENTER(&peer->peer_lock);
3609 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3610 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3611 call->flags |= RX_CALL_FAST_RECOVER;
3612 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3613 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3615 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3616 call->nextCwind = call->ssthresh;
3619 peer->MTU = call->MTU;
3620 peer->cwind = call->nextCwind;
3621 peer->nDgramPackets = call->nDgramPackets;
3623 call->congestSeq = peer->congestSeq;
3624 /* Reset the resend times on the packets that were nacked
3625 * so we will retransmit as soon as the window permits*/
3626 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3628 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3629 clock_Zero(&tp->retryTime);
3631 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3636 /* If cwind is smaller than ssthresh, then increase
3637 * the window one packet for each ack we receive (exponential
3639 * If cwind is greater than or equal to ssthresh then increase
3640 * the congestion window by one packet for each cwind acks we
3641 * receive (linear growth). */
3642 if (call->cwind < call->ssthresh) {
3643 call->cwind = MIN((int)call->ssthresh,
3644 (int)(call->cwind + newAckCount));
3645 call->nCwindAcks = 0;
3647 call->nCwindAcks += newAckCount;
3648 if (call->nCwindAcks >= call->cwind) {
3649 call->nCwindAcks = 0;
3650 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3654 * If we have received several acknowledgements in a row then
3655 * it is time to increase the size of our datagrams
3657 if ((int)call->nAcks > rx_nDgramThreshold) {
3658 if (peer->maxDgramPackets > 1) {
3659 if (call->nDgramPackets < peer->maxDgramPackets) {
3660 call->nDgramPackets++;
3662 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3663 } else if (call->MTU < peer->maxMTU) {
3664 call->MTU += peer->natMTU;
3665 call->MTU = MIN(call->MTU, peer->maxMTU);
3671 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3673 /* Servers need to hold the call until all response packets have
3674 * been acknowledged. Soft acks are good enough since clients
3675 * are not allowed to clear their receive queues. */
3676 if (call->state == RX_STATE_HOLD &&
3677 call->tfirst + call->nSoftAcked >= call->tnext) {
3678 call->state = RX_STATE_DALLY;
3679 rxi_ClearTransmitQueue(call, 0);
3680 } else if (!queue_IsEmpty(&call->tq)) {