2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "afs/param.h"
16 #include <afs/param.h>
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
64 #include "rx_globals.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
116 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
118 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119 afs_int32 rxi_start_in_error;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
124 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125 * currently allocated within rx. This number is used to allocate the
126 * memory required to return the statistics when queried.
129 static unsigned int rxi_rpc_peer_stat_cnt;
132 * rxi_rpc_process_stat_cnt counts the total number of local process stat
133 * structures currently allocated within rx. The number is used to allocate
134 * the memory required to return the statistics when queried.
137 static unsigned int rxi_rpc_process_stat_cnt;
139 #if !defined(offsetof)
140 #include <stddef.h> /* for definition of offsetof() */
143 #ifdef AFS_PTHREAD_ENV
147 * Use procedural initialization of mutexes/condition variables
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
172 static void rxi_InitPthread(void) {
173 assert(pthread_mutex_init(&rx_clock_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxi_connCacheMutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_init_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&epoch_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_event_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&des_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&des_random_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&osi_malloc_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&event_handler_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&listener_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_if_init_mutex,
194 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_mutex_init(&rx_if_mutex,
196 (const pthread_mutexattr_t*)0)==0);
197 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198 (const pthread_mutexattr_t*)0)==0);
199 assert(pthread_mutex_init(&rxkad_random_mutex,
200 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_mutex_init(&rxkad_stats_mutex,
202 (const pthread_mutexattr_t*)0)==0);
203 assert(pthread_mutex_init(&rx_debug_mutex,
204 (const pthread_mutexattr_t*)0)==0);
206 assert(pthread_cond_init(&rx_event_handler_cond,
207 (const pthread_condattr_t*)0)==0);
208 assert(pthread_cond_init(&rx_listener_cond,
209 (const pthread_condattr_t*)0)==0);
210 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
217 * The rx_stats_mutex mutex protects the following global variables:
222 * rxi_lowConnRefCount
223 * rxi_lowPeerRefCount
232 #define INIT_PTHREAD_LOCKS
236 /* Variables for handling the minProcs implementation. availProcs gives the
237 * number of threads available in the pool at this moment (not counting dudes
238 * executing right now). totalMin gives the total number of procs required
239 * for handling all minProcs requests. minDeficit is a dynamic variable
240 * tracking the # of procs required to satisfy all of the remaining minProcs
242 * For fine grain locking to work, the quota check and the reservation of
243 * a server thread has to come while rxi_availProcs and rxi_minDeficit
244 * are locked. To this end, the code has been modified under #ifdef
245 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246 * same time. A new function, ReturnToServerPool() returns the allocation.
248 * A call can be on several queue's (but only one at a time). When
249 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250 * that no one else is touching the queue. To this end, we store the address
251 * of the queue lock in the call structure (under the call lock) when we
252 * put the call on a queue, and we clear the call_queue_lock when the
253 * call is removed from a queue (once the call lock has been obtained).
254 * This allows rxi_ResetCall to safely synchronize with others wishing
255 * to manipulate the queue.
258 #ifdef RX_ENABLE_LOCKS
259 static int rxi_ServerThreadSelectingCall;
260 static afs_kmutex_t rx_rpc_stats;
261 void rxi_StartUnlocked();
264 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
265 ** pretty good that the next packet coming in is from the same connection
266 ** as the last packet, since we're send multiple packets in a transmit window.
268 struct rx_connection *rxLastConn = 0;
270 #ifdef RX_ENABLE_LOCKS
271 /* The locking hierarchy for rx fine grain locking is composed of these
274 * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
275 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
276 * call->lock - locks call data fields.
277 * These are independent of each other:
278 * rx_freeCallQueue_lock
283 * serverQueueEntry->lock
285 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
286 * peer->lock - locks peer data fields.
287 * conn_data_lock - that more than one thread is not updating a conn data
288 * field at the same time.
296 * Do we need a lock to protect the peer field in the conn structure?
297 * conn->peer was previously a constant for all intents and so has no
298 * lock protecting this field. The multihomed client delta introduced
299 * a RX code change : change the peer field in the connection structure
300 * to that remote inetrface from which the last packet for this
301 * connection was sent out. This may become an issue if further changes
304 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
305 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
307 /* rxdb_fileID is used to identify the lock location, along with line#. */
308 static int rxdb_fileID = RXDB_FILE_RX;
309 #endif /* RX_LOCKS_DB */
310 #else /* RX_ENABLE_LOCKS */
311 #define SET_CALL_QUEUE_LOCK(C, L)
312 #define CLEAR_CALL_QUEUE_LOCK(C)
313 #endif /* RX_ENABLE_LOCKS */
314 struct rx_serverQueueEntry *rx_waitForPacket = 0;
316 /* ------------Exported Interfaces------------- */
318 /* This function allows rxkad to set the epoch to a suitably random number
319 * which rx_NewConnection will use in the future. The principle purpose is to
320 * get rxnull connections to use the same epoch as the rxkad connections do, at
321 * least once the first rxkad connection is established. This is important now
322 * that the host/port addresses aren't used in FindConnection: the uniqueness
323 * of epoch/cid matters and the start time won't do. */
325 #ifdef AFS_PTHREAD_ENV
327 * This mutex protects the following global variables:
331 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
332 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
336 #endif /* AFS_PTHREAD_ENV */
338 void rx_SetEpoch (afs_uint32 epoch)
345 /* Initialize rx. A port number may be mentioned, in which case this
346 * becomes the default port number for any service installed later.
347 * If 0 is provided for the port number, a random port will be chosen
348 * by the kernel. Whether this will ever overlap anything in
349 * /etc/services is anybody's guess... Returns 0 on success, -1 on
351 static int rxinit_status = 1;
352 #ifdef AFS_PTHREAD_ENV
354 * This mutex protects the following global variables:
358 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
359 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
362 #define UNLOCK_RX_INIT
365 int rx_Init(u_int port)
372 char *htable, *ptable;
375 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
376 __djgpp_set_quiet_socket(1);
383 if (rxinit_status == 0) {
384 tmp_status = rxinit_status;
386 return tmp_status; /* Already started; return previous error code. */
390 if (afs_winsockInit()<0)
396 * Initialize anything necessary to provide a non-premptive threading
399 rxi_InitializeThreadSupport();
402 /* Allocate and initialize a socket for client and perhaps server
405 rx_socket = rxi_GetUDPSocket((u_short)port);
406 if (rx_socket == OSI_NULLSOCKET) {
412 #ifdef RX_ENABLE_LOCKS
415 #endif /* RX_LOCKS_DB */
416 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
417 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
418 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
419 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
420 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
422 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
423 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
424 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
425 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
427 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
429 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
430 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
432 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
433 #endif /* KERNEL && AFS_HPUX110_ENV */
434 #else /* RX_ENABLE_LOCKS */
435 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
436 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
437 #endif /* AFS_GLOBAL_SUNLOCK */
438 #endif /* RX_ENABLE_LOCKS */
441 rx_connDeadTime = 12;
442 rx_tranquil = 0; /* reset flag */
443 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
445 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
446 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
447 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
448 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
449 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
450 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
452 /* Malloc up a bunch of packets & buffers */
454 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
455 queue_Init(&rx_freePacketQueue);
456 rxi_NeedMorePackets = FALSE;
457 rxi_MorePackets(rx_nPackets);
465 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
466 tv.tv_sec = clock_now.sec;
467 tv.tv_usec = clock_now.usec;
468 srand((unsigned int) tv.tv_usec);
475 #if defined(KERNEL) && !defined(UKERNEL)
476 /* Really, this should never happen in a real kernel */
479 struct sockaddr_in addr;
480 int addrlen = sizeof(addr);
481 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
485 rx_port = addr.sin_port;
488 rx_stats.minRtt.sec = 9999999;
490 rx_SetEpoch (tv.tv_sec | 0x80000000);
492 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
493 * will provide a randomer value. */
495 MUTEX_ENTER(&rx_stats_mutex);
496 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
497 MUTEX_EXIT(&rx_stats_mutex);
498 /* *Slightly* random start time for the cid. This is just to help
499 * out with the hashing function at the peer */
500 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
501 rx_connHashTable = (struct rx_connection **) htable;
502 rx_peerHashTable = (struct rx_peer **) ptable;
504 rx_lastAckDelay.sec = 0;
505 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
506 rx_hardAckDelay.sec = 0;
507 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
508 rx_softAckDelay.sec = 0;
509 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
511 rxevent_Init(20, rxi_ReScheduleEvents);
513 /* Initialize various global queues */
514 queue_Init(&rx_idleServerQueue);
515 queue_Init(&rx_incomingCallQueue);
516 queue_Init(&rx_freeCallQueue);
518 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
519 /* Initialize our list of usable IP addresses. */
523 /* Start listener process (exact function is dependent on the
524 * implementation environment--kernel or user space) */
529 tmp_status = rxinit_status = 0;
534 /* called with unincremented nRequestsRunning to see if it is OK to start
535 * a new thread in this service. Could be "no" for two reasons: over the
536 * max quota, or would prevent others from reaching their min quota.
538 #ifdef RX_ENABLE_LOCKS
539 /* This verion of QuotaOK reserves quota if it's ok while the
540 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
542 static int QuotaOK(register struct rx_service *aservice)
544 /* check if over max quota */
545 if (aservice->nRequestsRunning >= aservice->maxProcs) {
549 /* under min quota, we're OK */
550 /* otherwise, can use only if there are enough to allow everyone
551 * to go to their min quota after this guy starts.
553 MUTEX_ENTER(&rx_stats_mutex);
554 if ((aservice->nRequestsRunning < aservice->minProcs) ||
555 (rxi_availProcs > rxi_minDeficit)) {
556 aservice->nRequestsRunning++;
557 /* just started call in minProcs pool, need fewer to maintain
559 if (aservice->nRequestsRunning <= aservice->minProcs)
562 MUTEX_EXIT(&rx_stats_mutex);
565 MUTEX_EXIT(&rx_stats_mutex);
570 static void ReturnToServerPool(register struct rx_service *aservice)
572 aservice->nRequestsRunning--;
573 MUTEX_ENTER(&rx_stats_mutex);
574 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
576 MUTEX_EXIT(&rx_stats_mutex);
579 #else /* RX_ENABLE_LOCKS */
580 static int QuotaOK(register struct rx_service *aservice)
583 /* under min quota, we're OK */
584 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
586 /* check if over max quota */
587 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
589 /* otherwise, can use only if there are enough to allow everyone
590 * to go to their min quota after this guy starts.
592 if (rxi_availProcs > rxi_minDeficit) rc = 1;
595 #endif /* RX_ENABLE_LOCKS */
598 /* Called by rx_StartServer to start up lwp's to service calls.
599 NExistingProcs gives the number of procs already existing, and which
600 therefore needn't be created. */
601 void rxi_StartServerProcs(int nExistingProcs)
603 register struct rx_service *service;
608 /* For each service, reserve N processes, where N is the "minimum"
609 number of processes that MUST be able to execute a request in parallel,
610 at any time, for that process. Also compute the maximum difference
611 between any service's maximum number of processes that can run
612 (i.e. the maximum number that ever will be run, and a guarantee
613 that this number will run if other services aren't running), and its
614 minimum number. The result is the extra number of processes that
615 we need in order to provide the latter guarantee */
616 for (i=0; i<RX_MAX_SERVICES; i++) {
618 service = rx_services[i];
619 if (service == (struct rx_service *) 0) break;
620 nProcs += service->minProcs;
621 diff = service->maxProcs - service->minProcs;
622 if (diff > maxdiff) maxdiff = diff;
624 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
625 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
626 for (i = 0; i<nProcs; i++) {
627 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
632 /* This routine must be called if any services are exported. If the
633 * donateMe flag is set, the calling process is donated to the server
635 void rx_StartServer(int donateMe)
637 register struct rx_service *service;
638 register int i, nProcs=0;
644 /* Start server processes, if necessary (exact function is dependent
645 * on the implementation environment--kernel or user space). DonateMe
646 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
647 * case, one less new proc will be created rx_StartServerProcs.
649 rxi_StartServerProcs(donateMe);
651 /* count up the # of threads in minProcs, and add set the min deficit to
652 * be that value, too.
654 for (i=0; i<RX_MAX_SERVICES; i++) {
655 service = rx_services[i];
656 if (service == (struct rx_service *) 0) break;
657 MUTEX_ENTER(&rx_stats_mutex);
658 rxi_totalMin += service->minProcs;
659 /* below works even if a thread is running, since minDeficit would
660 * still have been decremented and later re-incremented.
662 rxi_minDeficit += service->minProcs;
663 MUTEX_EXIT(&rx_stats_mutex);
666 /* Turn on reaping of idle server connections */
667 rxi_ReapConnections();
676 #ifdef AFS_PTHREAD_ENV
678 pid = (pid_t) pthread_self();
679 #else /* AFS_PTHREAD_ENV */
681 LWP_CurrentProcess(&pid);
682 #endif /* AFS_PTHREAD_ENV */
684 sprintf(name,"srv_%d", ++nProcs);
686 (*registerProgram)(pid, name);
688 #endif /* AFS_NT40_ENV */
689 rx_ServerProc(); /* Never returns */
694 /* Create a new client connection to the specified service, using the
695 * specified security object to implement the security model for this
697 struct rx_connection *rx_NewConnection(register afs_uint32 shost,
698 u_short sport, u_short sservice,
699 register struct rx_securityClass *securityObject, int serviceSecurityIndex)
703 register struct rx_connection *conn;
708 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
709 shost, sport, sservice, securityObject, serviceSecurityIndex));
711 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
712 * the case of kmem_alloc? */
713 conn = rxi_AllocConnection();
714 #ifdef RX_ENABLE_LOCKS
715 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
716 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
717 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
721 MUTEX_ENTER(&rx_connHashTable_lock);
722 cid = (rx_nextCid += RX_MAXCALLS);
723 conn->type = RX_CLIENT_CONNECTION;
725 conn->epoch = rx_epoch;
726 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
727 conn->serviceId = sservice;
728 conn->securityObject = securityObject;
729 /* This doesn't work in all compilers with void (they're buggy), so fake it
731 conn->securityData = (VOID *) 0;
732 conn->securityIndex = serviceSecurityIndex;
733 rx_SetConnDeadTime(conn, rx_connDeadTime);
734 conn->ackRate = RX_FAST_ACK_RATE;
736 conn->specific = NULL;
737 conn->challengeEvent = NULL;
738 conn->delayedAbortEvent = NULL;
739 conn->abortCount = 0;
742 RXS_NewConnection(securityObject, conn);
743 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
745 conn->refCount++; /* no lock required since only this thread knows... */
746 conn->next = rx_connHashTable[hashindex];
747 rx_connHashTable[hashindex] = conn;
748 MUTEX_ENTER(&rx_stats_mutex);
749 rx_stats.nClientConns++;
750 MUTEX_EXIT(&rx_stats_mutex);
752 MUTEX_EXIT(&rx_connHashTable_lock);
758 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
760 /* The idea is to set the dead time to a value that allows several
761 * keepalives to be dropped without timing out the connection. */
762 conn->secondsUntilDead = MAX(seconds, 6);
763 conn->secondsUntilPing = conn->secondsUntilDead/6;
766 int rxi_lowPeerRefCount = 0;
767 int rxi_lowConnRefCount = 0;
770 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
771 * NOTE: must not be called with rx_connHashTable_lock held.
773 void rxi_CleanupConnection(struct rx_connection *conn)
775 /* Notify the service exporter, if requested, that this connection
776 * is being destroyed */
777 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
778 (*conn->service->destroyConnProc)(conn);
780 /* Notify the security module that this connection is being destroyed */
781 RXS_DestroyConnection(conn->securityObject, conn);
783 /* If this is the last connection using the rx_peer struct, set its
784 * idle time to now. rxi_ReapConnections will reap it if it's still
785 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
787 MUTEX_ENTER(&rx_peerHashTable_lock);
788 if (--conn->peer->refCount <= 0) {
789 conn->peer->idleWhen = clock_Sec();
790 if (conn->peer->refCount < 0) {
791 conn->peer->refCount = 0;
792 MUTEX_ENTER(&rx_stats_mutex);
793 rxi_lowPeerRefCount ++;
794 MUTEX_EXIT(&rx_stats_mutex);
797 MUTEX_EXIT(&rx_peerHashTable_lock);
799 MUTEX_ENTER(&rx_stats_mutex);
800 if (conn->type == RX_SERVER_CONNECTION)
801 rx_stats.nServerConns--;
803 rx_stats.nClientConns--;
804 MUTEX_EXIT(&rx_stats_mutex);
807 if (conn->specific) {
809 for (i = 0 ; i < conn->nSpecific ; i++) {
810 if (conn->specific[i] && rxi_keyCreate_destructor[i])
811 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
812 conn->specific[i] = NULL;
814 free(conn->specific);
816 conn->specific = NULL;
820 MUTEX_DESTROY(&conn->conn_call_lock);
821 MUTEX_DESTROY(&conn->conn_data_lock);
822 CV_DESTROY(&conn->conn_call_cv);
824 rxi_FreeConnection(conn);
827 /* Destroy the specified connection */
828 void rxi_DestroyConnection(register struct rx_connection *conn)
830 MUTEX_ENTER(&rx_connHashTable_lock);
831 rxi_DestroyConnectionNoLock(conn);
832 /* conn should be at the head of the cleanup list */
833 if (conn == rx_connCleanup_list) {
834 rx_connCleanup_list = rx_connCleanup_list->next;
835 MUTEX_EXIT(&rx_connHashTable_lock);
836 rxi_CleanupConnection(conn);
838 #ifdef RX_ENABLE_LOCKS
840 MUTEX_EXIT(&rx_connHashTable_lock);
842 #endif /* RX_ENABLE_LOCKS */
845 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
847 register struct rx_connection **conn_ptr;
848 register int havecalls = 0;
849 struct rx_packet *packet;
856 MUTEX_ENTER(&conn->conn_data_lock);
857 if (conn->refCount > 0)
860 MUTEX_ENTER(&rx_stats_mutex);
861 rxi_lowConnRefCount++;
862 MUTEX_EXIT(&rx_stats_mutex);
865 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
866 /* Busy; wait till the last guy before proceeding */
867 MUTEX_EXIT(&conn->conn_data_lock);
872 /* If the client previously called rx_NewCall, but it is still
873 * waiting, treat this as a running call, and wait to destroy the
874 * connection later when the call completes. */
875 if ((conn->type == RX_CLIENT_CONNECTION) &&
876 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
877 conn->flags |= RX_CONN_DESTROY_ME;
878 MUTEX_EXIT(&conn->conn_data_lock);
882 MUTEX_EXIT(&conn->conn_data_lock);
884 /* Check for extant references to this connection */
885 for (i = 0; i<RX_MAXCALLS; i++) {
886 register struct rx_call *call = conn->call[i];
889 if (conn->type == RX_CLIENT_CONNECTION) {
890 MUTEX_ENTER(&call->lock);
891 if (call->delayedAckEvent) {
892 /* Push the final acknowledgment out now--there
893 * won't be a subsequent call to acknowledge the
894 * last reply packets */
895 rxevent_Cancel(call->delayedAckEvent, call,
896 RX_CALL_REFCOUNT_DELAY);
897 if (call->state == RX_STATE_PRECALL ||
898 call->state == RX_STATE_ACTIVE) {
899 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
901 rxi_AckAll(NULL, call, 0);
904 MUTEX_EXIT(&call->lock);
908 #ifdef RX_ENABLE_LOCKS
910 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
911 MUTEX_EXIT(&conn->conn_data_lock);
914 /* Someone is accessing a packet right now. */
918 #endif /* RX_ENABLE_LOCKS */
921 /* Don't destroy the connection if there are any call
922 * structures still in use */
923 MUTEX_ENTER(&conn->conn_data_lock);
924 conn->flags |= RX_CONN_DESTROY_ME;
925 MUTEX_EXIT(&conn->conn_data_lock);
930 if (conn->delayedAbortEvent) {
931 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
932 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
934 MUTEX_ENTER(&conn->conn_data_lock);
935 rxi_SendConnectionAbort(conn, packet, 0, 1);
936 MUTEX_EXIT(&conn->conn_data_lock);
937 rxi_FreePacket(packet);
941 /* Remove from connection hash table before proceeding */
942 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
943 conn->epoch, conn->type) ];
944 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
945 if (*conn_ptr == conn) {
946 *conn_ptr = conn->next;
950 /* if the conn that we are destroying was the last connection, then we
951 * clear rxLastConn as well */
952 if ( rxLastConn == conn )
955 /* Make sure the connection is completely reset before deleting it. */
956 /* get rid of pending events that could zap us later */
957 if (conn->challengeEvent)
958 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
959 if (conn->checkReachEvent)
960 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
962 /* Add the connection to the list of destroyed connections that
963 * need to be cleaned up. This is necessary to avoid deadlocks
964 * in the routines we call to inform others that this connection is
965 * being destroyed. */
966 conn->next = rx_connCleanup_list;
967 rx_connCleanup_list = conn;
970 /* Externally available version */
971 void rx_DestroyConnection(register struct rx_connection *conn)
977 rxi_DestroyConnection (conn);
982 /* Start a new rx remote procedure call, on the specified connection.
983 * If wait is set to 1, wait for a free call channel; otherwise return
984 * 0. Maxtime gives the maximum number of seconds this call may take,
985 * after rx_MakeCall returns. After this time interval, a call to any
986 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
987 * For fine grain locking, we hold the conn_call_lock in order to
988 * to ensure that we don't get signalle after we found a call in an active
989 * state and before we go to sleep.
991 struct rx_call *rx_NewCall(register struct rx_connection *conn)
994 register struct rx_call *call;
995 struct clock queueTime;
999 dpf (("rx_MakeCall(conn %x)\n", conn));
1002 clock_GetTime(&queueTime);
1004 MUTEX_ENTER(&conn->conn_call_lock);
1007 * Check if there are others waiting for a new call.
1008 * If so, let them go first to avoid starving them.
1009 * This is a fairly simple scheme, and might not be
1010 * a complete solution for large numbers of waiters.
1012 if (conn->makeCallWaiters) {
1013 #ifdef RX_ENABLE_LOCKS
1014 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1021 for (i=0; i<RX_MAXCALLS; i++) {
1022 call = conn->call[i];
1024 MUTEX_ENTER(&call->lock);
1025 if (call->state == RX_STATE_DALLY) {
1026 rxi_ResetCall(call, 0);
1027 (*call->callNumber)++;
1030 MUTEX_EXIT(&call->lock);
1033 call = rxi_NewCall(conn, i);
1037 if (i < RX_MAXCALLS) {
1040 MUTEX_ENTER(&conn->conn_data_lock);
1041 conn->flags |= RX_CONN_MAKECALL_WAITING;
1042 MUTEX_EXIT(&conn->conn_data_lock);
1044 conn->makeCallWaiters++;
1045 #ifdef RX_ENABLE_LOCKS
1046 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1050 conn->makeCallWaiters--;
1053 * Wake up anyone else who might be giving us a chance to
1054 * run (see code above that avoids resource starvation).
1056 #ifdef RX_ENABLE_LOCKS
1057 CV_BROADCAST(&conn->conn_call_cv);
1062 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1064 /* Client is initially in send mode */
1065 call->state = RX_STATE_ACTIVE;
1066 call->mode = RX_MODE_SENDING;
1068 /* remember start time for call in case we have hard dead time limit */
1069 call->queueTime = queueTime;
1070 clock_GetTime(&call->startTime);
1071 hzero(call->bytesSent);
1072 hzero(call->bytesRcvd);
1074 /* Turn on busy protocol. */
1075 rxi_KeepAliveOn(call);
1077 MUTEX_EXIT(&call->lock);
1078 MUTEX_EXIT(&conn->conn_call_lock);
1082 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1083 /* Now, if TQ wasn't cleared earlier, do it now. */
1085 MUTEX_ENTER(&call->lock);
1086 while (call->flags & RX_CALL_TQ_BUSY) {
1087 call->flags |= RX_CALL_TQ_WAIT;
1088 #ifdef RX_ENABLE_LOCKS
1089 CV_WAIT(&call->cv_tq, &call->lock);
1090 #else /* RX_ENABLE_LOCKS */
1091 osi_rxSleep(&call->tq);
1092 #endif /* RX_ENABLE_LOCKS */
1094 if (call->flags & RX_CALL_TQ_CLEARME) {
1095 rxi_ClearTransmitQueue(call, 0);
1096 queue_Init(&call->tq);
1098 MUTEX_EXIT(&call->lock);
1100 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1105 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1108 register struct rx_call *tcall;
1112 for(i=0; i<RX_MAXCALLS; i++) {
1113 if ((tcall = aconn->call[i])) {
1114 if ((tcall->state == RX_STATE_ACTIVE)
1115 || (tcall->state == RX_STATE_PRECALL)) {
1125 int rxi_GetCallNumberVector(register struct rx_connection *aconn,
1126 register afs_int32 *aint32s)
1129 register struct rx_call *tcall;
1133 for(i=0; i<RX_MAXCALLS; i++) {
1134 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1135 aint32s[i] = aconn->callNumber[i]+1;
1137 aint32s[i] = aconn->callNumber[i];
1143 int rxi_SetCallNumberVector(register struct rx_connection *aconn,
1144 register afs_int32 *aint32s)
1147 register struct rx_call *tcall;
1151 for(i=0; i<RX_MAXCALLS; i++) {
1152 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1153 aconn->callNumber[i] = aint32s[i] - 1;
1155 aconn->callNumber[i] = aint32s[i];
1161 /* Advertise a new service. A service is named locally by a UDP port
1162 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1165 char *serviceName; Name for identification purposes (e.g. the
1166 service name might be used for probing for
1168 struct rx_service *rx_NewService(u_short port, u_short serviceId,
1170 struct rx_securityClass **securityObjects,
1171 int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1173 osi_socket socket = OSI_NULLSOCKET;
1174 register struct rx_service *tservice;
1180 if (serviceId == 0) {
1181 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1187 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1194 tservice = rxi_AllocService();
1197 for (i = 0; i<RX_MAX_SERVICES; i++) {
1198 register struct rx_service *service = rx_services[i];
1200 if (port == service->servicePort) {
1201 if (service->serviceId == serviceId) {
1202 /* The identical service has already been
1203 * installed; if the caller was intending to
1204 * change the security classes used by this
1205 * service, he/she loses. */
1206 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1209 rxi_FreeService(tservice);
1212 /* Different service, same port: re-use the socket
1213 * which is bound to the same port */
1214 socket = service->socket;
1217 if (socket == OSI_NULLSOCKET) {
1218 /* If we don't already have a socket (from another
1219 * service on same port) get a new one */
1220 socket = rxi_GetUDPSocket(port);
1221 if (socket == OSI_NULLSOCKET) {
1224 rxi_FreeService(tservice);
1229 service->socket = socket;
1230 service->servicePort = port;
1231 service->serviceId = serviceId;
1232 service->serviceName = serviceName;
1233 service->nSecurityObjects = nSecurityObjects;
1234 service->securityObjects = securityObjects;
1235 service->minProcs = 0;
1236 service->maxProcs = 1;
1237 service->idleDeadTime = 60;
1238 service->connDeadTime = rx_connDeadTime;
1239 service->executeRequestProc = serviceProc;
1240 service->checkReach = 0;
1241 rx_services[i] = service; /* not visible until now */
1249 rxi_FreeService(tservice);
1250 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1254 /* Generic request processing loop. This routine should be called
1255 * by the implementation dependent rx_ServerProc. If socketp is
1256 * non-null, it will be set to the file descriptor that this thread
1257 * is now listening on. If socketp is null, this routine will never
1259 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1261 register struct rx_call *call;
1262 register afs_int32 code;
1263 register struct rx_service *tservice = NULL;
1270 call = rx_GetCall(threadID, tservice, socketp);
1271 if (socketp && *socketp != OSI_NULLSOCKET) {
1272 /* We are now a listener thread */
1277 /* if server is restarting( typically smooth shutdown) then do not
1278 * allow any new calls.
1281 if ( rx_tranquil && (call != NULL) ) {
1286 MUTEX_ENTER(&call->lock);
1288 rxi_CallError(call, RX_RESTARTING);
1289 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1291 MUTEX_EXIT(&call->lock);
1297 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1298 #ifdef RX_ENABLE_LOCKS
1300 #endif /* RX_ENABLE_LOCKS */
1301 afs_termState = AFSOP_STOP_AFS;
1302 afs_osi_Wakeup(&afs_termState);
1303 #ifdef RX_ENABLE_LOCKS
1305 #endif /* RX_ENABLE_LOCKS */
1310 tservice = call->conn->service;
1312 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1314 code = call->conn->service->executeRequestProc(call);
1316 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1318 rx_EndCall(call, code);
1319 MUTEX_ENTER(&rx_stats_mutex);
1321 MUTEX_EXIT(&rx_stats_mutex);
1326 void rx_WakeupServerProcs(void)
1328 struct rx_serverQueueEntry *np, *tqp;
1333 MUTEX_ENTER(&rx_serverPool_lock);
1335 #ifdef RX_ENABLE_LOCKS
1336 if (rx_waitForPacket)
1337 CV_BROADCAST(&rx_waitForPacket->cv);
1338 #else /* RX_ENABLE_LOCKS */
1339 if (rx_waitForPacket)
1340 osi_rxWakeup(rx_waitForPacket);
1341 #endif /* RX_ENABLE_LOCKS */
1342 MUTEX_ENTER(&freeSQEList_lock);
1343 for (np = rx_FreeSQEList; np; np = tqp) {
1344 tqp = *(struct rx_serverQueueEntry **)np;
1345 #ifdef RX_ENABLE_LOCKS
1346 CV_BROADCAST(&np->cv);
1347 #else /* RX_ENABLE_LOCKS */
1349 #endif /* RX_ENABLE_LOCKS */
1351 MUTEX_EXIT(&freeSQEList_lock);
1352 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1353 #ifdef RX_ENABLE_LOCKS
1354 CV_BROADCAST(&np->cv);
1355 #else /* RX_ENABLE_LOCKS */
1357 #endif /* RX_ENABLE_LOCKS */
1359 MUTEX_EXIT(&rx_serverPool_lock);
1365 * One thing that seems to happen is that all the server threads get
1366 * tied up on some empty or slow call, and then a whole bunch of calls
1367 * arrive at once, using up the packet pool, so now there are more
1368 * empty calls. The most critical resources here are server threads
1369 * and the free packet pool. The "doreclaim" code seems to help in
1370 * general. I think that eventually we arrive in this state: there
1371 * are lots of pending calls which do have all their packets present,
1372 * so they won't be reclaimed, are multi-packet calls, so they won't
1373 * be scheduled until later, and thus are tying up most of the free
1374 * packet pool for a very long time.
1376 * 1. schedule multi-packet calls if all the packets are present.
1377 * Probably CPU-bound operation, useful to return packets to pool.
1378 * Do what if there is a full window, but the last packet isn't here?
1379 * 3. preserve one thread which *only* runs "best" calls, otherwise
1380 * it sleeps and waits for that type of call.
1381 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1382 * the current dataquota business is badly broken. The quota isn't adjusted
1383 * to reflect how many packets are presently queued for a running call.
1384 * So, when we schedule a queued call with a full window of packets queued
1385 * up for it, that *should* free up a window full of packets for other 2d-class
1386 * calls to be able to use from the packet pool. But it doesn't.
1388 * NB. Most of the time, this code doesn't run -- since idle server threads
1389 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1390 * as a new call arrives.
1392 /* Sleep until a call arrives. Returns a pointer to the call, ready
1393 * for an rx_Read. */
1394 #ifdef RX_ENABLE_LOCKS
1395 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1397 struct rx_serverQueueEntry *sq;
1398 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1399 struct rx_service *service = NULL;
1402 MUTEX_ENTER(&freeSQEList_lock);
1404 if ((sq = rx_FreeSQEList)) {
1405 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1406 MUTEX_EXIT(&freeSQEList_lock);
1407 } else { /* otherwise allocate a new one and return that */
1408 MUTEX_EXIT(&freeSQEList_lock);
1409 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1410 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1411 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1414 MUTEX_ENTER(&rx_serverPool_lock);
1415 if (cur_service != NULL) {
1416 ReturnToServerPool(cur_service);
1419 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1420 register struct rx_call *tcall, *ncall;
1421 choice2 = (struct rx_call *) 0;
1422 /* Scan for eligible incoming calls. A call is not eligible
1423 * if the maximum number of calls for its service type are
1424 * already executing */
1425 /* One thread will process calls FCFS (to prevent starvation),
1426 * while the other threads may run ahead looking for calls which
1427 * have all their input data available immediately. This helps
1428 * keep threads from blocking, waiting for data from the client. */
1429 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1430 service = tcall->conn->service;
1431 if (!QuotaOK(service)) {
1434 if (!tno || !tcall->queue_item_header.next ) {
1435 /* If we're thread 0, then we'll just use
1436 * this call. If we haven't been able to find an optimal
1437 * choice, and we're at the end of the list, then use a
1438 * 2d choice if one has been identified. Otherwise... */
1439 call = (choice2 ? choice2 : tcall);
1440 service = call->conn->service;
1441 } else if (!queue_IsEmpty(&tcall->rq)) {
1442 struct rx_packet *rp;
1443 rp = queue_First(&tcall->rq, rx_packet);
1444 if (rp->header.seq == 1) {
1445 if (!meltdown_1pkt ||
1446 (rp->header.flags & RX_LAST_PACKET)) {
1448 } else if (rxi_2dchoice && !choice2 &&
1449 !(tcall->flags & RX_CALL_CLEARED) &&
1450 (tcall->rprev > rxi_HardAckRate)) {
1452 } else rxi_md2cnt++;
1458 ReturnToServerPool(service);
1465 rxi_ServerThreadSelectingCall = 1;
1466 MUTEX_EXIT(&rx_serverPool_lock);
1467 MUTEX_ENTER(&call->lock);
1468 MUTEX_ENTER(&rx_serverPool_lock);
1470 if (queue_IsEmpty(&call->rq) ||
1471 queue_First(&call->rq, rx_packet)->header.seq != 1)
1472 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1474 CLEAR_CALL_QUEUE_LOCK(call);
1476 MUTEX_EXIT(&call->lock);
1477 ReturnToServerPool(service);
1478 rxi_ServerThreadSelectingCall = 0;
1479 CV_SIGNAL(&rx_serverPool_cv);
1480 call = (struct rx_call*)0;
1483 call->flags &= (~RX_CALL_WAIT_PROC);
1484 MUTEX_ENTER(&rx_stats_mutex);
1486 MUTEX_EXIT(&rx_stats_mutex);
1487 rxi_ServerThreadSelectingCall = 0;
1488 CV_SIGNAL(&rx_serverPool_cv);
1489 MUTEX_EXIT(&rx_serverPool_lock);
1493 /* If there are no eligible incoming calls, add this process
1494 * to the idle server queue, to wait for one */
1498 *socketp = OSI_NULLSOCKET;
1500 sq->socketp = socketp;
1501 queue_Append(&rx_idleServerQueue, sq);
1502 #ifndef AFS_AIX41_ENV
1503 rx_waitForPacket = sq;
1504 #endif /* AFS_AIX41_ENV */
1506 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1508 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1509 MUTEX_EXIT(&rx_serverPool_lock);
1510 return (struct rx_call *)0;
1513 } while (!(call = sq->newcall) &&
1514 !(socketp && *socketp != OSI_NULLSOCKET));
1515 MUTEX_EXIT(&rx_serverPool_lock);
1517 MUTEX_ENTER(&call->lock);
1523 MUTEX_ENTER(&freeSQEList_lock);
1524 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1525 rx_FreeSQEList = sq;
1526 MUTEX_EXIT(&freeSQEList_lock);
1529 clock_GetTime(&call->startTime);
1530 call->state = RX_STATE_ACTIVE;
1531 call->mode = RX_MODE_RECEIVING;
1533 rxi_calltrace(RX_CALL_START, call);
1534 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1535 call->conn->service->servicePort,
1536 call->conn->service->serviceId, call));
1538 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1539 MUTEX_EXIT(&call->lock);
1541 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1546 #else /* RX_ENABLE_LOCKS */
1547 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1549 struct rx_serverQueueEntry *sq;
1550 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1551 struct rx_service *service = NULL;
1556 MUTEX_ENTER(&freeSQEList_lock);
1558 if ((sq = rx_FreeSQEList)) {
1559 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1560 MUTEX_EXIT(&freeSQEList_lock);
1561 } else { /* otherwise allocate a new one and return that */
1562 MUTEX_EXIT(&freeSQEList_lock);
1563 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1564 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1565 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1567 MUTEX_ENTER(&sq->lock);
1569 if (cur_service != NULL) {
1570 cur_service->nRequestsRunning--;
1571 if (cur_service->nRequestsRunning < cur_service->minProcs)
1575 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1576 register struct rx_call *tcall, *ncall;
1577 /* Scan for eligible incoming calls. A call is not eligible
1578 * if the maximum number of calls for its service type are
1579 * already executing */
1580 /* One thread will process calls FCFS (to prevent starvation),
1581 * while the other threads may run ahead looking for calls which
1582 * have all their input data available immediately. This helps
1583 * keep threads from blocking, waiting for data from the client. */
1584 choice2 = (struct rx_call *) 0;
1585 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1586 service = tcall->conn->service;
1587 if (QuotaOK(service)) {
1588 if (!tno || !tcall->queue_item_header.next ) {
1589 /* If we're thread 0, then we'll just use
1590 * this call. If we haven't been able to find an optimal
1591 * choice, and we're at the end of the list, then use a
1592 * 2d choice if one has been identified. Otherwise... */
1593 call = (choice2 ? choice2 : tcall);
1594 service = call->conn->service;
1595 } else if (!queue_IsEmpty(&tcall->rq)) {
1596 struct rx_packet *rp;
1597 rp = queue_First(&tcall->rq, rx_packet);
1598 if (rp->header.seq == 1
1599 && (!meltdown_1pkt ||
1600 (rp->header.flags & RX_LAST_PACKET))) {
1602 } else if (rxi_2dchoice && !choice2 &&
1603 !(tcall->flags & RX_CALL_CLEARED) &&
1604 (tcall->rprev > rxi_HardAckRate)) {
1606 } else rxi_md2cnt++;
1616 /* we can't schedule a call if there's no data!!! */
1617 /* send an ack if there's no data, if we're missing the
1618 * first packet, or we're missing something between first
1619 * and last -- there's a "hole" in the incoming data. */
1620 if (queue_IsEmpty(&call->rq) ||
1621 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1622 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1623 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1625 call->flags &= (~RX_CALL_WAIT_PROC);
1626 service->nRequestsRunning++;
1627 /* just started call in minProcs pool, need fewer to maintain
1629 if (service->nRequestsRunning <= service->minProcs)
1633 /* MUTEX_EXIT(&call->lock); */
1636 /* If there are no eligible incoming calls, add this process
1637 * to the idle server queue, to wait for one */
1640 *socketp = OSI_NULLSOCKET;
1642 sq->socketp = socketp;
1643 queue_Append(&rx_idleServerQueue, sq);
1647 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1650 return (struct rx_call *)0;
1653 } while (!(call = sq->newcall) &&
1654 !(socketp && *socketp != OSI_NULLSOCKET));
1656 MUTEX_EXIT(&sq->lock);
1658 MUTEX_ENTER(&freeSQEList_lock);
1659 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1660 rx_FreeSQEList = sq;
1661 MUTEX_EXIT(&freeSQEList_lock);
1664 clock_GetTime(&call->startTime);
1665 call->state = RX_STATE_ACTIVE;
1666 call->mode = RX_MODE_RECEIVING;
1668 rxi_calltrace(RX_CALL_START, call);
1669 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1670 call->conn->service->servicePort,
1671 call->conn->service->serviceId, call));
1673 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1681 #endif /* RX_ENABLE_LOCKS */
1685 /* Establish a procedure to be called when a packet arrives for a
1686 * call. This routine will be called at most once after each call,
1687 * and will also be called if there is an error condition on the or
1688 * the call is complete. Used by multi rx to build a selection
1689 * function which determines which of several calls is likely to be a
1690 * good one to read from.
1691 * NOTE: the way this is currently implemented it is probably only a
1692 * good idea to (1) use it immediately after a newcall (clients only)
1693 * and (2) only use it once. Other uses currently void your warranty
1695 void rx_SetArrivalProc(register struct rx_call *call,
1696 register VOID (*proc)(register struct rx_call *call,
1697 register struct multi_handle *mh, register int index),
1698 register VOID *handle, register VOID *arg)
1700 call->arrivalProc = proc;
1701 call->arrivalProcHandle = handle;
1702 call->arrivalProcArg = arg;
1705 /* Call is finished (possibly prematurely). Return rc to the peer, if
1706 * appropriate, and return the final error code from the conversation
1709 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1711 register struct rx_connection *conn = call->conn;
1712 register struct rx_service *service;
1713 register struct rx_packet *tp; /* Temporary packet pointer */
1714 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1718 dpf(("rx_EndCall(call %x)\n", call));
1722 MUTEX_ENTER(&call->lock);
1724 if (rc == 0 && call->error == 0) {
1725 call->abortCode = 0;
1726 call->abortCount = 0;
1729 call->arrivalProc = (VOID (*)()) 0;
1730 if (rc && call->error == 0) {
1731 rxi_CallError(call, rc);
1732 /* Send an abort message to the peer if this error code has
1733 * only just been set. If it was set previously, assume the
1734 * peer has already been sent the error code or will request it
1736 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1738 if (conn->type == RX_SERVER_CONNECTION) {
1739 /* Make sure reply or at least dummy reply is sent */
1740 if (call->mode == RX_MODE_RECEIVING) {
1741 rxi_WriteProc(call, 0, 0);
1743 if (call->mode == RX_MODE_SENDING) {
1744 rxi_FlushWrite(call);
1746 service = conn->service;
1747 rxi_calltrace(RX_CALL_END, call);
1748 /* Call goes to hold state until reply packets are acknowledged */
1749 if (call->tfirst + call->nSoftAcked < call->tnext) {
1750 call->state = RX_STATE_HOLD;
1752 call->state = RX_STATE_DALLY;
1753 rxi_ClearTransmitQueue(call, 0);
1754 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1755 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1758 else { /* Client connection */
1760 /* Make sure server receives input packets, in the case where
1761 * no reply arguments are expected */
1762 if ((call->mode == RX_MODE_SENDING)
1763 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1764 (void) rxi_ReadProc(call, &dummy, 1);
1767 /* If we had an outstanding delayed ack, be nice to the server
1768 * and force-send it now.
1770 if (call->delayedAckEvent) {
1771 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1772 call->delayedAckEvent = NULL;
1773 rxi_SendDelayedAck(NULL, call, NULL);
1776 /* We need to release the call lock since it's lower than the
1777 * conn_call_lock and we don't want to hold the conn_call_lock
1778 * over the rx_ReadProc call. The conn_call_lock needs to be held
1779 * here for the case where rx_NewCall is perusing the calls on
1780 * the connection structure. We don't want to signal until
1781 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1782 * have checked this call, found it active and by the time it
1783 * goes to sleep, will have missed the signal.
1785 MUTEX_EXIT(&call->lock);
1786 MUTEX_ENTER(&conn->conn_call_lock);
1787 MUTEX_ENTER(&call->lock);
1788 MUTEX_ENTER(&conn->conn_data_lock);
1789 conn->flags |= RX_CONN_BUSY;
1790 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1791 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1792 MUTEX_EXIT(&conn->conn_data_lock);
1793 #ifdef RX_ENABLE_LOCKS
1794 CV_BROADCAST(&conn->conn_call_cv);
1799 #ifdef RX_ENABLE_LOCKS
1801 MUTEX_EXIT(&conn->conn_data_lock);
1803 #endif /* RX_ENABLE_LOCKS */
1804 call->state = RX_STATE_DALLY;
1806 error = call->error;
1808 /* currentPacket, nLeft, and NFree must be zeroed here, because
1809 * ResetCall cannot: ResetCall may be called at splnet(), in the
1810 * kernel version, and may interrupt the macros rx_Read or
1811 * rx_Write, which run at normal priority for efficiency. */
1812 if (call->currentPacket) {
1813 rxi_FreePacket(call->currentPacket);
1814 call->currentPacket = (struct rx_packet *) 0;
1815 call->nLeft = call->nFree = call->curlen = 0;
1818 call->nLeft = call->nFree = call->curlen = 0;
1820 /* Free any packets from the last call to ReadvProc/WritevProc */
1821 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1826 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1827 MUTEX_EXIT(&call->lock);
1828 if (conn->type == RX_CLIENT_CONNECTION) {
1829 MUTEX_EXIT(&conn->conn_call_lock);
1830 conn->flags &= ~RX_CONN_BUSY;
1835 * Map errors to the local host's errno.h format.
1837 error = ntoh_syserr_conv(error);
1841 #if !defined(KERNEL)
1843 /* Call this routine when shutting down a server or client (especially
1844 * clients). This will allow Rx to gracefully garbage collect server
1845 * connections, and reduce the number of retries that a server might
1846 * make to a dead client.
1847 * This is not quite right, since some calls may still be ongoing and
1848 * we can't lock them to destroy them. */
1849 void rx_Finalize(void)
1851 register struct rx_connection **conn_ptr, **conn_end;
1855 if (rxinit_status == 1) {
1857 return; /* Already shutdown. */
1859 rxi_DeleteCachedConnections();
1860 if (rx_connHashTable) {
1861 MUTEX_ENTER(&rx_connHashTable_lock);
1862 for (conn_ptr = &rx_connHashTable[0],
1863 conn_end = &rx_connHashTable[rx_hashTableSize];
1864 conn_ptr < conn_end; conn_ptr++) {
1865 struct rx_connection *conn, *next;
1866 for (conn = *conn_ptr; conn; conn = next) {
1868 if (conn->type == RX_CLIENT_CONNECTION) {
1869 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1871 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1872 #ifdef RX_ENABLE_LOCKS
1873 rxi_DestroyConnectionNoLock(conn);
1874 #else /* RX_ENABLE_LOCKS */
1875 rxi_DestroyConnection(conn);
1876 #endif /* RX_ENABLE_LOCKS */
1880 #ifdef RX_ENABLE_LOCKS
1881 while (rx_connCleanup_list) {
1882 struct rx_connection *conn;
1883 conn = rx_connCleanup_list;
1884 rx_connCleanup_list = rx_connCleanup_list->next;
1885 MUTEX_EXIT(&rx_connHashTable_lock);
1886 rxi_CleanupConnection(conn);
1887 MUTEX_ENTER(&rx_connHashTable_lock);
1889 MUTEX_EXIT(&rx_connHashTable_lock);
1890 #endif /* RX_ENABLE_LOCKS */
1899 /* if we wakeup packet waiter too often, can get in loop with two
1900 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1901 void rxi_PacketsUnWait(void)
1903 if (!rx_waitingForPackets) {
1907 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1908 return; /* still over quota */
1911 rx_waitingForPackets = 0;
1912 #ifdef RX_ENABLE_LOCKS
1913 CV_BROADCAST(&rx_waitingForPackets_cv);
1915 osi_rxWakeup(&rx_waitingForPackets);
1921 /* ------------------Internal interfaces------------------------- */
1923 /* Return this process's service structure for the
1924 * specified socket and service */
1925 struct rx_service *rxi_FindService(register osi_socket socket,
1926 register u_short serviceId)
1928 register struct rx_service **sp;
1929 for (sp = &rx_services[0]; *sp; sp++) {
1930 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1936 /* Allocate a call structure, for the indicated channel of the
1937 * supplied connection. The mode and state of the call must be set by
1938 * the caller. Returns the call with mutex locked. */
1939 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1940 register int channel)
1942 register struct rx_call *call;
1943 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1944 register struct rx_call *cp; /* Call pointer temp */
1945 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1946 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1948 /* Grab an existing call structure, or allocate a new one.
1949 * Existing call structures are assumed to have been left reset by
1951 MUTEX_ENTER(&rx_freeCallQueue_lock);
1953 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1955 * EXCEPT that the TQ might not yet be cleared out.
1956 * Skip over those with in-use TQs.
1959 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1960 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1966 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1967 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1968 call = queue_First(&rx_freeCallQueue, rx_call);
1969 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1971 MUTEX_ENTER(&rx_stats_mutex);
1972 rx_stats.nFreeCallStructs--;
1973 MUTEX_EXIT(&rx_stats_mutex);
1974 MUTEX_EXIT(&rx_freeCallQueue_lock);
1975 MUTEX_ENTER(&call->lock);
1976 CLEAR_CALL_QUEUE_LOCK(call);
1977 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1978 /* Now, if TQ wasn't cleared earlier, do it now. */
1979 if (call->flags & RX_CALL_TQ_CLEARME) {
1980 rxi_ClearTransmitQueue(call, 0);
1981 queue_Init(&call->tq);
1983 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1984 /* Bind the call to its connection structure */
1986 rxi_ResetCall(call, 1);
1989 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1991 MUTEX_EXIT(&rx_freeCallQueue_lock);
1992 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1993 MUTEX_ENTER(&call->lock);
1994 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1995 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1996 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1998 MUTEX_ENTER(&rx_stats_mutex);
1999 rx_stats.nCallStructs++;
2000 MUTEX_EXIT(&rx_stats_mutex);
2001 /* Initialize once-only items */
2002 queue_Init(&call->tq);
2003 queue_Init(&call->rq);
2004 queue_Init(&call->iovq);
2005 /* Bind the call to its connection structure (prereq for reset) */
2007 rxi_ResetCall(call, 1);
2009 call->channel = channel;
2010 call->callNumber = &conn->callNumber[channel];
2011 /* Note that the next expected call number is retained (in
2012 * conn->callNumber[i]), even if we reallocate the call structure
2014 conn->call[channel] = call;
2015 /* if the channel's never been used (== 0), we should start at 1, otherwise
2016 the call number is valid from the last time this channel was used */
2017 if (*call->callNumber == 0) *call->callNumber = 1;
2022 /* A call has been inactive long enough that so we can throw away
2023 * state, including the call structure, which is placed on the call
2025 * Call is locked upon entry.
2026 * haveCTLock set if called from rxi_ReapConnections
2028 #ifdef RX_ENABLE_LOCKS
2029 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2030 #else /* RX_ENABLE_LOCKS */
2031 void rxi_FreeCall(register struct rx_call *call)
2032 #endif /* RX_ENABLE_LOCKS */
2034 register int channel = call->channel;
2035 register struct rx_connection *conn = call->conn;
2038 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2039 (*call->callNumber)++;
2040 rxi_ResetCall(call, 0);
2041 call->conn->call[channel] = (struct rx_call *) 0;
2043 MUTEX_ENTER(&rx_freeCallQueue_lock);
2044 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2045 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2046 /* A call may be free even though its transmit queue is still in use.
2047 * Since we search the call list from head to tail, put busy calls at
2048 * the head of the list, and idle calls at the tail.
2050 if (call->flags & RX_CALL_TQ_BUSY)
2051 queue_Prepend(&rx_freeCallQueue, call);
2053 queue_Append(&rx_freeCallQueue, call);
2054 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2055 queue_Append(&rx_freeCallQueue, call);
2056 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2057 MUTEX_ENTER(&rx_stats_mutex);
2058 rx_stats.nFreeCallStructs++;
2059 MUTEX_EXIT(&rx_stats_mutex);
2061 MUTEX_EXIT(&rx_freeCallQueue_lock);
2063 /* Destroy the connection if it was previously slated for
2064 * destruction, i.e. the Rx client code previously called
2065 * rx_DestroyConnection (client connections), or
2066 * rxi_ReapConnections called the same routine (server
2067 * connections). Only do this, however, if there are no
2068 * outstanding calls. Note that for fine grain locking, there appears
2069 * to be a deadlock in that rxi_FreeCall has a call locked and
2070 * DestroyConnectionNoLock locks each call in the conn. But note a
2071 * few lines up where we have removed this call from the conn.
2072 * If someone else destroys a connection, they either have no
2073 * call lock held or are going through this section of code.
2075 if (conn->flags & RX_CONN_DESTROY_ME) {
2076 MUTEX_ENTER(&conn->conn_data_lock);
2078 MUTEX_EXIT(&conn->conn_data_lock);
2079 #ifdef RX_ENABLE_LOCKS
2081 rxi_DestroyConnectionNoLock(conn);
2083 rxi_DestroyConnection(conn);
2084 #else /* RX_ENABLE_LOCKS */
2085 rxi_DestroyConnection(conn);
2086 #endif /* RX_ENABLE_LOCKS */
2090 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2091 char *rxi_Alloc(register size_t size)
2095 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2096 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2099 int glockOwner = ISAFS_GLOCK();
2103 MUTEX_ENTER(&rx_stats_mutex);
2104 rxi_Alloccnt++; rxi_Allocsize += size;
2105 MUTEX_EXIT(&rx_stats_mutex);
2106 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2107 if (size > AFS_SMALLOCSIZ) {
2108 p = (char *) osi_AllocMediumSpace(size);
2110 p = (char *) osi_AllocSmall(size, 1);
2111 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2116 p = (char *) osi_Alloc(size);
2118 if (!p) osi_Panic("rxi_Alloc error");
2123 void rxi_Free(void *addr, register size_t size)
2125 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2126 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2129 int glockOwner = ISAFS_GLOCK();
2133 MUTEX_ENTER(&rx_stats_mutex);
2134 rxi_Alloccnt--; rxi_Allocsize -= size;
2135 MUTEX_EXIT(&rx_stats_mutex);
2136 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2137 if (size > AFS_SMALLOCSIZ)
2138 osi_FreeMediumSpace(addr);
2140 osi_FreeSmall(addr);
2141 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2146 osi_Free(addr, size);
2150 /* Find the peer process represented by the supplied (host,port)
2151 * combination. If there is no appropriate active peer structure, a
2152 * new one will be allocated and initialized
2153 * The origPeer, if set, is a pointer to a peer structure on which the
2154 * refcount will be be decremented. This is used to replace the peer
2155 * structure hanging off a connection structure */
2156 struct rx_peer *rxi_FindPeer(register afs_uint32 host,
2157 register u_short port, struct rx_peer *origPeer, int create)
2159 register struct rx_peer *pp;
2161 hashIndex = PEER_HASH(host, port);
2162 MUTEX_ENTER(&rx_peerHashTable_lock);
2163 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2164 if ((pp->host == host) && (pp->port == port)) break;
2168 pp = rxi_AllocPeer(); /* This bzero's *pp */
2169 pp->host = host; /* set here or in InitPeerParams is zero */
2171 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2172 queue_Init(&pp->congestionQueue);
2173 queue_Init(&pp->rpcStats);
2174 pp->next = rx_peerHashTable[hashIndex];
2175 rx_peerHashTable[hashIndex] = pp;
2176 rxi_InitPeerParams(pp);
2177 MUTEX_ENTER(&rx_stats_mutex);
2178 rx_stats.nPeerStructs++;
2179 MUTEX_EXIT(&rx_stats_mutex);
2186 origPeer->refCount--;
2187 MUTEX_EXIT(&rx_peerHashTable_lock);
2192 /* Find the connection at (host, port) started at epoch, and with the
2193 * given connection id. Creates the server connection if necessary.
2194 * The type specifies whether a client connection or a server
2195 * connection is desired. In both cases, (host, port) specify the
2196 * peer's (host, pair) pair. Client connections are not made
2197 * automatically by this routine. The parameter socket gives the
2198 * socket descriptor on which the packet was received. This is used,
2199 * in the case of server connections, to check that *new* connections
2200 * come via a valid (port, serviceId). Finally, the securityIndex
2201 * parameter must match the existing index for the connection. If a
2202 * server connection is created, it will be created using the supplied
2203 * index, if the index is valid for this service */
2204 struct rx_connection *rxi_FindConnection(osi_socket socket,
2205 register afs_int32 host, register u_short port, u_short serviceId,
2206 afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2208 int hashindex, flag;
2209 register struct rx_connection *conn;
2210 hashindex = CONN_HASH(host, port, cid, epoch, type);
2211 MUTEX_ENTER(&rx_connHashTable_lock);
2212 rxLastConn ? (conn = rxLastConn, flag = 0) :
2213 (conn = rx_connHashTable[hashindex], flag = 1);
2215 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2216 && (epoch == conn->epoch)) {
2217 register struct rx_peer *pp = conn->peer;
2218 if (securityIndex != conn->securityIndex) {
2219 /* this isn't supposed to happen, but someone could forge a packet
2220 like this, and there seems to be some CM bug that makes this
2221 happen from time to time -- in which case, the fileserver
2223 MUTEX_EXIT(&rx_connHashTable_lock);
2224 return (struct rx_connection *) 0;
2226 if (pp->host == host && pp->port == port)
2228 if (type == RX_CLIENT_CONNECTION && pp->port == port)
2230 if (type == RX_CLIENT_CONNECTION && (conn->epoch & 0x80000000))
2235 /* the connection rxLastConn that was used the last time is not the
2236 ** one we are looking for now. Hence, start searching in the hash */
2238 conn = rx_connHashTable[hashindex];
2244 struct rx_service *service;
2245 if (type == RX_CLIENT_CONNECTION) {
2246 MUTEX_EXIT(&rx_connHashTable_lock);
2247 return (struct rx_connection *) 0;
2249 service = rxi_FindService(socket, serviceId);
2250 if (!service || (securityIndex >= service->nSecurityObjects)
2251 || (service->securityObjects[securityIndex] == 0)) {
2252 MUTEX_EXIT(&rx_connHashTable_lock);
2253 return (struct rx_connection *) 0;
2255 conn = rxi_AllocConnection(); /* This bzero's the connection */
2256 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2258 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2260 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2261 conn->next = rx_connHashTable[hashindex];
2262 rx_connHashTable[hashindex] = conn;
2263 conn->peer = rxi_FindPeer(host, port, 0, 1);
2264 conn->type = RX_SERVER_CONNECTION;
2265 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2266 conn->epoch = epoch;
2267 conn->cid = cid & RX_CIDMASK;
2268 /* conn->serial = conn->lastSerial = 0; */
2269 /* conn->timeout = 0; */
2270 conn->ackRate = RX_FAST_ACK_RATE;
2271 conn->service = service;
2272 conn->serviceId = serviceId;
2273 conn->securityIndex = securityIndex;
2274 conn->securityObject = service->securityObjects[securityIndex];
2275 conn->nSpecific = 0;
2276 conn->specific = NULL;
2277 rx_SetConnDeadTime(conn, service->connDeadTime);
2278 /* Notify security object of the new connection */
2279 RXS_NewConnection(conn->securityObject, conn);
2280 /* XXXX Connection timeout? */
2281 if (service->newConnProc) (*service->newConnProc)(conn);
2282 MUTEX_ENTER(&rx_stats_mutex);
2283 rx_stats.nServerConns++;
2284 MUTEX_EXIT(&rx_stats_mutex);
2287 MUTEX_ENTER(&conn->conn_data_lock);
2289 MUTEX_EXIT(&conn->conn_data_lock);
2291 rxLastConn = conn; /* store this connection as the last conn used */
2292 MUTEX_EXIT(&rx_connHashTable_lock);
2296 /* There are two packet tracing routines available for testing and monitoring
2297 * Rx. One is called just after every packet is received and the other is
2298 * called just before every packet is sent. Received packets, have had their
2299 * headers decoded, and packets to be sent have not yet had their headers
2300 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2301 * containing the network address. Both can be modified. The return value, if
2302 * non-zero, indicates that the packet should be dropped. */
2304 int (*rx_justReceived)() = 0;
2305 int (*rx_almostSent)() = 0;
2307 /* A packet has been received off the interface. Np is the packet, socket is
2308 * the socket number it was received from (useful in determining which service
2309 * this packet corresponds to), and (host, port) reflect the host,port of the
2310 * sender. This call returns the packet to the caller if it is finished with
2311 * it, rather than de-allocating it, just as a small performance hack */
2313 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np,
2314 osi_socket socket, afs_uint32 host, u_short port,
2315 int *tnop, struct rx_call **newcallp)
2317 register struct rx_call *call;
2318 register struct rx_connection *conn;
2320 afs_uint32 currentCallNumber;
2326 struct rx_packet *tnp;
2329 /* We don't print out the packet until now because (1) the time may not be
2330 * accurate enough until now in the lwp implementation (rx_Listener only gets
2331 * the time after the packet is read) and (2) from a protocol point of view,
2332 * this is the first time the packet has been seen */
2333 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2334 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2335 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2336 np->header.serial, packetType, host, port, np->header.serviceId,
2337 np->header.epoch, np->header.cid, np->header.callNumber,
2338 np->header.seq, np->header.flags, np));
2341 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2342 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2345 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2346 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2349 /* If an input tracer function is defined, call it with the packet and
2350 * network address. Note this function may modify its arguments. */
2351 if (rx_justReceived) {
2352 struct sockaddr_in addr;
2354 addr.sin_family = AF_INET;
2355 addr.sin_port = port;
2356 addr.sin_addr.s_addr = host;
2357 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2358 addr.sin_len = sizeof(addr);
2359 #endif /* AFS_OSF_ENV */
2360 drop = (*rx_justReceived) (np, &addr);
2361 /* drop packet if return value is non-zero */
2362 if (drop) return np;
2363 port = addr.sin_port; /* in case fcn changed addr */
2364 host = addr.sin_addr.s_addr;
2368 /* If packet was not sent by the client, then *we* must be the client */
2369 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2370 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2372 /* Find the connection (or fabricate one, if we're the server & if
2373 * necessary) associated with this packet */
2374 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2375 np->header.cid, np->header.epoch, type,
2376 np->header.securityIndex);
2379 /* If no connection found or fabricated, just ignore the packet.
2380 * (An argument could be made for sending an abort packet for
2385 MUTEX_ENTER(&conn->conn_data_lock);
2386 if (conn->maxSerial < np->header.serial)
2387 conn->maxSerial = np->header.serial;
2388 MUTEX_EXIT(&conn->conn_data_lock);
2390 /* If the connection is in an error state, send an abort packet and ignore
2391 * the incoming packet */
2393 /* Don't respond to an abort packet--we don't want loops! */
2394 MUTEX_ENTER(&conn->conn_data_lock);
2395 if (np->header.type != RX_PACKET_TYPE_ABORT)
2396 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2398 MUTEX_EXIT(&conn->conn_data_lock);
2402 /* Check for connection-only requests (i.e. not call specific). */
2403 if (np->header.callNumber == 0) {
2404 switch (np->header.type) {
2405 case RX_PACKET_TYPE_ABORT:
2406 /* What if the supplied error is zero? */
2407 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2408 MUTEX_ENTER(&conn->conn_data_lock);
2410 MUTEX_EXIT(&conn->conn_data_lock);
2412 case RX_PACKET_TYPE_CHALLENGE:
2413 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2414 MUTEX_ENTER(&conn->conn_data_lock);
2416 MUTEX_EXIT(&conn->conn_data_lock);
2418 case RX_PACKET_TYPE_RESPONSE:
2419 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2420 MUTEX_ENTER(&conn->conn_data_lock);
2422 MUTEX_EXIT(&conn->conn_data_lock);
2424 case RX_PACKET_TYPE_PARAMS:
2425 case RX_PACKET_TYPE_PARAMS+1:
2426 case RX_PACKET_TYPE_PARAMS+2:
2427 /* ignore these packet types for now */
2428 MUTEX_ENTER(&conn->conn_data_lock);
2430 MUTEX_EXIT(&conn->conn_data_lock);
2435 /* Should not reach here, unless the peer is broken: send an
2437 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2438 MUTEX_ENTER(&conn->conn_data_lock);
2439 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2441 MUTEX_EXIT(&conn->conn_data_lock);
2446 channel = np->header.cid & RX_CHANNELMASK;
2447 call = conn->call[channel];
2448 #ifdef RX_ENABLE_LOCKS
2450 MUTEX_ENTER(&call->lock);
2451 /* Test to see if call struct is still attached to conn. */
2452 if (call != conn->call[channel]) {
2454 MUTEX_EXIT(&call->lock);
2455 if (type == RX_SERVER_CONNECTION) {
2456 call = conn->call[channel];
2457 /* If we started with no call attached and there is one now,
2458 * another thread is also running this routine and has gotten
2459 * the connection channel. We should drop this packet in the tests
2460 * below. If there was a call on this connection and it's now
2461 * gone, then we'll be making a new call below.
2462 * If there was previously a call and it's now different then
2463 * the old call was freed and another thread running this routine
2464 * has created a call on this channel. One of these two threads
2465 * has a packet for the old call and the code below handles those
2469 MUTEX_ENTER(&call->lock);
2472 /* This packet can't be for this call. If the new call address is
2473 * 0 then no call is running on this channel. If there is a call
2474 * then, since this is a client connection we're getting data for
2475 * it must be for the previous call.
2477 MUTEX_ENTER(&rx_stats_mutex);
2478 rx_stats.spuriousPacketsRead++;
2479 MUTEX_EXIT(&rx_stats_mutex);
2480 MUTEX_ENTER(&conn->conn_data_lock);
2482 MUTEX_EXIT(&conn->conn_data_lock);
2487 currentCallNumber = conn->callNumber[channel];
2489 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2490 if (np->header.callNumber < currentCallNumber) {
2491 MUTEX_ENTER(&rx_stats_mutex);
2492 rx_stats.spuriousPacketsRead++;
2493 MUTEX_EXIT(&rx_stats_mutex);
2494 #ifdef RX_ENABLE_LOCKS
2496 MUTEX_EXIT(&call->lock);
2498 MUTEX_ENTER(&conn->conn_data_lock);
2500 MUTEX_EXIT(&conn->conn_data_lock);
2504 MUTEX_ENTER(&conn->conn_call_lock);
2505 call = rxi_NewCall(conn, channel);
2506 MUTEX_EXIT(&conn->conn_call_lock);
2507 *call->callNumber = np->header.callNumber;
2508 call->state = RX_STATE_PRECALL;
2509 clock_GetTime(&call->queueTime);
2510 hzero(call->bytesSent);
2511 hzero(call->bytesRcvd);
2512 rxi_KeepAliveOn(call);
2514 else if (np->header.callNumber != currentCallNumber) {
2515 /* Wait until the transmit queue is idle before deciding
2516 * whether to reset the current call. Chances are that the
2517 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2520 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2521 while ((call->state == RX_STATE_ACTIVE) &&
2522 (call->flags & RX_CALL_TQ_BUSY)) {
2523 call->flags |= RX_CALL_TQ_WAIT;
2524 #ifdef RX_ENABLE_LOCKS
2525 CV_WAIT(&call->cv_tq, &call->lock);
2526 #else /* RX_ENABLE_LOCKS */
2527 osi_rxSleep(&call->tq);
2528 #endif /* RX_ENABLE_LOCKS */
2530 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2531 /* If the new call cannot be taken right now send a busy and set
2532 * the error condition in this call, so that it terminates as
2533 * quickly as possible */
2534 if (call->state == RX_STATE_ACTIVE) {
2535 struct rx_packet *tp;
2537 rxi_CallError(call, RX_CALL_DEAD);
2538 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2539 MUTEX_EXIT(&call->lock);
2540 MUTEX_ENTER(&conn->conn_data_lock);
2542 MUTEX_EXIT(&conn->conn_data_lock);
2545 rxi_ResetCall(call, 0);
2546 *call->callNumber = np->header.callNumber;
2547 call->state = RX_STATE_PRECALL;
2548 clock_GetTime(&call->queueTime);
2549 hzero(call->bytesSent);
2550 hzero(call->bytesRcvd);
2552 * If the number of queued calls exceeds the overload
2553 * threshold then abort this call.
2555 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2556 struct rx_packet *tp;
2558 rxi_CallError(call, rx_BusyError);
2559 tp = rxi_SendCallAbort(call, np, 1, 0);
2560 MUTEX_EXIT(&call->lock);
2561 MUTEX_ENTER(&conn->conn_data_lock);
2563 MUTEX_EXIT(&conn->conn_data_lock);
2566 rxi_KeepAliveOn(call);
2569 /* Continuing call; do nothing here. */
2571 } else { /* we're the client */
2572 /* Ignore all incoming acknowledgements for calls in DALLY state */
2573 if ( call && (call->state == RX_STATE_DALLY)
2574 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2575 MUTEX_ENTER(&rx_stats_mutex);
2576 rx_stats.ignorePacketDally++;
2577 MUTEX_EXIT(&rx_stats_mutex);
2578 #ifdef RX_ENABLE_LOCKS
2580 MUTEX_EXIT(&call->lock);
2583 MUTEX_ENTER(&conn->conn_data_lock);
2585 MUTEX_EXIT(&conn->conn_data_lock);
2589 /* Ignore anything that's not relevant to the current call. If there
2590 * isn't a current call, then no packet is relevant. */
2591 if (!call || (np->header.callNumber != currentCallNumber)) {
2592 MUTEX_ENTER(&rx_stats_mutex);
2593 rx_stats.spuriousPacketsRead++;
2594 MUTEX_EXIT(&rx_stats_mutex);
2595 #ifdef RX_ENABLE_LOCKS
2597 MUTEX_EXIT(&call->lock);
2600 MUTEX_ENTER(&conn->conn_data_lock);
2602 MUTEX_EXIT(&conn->conn_data_lock);
2605 /* If the service security object index stamped in the packet does not
2606 * match the connection's security index, ignore the packet */
2607 if (np->header.securityIndex != conn->securityIndex) {
2608 #ifdef RX_ENABLE_LOCKS
2609 MUTEX_EXIT(&call->lock);
2611 MUTEX_ENTER(&conn->conn_data_lock);
2613 MUTEX_EXIT(&conn->conn_data_lock);
2617 /* If we're receiving the response, then all transmit packets are
2618 * implicitly acknowledged. Get rid of them. */
2619 if (np->header.type == RX_PACKET_TYPE_DATA) {
2620 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2621 /* XXX Hack. Because we must release the global rx lock when
2622 * sending packets (osi_NetSend) we drop all acks while we're
2623 * traversing the tq in rxi_Start sending packets out because
2624 * packets may move to the freePacketQueue as result of being here!
2625 * So we drop these packets until we're safely out of the
2626 * traversing. Really ugly!
2627 * For fine grain RX locking, we set the acked field in the
2628 * packets and let rxi_Start remove them from the transmit queue.
2630 if (call->flags & RX_CALL_TQ_BUSY) {
2631 #ifdef RX_ENABLE_LOCKS
2632 rxi_SetAcksInTransmitQueue(call);
2635 return np; /* xmitting; drop packet */
2639 rxi_ClearTransmitQueue(call, 0);
2641 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2642 rxi_ClearTransmitQueue(call, 0);
2643 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2645 if (np->header.type == RX_PACKET_TYPE_ACK) {
2646 /* now check to see if this is an ack packet acknowledging that the
2647 * server actually *lost* some hard-acked data. If this happens we
2648 * ignore this packet, as it may indicate that the server restarted in
2649 * the middle of a call. It is also possible that this is an old ack
2650 * packet. We don't abort the connection in this case, because this
2651 * *might* just be an old ack packet. The right way to detect a server
2652 * restart in the midst of a call is to notice that the server epoch
2654 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2655 * XXX unacknowledged. I think that this is off-by-one, but
2656 * XXX I don't dare change it just yet, since it will
2657 * XXX interact badly with the server-restart detection
2658 * XXX code in receiveackpacket. */
2659 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2660 MUTEX_ENTER(&rx_stats_mutex);
2661 rx_stats.spuriousPacketsRead++;
2662 MUTEX_EXIT(&rx_stats_mutex);
2663 MUTEX_EXIT(&call->lock);
2664 MUTEX_ENTER(&conn->conn_data_lock);
2666 MUTEX_EXIT(&conn->conn_data_lock);
2670 } /* else not a data packet */
2673 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2674 /* Set remote user defined status from packet */
2675 call->remoteStatus = np->header.userStatus;
2677 /* Note the gap between the expected next packet and the actual
2678 * packet that arrived, when the new packet has a smaller serial number
2679 * than expected. Rioses frequently reorder packets all by themselves,
2680 * so this will be quite important with very large window sizes.
2681 * Skew is checked against 0 here to avoid any dependence on the type of
2682 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2684 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2685 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2686 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2688 MUTEX_ENTER(&conn->conn_data_lock);
2689 skew = conn->lastSerial - np->header.serial;
2690 conn->lastSerial = np->header.serial;
2691 MUTEX_EXIT(&conn->conn_data_lock);
2693 register struct rx_peer *peer;
2695 if (skew > peer->inPacketSkew) {
2696 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2697 peer->inPacketSkew = skew;
2701 /* Now do packet type-specific processing */
2702 switch (np->header.type) {
2703 case RX_PACKET_TYPE_DATA:
2704 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2707 case RX_PACKET_TYPE_ACK:
2708 /* Respond immediately to ack packets requesting acknowledgement
2710 if (np->header.flags & RX_REQUEST_ACK) {
2712 (void) rxi_SendCallAbort(call, 0, 1, 0);
2714 (void) rxi_SendAck(call, 0, 0, np->header.serial, 0,
2715 RX_ACK_PING_RESPONSE, 1);
2717 np = rxi_ReceiveAckPacket(call, np, 1);
2719 case RX_PACKET_TYPE_ABORT:
2720 /* An abort packet: reset the connection, passing the error up to
2722 /* What if error is zero? */
2723 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2725 case RX_PACKET_TYPE_BUSY:
2728 case RX_PACKET_TYPE_ACKALL:
2729 /* All packets acknowledged, so we can drop all packets previously
2730 * readied for sending */
2731 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2732 /* XXX Hack. We because we can't release the global rx lock when
2733 * sending packets (osi_NetSend) we drop all ack pkts while we're
2734 * traversing the tq in rxi_Start sending packets out because
2735 * packets may move to the freePacketQueue as result of being
2736 * here! So we drop these packets until we're safely out of the
2737 * traversing. Really ugly!
2738 * For fine grain RX locking, we set the acked field in the packets
2739 * and let rxi_Start remove the packets from the transmit queue.
2741 if (call->flags & RX_CALL_TQ_BUSY) {
2742 #ifdef RX_ENABLE_LOCKS
2743 rxi_SetAcksInTransmitQueue(call);
2745 #else /* RX_ENABLE_LOCKS */
2747 return np; /* xmitting; drop packet */
2748 #endif /* RX_ENABLE_LOCKS */
2750 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2751 rxi_ClearTransmitQueue(call, 0);
2754 /* Should not reach here, unless the peer is broken: send an abort
2756 rxi_CallError(call, RX_PROTOCOL_ERROR);
2757 np = rxi_SendCallAbort(call, np, 1, 0);
2760 /* Note when this last legitimate packet was received, for keep-alive
2761 * processing. Note, we delay getting the time until now in the hope that
2762 * the packet will be delivered to the user before any get time is required
2763 * (if not, then the time won't actually be re-evaluated here). */
2764 call->lastReceiveTime = clock_Sec();
2765 MUTEX_EXIT(&call->lock);
2766 MUTEX_ENTER(&conn->conn_data_lock);
2768 MUTEX_EXIT(&conn->conn_data_lock);
2772 /* return true if this is an "interesting" connection from the point of view
2773 of someone trying to debug the system */
2774 int rxi_IsConnInteresting(struct rx_connection *aconn)
2777 register struct rx_call *tcall;
2779 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2781 for(i=0;i<RX_MAXCALLS;i++) {
2782 tcall = aconn->call[i];
2784 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2786 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2794 /* if this is one of the last few packets AND it wouldn't be used by the
2795 receiving call to immediately satisfy a read request, then drop it on
2796 the floor, since accepting it might prevent a lock-holding thread from
2797 making progress in its reading. If a call has been cleared while in
2798 the precall state then ignore all subsequent packets until the call
2799 is assigned to a thread. */
2801 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2804 MUTEX_ENTER(&rx_stats_mutex);
2805 if (((ap->header.seq != 1) &&
2806 (acall->flags & RX_CALL_CLEARED) &&
2807 (acall->state == RX_STATE_PRECALL)) ||
2808 ((rx_nFreePackets < rxi_dataQuota+2) &&
2809 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2810 && (acall->flags & RX_CALL_READER_WAIT)))) {
2813 MUTEX_EXIT(&rx_stats_mutex);
2818 static void rxi_CheckReachEvent(struct rxevent *event,
2819 struct rx_connection *conn, struct rx_call *acall)
2821 struct rx_call *call = acall;
2825 MUTEX_ENTER(&conn->conn_data_lock);
2826 conn->checkReachEvent = NULL;
2827 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2828 if (event) conn->refCount--;
2829 MUTEX_EXIT(&conn->conn_data_lock);
2833 MUTEX_ENTER(&conn->conn_call_lock);
2834 MUTEX_ENTER(&conn->conn_data_lock);
2835 for (i=0; i<RX_MAXCALLS; i++) {
2836 struct rx_call *tc = conn->call[i];
2837 if (tc && tc->state == RX_STATE_PRECALL) {
2843 /* Indicate that rxi_CheckReachEvent is no longer running by
2844 * clearing the flag. Must be atomic under conn_data_lock to
2845 * avoid a new call slipping by: rxi_CheckConnReach holds
2846 * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2848 conn->flags &= ~RX_CONN_ATTACHWAIT;
2849 MUTEX_EXIT(&conn->conn_data_lock);
2850 MUTEX_EXIT(&conn->conn_call_lock);
2854 if (call != acall) MUTEX_ENTER(&call->lock);
2855 rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
2856 if (call != acall) MUTEX_EXIT(&call->lock);
2858 clock_GetTime(&when);
2859 when.sec += RX_CHECKREACH_TIMEOUT;
2860 MUTEX_ENTER(&conn->conn_data_lock);
2861 if (!conn->checkReachEvent) {
2863 conn->checkReachEvent =
2864 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2866 MUTEX_EXIT(&conn->conn_data_lock);
2871 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2873 struct rx_service *service = conn->service;
2874 struct rx_peer *peer = conn->peer;
2875 afs_uint32 now, lastReach;
2877 if (service->checkReach == 0)
2881 MUTEX_ENTER(&peer->peer_lock);
2882 lastReach = peer->lastReachTime;
2883 MUTEX_EXIT(&peer->peer_lock);
2884 if (now - lastReach < RX_CHECKREACH_TTL)
2887 MUTEX_ENTER(&conn->conn_data_lock);
2888 if (conn->flags & RX_CONN_ATTACHWAIT) {
2889 MUTEX_EXIT(&conn->conn_data_lock);
2892 conn->flags |= RX_CONN_ATTACHWAIT;
2893 MUTEX_EXIT(&conn->conn_data_lock);
2894 if (!conn->checkReachEvent)
2895 rxi_CheckReachEvent(NULL, conn, call);
2900 /* try to attach call, if authentication is complete */
2901 static void TryAttach(register struct rx_call *acall,
2902 register osi_socket socket, register int *tnop,
2903 register struct rx_call **newcallp, int reachOverride)
2905 struct rx_connection *conn = acall->conn;
2907 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2908 /* Don't attach until we have any req'd. authentication. */
2909 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2910 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2911 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2912 /* Note: this does not necessarily succeed; there
2913 * may not any proc available
2917 rxi_ChallengeOn(acall->conn);
2922 /* A data packet has been received off the interface. This packet is
2923 * appropriate to the call (the call is in the right state, etc.). This
2924 * routine can return a packet to the caller, for re-use */
2926 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call,
2927 register struct rx_packet *np, int istack, osi_socket socket,
2928 afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2934 afs_uint32 seq, serial, flags;
2936 struct rx_packet *tnp;
2938 MUTEX_ENTER(&rx_stats_mutex);
2939 rx_stats.dataPacketsRead++;
2940 MUTEX_EXIT(&rx_stats_mutex);
2943 /* If there are no packet buffers, drop this new packet, unless we can find
2944 * packet buffers from inactive calls */
2946 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2947 MUTEX_ENTER(&rx_freePktQ_lock);
2948 rxi_NeedMorePackets = TRUE;
2949 MUTEX_EXIT(&rx_freePktQ_lock);
2950 MUTEX_ENTER(&rx_stats_mutex);
2951 rx_stats.noPacketBuffersOnRead++;
2952 MUTEX_EXIT(&rx_stats_mutex);
2953 call->rprev = np->header.serial;
2954 rxi_calltrace(RX_TRACE_DROP, call);
2955 dpf (("packet %x dropped on receipt - quota problems", np));
2957 rxi_ClearReceiveQueue(call);
2958 clock_GetTime(&when);
2959 clock_Add(&when, &rx_softAckDelay);
2960 if (!call->delayedAckEvent ||
2961 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2962 rxevent_Cancel(call->delayedAckEvent, call,
2963 RX_CALL_REFCOUNT_DELAY);
2964 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2965 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2968 /* we've damaged this call already, might as well do it in. */
2974 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2975 * packet is one of several packets transmitted as a single
2976 * datagram. Do not send any soft or hard acks until all packets
2977 * in a jumbogram have been processed. Send negative acks right away.
2979 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2980 /* tnp is non-null when there are more packets in the
2981 * current jumbo gram */
2988 seq = np->header.seq;
2989 serial = np->header.serial;
2990 flags = np->header.flags;
2992 /* If the call is in an error state, send an abort message */
2994 return rxi_SendCallAbort(call, np, istack, 0);
2996 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2997 * AFS 3.5 jumbogram. */
2998 if (flags & RX_JUMBO_PACKET) {
2999 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3004 if (np->header.spare != 0) {
3005 MUTEX_ENTER(&call->conn->conn_data_lock);
3006 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3007 MUTEX_EXIT(&call->conn->conn_data_lock);
3010 /* The usual case is that this is the expected next packet */
3011 if (seq == call->rnext) {
3013 /* Check to make sure it is not a duplicate of one already queued */
3014 if (queue_IsNotEmpty(&call->rq)
3015 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3016 MUTEX_ENTER(&rx_stats_mutex);
3017 rx_stats.dupPacketsRead++;
3018 MUTEX_EXIT(&rx_stats_mutex);
3019 dpf (("packet %x dropped on receipt - duplicate", np));
3020 rxevent_Cancel(call->delayedAckEvent, call,
3021 RX_CALL_REFCOUNT_DELAY);
3022 np = rxi_SendAck(call, np, seq, serial,
3023 flags, RX_ACK_DUPLICATE, istack);
3029 /* It's the next packet. Stick it on the receive queue
3030 * for this call. Set newPackets to make sure we wake
3031 * the reader once all packets have been processed */
3032 queue_Prepend(&call->rq, np);
3034 np = NULL; /* We can't use this anymore */
3037 /* If an ack is requested then set a flag to make sure we
3038 * send an acknowledgement for this packet */
3039 if (flags & RX_REQUEST_ACK) {
3043 /* Keep track of whether we have received the last packet */
3044 if (flags & RX_LAST_PACKET) {
3045 call->flags |= RX_CALL_HAVE_LAST;
3049 /* Check whether we have all of the packets for this call */
3050 if (call->flags & RX_CALL_HAVE_LAST) {
3051 afs_uint32 tseq; /* temporary sequence number */
3052 struct rx_packet *tp; /* Temporary packet pointer */
3053 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3055 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3056 if (tseq != tp->header.seq)
3058 if (tp->header.flags & RX_LAST_PACKET) {
3059 call->flags |= RX_CALL_RECEIVE_DONE;
3066 /* Provide asynchronous notification for those who want it
3067 * (e.g. multi rx) */
3068 if (call->arrivalProc) {
3069 (*call->arrivalProc)(call, call->arrivalProcHandle,
3070 (int) call->arrivalProcArg);
3071 call->arrivalProc = (VOID (*)()) 0;
3074 /* Update last packet received */
3077 /* If there is no server process serving this call, grab
3078 * one, if available. We only need to do this once. If a
3079 * server thread is available, this thread becomes a server
3080 * thread and the server thread becomes a listener thread. */
3082 TryAttach(call, socket, tnop, newcallp, 0);
3085 /* This is not the expected next packet. */
3087 /* Determine whether this is a new or old packet, and if it's
3088 * a new one, whether it fits into the current receive window.
3089 * Also figure out whether the packet was delivered in sequence.
3090 * We use the prev variable to determine whether the new packet
3091 * is the successor of its immediate predecessor in the
3092 * receive queue, and the missing flag to determine whether
3093 * any of this packets predecessors are missing. */
3095 afs_uint32 prev; /* "Previous packet" sequence number */
3096 struct rx_packet *tp; /* Temporary packet pointer */
3097 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3098 int missing; /* Are any predecessors missing? */
3100 /* If the new packet's sequence number has been sent to the
3101 * application already, then this is a duplicate */
3102 if (seq < call->rnext) {
3103 MUTEX_ENTER(&rx_stats_mutex);
3104 rx_stats.dupPacketsRead++;
3105 MUTEX_EXIT(&rx_stats_mutex);
3106 rxevent_Cancel(call->delayedAckEvent, call,
3107 RX_CALL_REFCOUNT_DELAY);
3108 np = rxi_SendAck(call, np, seq, serial,
3109 flags, RX_ACK_DUPLICATE, istack);
3115 /* If the sequence number is greater than what can be
3116 * accomodated by the current window, then send a negative
3117 * acknowledge and drop the packet */
3118 if ((call->rnext + call->rwind) <= seq) {
3119 rxevent_Cancel(call->delayedAckEvent, call,
3120 RX_CALL_REFCOUNT_DELAY);
3121 np = rxi_SendAck(call, np, seq, serial,
3122 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3128 /* Look for the packet in the queue of old received packets */
3129 for (prev = call->rnext - 1, missing = 0,
3130 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3131 /*Check for duplicate packet */
3132 if (seq == tp->header.seq) {
3133 MUTEX_ENTER(&rx_stats_mutex);
3134 rx_stats.dupPacketsRead++;
3135 MUTEX_EXIT(&rx_stats_mutex);
3136 rxevent_Cancel(call->delayedAckEvent, call,
3137 RX_CALL_REFCOUNT_DELAY);
3138 np = rxi_SendAck(call, np, seq, serial,
3139 flags, RX_ACK_DUPLICATE, istack);
3144 /* If we find a higher sequence packet, break out and
3145 * insert the new packet here. */
3146 if (seq < tp->header.seq) break;
3147 /* Check for missing packet */
3148 if (tp->header.seq != prev+1) {
3152 prev = tp->header.seq;
3155 /* Keep track of whether we have received the last packet. */
3156 if (flags & RX_LAST_PACKET) {
3157 call->flags |= RX_CALL_HAVE_LAST;
3160 /* It's within the window: add it to the the receive queue.
3161 * tp is left by the previous loop either pointing at the
3162 * packet before which to insert the new packet, or at the
3163 * queue head if the queue is empty or the packet should be
3165 queue_InsertBefore(tp, np);
3169 /* Check whether we have all of the packets for this call */
3170 if ((call->flags & RX_CALL_HAVE_LAST)
3171 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3172 afs_uint32 tseq; /* temporary sequence number */
3174 for (tseq = call->rnext,
3175 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3176 if (tseq != tp->header.seq)
3178 if (tp->header.flags & RX_LAST_PACKET) {
3179 call->flags |= RX_CALL_RECEIVE_DONE;
3186 /* We need to send an ack of the packet is out of sequence,
3187 * or if an ack was requested by the peer. */
3188 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3192 /* Acknowledge the last packet for each call */
3193 if (flags & RX_LAST_PACKET) {
3204 * If the receiver is waiting for an iovec, fill the iovec
3205 * using the data from the receive queue */
3206 if (call->flags & RX_CALL_IOVEC_WAIT) {
3207 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3208 /* the call may have been aborted */
3217 /* Wakeup the reader if any */
3218 if ((call->flags & RX_CALL_READER_WAIT) &&
3219 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3220 (call->iovNext >= call->iovMax) ||
3221 (call->flags & RX_CALL_RECEIVE_DONE))) {
3222 call->flags &= ~RX_CALL_READER_WAIT;
3223 #ifdef RX_ENABLE_LOCKS
3224 CV_BROADCAST(&call->cv_rq);
3226 osi_rxWakeup(&call->rq);
3232 * Send an ack when requested by the peer, or once every
3233 * rxi_SoftAckRate packets until the last packet has been
3234 * received. Always send a soft ack for the last packet in
3235 * the server's reply. */
3237 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3238 np = rxi_SendAck(call, np, seq, serial, flags,
3239 RX_ACK_REQUESTED, istack);
3240 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3241 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3242 np = rxi_SendAck(call, np, seq, serial, flags,
3243 RX_ACK_IDLE, istack);
3244 } else if (call->nSoftAcks) {
3245 clock_GetTime(&when);
3246 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3247 clock_Add(&when, &rx_lastAckDelay);
3249 clock_Add(&when, &rx_softAckDelay);
3251 if (!call->delayedAckEvent ||
3252 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3253 rxevent_Cancel(call->delayedAckEvent, call,
3254 RX_CALL_REFCOUNT_DELAY);
3255 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3256 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3259 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3260 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3267 static void rxi_ComputeRate();
3270 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3272 struct rx_peer *peer = conn->peer;
3274 MUTEX_ENTER(&peer->peer_lock);
3275 peer->lastReachTime = clock_Sec();
3276 MUTEX_EXIT(&peer->peer_lock);
3278 MUTEX_ENTER(&conn->conn_data_lock);
3279 if (conn->flags & RX_CONN_ATTACHWAIT) {
3282 conn->flags &= ~RX_CONN_ATTACHWAIT;
3283 MUTEX_EXIT(&conn->conn_data_lock);
3285 for (i=0; i<RX_MAXCALLS; i++) {
3286 struct rx_call *call = conn->call[i];
3288 if (call != acall) MUTEX_ENTER(&call->lock);
3289 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3290 if (call != acall) MUTEX_EXIT(&call->lock);
3294 MUTEX_EXIT(&conn->conn_data_lock);
3297 /* rxi_ComputePeerNetStats
3299 * Called exclusively by rxi_ReceiveAckPacket to compute network link
3300 * estimates (like RTT and throughput) based on ack packets. Caller
3301 * must ensure that the packet in question is the right one (i.e.
3302 * serial number matches).
3305 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3306 struct rx_ackPacket *ap, struct rx_packet *np)
3308 struct rx_peer *peer = call->conn->peer;
3310 /* Use RTT if not delayed by client. */
3311 if (ap->reason != RX_ACK_DELAY)
3312 rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3314 rxi_ComputeRate(peer, call, p, np, ap->reason);
3318 /* The real smarts of the whole thing. */
3319 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call,
3320 struct rx_packet *np, int istack)
3322 struct rx_ackPacket *ap;
3324 register struct rx_packet *tp;
3325 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3326 register struct rx_connection *conn = call->conn;
3327 struct rx_peer *peer = conn->peer;
3330 /* because there are CM's that are bogus, sending weird values for this. */
3331 afs_uint32 skew = 0;
3336 int newAckCount = 0;
3337 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3338 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3340 MUTEX_ENTER(&rx_stats_mutex);
3341 rx_stats.ackPacketsRead++;
3342 MUTEX_EXIT(&rx_stats_mutex);
3343 ap = (struct rx_ackPacket *) rx_DataOf(np);
3344 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3346 return np; /* truncated ack packet */
3348 /* depends on ack packet struct */
3349 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3350 first = ntohl(ap->firstPacket);
3351 serial = ntohl(ap->serial);
3352 /* temporarily disabled -- needs to degrade over time
3353 skew = ntohs(ap->maxSkew); */
3355 /* Ignore ack packets received out of order */
3356 if (first < call->tfirst) {
3360 if (np->header.flags & RX_SLOW_START_OK) {
3361 call->flags |= RX_CALL_SLOW_START_OK;
3364 if (ap->reason == RX_ACK_PING_RESPONSE)
3365 rxi_UpdatePeerReach(conn, call);
3370 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3371 ap->reason, ntohl(ap->previousPacket),
3372 (unsigned int) np->header.seq, (unsigned int) serial,
3373 (unsigned int) skew, ntohl(ap->firstPacket));
3376 for (offset = 0; offset < nAcks; offset++)
3377 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3383 /* Update the outgoing packet skew value to the latest value of
3384 * the peer's incoming packet skew value. The ack packet, of
3385 * course, could arrive out of order, but that won't affect things
3387 MUTEX_ENTER(&peer->peer_lock);
3388 peer->outPacketSkew = skew;
3390 /* Check for packets that no longer need to be transmitted, and
3391 * discard them. This only applies to packets positively
3392 * acknowledged as having been sent to the peer's upper level.
3393 * All other packets must be retained. So only packets with
3394 * sequence numbers < ap->firstPacket are candidates. */
3395 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3396 if (tp->header.seq >= first) break;
3397 call->tfirst = tp->header.seq + 1;
3398 if (serial && (tp->header.serial == serial ||
3399 tp->firstSerial == serial))
3400 rxi_ComputePeerNetStats(call, tp, ap, np);
3401 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3402 /* XXX Hack. Because we have to release the global rx lock when sending
3403 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3404 * in rxi_Start sending packets out because packets may move to the
3405 * freePacketQueue as result of being here! So we drop these packets until
3406 * we're safely out of the traversing. Really ugly!
3407 * To make it even uglier, if we're using fine grain locking, we can
3408 * set the ack bits in the packets and have rxi_Start remove the packets
3409 * when it's done transmitting.
3411 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3414 if (call->flags & RX_CALL_TQ_BUSY) {
3415 #ifdef RX_ENABLE_LOCKS
3416 tp->flags |= RX_PKTFLAG_ACKED;
3417 call->flags |= RX_CALL_TQ_SOME_ACKED;
3418 #else /* RX_ENABLE_LOCKS */
3420 #endif /* RX_ENABLE_LOCKS */
3422 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3425 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3430 /* Give rate detector a chance to respond to ping requests */
3431 if (ap->reason == RX_ACK_PING_RESPONSE) {
3432 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3436 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3438 /* Now go through explicit acks/nacks and record the results in
3439 * the waiting packets. These are packets that can't be released
3440 * yet, even with a positive acknowledge. This positive
3441 * acknowledge only means the packet has been received by the
3442 * peer, not that it will be retained long enough to be sent to
3443 * the peer's upper level. In addition, reset the transmit timers
3444 * of any missing packets (those packets that must be missing
3445 * because this packet was out of sequence) */
3447 call->nSoftAcked = 0;
3448 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3449 /* Update round trip time if the ack was stimulated on receipt
3451 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3452 #ifdef RX_ENABLE_LOCKS
3453 if (tp->header.seq >= first)
3454 #endif /* RX_ENABLE_LOCKS */
3455 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3456 if (serial && (tp->header.serial == serial ||
3457 tp->firstSerial == serial))
3458 rxi_ComputePeerNetStats(call, tp, ap, np);
3460 /* Set the acknowledge flag per packet based on the
3461 * information in the ack packet. An acknowlegded packet can
3462 * be downgraded when the server has discarded a packet it
3463 * soacked previously, or when an ack packet is received
3464 * out of sequence. */
3465 if (tp->header.seq < first) {
3466 /* Implicit ack information */
3467 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3470 tp->flags |= RX_PKTFLAG_ACKED;
3472 else if (tp->header.seq < first + nAcks) {
3473 /* Explicit ack information: set it in the packet appropriately */
3474 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3475 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3477 tp->flags |= RX_PKTFLAG_ACKED;
3485 tp->flags &= ~RX_PKTFLAG_ACKED;
3490 tp->flags &= ~RX_PKTFLAG_ACKED;
3494 /* If packet isn't yet acked, and it has been transmitted at least
3495 * once, reset retransmit time using latest timeout
3496 * ie, this should readjust the retransmit timer for all outstanding
3497 * packets... So we don't just retransmit when we should know better*/
3499 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3500 tp->retryTime = tp->timeSent;
3501 clock_Add(&tp->retryTime, &peer->timeout);
3502 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3503 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3507 /* If the window has been extended by this acknowledge packet,
3508 * then wakeup a sender waiting in alloc for window space, or try
3509 * sending packets now, if he's been sitting on packets due to
3510 * lack of window space */
3511 if (call->tnext < (call->tfirst + call->twind)) {
3512 #ifdef RX_ENABLE_LOCKS
3513 CV_SIGNAL(&call->cv_twind);
3515 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3516 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3517 osi_rxWakeup(&call->twind);
3520 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3521 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3525 /* if the ack packet has a receivelen field hanging off it,
3526 * update our state */
3527 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3530 /* If the ack packet has a "recommended" size that is less than
3531 * what I am using now, reduce my size to match */
3532 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3533 sizeof(afs_int32), &tSize);
3534 tSize = (afs_uint32) ntohl(tSize);
3535 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3537 /* Get the maximum packet size to send to this peer */
3538 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3540 tSize = (afs_uint32)ntohl(tSize);
3541 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3542 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3544 /* sanity check - peer might have restarted with different params.
3545 * If peer says "send less", dammit, send less... Peer should never
3546 * be unable to accept packets of the size that prior AFS versions would
3547 * send without asking. */
3548 if (peer->maxMTU != tSize) {
3549 peer->maxMTU = tSize;
3550 peer->MTU = MIN(tSize, peer->MTU);
3551 call->MTU = MIN(call->MTU, tSize);
3555 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3557 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3558 sizeof(afs_int32), &tSize);
3559 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3560 if (tSize < call->twind) { /* smaller than our send */
3561 call->twind = tSize; /* window, we must send less... */
3562 call->ssthresh = MIN(call->twind, call->ssthresh);
3565 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3566 * network MTU confused with the loopback MTU. Calculate the
3567 * maximum MTU here for use in the slow start code below.
3569 maxMTU = peer->maxMTU;
3570 /* Did peer restart with older RX version? */
3571 if (peer->maxDgramPackets > 1) {
3572 peer->maxDgramPackets = 1;
3574 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3576 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3577 sizeof(afs_int32), &tSize);
3578 tSize = (afs_uint32) ntohl(tSize);
3580 * As of AFS 3.5 we set the send window to match the receive window.
3582 if (tSize < call->twind) {
3583 call->twind = tSize;
3584 call->ssthresh = MIN(call->twind, call->ssthresh);
3585 } else if (tSize > call->twind) {
3586 call->twind = tSize;
3590 * As of AFS 3.5, a jumbogram is more than one fixed size
3591 * packet transmitted in a single UDP datagram. If the remote
3592 * MTU is smaller than our local MTU then never send a datagram
3593 * larger than the natural MTU.
3595 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3596 sizeof(afs_int32), &tSize);
3597 maxDgramPackets = (afs_uint32) ntohl(tSize);
3598 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3599 maxDgramPackets = MIN(maxDgramPackets,
3600 (int)(peer->ifDgramPackets));
3601 maxDgramPackets = MIN(maxDgramPackets, tSize);
3602 if (maxDgramPackets > 1) {
3603 peer->maxDgramPackets = maxDgramPackets;
3604 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3606 peer->maxDgramPackets = 1;
3607 call->MTU = peer->natMTU;
3609 } else if (peer->maxDgramPackets > 1) {
3610 /* Restarted with lower version of RX */
3611 peer->maxDgramPackets = 1;
3613 } else if (peer->maxDgramPackets > 1 ||
3614 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3615 /* Restarted with lower version of RX */
3616 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3617 peer->natMTU = OLD_MAX_PACKET_SIZE;
3618 peer->MTU = OLD_MAX_PACKET_SIZE;
3619 peer->maxDgramPackets = 1;
3620 peer->nDgramPackets = 1;
3622 call->MTU = OLD_MAX_PACKET_SIZE;
3627 * Calculate how many datagrams were successfully received after
3628 * the first missing packet and adjust the negative ack counter
3633 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3634 if (call->nNacks < nNacked) {
3635 call->nNacks = nNacked;
3644 if (call->flags & RX_CALL_FAST_RECOVER) {
3646 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3648 call->flags &= ~RX_CALL_FAST_RECOVER;
3649 call->cwind = call->nextCwind;
3650 call->nextCwind = 0;
3653 call->nCwindAcks = 0;
3655 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3656 /* Three negative acks in a row trigger congestion recovery */
3657 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3658 MUTEX_EXIT(&peer->peer_lock);
3659 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3660 /* someone else is waiting to start recovery */
3663 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3664 while (call->flags & RX_CALL_TQ_BUSY) {
3665 call->flags |= RX_CALL_TQ_WAIT;
3666 #ifdef RX_ENABLE_LOCKS
3667 CV_WAIT(&call->cv_tq, &call->lock);
3668 #else /* RX_ENABLE_LOCKS */
3669 osi_rxSleep(&call->tq);
3670 #endif /* RX_ENABLE_LOCKS */
3672 MUTEX_ENTER(&peer->peer_lock);
3673 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3674 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3675 call->flags |= RX_CALL_FAST_RECOVER;
3676 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3677 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3679 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3680 call->nextCwind = call->ssthresh;
3683 peer->MTU = call->MTU;
3684 peer->cwind = call->nextCwind;