2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "afs/param.h"
16 #include <afs/param.h>
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
64 #include "rx_globals.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
116 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
118 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119 afs_int32 rxi_start_in_error;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
124 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125 * currently allocated within rx. This number is used to allocate the
126 * memory required to return the statistics when queried.
129 static unsigned int rxi_rpc_peer_stat_cnt;
132 * rxi_rpc_process_stat_cnt counts the total number of local process stat
133 * structures currently allocated within rx. The number is used to allocate
134 * the memory required to return the statistics when queried.
137 static unsigned int rxi_rpc_process_stat_cnt;
139 #if !defined(offsetof)
140 #include <stddef.h> /* for definition of offsetof() */
143 #ifdef AFS_PTHREAD_ENV
147 * Use procedural initialization of mutexes/condition variables
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
172 static void rxi_InitPthread(void) {
173 assert(pthread_mutex_init(&rx_clock_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxi_connCacheMutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_init_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&epoch_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_event_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&des_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&des_random_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&osi_malloc_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&event_handler_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&listener_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_if_init_mutex,
194 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_mutex_init(&rx_if_mutex,
196 (const pthread_mutexattr_t*)0)==0);
197 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198 (const pthread_mutexattr_t*)0)==0);
199 assert(pthread_mutex_init(&rxkad_random_mutex,
200 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_mutex_init(&rxkad_stats_mutex,
202 (const pthread_mutexattr_t*)0)==0);
203 assert(pthread_mutex_init(&rx_debug_mutex,
204 (const pthread_mutexattr_t*)0)==0);
206 assert(pthread_cond_init(&rx_event_handler_cond,
207 (const pthread_condattr_t*)0)==0);
208 assert(pthread_cond_init(&rx_listener_cond,
209 (const pthread_condattr_t*)0)==0);
210 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
217 * The rx_stats_mutex mutex protects the following global variables:
222 * rxi_lowConnRefCount
223 * rxi_lowPeerRefCount
232 #define INIT_PTHREAD_LOCKS
236 /* Variables for handling the minProcs implementation. availProcs gives the
237 * number of threads available in the pool at this moment (not counting dudes
238 * executing right now). totalMin gives the total number of procs required
239 * for handling all minProcs requests. minDeficit is a dynamic variable
240 * tracking the # of procs required to satisfy all of the remaining minProcs
242 * For fine grain locking to work, the quota check and the reservation of
243 * a server thread has to come while rxi_availProcs and rxi_minDeficit
244 * are locked. To this end, the code has been modified under #ifdef
245 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246 * same time. A new function, ReturnToServerPool() returns the allocation.
248 * A call can be on several queue's (but only one at a time). When
249 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250 * that no one else is touching the queue. To this end, we store the address
251 * of the queue lock in the call structure (under the call lock) when we
252 * put the call on a queue, and we clear the call_queue_lock when the
253 * call is removed from a queue (once the call lock has been obtained).
254 * This allows rxi_ResetCall to safely synchronize with others wishing
255 * to manipulate the queue.
258 #ifdef RX_ENABLE_LOCKS
259 static afs_kmutex_t rx_rpc_stats;
260 void rxi_StartUnlocked();
263 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
264 ** pretty good that the next packet coming in is from the same connection
265 ** as the last packet, since we're send multiple packets in a transmit window.
267 struct rx_connection *rxLastConn = 0;
269 #ifdef RX_ENABLE_LOCKS
270 /* The locking hierarchy for rx fine grain locking is composed of these
273 * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
274 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
275 * call->lock - locks call data fields.
276 * These are independent of each other:
277 * rx_freeCallQueue_lock
282 * serverQueueEntry->lock
284 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
285 * peer->lock - locks peer data fields.
286 * conn_data_lock - that more than one thread is not updating a conn data
287 * field at the same time.
295 * Do we need a lock to protect the peer field in the conn structure?
296 * conn->peer was previously a constant for all intents and so has no
297 * lock protecting this field. The multihomed client delta introduced
298 * a RX code change : change the peer field in the connection structure
299 * to that remote inetrface from which the last packet for this
300 * connection was sent out. This may become an issue if further changes
303 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
304 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
306 /* rxdb_fileID is used to identify the lock location, along with line#. */
307 static int rxdb_fileID = RXDB_FILE_RX;
308 #endif /* RX_LOCKS_DB */
309 #else /* RX_ENABLE_LOCKS */
310 #define SET_CALL_QUEUE_LOCK(C, L)
311 #define CLEAR_CALL_QUEUE_LOCK(C)
312 #endif /* RX_ENABLE_LOCKS */
313 struct rx_serverQueueEntry *rx_waitForPacket = 0;
315 /* ------------Exported Interfaces------------- */
317 /* This function allows rxkad to set the epoch to a suitably random number
318 * which rx_NewConnection will use in the future. The principle purpose is to
319 * get rxnull connections to use the same epoch as the rxkad connections do, at
320 * least once the first rxkad connection is established. This is important now
321 * that the host/port addresses aren't used in FindConnection: the uniqueness
322 * of epoch/cid matters and the start time won't do. */
324 #ifdef AFS_PTHREAD_ENV
326 * This mutex protects the following global variables:
330 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
331 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
335 #endif /* AFS_PTHREAD_ENV */
337 void rx_SetEpoch (afs_uint32 epoch)
344 /* Initialize rx. A port number may be mentioned, in which case this
345 * becomes the default port number for any service installed later.
346 * If 0 is provided for the port number, a random port will be chosen
347 * by the kernel. Whether this will ever overlap anything in
348 * /etc/services is anybody's guess... Returns 0 on success, -1 on
350 static int rxinit_status = 1;
351 #ifdef AFS_PTHREAD_ENV
353 * This mutex protects the following global variables:
357 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
358 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
361 #define UNLOCK_RX_INIT
364 int rx_Init(u_int port)
371 char *htable, *ptable;
374 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
375 __djgpp_set_quiet_socket(1);
382 if (rxinit_status == 0) {
383 tmp_status = rxinit_status;
385 return tmp_status; /* Already started; return previous error code. */
389 if (afs_winsockInit()<0)
395 * Initialize anything necessary to provide a non-premptive threading
398 rxi_InitializeThreadSupport();
401 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
403 /* Allocate and initialize a socket for client and perhaps server
406 rx_socket = rxi_GetUDPSocket((u_short)port);
407 if (rx_socket == OSI_NULLSOCKET) {
413 #ifdef RX_ENABLE_LOCKS
416 #endif /* RX_LOCKS_DB */
417 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
418 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
419 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
420 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
422 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
423 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
424 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
425 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
427 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
429 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
431 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
432 #endif /* KERNEL && AFS_HPUX110_ENV */
433 #else /* RX_ENABLE_LOCKS */
434 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
435 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
436 #endif /* AFS_GLOBAL_SUNLOCK */
437 #endif /* RX_ENABLE_LOCKS */
440 rx_connDeadTime = 12;
441 rx_tranquil = 0; /* reset flag */
442 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
444 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
445 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
446 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
447 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
448 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
449 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
451 /* Malloc up a bunch of packets & buffers */
453 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
454 queue_Init(&rx_freePacketQueue);
455 rxi_NeedMorePackets = FALSE;
456 rxi_MorePackets(rx_nPackets);
464 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
465 tv.tv_sec = clock_now.sec;
466 tv.tv_usec = clock_now.usec;
467 srand((unsigned int) tv.tv_usec);
474 #if defined(KERNEL) && !defined(UKERNEL)
475 /* Really, this should never happen in a real kernel */
478 struct sockaddr_in addr;
479 int addrlen = sizeof(addr);
480 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
484 rx_port = addr.sin_port;
487 rx_stats.minRtt.sec = 9999999;
489 rx_SetEpoch (tv.tv_sec | 0x80000000);
491 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
492 * will provide a randomer value. */
494 MUTEX_ENTER(&rx_stats_mutex);
495 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
496 MUTEX_EXIT(&rx_stats_mutex);
497 /* *Slightly* random start time for the cid. This is just to help
498 * out with the hashing function at the peer */
499 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
500 rx_connHashTable = (struct rx_connection **) htable;
501 rx_peerHashTable = (struct rx_peer **) ptable;
503 rx_lastAckDelay.sec = 0;
504 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
505 rx_hardAckDelay.sec = 0;
506 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
507 rx_softAckDelay.sec = 0;
508 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
510 rxevent_Init(20, rxi_ReScheduleEvents);
512 /* Initialize various global queues */
513 queue_Init(&rx_idleServerQueue);
514 queue_Init(&rx_incomingCallQueue);
515 queue_Init(&rx_freeCallQueue);
517 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
518 /* Initialize our list of usable IP addresses. */
522 /* Start listener process (exact function is dependent on the
523 * implementation environment--kernel or user space) */
528 tmp_status = rxinit_status = 0;
533 /* called with unincremented nRequestsRunning to see if it is OK to start
534 * a new thread in this service. Could be "no" for two reasons: over the
535 * max quota, or would prevent others from reaching their min quota.
537 #ifdef RX_ENABLE_LOCKS
538 /* This verion of QuotaOK reserves quota if it's ok while the
539 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
541 static int QuotaOK(register struct rx_service *aservice)
543 /* check if over max quota */
544 if (aservice->nRequestsRunning >= aservice->maxProcs) {
548 /* under min quota, we're OK */
549 /* otherwise, can use only if there are enough to allow everyone
550 * to go to their min quota after this guy starts.
552 MUTEX_ENTER(&rx_stats_mutex);
553 if ((aservice->nRequestsRunning < aservice->minProcs) ||
554 (rxi_availProcs > rxi_minDeficit)) {
555 aservice->nRequestsRunning++;
556 /* just started call in minProcs pool, need fewer to maintain
558 if (aservice->nRequestsRunning <= aservice->minProcs)
561 MUTEX_EXIT(&rx_stats_mutex);
564 MUTEX_EXIT(&rx_stats_mutex);
569 static void ReturnToServerPool(register struct rx_service *aservice)
571 aservice->nRequestsRunning--;
572 MUTEX_ENTER(&rx_stats_mutex);
573 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
575 MUTEX_EXIT(&rx_stats_mutex);
578 #else /* RX_ENABLE_LOCKS */
579 static int QuotaOK(register struct rx_service *aservice)
582 /* under min quota, we're OK */
583 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
585 /* check if over max quota */
586 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
588 /* otherwise, can use only if there are enough to allow everyone
589 * to go to their min quota after this guy starts.
591 if (rxi_availProcs > rxi_minDeficit) rc = 1;
594 #endif /* RX_ENABLE_LOCKS */
597 /* Called by rx_StartServer to start up lwp's to service calls.
598 NExistingProcs gives the number of procs already existing, and which
599 therefore needn't be created. */
600 void rxi_StartServerProcs(int nExistingProcs)
602 register struct rx_service *service;
607 /* For each service, reserve N processes, where N is the "minimum"
608 number of processes that MUST be able to execute a request in parallel,
609 at any time, for that process. Also compute the maximum difference
610 between any service's maximum number of processes that can run
611 (i.e. the maximum number that ever will be run, and a guarantee
612 that this number will run if other services aren't running), and its
613 minimum number. The result is the extra number of processes that
614 we need in order to provide the latter guarantee */
615 for (i=0; i<RX_MAX_SERVICES; i++) {
617 service = rx_services[i];
618 if (service == (struct rx_service *) 0) break;
619 nProcs += service->minProcs;
620 diff = service->maxProcs - service->minProcs;
621 if (diff > maxdiff) maxdiff = diff;
623 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
624 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
625 for (i = 0; i<nProcs; i++) {
626 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
631 /* This routine must be called if any services are exported. If the
632 * donateMe flag is set, the calling process is donated to the server
634 void rx_StartServer(int donateMe)
636 register struct rx_service *service;
637 register int i, nProcs=0;
643 /* Start server processes, if necessary (exact function is dependent
644 * on the implementation environment--kernel or user space). DonateMe
645 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
646 * case, one less new proc will be created rx_StartServerProcs.
648 rxi_StartServerProcs(donateMe);
650 /* count up the # of threads in minProcs, and add set the min deficit to
651 * be that value, too.
653 for (i=0; i<RX_MAX_SERVICES; i++) {
654 service = rx_services[i];
655 if (service == (struct rx_service *) 0) break;
656 MUTEX_ENTER(&rx_stats_mutex);
657 rxi_totalMin += service->minProcs;
658 /* below works even if a thread is running, since minDeficit would
659 * still have been decremented and later re-incremented.
661 rxi_minDeficit += service->minProcs;
662 MUTEX_EXIT(&rx_stats_mutex);
665 /* Turn on reaping of idle server connections */
666 rxi_ReapConnections();
675 #ifdef AFS_PTHREAD_ENV
677 pid = (pid_t) pthread_self();
678 #else /* AFS_PTHREAD_ENV */
680 LWP_CurrentProcess(&pid);
681 #endif /* AFS_PTHREAD_ENV */
683 sprintf(name,"srv_%d", ++nProcs);
685 (*registerProgram)(pid, name);
687 #endif /* AFS_NT40_ENV */
688 rx_ServerProc(); /* Never returns */
693 /* Create a new client connection to the specified service, using the
694 * specified security object to implement the security model for this
696 struct rx_connection *rx_NewConnection(register afs_uint32 shost,
697 u_short sport, u_short sservice,
698 register struct rx_securityClass *securityObject, int serviceSecurityIndex)
702 register struct rx_connection *conn;
707 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
708 shost, sport, sservice, securityObject, serviceSecurityIndex));
710 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
711 * the case of kmem_alloc? */
712 conn = rxi_AllocConnection();
713 #ifdef RX_ENABLE_LOCKS
714 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
715 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
716 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
720 MUTEX_ENTER(&rx_connHashTable_lock);
721 cid = (rx_nextCid += RX_MAXCALLS);
722 conn->type = RX_CLIENT_CONNECTION;
724 conn->epoch = rx_epoch;
725 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
726 conn->serviceId = sservice;
727 conn->securityObject = securityObject;
728 /* This doesn't work in all compilers with void (they're buggy), so fake it
730 conn->securityData = (VOID *) 0;
731 conn->securityIndex = serviceSecurityIndex;
732 rx_SetConnDeadTime(conn, rx_connDeadTime);
733 conn->ackRate = RX_FAST_ACK_RATE;
735 conn->specific = NULL;
736 conn->challengeEvent = NULL;
737 conn->delayedAbortEvent = NULL;
738 conn->abortCount = 0;
741 RXS_NewConnection(securityObject, conn);
742 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
744 conn->refCount++; /* no lock required since only this thread knows... */
745 conn->next = rx_connHashTable[hashindex];
746 rx_connHashTable[hashindex] = conn;
747 MUTEX_ENTER(&rx_stats_mutex);
748 rx_stats.nClientConns++;
749 MUTEX_EXIT(&rx_stats_mutex);
751 MUTEX_EXIT(&rx_connHashTable_lock);
757 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
759 /* The idea is to set the dead time to a value that allows several
760 * keepalives to be dropped without timing out the connection. */
761 conn->secondsUntilDead = MAX(seconds, 6);
762 conn->secondsUntilPing = conn->secondsUntilDead/6;
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
769 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770 * NOTE: must not be called with rx_connHashTable_lock held.
772 void rxi_CleanupConnection(struct rx_connection *conn)
774 /* Notify the service exporter, if requested, that this connection
775 * is being destroyed */
776 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
777 (*conn->service->destroyConnProc)(conn);
779 /* Notify the security module that this connection is being destroyed */
780 RXS_DestroyConnection(conn->securityObject, conn);
782 /* If this is the last connection using the rx_peer struct, set its
783 * idle time to now. rxi_ReapConnections will reap it if it's still
784 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
786 MUTEX_ENTER(&rx_peerHashTable_lock);
787 if (--conn->peer->refCount <= 0) {
788 conn->peer->idleWhen = clock_Sec();
789 if (conn->peer->refCount < 0) {
790 conn->peer->refCount = 0;
791 MUTEX_ENTER(&rx_stats_mutex);
792 rxi_lowPeerRefCount ++;
793 MUTEX_EXIT(&rx_stats_mutex);
796 MUTEX_EXIT(&rx_peerHashTable_lock);
798 MUTEX_ENTER(&rx_stats_mutex);
799 if (conn->type == RX_SERVER_CONNECTION)
800 rx_stats.nServerConns--;
802 rx_stats.nClientConns--;
803 MUTEX_EXIT(&rx_stats_mutex);
806 if (conn->specific) {
808 for (i = 0 ; i < conn->nSpecific ; i++) {
809 if (conn->specific[i] && rxi_keyCreate_destructor[i])
810 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
811 conn->specific[i] = NULL;
813 free(conn->specific);
815 conn->specific = NULL;
819 MUTEX_DESTROY(&conn->conn_call_lock);
820 MUTEX_DESTROY(&conn->conn_data_lock);
821 CV_DESTROY(&conn->conn_call_cv);
823 rxi_FreeConnection(conn);
826 /* Destroy the specified connection */
827 void rxi_DestroyConnection(register struct rx_connection *conn)
829 MUTEX_ENTER(&rx_connHashTable_lock);
830 rxi_DestroyConnectionNoLock(conn);
831 /* conn should be at the head of the cleanup list */
832 if (conn == rx_connCleanup_list) {
833 rx_connCleanup_list = rx_connCleanup_list->next;
834 MUTEX_EXIT(&rx_connHashTable_lock);
835 rxi_CleanupConnection(conn);
837 #ifdef RX_ENABLE_LOCKS
839 MUTEX_EXIT(&rx_connHashTable_lock);
841 #endif /* RX_ENABLE_LOCKS */
844 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
846 register struct rx_connection **conn_ptr;
847 register int havecalls = 0;
848 struct rx_packet *packet;
855 MUTEX_ENTER(&conn->conn_data_lock);
856 if (conn->refCount > 0)
859 MUTEX_ENTER(&rx_stats_mutex);
860 rxi_lowConnRefCount++;
861 MUTEX_EXIT(&rx_stats_mutex);
864 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
865 /* Busy; wait till the last guy before proceeding */
866 MUTEX_EXIT(&conn->conn_data_lock);
871 /* If the client previously called rx_NewCall, but it is still
872 * waiting, treat this as a running call, and wait to destroy the
873 * connection later when the call completes. */
874 if ((conn->type == RX_CLIENT_CONNECTION) &&
875 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
876 conn->flags |= RX_CONN_DESTROY_ME;
877 MUTEX_EXIT(&conn->conn_data_lock);
881 MUTEX_EXIT(&conn->conn_data_lock);
883 /* Check for extant references to this connection */
884 for (i = 0; i<RX_MAXCALLS; i++) {
885 register struct rx_call *call = conn->call[i];
888 if (conn->type == RX_CLIENT_CONNECTION) {
889 MUTEX_ENTER(&call->lock);
890 if (call->delayedAckEvent) {
891 /* Push the final acknowledgment out now--there
892 * won't be a subsequent call to acknowledge the
893 * last reply packets */
894 rxevent_Cancel(call->delayedAckEvent, call,
895 RX_CALL_REFCOUNT_DELAY);
896 if (call->state == RX_STATE_PRECALL ||
897 call->state == RX_STATE_ACTIVE) {
898 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
900 rxi_AckAll(NULL, call, 0);
903 MUTEX_EXIT(&call->lock);
907 #ifdef RX_ENABLE_LOCKS
909 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
910 MUTEX_EXIT(&conn->conn_data_lock);
913 /* Someone is accessing a packet right now. */
917 #endif /* RX_ENABLE_LOCKS */
920 /* Don't destroy the connection if there are any call
921 * structures still in use */
922 MUTEX_ENTER(&conn->conn_data_lock);
923 conn->flags |= RX_CONN_DESTROY_ME;
924 MUTEX_EXIT(&conn->conn_data_lock);
929 if (conn->delayedAbortEvent) {
930 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
931 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
933 MUTEX_ENTER(&conn->conn_data_lock);
934 rxi_SendConnectionAbort(conn, packet, 0, 1);
935 MUTEX_EXIT(&conn->conn_data_lock);
936 rxi_FreePacket(packet);
940 /* Remove from connection hash table before proceeding */
941 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
942 conn->epoch, conn->type) ];
943 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
944 if (*conn_ptr == conn) {
945 *conn_ptr = conn->next;
949 /* if the conn that we are destroying was the last connection, then we
950 * clear rxLastConn as well */
951 if ( rxLastConn == conn )
954 /* Make sure the connection is completely reset before deleting it. */
955 /* get rid of pending events that could zap us later */
956 if (conn->challengeEvent)
957 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
958 if (conn->checkReachEvent)
959 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
961 /* Add the connection to the list of destroyed connections that
962 * need to be cleaned up. This is necessary to avoid deadlocks
963 * in the routines we call to inform others that this connection is
964 * being destroyed. */
965 conn->next = rx_connCleanup_list;
966 rx_connCleanup_list = conn;
969 /* Externally available version */
970 void rx_DestroyConnection(register struct rx_connection *conn)
976 rxi_DestroyConnection (conn);
981 /* Start a new rx remote procedure call, on the specified connection.
982 * If wait is set to 1, wait for a free call channel; otherwise return
983 * 0. Maxtime gives the maximum number of seconds this call may take,
984 * after rx_MakeCall returns. After this time interval, a call to any
985 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
986 * For fine grain locking, we hold the conn_call_lock in order to
987 * to ensure that we don't get signalle after we found a call in an active
988 * state and before we go to sleep.
990 struct rx_call *rx_NewCall(register struct rx_connection *conn)
993 register struct rx_call *call;
994 struct clock queueTime;
998 dpf (("rx_MakeCall(conn %x)\n", conn));
1001 clock_GetTime(&queueTime);
1003 MUTEX_ENTER(&conn->conn_call_lock);
1006 * Check if there are others waiting for a new call.
1007 * If so, let them go first to avoid starving them.
1008 * This is a fairly simple scheme, and might not be
1009 * a complete solution for large numbers of waiters.
1011 if (conn->makeCallWaiters) {
1012 #ifdef RX_ENABLE_LOCKS
1013 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1020 for (i=0; i<RX_MAXCALLS; i++) {
1021 call = conn->call[i];
1023 MUTEX_ENTER(&call->lock);
1024 if (call->state == RX_STATE_DALLY) {
1025 rxi_ResetCall(call, 0);
1026 (*call->callNumber)++;
1029 MUTEX_EXIT(&call->lock);
1032 call = rxi_NewCall(conn, i);
1036 if (i < RX_MAXCALLS) {
1039 MUTEX_ENTER(&conn->conn_data_lock);
1040 conn->flags |= RX_CONN_MAKECALL_WAITING;
1041 MUTEX_EXIT(&conn->conn_data_lock);
1043 conn->makeCallWaiters++;
1044 #ifdef RX_ENABLE_LOCKS
1045 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1049 conn->makeCallWaiters--;
1052 * Wake up anyone else who might be giving us a chance to
1053 * run (see code above that avoids resource starvation).
1055 #ifdef RX_ENABLE_LOCKS
1056 CV_BROADCAST(&conn->conn_call_cv);
1061 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1063 /* Client is initially in send mode */
1064 call->state = RX_STATE_ACTIVE;
1065 call->mode = RX_MODE_SENDING;
1067 /* remember start time for call in case we have hard dead time limit */
1068 call->queueTime = queueTime;
1069 clock_GetTime(&call->startTime);
1070 hzero(call->bytesSent);
1071 hzero(call->bytesRcvd);
1073 /* Turn on busy protocol. */
1074 rxi_KeepAliveOn(call);
1076 MUTEX_EXIT(&call->lock);
1077 MUTEX_EXIT(&conn->conn_call_lock);
1081 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1082 /* Now, if TQ wasn't cleared earlier, do it now. */
1084 MUTEX_ENTER(&call->lock);
1085 while (call->flags & RX_CALL_TQ_BUSY) {
1086 call->flags |= RX_CALL_TQ_WAIT;
1087 #ifdef RX_ENABLE_LOCKS
1088 CV_WAIT(&call->cv_tq, &call->lock);
1089 #else /* RX_ENABLE_LOCKS */
1090 osi_rxSleep(&call->tq);
1091 #endif /* RX_ENABLE_LOCKS */
1093 if (call->flags & RX_CALL_TQ_CLEARME) {
1094 rxi_ClearTransmitQueue(call, 0);
1095 queue_Init(&call->tq);
1097 MUTEX_EXIT(&call->lock);
1099 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1104 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1107 register struct rx_call *tcall;
1111 for(i=0; i<RX_MAXCALLS; i++) {
1112 if ((tcall = aconn->call[i])) {
1113 if ((tcall->state == RX_STATE_ACTIVE)
1114 || (tcall->state == RX_STATE_PRECALL)) {
1124 int rxi_GetCallNumberVector(register struct rx_connection *aconn,
1125 register afs_int32 *aint32s)
1128 register struct rx_call *tcall;
1132 for(i=0; i<RX_MAXCALLS; i++) {
1133 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1134 aint32s[i] = aconn->callNumber[i]+1;
1136 aint32s[i] = aconn->callNumber[i];
1142 int rxi_SetCallNumberVector(register struct rx_connection *aconn,
1143 register afs_int32 *aint32s)
1146 register struct rx_call *tcall;
1150 for(i=0; i<RX_MAXCALLS; i++) {
1151 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1152 aconn->callNumber[i] = aint32s[i] - 1;
1154 aconn->callNumber[i] = aint32s[i];
1160 /* Advertise a new service. A service is named locally by a UDP port
1161 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1164 char *serviceName; Name for identification purposes (e.g. the
1165 service name might be used for probing for
1167 struct rx_service *rx_NewService(u_short port, u_short serviceId,
1169 struct rx_securityClass **securityObjects,
1170 int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1172 osi_socket socket = OSI_NULLSOCKET;
1173 register struct rx_service *tservice;
1179 if (serviceId == 0) {
1180 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1186 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1193 tservice = rxi_AllocService();
1196 for (i = 0; i<RX_MAX_SERVICES; i++) {
1197 register struct rx_service *service = rx_services[i];
1199 if (port == service->servicePort) {
1200 if (service->serviceId == serviceId) {
1201 /* The identical service has already been
1202 * installed; if the caller was intending to
1203 * change the security classes used by this
1204 * service, he/she loses. */
1205 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1208 rxi_FreeService(tservice);
1211 /* Different service, same port: re-use the socket
1212 * which is bound to the same port */
1213 socket = service->socket;
1216 if (socket == OSI_NULLSOCKET) {
1217 /* If we don't already have a socket (from another
1218 * service on same port) get a new one */
1219 socket = rxi_GetUDPSocket(port);
1220 if (socket == OSI_NULLSOCKET) {
1223 rxi_FreeService(tservice);
1228 service->socket = socket;
1229 service->servicePort = port;
1230 service->serviceId = serviceId;
1231 service->serviceName = serviceName;
1232 service->nSecurityObjects = nSecurityObjects;
1233 service->securityObjects = securityObjects;
1234 service->minProcs = 0;
1235 service->maxProcs = 1;
1236 service->idleDeadTime = 60;
1237 service->connDeadTime = rx_connDeadTime;
1238 service->executeRequestProc = serviceProc;
1239 service->checkReach = 0;
1240 rx_services[i] = service; /* not visible until now */
1248 rxi_FreeService(tservice);
1249 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1253 /* Generic request processing loop. This routine should be called
1254 * by the implementation dependent rx_ServerProc. If socketp is
1255 * non-null, it will be set to the file descriptor that this thread
1256 * is now listening on. If socketp is null, this routine will never
1258 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1260 register struct rx_call *call;
1261 register afs_int32 code;
1262 register struct rx_service *tservice = NULL;
1269 call = rx_GetCall(threadID, tservice, socketp);
1270 if (socketp && *socketp != OSI_NULLSOCKET) {
1271 /* We are now a listener thread */
1276 /* if server is restarting( typically smooth shutdown) then do not
1277 * allow any new calls.
1280 if ( rx_tranquil && (call != NULL) ) {
1285 MUTEX_ENTER(&call->lock);
1287 rxi_CallError(call, RX_RESTARTING);
1288 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1290 MUTEX_EXIT(&call->lock);
1296 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1297 #ifdef RX_ENABLE_LOCKS
1299 #endif /* RX_ENABLE_LOCKS */
1300 afs_termState = AFSOP_STOP_AFS;
1301 afs_osi_Wakeup(&afs_termState);
1302 #ifdef RX_ENABLE_LOCKS
1304 #endif /* RX_ENABLE_LOCKS */
1309 tservice = call->conn->service;
1311 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1313 code = call->conn->service->executeRequestProc(call);
1315 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1317 rx_EndCall(call, code);
1318 MUTEX_ENTER(&rx_stats_mutex);
1320 MUTEX_EXIT(&rx_stats_mutex);
1325 void rx_WakeupServerProcs(void)
1327 struct rx_serverQueueEntry *np, *tqp;
1332 MUTEX_ENTER(&rx_serverPool_lock);
1334 #ifdef RX_ENABLE_LOCKS
1335 if (rx_waitForPacket)
1336 CV_BROADCAST(&rx_waitForPacket->cv);
1337 #else /* RX_ENABLE_LOCKS */
1338 if (rx_waitForPacket)
1339 osi_rxWakeup(rx_waitForPacket);
1340 #endif /* RX_ENABLE_LOCKS */
1341 MUTEX_ENTER(&freeSQEList_lock);
1342 for (np = rx_FreeSQEList; np; np = tqp) {
1343 tqp = *(struct rx_serverQueueEntry **)np;
1344 #ifdef RX_ENABLE_LOCKS
1345 CV_BROADCAST(&np->cv);
1346 #else /* RX_ENABLE_LOCKS */
1348 #endif /* RX_ENABLE_LOCKS */
1350 MUTEX_EXIT(&freeSQEList_lock);
1351 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1352 #ifdef RX_ENABLE_LOCKS
1353 CV_BROADCAST(&np->cv);
1354 #else /* RX_ENABLE_LOCKS */
1356 #endif /* RX_ENABLE_LOCKS */
1358 MUTEX_EXIT(&rx_serverPool_lock);
1364 * One thing that seems to happen is that all the server threads get
1365 * tied up on some empty or slow call, and then a whole bunch of calls
1366 * arrive at once, using up the packet pool, so now there are more
1367 * empty calls. The most critical resources here are server threads
1368 * and the free packet pool. The "doreclaim" code seems to help in
1369 * general. I think that eventually we arrive in this state: there
1370 * are lots of pending calls which do have all their packets present,
1371 * so they won't be reclaimed, are multi-packet calls, so they won't
1372 * be scheduled until later, and thus are tying up most of the free
1373 * packet pool for a very long time.
1375 * 1. schedule multi-packet calls if all the packets are present.
1376 * Probably CPU-bound operation, useful to return packets to pool.
1377 * Do what if there is a full window, but the last packet isn't here?
1378 * 3. preserve one thread which *only* runs "best" calls, otherwise
1379 * it sleeps and waits for that type of call.
1380 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1381 * the current dataquota business is badly broken. The quota isn't adjusted
1382 * to reflect how many packets are presently queued for a running call.
1383 * So, when we schedule a queued call with a full window of packets queued
1384 * up for it, that *should* free up a window full of packets for other 2d-class
1385 * calls to be able to use from the packet pool. But it doesn't.
1387 * NB. Most of the time, this code doesn't run -- since idle server threads
1388 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1389 * as a new call arrives.
1391 /* Sleep until a call arrives. Returns a pointer to the call, ready
1392 * for an rx_Read. */
1393 #ifdef RX_ENABLE_LOCKS
1394 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1396 struct rx_serverQueueEntry *sq;
1397 register struct rx_call *call = (struct rx_call *) 0;
1398 struct rx_service *service = NULL;
1401 MUTEX_ENTER(&freeSQEList_lock);
1403 if ((sq = rx_FreeSQEList)) {
1404 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1405 MUTEX_EXIT(&freeSQEList_lock);
1406 } else { /* otherwise allocate a new one and return that */
1407 MUTEX_EXIT(&freeSQEList_lock);
1408 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1409 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1410 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1413 MUTEX_ENTER(&rx_serverPool_lock);
1414 if (cur_service != NULL) {
1415 ReturnToServerPool(cur_service);
1418 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1419 register struct rx_call *tcall, *ncall, *choice2 = NULL;
1421 /* Scan for eligible incoming calls. A call is not eligible
1422 * if the maximum number of calls for its service type are
1423 * already executing */
1424 /* One thread will process calls FCFS (to prevent starvation),
1425 * while the other threads may run ahead looking for calls which
1426 * have all their input data available immediately. This helps
1427 * keep threads from blocking, waiting for data from the client. */
1428 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1429 service = tcall->conn->service;
1430 if (!QuotaOK(service)) {
1433 if (tno==rxi_fcfs_thread_num || !tcall->queue_item_header.next ) {
1434 /* If we're the fcfs thread , then we'll just use
1435 * this call. If we haven't been able to find an optimal
1436 * choice, and we're at the end of the list, then use a
1437 * 2d choice if one has been identified. Otherwise... */
1438 call = (choice2 ? choice2 : tcall);
1439 service = call->conn->service;
1440 } else if (!queue_IsEmpty(&tcall->rq)) {
1441 struct rx_packet *rp;
1442 rp = queue_First(&tcall->rq, rx_packet);
1443 if (rp->header.seq == 1) {
1444 if (!meltdown_1pkt ||
1445 (rp->header.flags & RX_LAST_PACKET)) {
1447 } else if (rxi_2dchoice && !choice2 &&
1448 !(tcall->flags & RX_CALL_CLEARED) &&
1449 (tcall->rprev > rxi_HardAckRate)) {
1451 } else rxi_md2cnt++;
1457 ReturnToServerPool(service);
1464 MUTEX_EXIT(&rx_serverPool_lock);
1465 MUTEX_ENTER(&call->lock);
1467 if (call->state != RX_STATE_PRECALL || call->error) {
1468 MUTEX_EXIT(&call->lock);
1469 MUTEX_ENTER(&rx_serverPool_lock);
1470 ReturnToServerPool(service);
1475 if (queue_IsEmpty(&call->rq) ||
1476 queue_First(&call->rq, rx_packet)->header.seq != 1)
1477 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1479 CLEAR_CALL_QUEUE_LOCK(call);
1480 call->flags &= ~RX_CALL_WAIT_PROC;
1481 MUTEX_ENTER(&rx_stats_mutex);
1483 MUTEX_EXIT(&rx_stats_mutex);
1487 /* If there are no eligible incoming calls, add this process
1488 * to the idle server queue, to wait for one */
1492 *socketp = OSI_NULLSOCKET;
1494 sq->socketp = socketp;
1495 queue_Append(&rx_idleServerQueue, sq);
1496 #ifndef AFS_AIX41_ENV
1497 rx_waitForPacket = sq;
1498 #endif /* AFS_AIX41_ENV */
1500 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1502 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1503 MUTEX_EXIT(&rx_serverPool_lock);
1504 return (struct rx_call *)0;
1507 } while (!(call = sq->newcall) &&
1508 !(socketp && *socketp != OSI_NULLSOCKET));
1509 MUTEX_EXIT(&rx_serverPool_lock);
1511 MUTEX_ENTER(&call->lock);
1517 MUTEX_ENTER(&freeSQEList_lock);
1518 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1519 rx_FreeSQEList = sq;
1520 MUTEX_EXIT(&freeSQEList_lock);
1523 clock_GetTime(&call->startTime);
1524 call->state = RX_STATE_ACTIVE;
1525 call->mode = RX_MODE_RECEIVING;
1527 rxi_calltrace(RX_CALL_START, call);
1528 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1529 call->conn->service->servicePort,
1530 call->conn->service->serviceId, call));
1532 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1533 MUTEX_EXIT(&call->lock);
1535 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1540 #else /* RX_ENABLE_LOCKS */
1541 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1543 struct rx_serverQueueEntry *sq;
1544 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1545 struct rx_service *service = NULL;
1550 MUTEX_ENTER(&freeSQEList_lock);
1552 if ((sq = rx_FreeSQEList)) {
1553 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1554 MUTEX_EXIT(&freeSQEList_lock);
1555 } else { /* otherwise allocate a new one and return that */
1556 MUTEX_EXIT(&freeSQEList_lock);
1557 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1558 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1559 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1561 MUTEX_ENTER(&sq->lock);
1563 if (cur_service != NULL) {
1564 cur_service->nRequestsRunning--;
1565 if (cur_service->nRequestsRunning < cur_service->minProcs)
1569 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1570 register struct rx_call *tcall, *ncall;
1571 /* Scan for eligible incoming calls. A call is not eligible
1572 * if the maximum number of calls for its service type are
1573 * already executing */
1574 /* One thread will process calls FCFS (to prevent starvation),
1575 * while the other threads may run ahead looking for calls which
1576 * have all their input data available immediately. This helps
1577 * keep threads from blocking, waiting for data from the client. */
1578 choice2 = (struct rx_call *) 0;
1579 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1580 service = tcall->conn->service;
1581 if (QuotaOK(service)) {
1582 if (tno==rxi_fcfs_thread_num || !tcall->queue_item_header.next ) {
1583 /* If we're the fcfs thread, then we'll just use
1584 * this call. If we haven't been able to find an optimal
1585 * choice, and we're at the end of the list, then use a
1586 * 2d choice if one has been identified. Otherwise... */
1587 call = (choice2 ? choice2 : tcall);
1588 service = call->conn->service;
1589 } else if (!queue_IsEmpty(&tcall->rq)) {
1590 struct rx_packet *rp;
1591 rp = queue_First(&tcall->rq, rx_packet);
1592 if (rp->header.seq == 1
1593 && (!meltdown_1pkt ||
1594 (rp->header.flags & RX_LAST_PACKET))) {
1596 } else if (rxi_2dchoice && !choice2 &&
1597 !(tcall->flags & RX_CALL_CLEARED) &&
1598 (tcall->rprev > rxi_HardAckRate)) {
1600 } else rxi_md2cnt++;
1610 /* we can't schedule a call if there's no data!!! */
1611 /* send an ack if there's no data, if we're missing the
1612 * first packet, or we're missing something between first
1613 * and last -- there's a "hole" in the incoming data. */
1614 if (queue_IsEmpty(&call->rq) ||
1615 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1616 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1617 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1619 call->flags &= (~RX_CALL_WAIT_PROC);
1620 service->nRequestsRunning++;
1621 /* just started call in minProcs pool, need fewer to maintain
1623 if (service->nRequestsRunning <= service->minProcs)
1627 /* MUTEX_EXIT(&call->lock); */
1630 /* If there are no eligible incoming calls, add this process
1631 * to the idle server queue, to wait for one */
1634 *socketp = OSI_NULLSOCKET;
1636 sq->socketp = socketp;
1637 queue_Append(&rx_idleServerQueue, sq);
1641 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1644 return (struct rx_call *)0;
1647 } while (!(call = sq->newcall) &&
1648 !(socketp && *socketp != OSI_NULLSOCKET));
1650 MUTEX_EXIT(&sq->lock);
1652 MUTEX_ENTER(&freeSQEList_lock);
1653 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1654 rx_FreeSQEList = sq;
1655 MUTEX_EXIT(&freeSQEList_lock);
1658 clock_GetTime(&call->startTime);
1659 call->state = RX_STATE_ACTIVE;
1660 call->mode = RX_MODE_RECEIVING;
1662 rxi_calltrace(RX_CALL_START, call);
1663 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1664 call->conn->service->servicePort,
1665 call->conn->service->serviceId, call));
1667 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1675 #endif /* RX_ENABLE_LOCKS */
1679 /* Establish a procedure to be called when a packet arrives for a
1680 * call. This routine will be called at most once after each call,
1681 * and will also be called if there is an error condition on the or
1682 * the call is complete. Used by multi rx to build a selection
1683 * function which determines which of several calls is likely to be a
1684 * good one to read from.
1685 * NOTE: the way this is currently implemented it is probably only a
1686 * good idea to (1) use it immediately after a newcall (clients only)
1687 * and (2) only use it once. Other uses currently void your warranty
1689 void rx_SetArrivalProc(register struct rx_call *call,
1690 register VOID (*proc)(register struct rx_call *call,
1691 register struct multi_handle *mh, register int index),
1692 register VOID *handle, register VOID *arg)
1694 call->arrivalProc = proc;
1695 call->arrivalProcHandle = handle;
1696 call->arrivalProcArg = arg;
1699 /* Call is finished (possibly prematurely). Return rc to the peer, if
1700 * appropriate, and return the final error code from the conversation
1703 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1705 register struct rx_connection *conn = call->conn;
1706 register struct rx_service *service;
1707 register struct rx_packet *tp; /* Temporary packet pointer */
1708 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1712 dpf(("rx_EndCall(call %x)\n", call));
1716 MUTEX_ENTER(&call->lock);
1718 if (rc == 0 && call->error == 0) {
1719 call->abortCode = 0;
1720 call->abortCount = 0;
1723 call->arrivalProc = (VOID (*)()) 0;
1724 if (rc && call->error == 0) {
1725 rxi_CallError(call, rc);
1726 /* Send an abort message to the peer if this error code has
1727 * only just been set. If it was set previously, assume the
1728 * peer has already been sent the error code or will request it
1730 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1732 if (conn->type == RX_SERVER_CONNECTION) {
1733 /* Make sure reply or at least dummy reply is sent */
1734 if (call->mode == RX_MODE_RECEIVING) {
1735 rxi_WriteProc(call, 0, 0);
1737 if (call->mode == RX_MODE_SENDING) {
1738 rxi_FlushWrite(call);
1740 service = conn->service;
1741 rxi_calltrace(RX_CALL_END, call);
1742 /* Call goes to hold state until reply packets are acknowledged */
1743 if (call->tfirst + call->nSoftAcked < call->tnext) {
1744 call->state = RX_STATE_HOLD;
1746 call->state = RX_STATE_DALLY;
1747 rxi_ClearTransmitQueue(call, 0);
1748 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1749 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1752 else { /* Client connection */
1754 /* Make sure server receives input packets, in the case where
1755 * no reply arguments are expected */
1756 if ((call->mode == RX_MODE_SENDING)
1757 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1758 (void) rxi_ReadProc(call, &dummy, 1);
1761 /* If we had an outstanding delayed ack, be nice to the server
1762 * and force-send it now.
1764 if (call->delayedAckEvent) {
1765 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1766 call->delayedAckEvent = NULL;
1767 rxi_SendDelayedAck(NULL, call, NULL);
1770 /* We need to release the call lock since it's lower than the
1771 * conn_call_lock and we don't want to hold the conn_call_lock
1772 * over the rx_ReadProc call. The conn_call_lock needs to be held
1773 * here for the case where rx_NewCall is perusing the calls on
1774 * the connection structure. We don't want to signal until
1775 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1776 * have checked this call, found it active and by the time it
1777 * goes to sleep, will have missed the signal.
1779 MUTEX_EXIT(&call->lock);
1780 MUTEX_ENTER(&conn->conn_call_lock);
1781 MUTEX_ENTER(&call->lock);
1782 MUTEX_ENTER(&conn->conn_data_lock);
1783 conn->flags |= RX_CONN_BUSY;
1784 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1785 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1786 MUTEX_EXIT(&conn->conn_data_lock);
1787 #ifdef RX_ENABLE_LOCKS
1788 CV_BROADCAST(&conn->conn_call_cv);
1793 #ifdef RX_ENABLE_LOCKS
1795 MUTEX_EXIT(&conn->conn_data_lock);
1797 #endif /* RX_ENABLE_LOCKS */
1798 call->state = RX_STATE_DALLY;
1800 error = call->error;
1802 /* currentPacket, nLeft, and NFree must be zeroed here, because
1803 * ResetCall cannot: ResetCall may be called at splnet(), in the
1804 * kernel version, and may interrupt the macros rx_Read or
1805 * rx_Write, which run at normal priority for efficiency. */
1806 if (call->currentPacket) {
1807 rxi_FreePacket(call->currentPacket);
1808 call->currentPacket = (struct rx_packet *) 0;
1809 call->nLeft = call->nFree = call->curlen = 0;
1812 call->nLeft = call->nFree = call->curlen = 0;
1814 /* Free any packets from the last call to ReadvProc/WritevProc */
1815 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1820 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1821 MUTEX_EXIT(&call->lock);
1822 if (conn->type == RX_CLIENT_CONNECTION) {
1823 MUTEX_EXIT(&conn->conn_call_lock);
1824 conn->flags &= ~RX_CONN_BUSY;
1829 * Map errors to the local host's errno.h format.
1831 error = ntoh_syserr_conv(error);
1835 #if !defined(KERNEL)
1837 /* Call this routine when shutting down a server or client (especially
1838 * clients). This will allow Rx to gracefully garbage collect server
1839 * connections, and reduce the number of retries that a server might
1840 * make to a dead client.
1841 * This is not quite right, since some calls may still be ongoing and
1842 * we can't lock them to destroy them. */
1843 void rx_Finalize(void)
1845 register struct rx_connection **conn_ptr, **conn_end;
1849 if (rxinit_status == 1) {
1851 return; /* Already shutdown. */
1853 rxi_DeleteCachedConnections();
1854 if (rx_connHashTable) {
1855 MUTEX_ENTER(&rx_connHashTable_lock);
1856 for (conn_ptr = &rx_connHashTable[0],
1857 conn_end = &rx_connHashTable[rx_hashTableSize];
1858 conn_ptr < conn_end; conn_ptr++) {
1859 struct rx_connection *conn, *next;
1860 for (conn = *conn_ptr; conn; conn = next) {
1862 if (conn->type == RX_CLIENT_CONNECTION) {
1863 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1865 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1866 #ifdef RX_ENABLE_LOCKS
1867 rxi_DestroyConnectionNoLock(conn);
1868 #else /* RX_ENABLE_LOCKS */
1869 rxi_DestroyConnection(conn);
1870 #endif /* RX_ENABLE_LOCKS */
1874 #ifdef RX_ENABLE_LOCKS
1875 while (rx_connCleanup_list) {
1876 struct rx_connection *conn;
1877 conn = rx_connCleanup_list;
1878 rx_connCleanup_list = rx_connCleanup_list->next;
1879 MUTEX_EXIT(&rx_connHashTable_lock);
1880 rxi_CleanupConnection(conn);
1881 MUTEX_ENTER(&rx_connHashTable_lock);
1883 MUTEX_EXIT(&rx_connHashTable_lock);
1884 #endif /* RX_ENABLE_LOCKS */
1893 /* if we wakeup packet waiter too often, can get in loop with two
1894 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1895 void rxi_PacketsUnWait(void)
1897 if (!rx_waitingForPackets) {
1901 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1902 return; /* still over quota */
1905 rx_waitingForPackets = 0;
1906 #ifdef RX_ENABLE_LOCKS
1907 CV_BROADCAST(&rx_waitingForPackets_cv);
1909 osi_rxWakeup(&rx_waitingForPackets);
1915 /* ------------------Internal interfaces------------------------- */
1917 /* Return this process's service structure for the
1918 * specified socket and service */
1919 struct rx_service *rxi_FindService(register osi_socket socket,
1920 register u_short serviceId)
1922 register struct rx_service **sp;
1923 for (sp = &rx_services[0]; *sp; sp++) {
1924 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1930 /* Allocate a call structure, for the indicated channel of the
1931 * supplied connection. The mode and state of the call must be set by
1932 * the caller. Returns the call with mutex locked. */
1933 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1934 register int channel)
1936 register struct rx_call *call;
1937 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1938 register struct rx_call *cp; /* Call pointer temp */
1939 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1940 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1942 /* Grab an existing call structure, or allocate a new one.
1943 * Existing call structures are assumed to have been left reset by
1945 MUTEX_ENTER(&rx_freeCallQueue_lock);
1947 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1949 * EXCEPT that the TQ might not yet be cleared out.
1950 * Skip over those with in-use TQs.
1953 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1954 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1960 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1961 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1962 call = queue_First(&rx_freeCallQueue, rx_call);
1963 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1965 MUTEX_ENTER(&rx_stats_mutex);
1966 rx_stats.nFreeCallStructs--;
1967 MUTEX_EXIT(&rx_stats_mutex);
1968 MUTEX_EXIT(&rx_freeCallQueue_lock);
1969 MUTEX_ENTER(&call->lock);
1970 CLEAR_CALL_QUEUE_LOCK(call);
1971 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1972 /* Now, if TQ wasn't cleared earlier, do it now. */
1973 if (call->flags & RX_CALL_TQ_CLEARME) {
1974 rxi_ClearTransmitQueue(call, 0);
1975 queue_Init(&call->tq);
1977 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1978 /* Bind the call to its connection structure */
1980 rxi_ResetCall(call, 1);
1983 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1985 MUTEX_EXIT(&rx_freeCallQueue_lock);
1986 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1987 MUTEX_ENTER(&call->lock);
1988 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1989 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1990 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1992 MUTEX_ENTER(&rx_stats_mutex);
1993 rx_stats.nCallStructs++;
1994 MUTEX_EXIT(&rx_stats_mutex);
1995 /* Initialize once-only items */
1996 queue_Init(&call->tq);
1997 queue_Init(&call->rq);
1998 queue_Init(&call->iovq);
1999 /* Bind the call to its connection structure (prereq for reset) */
2001 rxi_ResetCall(call, 1);
2003 call->channel = channel;
2004 call->callNumber = &conn->callNumber[channel];
2005 /* Note that the next expected call number is retained (in
2006 * conn->callNumber[i]), even if we reallocate the call structure
2008 conn->call[channel] = call;
2009 /* if the channel's never been used (== 0), we should start at 1, otherwise
2010 the call number is valid from the last time this channel was used */
2011 if (*call->callNumber == 0) *call->callNumber = 1;
2016 /* A call has been inactive long enough that so we can throw away
2017 * state, including the call structure, which is placed on the call
2019 * Call is locked upon entry.
2020 * haveCTLock set if called from rxi_ReapConnections
2022 #ifdef RX_ENABLE_LOCKS
2023 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2024 #else /* RX_ENABLE_LOCKS */
2025 void rxi_FreeCall(register struct rx_call *call)
2026 #endif /* RX_ENABLE_LOCKS */
2028 register int channel = call->channel;
2029 register struct rx_connection *conn = call->conn;
2032 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2033 (*call->callNumber)++;
2034 rxi_ResetCall(call, 0);
2035 call->conn->call[channel] = (struct rx_call *) 0;
2037 MUTEX_ENTER(&rx_freeCallQueue_lock);
2038 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2039 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2040 /* A call may be free even though its transmit queue is still in use.
2041 * Since we search the call list from head to tail, put busy calls at
2042 * the head of the list, and idle calls at the tail.
2044 if (call->flags & RX_CALL_TQ_BUSY)
2045 queue_Prepend(&rx_freeCallQueue, call);
2047 queue_Append(&rx_freeCallQueue, call);
2048 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2049 queue_Append(&rx_freeCallQueue, call);
2050 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2051 MUTEX_ENTER(&rx_stats_mutex);
2052 rx_stats.nFreeCallStructs++;
2053 MUTEX_EXIT(&rx_stats_mutex);
2055 MUTEX_EXIT(&rx_freeCallQueue_lock);
2057 /* Destroy the connection if it was previously slated for
2058 * destruction, i.e. the Rx client code previously called
2059 * rx_DestroyConnection (client connections), or
2060 * rxi_ReapConnections called the same routine (server
2061 * connections). Only do this, however, if there are no
2062 * outstanding calls. Note that for fine grain locking, there appears
2063 * to be a deadlock in that rxi_FreeCall has a call locked and
2064 * DestroyConnectionNoLock locks each call in the conn. But note a
2065 * few lines up where we have removed this call from the conn.
2066 * If someone else destroys a connection, they either have no
2067 * call lock held or are going through this section of code.
2069 if (conn->flags & RX_CONN_DESTROY_ME) {
2070 MUTEX_ENTER(&conn->conn_data_lock);
2072 MUTEX_EXIT(&conn->conn_data_lock);
2073 #ifdef RX_ENABLE_LOCKS
2075 rxi_DestroyConnectionNoLock(conn);
2077 rxi_DestroyConnection(conn);
2078 #else /* RX_ENABLE_LOCKS */
2079 rxi_DestroyConnection(conn);
2080 #endif /* RX_ENABLE_LOCKS */
2084 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2085 char *rxi_Alloc(register size_t size)
2089 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2090 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2093 int glockOwner = ISAFS_GLOCK();
2097 MUTEX_ENTER(&rx_stats_mutex);
2098 rxi_Alloccnt++; rxi_Allocsize += size;
2099 MUTEX_EXIT(&rx_stats_mutex);
2100 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2101 if (size > AFS_SMALLOCSIZ) {
2102 p = (char *) osi_AllocMediumSpace(size);
2104 p = (char *) osi_AllocSmall(size, 1);
2105 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2110 p = (char *) osi_Alloc(size);
2112 if (!p) osi_Panic("rxi_Alloc error");
2117 void rxi_Free(void *addr, register size_t size)
2119 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2120 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2123 int glockOwner = ISAFS_GLOCK();
2127 MUTEX_ENTER(&rx_stats_mutex);
2128 rxi_Alloccnt--; rxi_Allocsize -= size;
2129 MUTEX_EXIT(&rx_stats_mutex);
2130 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2131 if (size > AFS_SMALLOCSIZ)
2132 osi_FreeMediumSpace(addr);
2134 osi_FreeSmall(addr);
2135 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2140 osi_Free(addr, size);
2144 /* Find the peer process represented by the supplied (host,port)
2145 * combination. If there is no appropriate active peer structure, a
2146 * new one will be allocated and initialized
2147 * The origPeer, if set, is a pointer to a peer structure on which the
2148 * refcount will be be decremented. This is used to replace the peer
2149 * structure hanging off a connection structure */
2150 struct rx_peer *rxi_FindPeer(register afs_uint32 host,
2151 register u_short port, struct rx_peer *origPeer, int create)
2153 register struct rx_peer *pp;
2155 hashIndex = PEER_HASH(host, port);
2156 MUTEX_ENTER(&rx_peerHashTable_lock);
2157 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2158 if ((pp->host == host) && (pp->port == port)) break;
2162 pp = rxi_AllocPeer(); /* This bzero's *pp */
2163 pp->host = host; /* set here or in InitPeerParams is zero */
2165 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2166 queue_Init(&pp->congestionQueue);
2167 queue_Init(&pp->rpcStats);
2168 pp->next = rx_peerHashTable[hashIndex];
2169 rx_peerHashTable[hashIndex] = pp;
2170 rxi_InitPeerParams(pp);
2171 MUTEX_ENTER(&rx_stats_mutex);
2172 rx_stats.nPeerStructs++;
2173 MUTEX_EXIT(&rx_stats_mutex);
2180 origPeer->refCount--;
2181 MUTEX_EXIT(&rx_peerHashTable_lock);
2186 /* Find the connection at (host, port) started at epoch, and with the
2187 * given connection id. Creates the server connection if necessary.
2188 * The type specifies whether a client connection or a server
2189 * connection is desired. In both cases, (host, port) specify the
2190 * peer's (host, pair) pair. Client connections are not made
2191 * automatically by this routine. The parameter socket gives the
2192 * socket descriptor on which the packet was received. This is used,
2193 * in the case of server connections, to check that *new* connections
2194 * come via a valid (port, serviceId). Finally, the securityIndex
2195 * parameter must match the existing index for the connection. If a
2196 * server connection is created, it will be created using the supplied
2197 * index, if the index is valid for this service */
2198 struct rx_connection *rxi_FindConnection(osi_socket socket,
2199 register afs_int32 host, register u_short port, u_short serviceId,
2200 afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2202 int hashindex, flag;
2203 register struct rx_connection *conn;
2204 hashindex = CONN_HASH(host, port, cid, epoch, type);
2205 MUTEX_ENTER(&rx_connHashTable_lock);
2206 rxLastConn ? (conn = rxLastConn, flag = 0) :
2207 (conn = rx_connHashTable[hashindex], flag = 1);
2209 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2210 && (epoch == conn->epoch)) {
2211 register struct rx_peer *pp = conn->peer;
2212 if (securityIndex != conn->securityIndex) {
2213 /* this isn't supposed to happen, but someone could forge a packet
2214 like this, and there seems to be some CM bug that makes this
2215 happen from time to time -- in which case, the fileserver
2217 MUTEX_EXIT(&rx_connHashTable_lock);
2218 return (struct rx_connection *) 0;
2220 if (pp->host == host && pp->port == port)
2222 if (type == RX_CLIENT_CONNECTION && pp->port == port)
2224 if (type == RX_CLIENT_CONNECTION && (conn->epoch & 0x80000000))
2229 /* the connection rxLastConn that was used the last time is not the
2230 ** one we are looking for now. Hence, start searching in the hash */
2232 conn = rx_connHashTable[hashindex];
2238 struct rx_service *service;
2239 if (type == RX_CLIENT_CONNECTION) {
2240 MUTEX_EXIT(&rx_connHashTable_lock);
2241 return (struct rx_connection *) 0;
2243 service = rxi_FindService(socket, serviceId);
2244 if (!service || (securityIndex >= service->nSecurityObjects)
2245 || (service->securityObjects[securityIndex] == 0)) {
2246 MUTEX_EXIT(&rx_connHashTable_lock);
2247 return (struct rx_connection *) 0;
2249 conn = rxi_AllocConnection(); /* This bzero's the connection */
2250 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2252 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2254 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2255 conn->next = rx_connHashTable[hashindex];
2256 rx_connHashTable[hashindex] = conn;
2257 conn->peer = rxi_FindPeer(host, port, 0, 1);
2258 conn->type = RX_SERVER_CONNECTION;
2259 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2260 conn->epoch = epoch;
2261 conn->cid = cid & RX_CIDMASK;
2262 /* conn->serial = conn->lastSerial = 0; */
2263 /* conn->timeout = 0; */
2264 conn->ackRate = RX_FAST_ACK_RATE;
2265 conn->service = service;
2266 conn->serviceId = serviceId;
2267 conn->securityIndex = securityIndex;
2268 conn->securityObject = service->securityObjects[securityIndex];
2269 conn->nSpecific = 0;
2270 conn->specific = NULL;
2271 rx_SetConnDeadTime(conn, service->connDeadTime);
2272 rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
2273 /* Notify security object of the new connection */
2274 RXS_NewConnection(conn->securityObject, conn);
2275 /* XXXX Connection timeout? */
2276 if (service->newConnProc) (*service->newConnProc)(conn);
2277 MUTEX_ENTER(&rx_stats_mutex);
2278 rx_stats.nServerConns++;
2279 MUTEX_EXIT(&rx_stats_mutex);
2282 MUTEX_ENTER(&conn->conn_data_lock);
2284 MUTEX_EXIT(&conn->conn_data_lock);
2286 rxLastConn = conn; /* store this connection as the last conn used */
2287 MUTEX_EXIT(&rx_connHashTable_lock);
2291 /* There are two packet tracing routines available for testing and monitoring
2292 * Rx. One is called just after every packet is received and the other is
2293 * called just before every packet is sent. Received packets, have had their
2294 * headers decoded, and packets to be sent have not yet had their headers
2295 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2296 * containing the network address. Both can be modified. The return value, if
2297 * non-zero, indicates that the packet should be dropped. */
2299 int (*rx_justReceived)() = 0;
2300 int (*rx_almostSent)() = 0;
2302 /* A packet has been received off the interface. Np is the packet, socket is
2303 * the socket number it was received from (useful in determining which service
2304 * this packet corresponds to), and (host, port) reflect the host,port of the
2305 * sender. This call returns the packet to the caller if it is finished with
2306 * it, rather than de-allocating it, just as a small performance hack */
2308 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np,
2309 osi_socket socket, afs_uint32 host, u_short port,
2310 int *tnop, struct rx_call **newcallp)
2312 register struct rx_call *call;
2313 register struct rx_connection *conn;
2315 afs_uint32 currentCallNumber;
2321 struct rx_packet *tnp;
2324 /* We don't print out the packet until now because (1) the time may not be
2325 * accurate enough until now in the lwp implementation (rx_Listener only gets
2326 * the time after the packet is read) and (2) from a protocol point of view,
2327 * this is the first time the packet has been seen */
2328 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2329 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2330 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2331 np->header.serial, packetType, host, port, np->header.serviceId,
2332 np->header.epoch, np->header.cid, np->header.callNumber,
2333 np->header.seq, np->header.flags, np));
2336 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2337 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2340 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2341 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2344 /* If an input tracer function is defined, call it with the packet and
2345 * network address. Note this function may modify its arguments. */
2346 if (rx_justReceived) {
2347 struct sockaddr_in addr;
2349 addr.sin_family = AF_INET;
2350 addr.sin_port = port;
2351 addr.sin_addr.s_addr = host;
2352 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2353 addr.sin_len = sizeof(addr);
2354 #endif /* AFS_OSF_ENV */
2355 drop = (*rx_justReceived) (np, &addr);
2356 /* drop packet if return value is non-zero */
2357 if (drop) return np;
2358 port = addr.sin_port; /* in case fcn changed addr */
2359 host = addr.sin_addr.s_addr;
2363 /* If packet was not sent by the client, then *we* must be the client */
2364 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2365 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2367 /* Find the connection (or fabricate one, if we're the server & if
2368 * necessary) associated with this packet */
2369 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2370 np->header.cid, np->header.epoch, type,
2371 np->header.securityIndex);
2374 /* If no connection found or fabricated, just ignore the packet.
2375 * (An argument could be made for sending an abort packet for
2380 MUTEX_ENTER(&conn->conn_data_lock);
2381 if (conn->maxSerial < np->header.serial)
2382 conn->maxSerial = np->header.serial;
2383 MUTEX_EXIT(&conn->conn_data_lock);
2385 /* If the connection is in an error state, send an abort packet and ignore
2386 * the incoming packet */
2388 /* Don't respond to an abort packet--we don't want loops! */
2389 MUTEX_ENTER(&conn->conn_data_lock);
2390 if (np->header.type != RX_PACKET_TYPE_ABORT)
2391 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2393 MUTEX_EXIT(&conn->conn_data_lock);
2397 /* Check for connection-only requests (i.e. not call specific). */
2398 if (np->header.callNumber == 0) {
2399 switch (np->header.type) {
2400 case RX_PACKET_TYPE_ABORT:
2401 /* What if the supplied error is zero? */
2402 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2403 MUTEX_ENTER(&conn->conn_data_lock);
2405 MUTEX_EXIT(&conn->conn_data_lock);
2407 case RX_PACKET_TYPE_CHALLENGE:
2408 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2409 MUTEX_ENTER(&conn->conn_data_lock);
2411 MUTEX_EXIT(&conn->conn_data_lock);
2413 case RX_PACKET_TYPE_RESPONSE:
2414 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2415 MUTEX_ENTER(&conn->conn_data_lock);
2417 MUTEX_EXIT(&conn->conn_data_lock);
2419 case RX_PACKET_TYPE_PARAMS:
2420 case RX_PACKET_TYPE_PARAMS+1:
2421 case RX_PACKET_TYPE_PARAMS+2:
2422 /* ignore these packet types for now */
2423 MUTEX_ENTER(&conn->conn_data_lock);
2425 MUTEX_EXIT(&conn->conn_data_lock);
2430 /* Should not reach here, unless the peer is broken: send an
2432 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2433 MUTEX_ENTER(&conn->conn_data_lock);
2434 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2436 MUTEX_EXIT(&conn->conn_data_lock);
2441 channel = np->header.cid & RX_CHANNELMASK;
2442 call = conn->call[channel];
2443 #ifdef RX_ENABLE_LOCKS
2445 MUTEX_ENTER(&call->lock);
2446 /* Test to see if call struct is still attached to conn. */
2447 if (call != conn->call[channel]) {
2449 MUTEX_EXIT(&call->lock);
2450 if (type == RX_SERVER_CONNECTION) {
2451 call = conn->call[channel];
2452 /* If we started with no call attached and there is one now,
2453 * another thread is also running this routine and has gotten
2454 * the connection channel. We should drop this packet in the tests
2455 * below. If there was a call on this connection and it's now
2456 * gone, then we'll be making a new call below.
2457 * If there was previously a call and it's now different then
2458 * the old call was freed and another thread running this routine
2459 * has created a call on this channel. One of these two threads
2460 * has a packet for the old call and the code below handles those
2464 MUTEX_ENTER(&call->lock);
2467 /* This packet can't be for this call. If the new call address is
2468 * 0 then no call is running on this channel. If there is a call
2469 * then, since this is a client connection we're getting data for
2470 * it must be for the previous call.
2472 MUTEX_ENTER(&rx_stats_mutex);
2473 rx_stats.spuriousPacketsRead++;
2474 MUTEX_EXIT(&rx_stats_mutex);
2475 MUTEX_ENTER(&conn->conn_data_lock);
2477 MUTEX_EXIT(&conn->conn_data_lock);
2482 currentCallNumber = conn->callNumber[channel];
2484 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2485 if (np->header.callNumber < currentCallNumber) {
2486 MUTEX_ENTER(&rx_stats_mutex);
2487 rx_stats.spuriousPacketsRead++;
2488 MUTEX_EXIT(&rx_stats_mutex);
2489 #ifdef RX_ENABLE_LOCKS
2491 MUTEX_EXIT(&call->lock);
2493 MUTEX_ENTER(&conn->conn_data_lock);
2495 MUTEX_EXIT(&conn->conn_data_lock);
2499 MUTEX_ENTER(&conn->conn_call_lock);
2500 call = rxi_NewCall(conn, channel);
2501 MUTEX_EXIT(&conn->conn_call_lock);
2502 *call->callNumber = np->header.callNumber;
2503 call->state = RX_STATE_PRECALL;
2504 clock_GetTime(&call->queueTime);
2505 hzero(call->bytesSent);
2506 hzero(call->bytesRcvd);
2507 rxi_KeepAliveOn(call);
2509 else if (np->header.callNumber != currentCallNumber) {
2510 /* Wait until the transmit queue is idle before deciding
2511 * whether to reset the current call. Chances are that the
2512 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2515 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2516 while ((call->state == RX_STATE_ACTIVE) &&
2517 (call->flags & RX_CALL_TQ_BUSY)) {
2518 call->flags |= RX_CALL_TQ_WAIT;
2519 #ifdef RX_ENABLE_LOCKS
2520 CV_WAIT(&call->cv_tq, &call->lock);
2521 #else /* RX_ENABLE_LOCKS */
2522 osi_rxSleep(&call->tq);
2523 #endif /* RX_ENABLE_LOCKS */
2525 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2526 /* If the new call cannot be taken right now send a busy and set
2527 * the error condition in this call, so that it terminates as
2528 * quickly as possible */
2529 if (call->state == RX_STATE_ACTIVE) {
2530 struct rx_packet *tp;
2532 rxi_CallError(call, RX_CALL_DEAD);
2533 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2534 MUTEX_EXIT(&call->lock);
2535 MUTEX_ENTER(&conn->conn_data_lock);
2537 MUTEX_EXIT(&conn->conn_data_lock);
2540 rxi_ResetCall(call, 0);
2541 *call->callNumber = np->header.callNumber;
2542 call->state = RX_STATE_PRECALL;
2543 clock_GetTime(&call->queueTime);
2544 hzero(call->bytesSent);
2545 hzero(call->bytesRcvd);
2547 * If the number of queued calls exceeds the overload
2548 * threshold then abort this call.
2550 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2551 struct rx_packet *tp;
2553 rxi_CallError(call, rx_BusyError);
2554 tp = rxi_SendCallAbort(call, np, 1, 0);
2555 MUTEX_EXIT(&call->lock);
2556 MUTEX_ENTER(&conn->conn_data_lock);
2558 MUTEX_EXIT(&conn->conn_data_lock);
2561 rxi_KeepAliveOn(call);
2564 /* Continuing call; do nothing here. */
2566 } else { /* we're the client */
2567 /* Ignore all incoming acknowledgements for calls in DALLY state */
2568 if ( call && (call->state == RX_STATE_DALLY)
2569 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2570 MUTEX_ENTER(&rx_stats_mutex);
2571 rx_stats.ignorePacketDally++;
2572 MUTEX_EXIT(&rx_stats_mutex);
2573 #ifdef RX_ENABLE_LOCKS
2575 MUTEX_EXIT(&call->lock);
2578 MUTEX_ENTER(&conn->conn_data_lock);
2580 MUTEX_EXIT(&conn->conn_data_lock);
2584 /* Ignore anything that's not relevant to the current call. If there
2585 * isn't a current call, then no packet is relevant. */
2586 if (!call || (np->header.callNumber != currentCallNumber)) {
2587 MUTEX_ENTER(&rx_stats_mutex);
2588 rx_stats.spuriousPacketsRead++;
2589 MUTEX_EXIT(&rx_stats_mutex);
2590 #ifdef RX_ENABLE_LOCKS
2592 MUTEX_EXIT(&call->lock);
2595 MUTEX_ENTER(&conn->conn_data_lock);
2597 MUTEX_EXIT(&conn->conn_data_lock);
2600 /* If the service security object index stamped in the packet does not
2601 * match the connection's security index, ignore the packet */
2602 if (np->header.securityIndex != conn->securityIndex) {
2603 #ifdef RX_ENABLE_LOCKS
2604 MUTEX_EXIT(&call->lock);
2606 MUTEX_ENTER(&conn->conn_data_lock);
2608 MUTEX_EXIT(&conn->conn_data_lock);
2612 /* If we're receiving the response, then all transmit packets are
2613 * implicitly acknowledged. Get rid of them. */
2614 if (np->header.type == RX_PACKET_TYPE_DATA) {
2615 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2616 /* XXX Hack. Because we must release the global rx lock when
2617 * sending packets (osi_NetSend) we drop all acks while we're
2618 * traversing the tq in rxi_Start sending packets out because
2619 * packets may move to the freePacketQueue as result of being here!
2620 * So we drop these packets until we're safely out of the
2621 * traversing. Really ugly!
2622 * For fine grain RX locking, we set the acked field in the
2623 * packets and let rxi_Start remove them from the transmit queue.
2625 if (call->flags & RX_CALL_TQ_BUSY) {
2626 #ifdef RX_ENABLE_LOCKS
2627 rxi_SetAcksInTransmitQueue(call);
2630 return np; /* xmitting; drop packet */
2634 rxi_ClearTransmitQueue(call, 0);
2636 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2637 rxi_ClearTransmitQueue(call, 0);
2638 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2640 if (np->header.type == RX_PACKET_TYPE_ACK) {
2641 /* now check to see if this is an ack packet acknowledging that the
2642 * server actually *lost* some hard-acked data. If this happens we
2643 * ignore this packet, as it may indicate that the server restarted in
2644 * the middle of a call. It is also possible that this is an old ack
2645 * packet. We don't abort the connection in this case, because this
2646 * *might* just be an old ack packet. The right way to detect a server
2647 * restart in the midst of a call is to notice that the server epoch
2649 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2650 * XXX unacknowledged. I think that this is off-by-one, but
2651 * XXX I don't dare change it just yet, since it will
2652 * XXX interact badly with the server-restart detection
2653 * XXX code in receiveackpacket. */
2654 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2655 MUTEX_ENTER(&rx_stats_mutex);
2656 rx_stats.spuriousPacketsRead++;
2657 MUTEX_EXIT(&rx_stats_mutex);
2658 MUTEX_EXIT(&call->lock);
2659 MUTEX_ENTER(&conn->conn_data_lock);
2661 MUTEX_EXIT(&conn->conn_data_lock);
2665 } /* else not a data packet */
2668 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2669 /* Set remote user defined status from packet */
2670 call->remoteStatus = np->header.userStatus;
2672 /* Note the gap between the expected next packet and the actual
2673 * packet that arrived, when the new packet has a smaller serial number
2674 * than expected. Rioses frequently reorder packets all by themselves,
2675 * so this will be quite important with very large window sizes.
2676 * Skew is checked against 0 here to avoid any dependence on the type of
2677 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2679 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2680 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2681 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2683 MUTEX_ENTER(&conn->conn_data_lock);
2684 skew = conn->lastSerial - np->header.serial;
2685 conn->lastSerial = np->header.serial;
2686 MUTEX_EXIT(&conn->conn_data_lock);
2688 register struct rx_peer *peer;
2690 if (skew > peer->inPacketSkew) {
2691 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2692 peer->inPacketSkew = skew;
2696 /* Now do packet type-specific processing */
2697 switch (np->header.type) {
2698 case RX_PACKET_TYPE_DATA:
2699 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2702 case RX_PACKET_TYPE_ACK:
2703 /* Respond immediately to ack packets requesting acknowledgement
2705 if (np->header.flags & RX_REQUEST_ACK) {
2707 (void) rxi_SendCallAbort(call, 0, 1, 0);
2709 (void) rxi_SendAck(call, 0, np->header.serial,
2710 RX_ACK_PING_RESPONSE, 1);
2712 np = rxi_ReceiveAckPacket(call, np, 1);
2714 case RX_PACKET_TYPE_ABORT:
2715 /* An abort packet: reset the connection, passing the error up to
2717 /* What if error is zero? */
2718 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2720 case RX_PACKET_TYPE_BUSY:
2723 case RX_PACKET_TYPE_ACKALL:
2724 /* All packets acknowledged, so we can drop all packets previously
2725 * readied for sending */
2726 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2727 /* XXX Hack. We because we can't release the global rx lock when
2728 * sending packets (osi_NetSend) we drop all ack pkts while we're
2729 * traversing the tq in rxi_Start sending packets out because
2730 * packets may move to the freePacketQueue as result of being
2731 * here! So we drop these packets until we're safely out of the
2732 * traversing. Really ugly!
2733 * For fine grain RX locking, we set the acked field in the packets
2734 * and let rxi_Start remove the packets from the transmit queue.
2736 if (call->flags & RX_CALL_TQ_BUSY) {
2737 #ifdef RX_ENABLE_LOCKS
2738 rxi_SetAcksInTransmitQueue(call);
2740 #else /* RX_ENABLE_LOCKS */
2742 return np; /* xmitting; drop packet */
2743 #endif /* RX_ENABLE_LOCKS */
2745 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2746 rxi_ClearTransmitQueue(call, 0);
2749 /* Should not reach here, unless the peer is broken: send an abort
2751 rxi_CallError(call, RX_PROTOCOL_ERROR);
2752 np = rxi_SendCallAbort(call, np, 1, 0);
2755 /* Note when this last legitimate packet was received, for keep-alive
2756 * processing. Note, we delay getting the time until now in the hope that
2757 * the packet will be delivered to the user before any get time is required
2758 * (if not, then the time won't actually be re-evaluated here). */
2759 call->lastReceiveTime = clock_Sec();
2760 MUTEX_EXIT(&call->lock);
2761 MUTEX_ENTER(&conn->conn_data_lock);
2763 MUTEX_EXIT(&conn->conn_data_lock);
2767 /* return true if this is an "interesting" connection from the point of view
2768 of someone trying to debug the system */
2769 int rxi_IsConnInteresting(struct rx_connection *aconn)
2772 register struct rx_call *tcall;
2774 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2776 for(i=0;i<RX_MAXCALLS;i++) {
2777 tcall = aconn->call[i];
2779 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2781 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2789 /* if this is one of the last few packets AND it wouldn't be used by the
2790 receiving call to immediately satisfy a read request, then drop it on
2791 the floor, since accepting it might prevent a lock-holding thread from
2792 making progress in its reading. If a call has been cleared while in
2793 the precall state then ignore all subsequent packets until the call
2794 is assigned to a thread. */
2796 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2799 MUTEX_ENTER(&rx_stats_mutex);
2800 if (((ap->header.seq != 1) &&
2801 (acall->flags & RX_CALL_CLEARED) &&
2802 (acall->state == RX_STATE_PRECALL)) ||
2803 ((rx_nFreePackets < rxi_dataQuota+2) &&
2804 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2805 && (acall->flags & RX_CALL_READER_WAIT)))) {
2808 MUTEX_EXIT(&rx_stats_mutex);
2813 static void rxi_CheckReachEvent(struct rxevent *event,
2814 struct rx_connection *conn, struct rx_call *acall)
2816 struct rx_call *call = acall;
2820 MUTEX_ENTER(&conn->conn_data_lock);
2821 conn->checkReachEvent = NULL;
2822 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2823 if (event) conn->refCount--;
2824 MUTEX_EXIT(&conn->conn_data_lock);
2828 MUTEX_ENTER(&conn->conn_call_lock);
2829 MUTEX_ENTER(&conn->conn_data_lock);
2830 for (i=0; i<RX_MAXCALLS; i++) {
2831 struct rx_call *tc = conn->call[i];
2832 if (tc && tc->state == RX_STATE_PRECALL) {
2838 /* Indicate that rxi_CheckReachEvent is no longer running by
2839 * clearing the flag. Must be atomic under conn_data_lock to
2840 * avoid a new call slipping by: rxi_CheckConnReach holds
2841 * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2843 conn->flags &= ~RX_CONN_ATTACHWAIT;
2844 MUTEX_EXIT(&conn->conn_data_lock);
2845 MUTEX_EXIT(&conn->conn_call_lock);
2849 if (call != acall) MUTEX_ENTER(&call->lock);
2850 rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
2851 if (call != acall) MUTEX_EXIT(&call->lock);
2853 clock_GetTime(&when);
2854 when.sec += RX_CHECKREACH_TIMEOUT;
2855 MUTEX_ENTER(&conn->conn_data_lock);
2856 if (!conn->checkReachEvent) {
2858 conn->checkReachEvent =
2859 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2861 MUTEX_EXIT(&conn->conn_data_lock);
2866 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2868 struct rx_service *service = conn->service;
2869 struct rx_peer *peer = conn->peer;
2870 afs_uint32 now, lastReach;
2872 if (service->checkReach == 0)
2876 MUTEX_ENTER(&peer->peer_lock);
2877 lastReach = peer->lastReachTime;
2878 MUTEX_EXIT(&peer->peer_lock);
2879 if (now - lastReach < RX_CHECKREACH_TTL)
2882 MUTEX_ENTER(&conn->conn_data_lock);
2883 if (conn->flags & RX_CONN_ATTACHWAIT) {
2884 MUTEX_EXIT(&conn->conn_data_lock);
2887 conn->flags |= RX_CONN_ATTACHWAIT;
2888 MUTEX_EXIT(&conn->conn_data_lock);
2889 if (!conn->checkReachEvent)
2890 rxi_CheckReachEvent(NULL, conn, call);
2895 /* try to attach call, if authentication is complete */
2896 static void TryAttach(register struct rx_call *acall,
2897 register osi_socket socket, register int *tnop,
2898 register struct rx_call **newcallp, int reachOverride)
2900 struct rx_connection *conn = acall->conn;
2902 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2903 /* Don't attach until we have any req'd. authentication. */
2904 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2905 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2906 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2907 /* Note: this does not necessarily succeed; there
2908 * may not any proc available
2912 rxi_ChallengeOn(acall->conn);
2917 /* A data packet has been received off the interface. This packet is
2918 * appropriate to the call (the call is in the right state, etc.). This
2919 * routine can return a packet to the caller, for re-use */
2921 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call,
2922 register struct rx_packet *np, int istack, osi_socket socket,
2923 afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2925 int ackNeeded = 0; /* 0 means no, otherwise ack_reason */
2929 afs_uint32 seq, serial, flags;
2931 struct rx_packet *tnp;
2933 MUTEX_ENTER(&rx_stats_mutex);
2934 rx_stats.dataPacketsRead++;
2935 MUTEX_EXIT(&rx_stats_mutex);
2938 /* If there are no packet buffers, drop this new packet, unless we can find
2939 * packet buffers from inactive calls */
2941 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2942 MUTEX_ENTER(&rx_freePktQ_lock);
2943 rxi_NeedMorePackets = TRUE;
2944 MUTEX_EXIT(&rx_freePktQ_lock);
2945 MUTEX_ENTER(&rx_stats_mutex);
2946 rx_stats.noPacketBuffersOnRead++;
2947 MUTEX_EXIT(&rx_stats_mutex);
2948 call->rprev = np->header.serial;
2949 rxi_calltrace(RX_TRACE_DROP, call);
2950 dpf (("packet %x dropped on receipt - quota problems", np));
2952 rxi_ClearReceiveQueue(call);
2953 clock_GetTime(&when);
2954 clock_Add(&when, &rx_softAckDelay);
2955 if (!call->delayedAckEvent ||
2956 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2957 rxevent_Cancel(call->delayedAckEvent, call,
2958 RX_CALL_REFCOUNT_DELAY);
2959 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2960 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2963 /* we've damaged this call already, might as well do it in. */
2969 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2970 * packet is one of several packets transmitted as a single
2971 * datagram. Do not send any soft or hard acks until all packets
2972 * in a jumbogram have been processed. Send negative acks right away.
2974 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2975 /* tnp is non-null when there are more packets in the
2976 * current jumbo gram */
2983 seq = np->header.seq;
2984 serial = np->header.serial;
2985 flags = np->header.flags;
2987 /* If the call is in an error state, send an abort message */
2989 return rxi_SendCallAbort(call, np, istack, 0);
2991 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2992 * AFS 3.5 jumbogram. */
2993 if (flags & RX_JUMBO_PACKET) {
2994 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2999 if (np->header.spare != 0) {
3000 MUTEX_ENTER(&call->conn->conn_data_lock);
3001 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3002 MUTEX_EXIT(&call->conn->conn_data_lock);
3005 /* The usual case is that this is the expected next packet */
3006 if (seq == call->rnext) {
3008 /* Check to make sure it is not a duplicate of one already queued */
3009 if (queue_IsNotEmpty(&call->rq)
3010 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3011 MUTEX_ENTER(&rx_stats_mutex);
3012 rx_stats.dupPacketsRead++;
3013 MUTEX_EXIT(&rx_stats_mutex);
3014 dpf (("packet %x dropped on receipt - duplicate", np));
3015 rxevent_Cancel(call->delayedAckEvent, call,
3016 RX_CALL_REFCOUNT_DELAY);
3017 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3023 /* It's the next packet. Stick it on the receive queue
3024 * for this call. Set newPackets to make sure we wake
3025 * the reader once all packets have been processed */
3026 queue_Prepend(&call->rq, np);
3028 np = NULL; /* We can't use this anymore */
3031 /* If an ack is requested then set a flag to make sure we
3032 * send an acknowledgement for this packet */
3033 if (flags & RX_REQUEST_ACK) {
3034 ackNeeded = RX_ACK_REQUESTED;
3037 /* Keep track of whether we have received the last packet */
3038 if (flags & RX_LAST_PACKET) {
3039 call->flags |= RX_CALL_HAVE_LAST;
3043 /* Check whether we have all of the packets for this call */
3044 if (call->flags & RX_CALL_HAVE_LAST) {
3045 afs_uint32 tseq; /* temporary sequence number */
3046 struct rx_packet *tp; /* Temporary packet pointer */
3047 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3049 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3050 if (tseq != tp->header.seq)
3052 if (tp->header.flags & RX_LAST_PACKET) {
3053 call->flags |= RX_CALL_RECEIVE_DONE;
3060 /* Provide asynchronous notification for those who want it
3061 * (e.g. multi rx) */
3062 if (call->arrivalProc) {
3063 (*call->arrivalProc)(call, call->arrivalProcHandle,
3064 (int) call->arrivalProcArg);
3065 call->arrivalProc = (VOID (*)()) 0;
3068 /* Update last packet received */
3071 /* If there is no server process serving this call, grab
3072 * one, if available. We only need to do this once. If a
3073 * server thread is available, this thread becomes a server
3074 * thread and the server thread becomes a listener thread. */
3076 TryAttach(call, socket, tnop, newcallp, 0);
3079 /* This is not the expected next packet. */
3081 /* Determine whether this is a new or old packet, and if it's
3082 * a new one, whether it fits into the current receive window.
3083 * Also figure out whether the packet was delivered in sequence.
3084 * We use the prev variable to determine whether the new packet
3085 * is the successor of its immediate predecessor in the
3086 * receive queue, and the missing flag to determine whether
3087 * any of this packets predecessors are missing. */
3089 afs_uint32 prev; /* "Previous packet" sequence number */
3090 struct rx_packet *tp; /* Temporary packet pointer */
3091 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3092 int missing; /* Are any predecessors missing? */
3094 /* If the new packet's sequence number has been sent to the
3095 * application already, then this is a duplicate */
3096 if (seq < call->rnext) {
3097 MUTEX_ENTER(&rx_stats_mutex);
3098 rx_stats.dupPacketsRead++;
3099 MUTEX_EXIT(&rx_stats_mutex);
3100 rxevent_Cancel(call->delayedAckEvent, call,
3101 RX_CALL_REFCOUNT_DELAY);
3102 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3108 /* If the sequence number is greater than what can be
3109 * accomodated by the current window, then send a negative
3110 * acknowledge and drop the packet */
3111 if ((call->rnext + call->rwind) <= seq) {
3112 rxevent_Cancel(call->delayedAckEvent, call,
3113 RX_CALL_REFCOUNT_DELAY);
3114 np = rxi_SendAck(call, np, serial,
3115 RX_ACK_EXCEEDS_WINDOW, istack);
3121 /* Look for the packet in the queue of old received packets */
3122 for (prev = call->rnext - 1, missing = 0,
3123 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3124 /*Check for duplicate packet */
3125 if (seq == tp->header.seq) {
3126 MUTEX_ENTER(&rx_stats_mutex);
3127 rx_stats.dupPacketsRead++;
3128 MUTEX_EXIT(&rx_stats_mutex);
3129 rxevent_Cancel(call->delayedAckEvent, call,
3130 RX_CALL_REFCOUNT_DELAY);
3131 np = rxi_SendAck(call, np, serial,
3132 RX_ACK_DUPLICATE, istack);
3137 /* If we find a higher sequence packet, break out and
3138 * insert the new packet here. */
3139 if (seq < tp->header.seq) break;
3140 /* Check for missing packet */
3141 if (tp->header.seq != prev+1) {
3145 prev = tp->header.seq;
3148 /* Keep track of whether we have received the last packet. */
3149 if (flags & RX_LAST_PACKET) {
3150 call->flags |= RX_CALL_HAVE_LAST;
3153 /* It's within the window: add it to the the receive queue.
3154 * tp is left by the previous loop either pointing at the
3155 * packet before which to insert the new packet, or at the
3156 * queue head if the queue is empty or the packet should be
3158 queue_InsertBefore(tp, np);
3162 /* Check whether we have all of the packets for this call */
3163 if ((call->flags & RX_CALL_HAVE_LAST)
3164 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3165 afs_uint32 tseq; /* temporary sequence number */
3167 for (tseq = call->rnext,
3168 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3169 if (tseq != tp->header.seq)
3171 if (tp->header.flags & RX_LAST_PACKET) {
3172 call->flags |= RX_CALL_RECEIVE_DONE;
3179 /* We need to send an ack of the packet is out of sequence,
3180 * or if an ack was requested by the peer. */
3181 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3182 ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
3185 /* Acknowledge the last packet for each call */
3186 if (flags & RX_LAST_PACKET) {
3197 * If the receiver is waiting for an iovec, fill the iovec
3198 * using the data from the receive queue */
3199 if (call->flags & RX_CALL_IOVEC_WAIT) {
3200 didHardAck = rxi_FillReadVec(call, serial);
3201 /* the call may have been aborted */
3210 /* Wakeup the reader if any */
3211 if ((call->flags & RX_CALL_READER_WAIT) &&
3212 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3213 (call->iovNext >= call->iovMax) ||
3214 (call->flags & RX_CALL_RECEIVE_DONE))) {
3215 call->flags &= ~RX_CALL_READER_WAIT;
3216 #ifdef RX_ENABLE_LOCKS
3217 CV_BROADCAST(&call->cv_rq);
3219 osi_rxWakeup(&call->rq);
3225 * Send an ack when requested by the peer, or once every
3226 * rxi_SoftAckRate packets until the last packet has been
3227 * received. Always send a soft ack for the last packet in
3228 * the server's reply. */
3230 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3231 np = rxi_SendAck(call, np, serial, ackNeeded, istack);
3232 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3233 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3234 np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
3235 } else if (call->nSoftAcks) {
3236 clock_GetTime(&when);
3237 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3238 clock_Add(&when, &rx_lastAckDelay);
3240 clock_Add(&when, &rx_softAckDelay);
3242 if (!call->delayedAckEvent ||
3243 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3244 rxevent_Cancel(call->delayedAckEvent, call,
3245 RX_CALL_REFCOUNT_DELAY);
3246 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3247 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3250 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3251 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3258 static void rxi_ComputeRate();
3261 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3263 struct rx_peer *peer = conn->peer;
3265 MUTEX_ENTER(&peer->peer_lock);
3266 peer->lastReachTime = clock_Sec();
3267 MUTEX_EXIT(&peer->peer_lock);
3269 MUTEX_ENTER(&conn->conn_data_lock);
3270 if (conn->flags & RX_CONN_ATTACHWAIT) {
3273 conn->flags &= ~RX_CONN_ATTACHWAIT;
3274 MUTEX_EXIT(&conn->conn_data_lock);
3276 for (i=0; i<RX_MAXCALLS; i++) {
3277 struct rx_call *call = conn->call[i];
3279 if (call != acall) MUTEX_ENTER(&call->lock);
3280 /* tnop can be null if newcallp is null */
3281 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3282 if (call != acall) MUTEX_EXIT(&call->lock);
3286 MUTEX_EXIT(&conn->conn_data_lock);
3289 /* rxi_ComputePeerNetStats
3291 * Called exclusively by rxi_ReceiveAckPacket to compute network link
3292 * estimates (like RTT and throughput) based on ack packets. Caller
3293 * must ensure that the packet in question is the right one (i.e.
3294 * serial number matches).
3297 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3298 struct rx_ackPacket *ap, struct rx_packet *np)
3300 struct rx_peer *peer = call->conn->peer;
3302 /* Use RTT if not delayed by client. */
3303 if (ap->reason != RX_ACK_DELAY)
3304 rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3306 rxi_ComputeRate(peer, call, p, np, ap->reason);
3310 /* The real smarts of the whole thing. */
3311 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call,
3312 struct rx_packet *np, int istack)
3314 struct rx_ackPacket *ap;
3316 register struct rx_packet *tp;
3317 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3318 register struct rx_connection *conn = call->conn;
3319 struct rx_peer *peer = conn->peer;
3322 /* because there are CM's that are bogus, sending weird values for this. */
3323 afs_uint32 skew = 0;
3328 int newAckCount = 0;
3329 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3330 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3332 MUTEX_ENTER(&rx_stats_mutex);
3333 rx_stats.ackPacketsRead++;
3334 MUTEX_EXIT(&rx_stats_mutex);
3335 ap = (struct rx_ackPacket *) rx_DataOf(np);
3336 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3338 return np; /* truncated ack packet */
3340 /* depends on ack packet struct */
3341 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3342 first = ntohl(ap->firstPacket);
3343 serial = ntohl(ap->serial);
3344 /* temporarily disabled -- needs to degrade over time
3345 skew = ntohs(ap->maxSkew); */
3347 /* Ignore ack packets received out of order */
3348 if (first < call->tfirst) {
3352 if (np->header.flags & RX_SLOW_START_OK) {
3353 call->flags |= RX_CALL_SLOW_START_OK;
3356 if (ap->reason == RX_ACK_PING_RESPONSE)
3357 rxi_UpdatePeerReach(conn, call);
3362 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3363 ap->reason, ntohl(ap->previousPacket),
3364 (unsigned int) np->header.seq, (unsigned int) serial,
3365 (unsigned int) skew, ntohl(ap->firstPacket));
3368 for (offset = 0; offset < nAcks; offset++)
3369 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3375 /* Update the outgoing packet skew value to the latest value of
3376 * the peer's incoming packet skew value. The ack packet, of
3377 * course, could arrive out of order, but that won't affect things
3379 MUTEX_ENTER(&peer->peer_lock);
3380 peer->outPacketSkew = skew;
3382 /* Check for packets that no longer need to be transmitted, and
3383 * discard them. This only applies to packets positively
3384 * acknowledged as having been sent to the peer's upper level.
3385 * All other packets must be retained. So only packets with
3386 * sequence numbers < ap->firstPacket are candidates. */
3387 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3388 if (tp->header.seq >= first) break;
3389 call->tfirst = tp->header.seq + 1;
3390 if (serial && (tp->header.serial == serial ||
3391 tp->firstSerial == serial))
3392 rxi_ComputePeerNetStats(call, tp, ap, np);
3393 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3394 /* XXX Hack. Because we have to release the global rx lock when sending
3395 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3396 * in rxi_Start sending packets out because packets may move to the
3397 * freePacketQueue as result of being here! So we drop these packets until
3398 * we're safely out of the traversing. Really ugly!
3399 * To make it even uglier, if we're using fine grain locking, we can
3400 * set the ack bits in the packets and have rxi_Start remove the packets
3401 * when it's done transmitting.
3403 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3406 if (call->flags & RX_CALL_TQ_BUSY) {
3407 #ifdef RX_ENABLE_LOCKS
3408 tp->flags |= RX_PKTFLAG_ACKED;
3409 call->flags |= RX_CALL_TQ_SOME_ACKED;
3410 #else /* RX_ENABLE_LOCKS */
3412 #endif /* RX_ENABLE_LOCKS */
3414 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3417 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3422 /* Give rate detector a chance to respond to ping requests */
3423 if (ap->reason == RX_ACK_PING_RESPONSE) {
3424 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3428 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3430 /* Now go through explicit acks/nacks and record the results in
3431 * the waiting packets. These are packets that can't be released
3432 * yet, even with a positive acknowledge. This positive
3433 * acknowledge only means the packet has been received by the
3434 * peer, not that it will be retained long enough to be sent to
3435 * the peer's upper level. In addition, reset the transmit timers
3436 * of any missing packets (those packets that must be missing
3437 * because this packet was out of sequence) */
3439 call->nSoftAcked = 0;
3440 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3441 /* Update round trip time if the ack was stimulated on receipt
3443 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3444 #ifdef RX_ENABLE_LOCKS
3445 if (tp->header.seq >= first)
3446 #endif /* RX_ENABLE_LOCKS */
3447 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3448 if (serial && (tp->header.serial == serial ||
3449 tp->firstSerial == serial))
3450 rxi_ComputePeerNetStats(call, tp, ap, np);
3452 /* Set the acknowledge flag per packet based on the
3453 * information in the ack packet. An acknowlegded packet can
3454 * be downgraded when the server has discarded a packet it
3455 * soacked previously, or when an ack packet is received
3456 * out of sequence. */
3457 if (tp->header.seq < first) {
3458 /* Implicit ack information */
3459 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3462 tp->flags |= RX_PKTFLAG_ACKED;
3464 else if (tp->header.seq < first + nAcks) {
3465 /* Explicit ack information: set it in the packet appropriately */
3466 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3467 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3469 tp->flags |= RX_PKTFLAG_ACKED;
3477 tp->flags &= ~RX_PKTFLAG_ACKED;
3482 tp->flags &= ~RX_PKTFLAG_ACKED;
3486 /* If packet isn't yet acked, and it has been transmitted at least
3487 * once, reset retransmit time using latest timeout
3488 * ie, this should readjust the retransmit timer for all outstanding
3489 * packets... So we don't just retransmit when we should know better*/
3491 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3492 tp->retryTime = tp->timeSent;
3493 clock_Add(&tp->retryTime, &peer->timeout);
3494 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3495 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3499 /* If the window has been extended by this acknowledge packet,
3500 * then wakeup a sender waiting in alloc for window space, or try
3501 * sending packets now, if he's been sitting on packets due to
3502 * lack of window space */
3503 if (call->tnext < (call->tfirst + call->twind)) {
3504 #ifdef RX_ENABLE_LOCKS
3505 CV_SIGNAL(&call->cv_twind);
3507 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3508 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3509 osi_rxWakeup(&call->twind);
3512 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3513 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3517 /* if the ack packet has a receivelen field hanging off it,
3518 * update our state */
3519 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3522 /* If the ack packet has a "recommended" size that is less than
3523 * what I am using now, reduce my size to match */
3524 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3525 sizeof(afs_int32), &tSize);
3526 tSize = (afs_uint32) ntohl(tSize);
3527 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3529 /* Get the maximum packet size to send to this peer */
3530 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3532 tSize = (afs_uint32)ntohl(tSize);
3533 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3534 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3536 /* sanity check - peer might have restarted with different params.
3537 * If peer says "send less", dammit, send less... Peer should never
3538 * be unable to accept packets of the size that prior AFS versions would
3539 * send without asking. */
3540 if (peer->maxMTU != tSize) {
3541 peer->maxMTU = tSize;
3542 peer->MTU = MIN(tSize, peer->MTU);
3543 call->MTU = MIN(call->MTU, tSize);
3547 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3549 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3550 sizeof(afs_int32), &tSize);
3551 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3552 if (tSize < call->twind) { /* smaller than our send */
3553 call->twind = tSize; /* window, we must send less... */
3554 call->ssthresh = MIN(call->twind, call->ssthresh);
3557 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3558 * network MTU confused with the loopback MTU. Calculate the
3559 * maximum MTU here for use in the slow start code below.
3561 maxMTU = peer->maxMTU;
3562 /* Did peer restart with older RX version? */
3563 if (peer->maxDgramPackets > 1) {
3564 peer->maxDgramPackets = 1;
3566 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3568 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3569 sizeof(afs_int32), &tSize);
3570 tSize = (afs_uint32) ntohl(tSize);
3572 * As of AFS 3.5 we set the send window to match the receive window.
3574 if (tSize < call->twind) {
3575 call->twind = tSize;
3576 call->ssthresh = MIN(call->twind, call->ssthresh);
3577 } else if (tSize > call->twind) {
3578 call->twind = tSize;
3582 * As of AFS 3.5, a jumbogram is more than one fixed size
3583 * packet transmitted in a single UDP datagram. If the remote
3584 * MTU is smaller than our local MTU then never send a datagram
3585 * larger than the natural MTU.
3587 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3588 sizeof(afs_int32), &tSize);
3589 maxDgramPackets = (afs_uint32) ntohl(tSize);
3590 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3591 maxDgramPackets = MIN(maxDgramPackets,
3592 (int)(peer->ifDgramPackets));
3593 maxDgramPackets = MIN(maxDgramPackets, tSize);
3594 if (maxDgramPackets > 1) {
3595 peer->maxDgramPackets = maxDgramPackets;
3596 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3598 peer->maxDgramPackets = 1;
3599 call->MTU = peer->natMTU;
3601 } else if (peer->maxDgramPackets > 1) {
3602 /* Restarted with lower version of RX */
3603 peer->maxDgramPackets = 1;
3605 } else if (peer->maxDgramPackets > 1 ||
3606 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3607 /* Restarted with lower version of RX */
3608 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3609 peer->natMTU = OLD_MAX_PACKET_SIZE;
3610 peer->MTU = OLD_MAX_PACKET_SIZE;
3611 peer->maxDgramPackets = 1;
3612 peer->nDgramPackets = 1;
3614 call->MTU = OLD_MAX_PACKET_SIZE;
3619 * Calculate how many datagrams were successfully received after
3620 * the first missing packet and adjust the negative ack counter
3625 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3626 if (call->nNacks < nNacked) {
3627 call->nNacks = nNacked;
3636 if (call->flags & RX_CALL_FAST_RECOVER) {
3638 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3640 call->flags &= ~RX_CALL_FAST_RECOVER;
3641 call->cwind = call->nextCwind;
3642 call->nextCwind = 0;
3645 call->nCwindAcks = 0;
3647 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3648 /* Three negative acks in a row trigger congestion recovery */
3649 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3650 MUTEX_EXIT(&peer->peer_lock);
3651 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3652 /* someone else is waiting to start recovery */
3655 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3656 while (call->flags & RX_CALL_TQ_BUSY) {
3657 call->flags |= RX_CALL_TQ_WAIT;
3658 #ifdef RX_ENABLE_LOCKS
3659 CV_WAIT(&call->cv_tq, &call->lock);
3660 #else /* RX_ENABLE_LOCKS */
3661 osi_rxSleep(&call->tq);
3662 #endif /* RX_ENABLE_LOCKS */
3664 MUTEX_ENTER(&peer->peer_lock);
3665 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3666 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3667 call->flags |= RX_CALL_FAST_RECOVER;
3668 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3669 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3671 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3672 call->nextCwind = call->ssthresh;
3675 peer->MTU = call->MTU;
3676 peer->cwind = call->nextCwind;
3677 peer->nDgramPackets = call->nDgramPackets;
3679 call->congestSeq = peer->congestSeq;
3680 /* Reset the resend times on the packets that were nacked