2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "afs/param.h"
16 #include <afs/param.h>
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
64 #include "rx_globals.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
116 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
118 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119 afs_int32 rxi_start_in_error;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
124 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125 * currently allocated within rx. This number is used to allocate the
126 * memory required to return the statistics when queried.
129 static unsigned int rxi_rpc_peer_stat_cnt;
132 * rxi_rpc_process_stat_cnt counts the total number of local process stat
133 * structures currently allocated within rx. The number is used to allocate
134 * the memory required to return the statistics when queried.
137 static unsigned int rxi_rpc_process_stat_cnt;
139 #if !defined(offsetof)
140 #include <stddef.h> /* for definition of offsetof() */
143 #ifdef AFS_PTHREAD_ENV
147 * Use procedural initialization of mutexes/condition variables
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
172 static void rxi_InitPthread(void) {
173 assert(pthread_mutex_init(&rx_clock_mutex,
174 (const pthread_mutexattr_t*)0)==0);
175 assert(pthread_mutex_init(&rxi_connCacheMutex,
176 (const pthread_mutexattr_t*)0)==0);
177 assert(pthread_mutex_init(&rx_init_mutex,
178 (const pthread_mutexattr_t*)0)==0);
179 assert(pthread_mutex_init(&epoch_mutex,
180 (const pthread_mutexattr_t*)0)==0);
181 assert(pthread_mutex_init(&rx_event_mutex,
182 (const pthread_mutexattr_t*)0)==0);
183 assert(pthread_mutex_init(&des_init_mutex,
184 (const pthread_mutexattr_t*)0)==0);
185 assert(pthread_mutex_init(&des_random_mutex,
186 (const pthread_mutexattr_t*)0)==0);
187 assert(pthread_mutex_init(&osi_malloc_mutex,
188 (const pthread_mutexattr_t*)0)==0);
189 assert(pthread_mutex_init(&event_handler_mutex,
190 (const pthread_mutexattr_t*)0)==0);
191 assert(pthread_mutex_init(&listener_mutex,
192 (const pthread_mutexattr_t*)0)==0);
193 assert(pthread_mutex_init(&rx_if_init_mutex,
194 (const pthread_mutexattr_t*)0)==0);
195 assert(pthread_mutex_init(&rx_if_mutex,
196 (const pthread_mutexattr_t*)0)==0);
197 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198 (const pthread_mutexattr_t*)0)==0);
199 assert(pthread_mutex_init(&rxkad_random_mutex,
200 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_mutex_init(&rxkad_stats_mutex,
202 (const pthread_mutexattr_t*)0)==0);
203 assert(pthread_mutex_init(&rx_debug_mutex,
204 (const pthread_mutexattr_t*)0)==0);
206 assert(pthread_cond_init(&rx_event_handler_cond,
207 (const pthread_condattr_t*)0)==0);
208 assert(pthread_cond_init(&rx_listener_cond,
209 (const pthread_condattr_t*)0)==0);
210 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
217 * The rx_stats_mutex mutex protects the following global variables:
222 * rxi_lowConnRefCount
223 * rxi_lowPeerRefCount
232 #define INIT_PTHREAD_LOCKS
236 /* Variables for handling the minProcs implementation. availProcs gives the
237 * number of threads available in the pool at this moment (not counting dudes
238 * executing right now). totalMin gives the total number of procs required
239 * for handling all minProcs requests. minDeficit is a dynamic variable
240 * tracking the # of procs required to satisfy all of the remaining minProcs
242 * For fine grain locking to work, the quota check and the reservation of
243 * a server thread has to come while rxi_availProcs and rxi_minDeficit
244 * are locked. To this end, the code has been modified under #ifdef
245 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246 * same time. A new function, ReturnToServerPool() returns the allocation.
248 * A call can be on several queue's (but only one at a time). When
249 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250 * that no one else is touching the queue. To this end, we store the address
251 * of the queue lock in the call structure (under the call lock) when we
252 * put the call on a queue, and we clear the call_queue_lock when the
253 * call is removed from a queue (once the call lock has been obtained).
254 * This allows rxi_ResetCall to safely synchronize with others wishing
255 * to manipulate the queue.
258 #ifdef RX_ENABLE_LOCKS
259 static afs_kmutex_t rx_rpc_stats;
260 void rxi_StartUnlocked();
263 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
264 ** pretty good that the next packet coming in is from the same connection
265 ** as the last packet, since we're send multiple packets in a transmit window.
267 struct rx_connection *rxLastConn = 0;
269 #ifdef RX_ENABLE_LOCKS
270 /* The locking hierarchy for rx fine grain locking is composed of these
273 * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
274 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
275 * call->lock - locks call data fields.
276 * These are independent of each other:
277 * rx_freeCallQueue_lock
282 * serverQueueEntry->lock
284 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
285 * peer->lock - locks peer data fields.
286 * conn_data_lock - that more than one thread is not updating a conn data
287 * field at the same time.
295 * Do we need a lock to protect the peer field in the conn structure?
296 * conn->peer was previously a constant for all intents and so has no
297 * lock protecting this field. The multihomed client delta introduced
298 * a RX code change : change the peer field in the connection structure
299 * to that remote inetrface from which the last packet for this
300 * connection was sent out. This may become an issue if further changes
303 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
304 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
306 /* rxdb_fileID is used to identify the lock location, along with line#. */
307 static int rxdb_fileID = RXDB_FILE_RX;
308 #endif /* RX_LOCKS_DB */
309 #else /* RX_ENABLE_LOCKS */
310 #define SET_CALL_QUEUE_LOCK(C, L)
311 #define CLEAR_CALL_QUEUE_LOCK(C)
312 #endif /* RX_ENABLE_LOCKS */
313 struct rx_serverQueueEntry *rx_waitForPacket = 0;
315 /* ------------Exported Interfaces------------- */
317 /* This function allows rxkad to set the epoch to a suitably random number
318 * which rx_NewConnection will use in the future. The principle purpose is to
319 * get rxnull connections to use the same epoch as the rxkad connections do, at
320 * least once the first rxkad connection is established. This is important now
321 * that the host/port addresses aren't used in FindConnection: the uniqueness
322 * of epoch/cid matters and the start time won't do. */
324 #ifdef AFS_PTHREAD_ENV
326 * This mutex protects the following global variables:
330 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
331 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
335 #endif /* AFS_PTHREAD_ENV */
337 void rx_SetEpoch (afs_uint32 epoch)
344 /* Initialize rx. A port number may be mentioned, in which case this
345 * becomes the default port number for any service installed later.
346 * If 0 is provided for the port number, a random port will be chosen
347 * by the kernel. Whether this will ever overlap anything in
348 * /etc/services is anybody's guess... Returns 0 on success, -1 on
350 static int rxinit_status = 1;
351 #ifdef AFS_PTHREAD_ENV
353 * This mutex protects the following global variables:
357 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
358 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
361 #define UNLOCK_RX_INIT
364 int rx_Init(u_int port)
371 char *htable, *ptable;
374 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
375 __djgpp_set_quiet_socket(1);
382 if (rxinit_status == 0) {
383 tmp_status = rxinit_status;
385 return tmp_status; /* Already started; return previous error code. */
389 if (afs_winsockInit()<0)
395 * Initialize anything necessary to provide a non-premptive threading
398 rxi_InitializeThreadSupport();
401 /* Allocate and initialize a socket for client and perhaps server
404 rx_socket = rxi_GetUDPSocket((u_short)port);
405 if (rx_socket == OSI_NULLSOCKET) {
411 #ifdef RX_ENABLE_LOCKS
414 #endif /* RX_LOCKS_DB */
415 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
417 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
418 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
419 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
421 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
422 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
423 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
424 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
426 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
428 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
430 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
431 #endif /* KERNEL && AFS_HPUX110_ENV */
432 #else /* RX_ENABLE_LOCKS */
433 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
434 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
435 #endif /* AFS_GLOBAL_SUNLOCK */
436 #endif /* RX_ENABLE_LOCKS */
439 rx_connDeadTime = 12;
440 rx_tranquil = 0; /* reset flag */
441 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
443 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
444 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
445 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
446 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
447 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
448 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
450 /* Malloc up a bunch of packets & buffers */
452 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
453 queue_Init(&rx_freePacketQueue);
454 rxi_NeedMorePackets = FALSE;
455 rxi_MorePackets(rx_nPackets);
463 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
464 tv.tv_sec = clock_now.sec;
465 tv.tv_usec = clock_now.usec;
466 srand((unsigned int) tv.tv_usec);
473 #if defined(KERNEL) && !defined(UKERNEL)
474 /* Really, this should never happen in a real kernel */
477 struct sockaddr_in addr;
478 int addrlen = sizeof(addr);
479 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
483 rx_port = addr.sin_port;
486 rx_stats.minRtt.sec = 9999999;
488 rx_SetEpoch (tv.tv_sec | 0x80000000);
490 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
491 * will provide a randomer value. */
493 MUTEX_ENTER(&rx_stats_mutex);
494 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
495 MUTEX_EXIT(&rx_stats_mutex);
496 /* *Slightly* random start time for the cid. This is just to help
497 * out with the hashing function at the peer */
498 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
499 rx_connHashTable = (struct rx_connection **) htable;
500 rx_peerHashTable = (struct rx_peer **) ptable;
502 rx_lastAckDelay.sec = 0;
503 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
504 rx_hardAckDelay.sec = 0;
505 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
506 rx_softAckDelay.sec = 0;
507 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
509 rxevent_Init(20, rxi_ReScheduleEvents);
511 /* Initialize various global queues */
512 queue_Init(&rx_idleServerQueue);
513 queue_Init(&rx_incomingCallQueue);
514 queue_Init(&rx_freeCallQueue);
516 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
517 /* Initialize our list of usable IP addresses. */
521 /* Start listener process (exact function is dependent on the
522 * implementation environment--kernel or user space) */
527 tmp_status = rxinit_status = 0;
532 /* called with unincremented nRequestsRunning to see if it is OK to start
533 * a new thread in this service. Could be "no" for two reasons: over the
534 * max quota, or would prevent others from reaching their min quota.
536 #ifdef RX_ENABLE_LOCKS
537 /* This verion of QuotaOK reserves quota if it's ok while the
538 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
540 static int QuotaOK(register struct rx_service *aservice)
542 /* check if over max quota */
543 if (aservice->nRequestsRunning >= aservice->maxProcs) {
547 /* under min quota, we're OK */
548 /* otherwise, can use only if there are enough to allow everyone
549 * to go to their min quota after this guy starts.
551 MUTEX_ENTER(&rx_stats_mutex);
552 if ((aservice->nRequestsRunning < aservice->minProcs) ||
553 (rxi_availProcs > rxi_minDeficit)) {
554 aservice->nRequestsRunning++;
555 /* just started call in minProcs pool, need fewer to maintain
557 if (aservice->nRequestsRunning <= aservice->minProcs)
560 MUTEX_EXIT(&rx_stats_mutex);
563 MUTEX_EXIT(&rx_stats_mutex);
568 static void ReturnToServerPool(register struct rx_service *aservice)
570 aservice->nRequestsRunning--;
571 MUTEX_ENTER(&rx_stats_mutex);
572 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
574 MUTEX_EXIT(&rx_stats_mutex);
577 #else /* RX_ENABLE_LOCKS */
578 static int QuotaOK(register struct rx_service *aservice)
581 /* under min quota, we're OK */
582 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
584 /* check if over max quota */
585 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
587 /* otherwise, can use only if there are enough to allow everyone
588 * to go to their min quota after this guy starts.
590 if (rxi_availProcs > rxi_minDeficit) rc = 1;
593 #endif /* RX_ENABLE_LOCKS */
596 /* Called by rx_StartServer to start up lwp's to service calls.
597 NExistingProcs gives the number of procs already existing, and which
598 therefore needn't be created. */
599 void rxi_StartServerProcs(int nExistingProcs)
601 register struct rx_service *service;
606 /* For each service, reserve N processes, where N is the "minimum"
607 number of processes that MUST be able to execute a request in parallel,
608 at any time, for that process. Also compute the maximum difference
609 between any service's maximum number of processes that can run
610 (i.e. the maximum number that ever will be run, and a guarantee
611 that this number will run if other services aren't running), and its
612 minimum number. The result is the extra number of processes that
613 we need in order to provide the latter guarantee */
614 for (i=0; i<RX_MAX_SERVICES; i++) {
616 service = rx_services[i];
617 if (service == (struct rx_service *) 0) break;
618 nProcs += service->minProcs;
619 diff = service->maxProcs - service->minProcs;
620 if (diff > maxdiff) maxdiff = diff;
622 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
623 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
624 for (i = 0; i<nProcs; i++) {
625 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
630 /* This routine must be called if any services are exported. If the
631 * donateMe flag is set, the calling process is donated to the server
633 void rx_StartServer(int donateMe)
635 register struct rx_service *service;
636 register int i, nProcs=0;
642 /* Start server processes, if necessary (exact function is dependent
643 * on the implementation environment--kernel or user space). DonateMe
644 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
645 * case, one less new proc will be created rx_StartServerProcs.
647 rxi_StartServerProcs(donateMe);
649 /* count up the # of threads in minProcs, and add set the min deficit to
650 * be that value, too.
652 for (i=0; i<RX_MAX_SERVICES; i++) {
653 service = rx_services[i];
654 if (service == (struct rx_service *) 0) break;
655 MUTEX_ENTER(&rx_stats_mutex);
656 rxi_totalMin += service->minProcs;
657 /* below works even if a thread is running, since minDeficit would
658 * still have been decremented and later re-incremented.
660 rxi_minDeficit += service->minProcs;
661 MUTEX_EXIT(&rx_stats_mutex);
664 /* Turn on reaping of idle server connections */
665 rxi_ReapConnections();
674 #ifdef AFS_PTHREAD_ENV
676 pid = (pid_t) pthread_self();
677 #else /* AFS_PTHREAD_ENV */
679 LWP_CurrentProcess(&pid);
680 #endif /* AFS_PTHREAD_ENV */
682 sprintf(name,"srv_%d", ++nProcs);
684 (*registerProgram)(pid, name);
686 #endif /* AFS_NT40_ENV */
687 rx_ServerProc(); /* Never returns */
692 /* Create a new client connection to the specified service, using the
693 * specified security object to implement the security model for this
695 struct rx_connection *rx_NewConnection(register afs_uint32 shost,
696 u_short sport, u_short sservice,
697 register struct rx_securityClass *securityObject, int serviceSecurityIndex)
701 register struct rx_connection *conn;
706 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
707 shost, sport, sservice, securityObject, serviceSecurityIndex));
709 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
710 * the case of kmem_alloc? */
711 conn = rxi_AllocConnection();
712 #ifdef RX_ENABLE_LOCKS
713 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
714 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
715 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
719 MUTEX_ENTER(&rx_connHashTable_lock);
720 cid = (rx_nextCid += RX_MAXCALLS);
721 conn->type = RX_CLIENT_CONNECTION;
723 conn->epoch = rx_epoch;
724 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
725 conn->serviceId = sservice;
726 conn->securityObject = securityObject;
727 /* This doesn't work in all compilers with void (they're buggy), so fake it
729 conn->securityData = (VOID *) 0;
730 conn->securityIndex = serviceSecurityIndex;
731 rx_SetConnDeadTime(conn, rx_connDeadTime);
732 conn->ackRate = RX_FAST_ACK_RATE;
734 conn->specific = NULL;
735 conn->challengeEvent = NULL;
736 conn->delayedAbortEvent = NULL;
737 conn->abortCount = 0;
740 RXS_NewConnection(securityObject, conn);
741 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
743 conn->refCount++; /* no lock required since only this thread knows... */
744 conn->next = rx_connHashTable[hashindex];
745 rx_connHashTable[hashindex] = conn;
746 MUTEX_ENTER(&rx_stats_mutex);
747 rx_stats.nClientConns++;
748 MUTEX_EXIT(&rx_stats_mutex);
750 MUTEX_EXIT(&rx_connHashTable_lock);
756 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
758 /* The idea is to set the dead time to a value that allows several
759 * keepalives to be dropped without timing out the connection. */
760 conn->secondsUntilDead = MAX(seconds, 6);
761 conn->secondsUntilPing = conn->secondsUntilDead/6;
764 int rxi_lowPeerRefCount = 0;
765 int rxi_lowConnRefCount = 0;
768 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
769 * NOTE: must not be called with rx_connHashTable_lock held.
771 void rxi_CleanupConnection(struct rx_connection *conn)
773 /* Notify the service exporter, if requested, that this connection
774 * is being destroyed */
775 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
776 (*conn->service->destroyConnProc)(conn);
778 /* Notify the security module that this connection is being destroyed */
779 RXS_DestroyConnection(conn->securityObject, conn);
781 /* If this is the last connection using the rx_peer struct, set its
782 * idle time to now. rxi_ReapConnections will reap it if it's still
783 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
785 MUTEX_ENTER(&rx_peerHashTable_lock);
786 if (--conn->peer->refCount <= 0) {
787 conn->peer->idleWhen = clock_Sec();
788 if (conn->peer->refCount < 0) {
789 conn->peer->refCount = 0;
790 MUTEX_ENTER(&rx_stats_mutex);
791 rxi_lowPeerRefCount ++;
792 MUTEX_EXIT(&rx_stats_mutex);
795 MUTEX_EXIT(&rx_peerHashTable_lock);
797 MUTEX_ENTER(&rx_stats_mutex);
798 if (conn->type == RX_SERVER_CONNECTION)
799 rx_stats.nServerConns--;
801 rx_stats.nClientConns--;
802 MUTEX_EXIT(&rx_stats_mutex);
805 if (conn->specific) {
807 for (i = 0 ; i < conn->nSpecific ; i++) {
808 if (conn->specific[i] && rxi_keyCreate_destructor[i])
809 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
810 conn->specific[i] = NULL;
812 free(conn->specific);
814 conn->specific = NULL;
818 MUTEX_DESTROY(&conn->conn_call_lock);
819 MUTEX_DESTROY(&conn->conn_data_lock);
820 CV_DESTROY(&conn->conn_call_cv);
822 rxi_FreeConnection(conn);
825 /* Destroy the specified connection */
826 void rxi_DestroyConnection(register struct rx_connection *conn)
828 MUTEX_ENTER(&rx_connHashTable_lock);
829 rxi_DestroyConnectionNoLock(conn);
830 /* conn should be at the head of the cleanup list */
831 if (conn == rx_connCleanup_list) {
832 rx_connCleanup_list = rx_connCleanup_list->next;
833 MUTEX_EXIT(&rx_connHashTable_lock);
834 rxi_CleanupConnection(conn);
836 #ifdef RX_ENABLE_LOCKS
838 MUTEX_EXIT(&rx_connHashTable_lock);
840 #endif /* RX_ENABLE_LOCKS */
843 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
845 register struct rx_connection **conn_ptr;
846 register int havecalls = 0;
847 struct rx_packet *packet;
854 MUTEX_ENTER(&conn->conn_data_lock);
855 if (conn->refCount > 0)
858 MUTEX_ENTER(&rx_stats_mutex);
859 rxi_lowConnRefCount++;
860 MUTEX_EXIT(&rx_stats_mutex);
863 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
864 /* Busy; wait till the last guy before proceeding */
865 MUTEX_EXIT(&conn->conn_data_lock);
870 /* If the client previously called rx_NewCall, but it is still
871 * waiting, treat this as a running call, and wait to destroy the
872 * connection later when the call completes. */
873 if ((conn->type == RX_CLIENT_CONNECTION) &&
874 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
875 conn->flags |= RX_CONN_DESTROY_ME;
876 MUTEX_EXIT(&conn->conn_data_lock);
880 MUTEX_EXIT(&conn->conn_data_lock);
882 /* Check for extant references to this connection */
883 for (i = 0; i<RX_MAXCALLS; i++) {
884 register struct rx_call *call = conn->call[i];
887 if (conn->type == RX_CLIENT_CONNECTION) {
888 MUTEX_ENTER(&call->lock);
889 if (call->delayedAckEvent) {
890 /* Push the final acknowledgment out now--there
891 * won't be a subsequent call to acknowledge the
892 * last reply packets */
893 rxevent_Cancel(call->delayedAckEvent, call,
894 RX_CALL_REFCOUNT_DELAY);
895 if (call->state == RX_STATE_PRECALL ||
896 call->state == RX_STATE_ACTIVE) {
897 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
899 rxi_AckAll(NULL, call, 0);
902 MUTEX_EXIT(&call->lock);
906 #ifdef RX_ENABLE_LOCKS
908 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
909 MUTEX_EXIT(&conn->conn_data_lock);
912 /* Someone is accessing a packet right now. */
916 #endif /* RX_ENABLE_LOCKS */
919 /* Don't destroy the connection if there are any call
920 * structures still in use */
921 MUTEX_ENTER(&conn->conn_data_lock);
922 conn->flags |= RX_CONN_DESTROY_ME;
923 MUTEX_EXIT(&conn->conn_data_lock);
928 if (conn->delayedAbortEvent) {
929 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
930 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
932 MUTEX_ENTER(&conn->conn_data_lock);
933 rxi_SendConnectionAbort(conn, packet, 0, 1);
934 MUTEX_EXIT(&conn->conn_data_lock);
935 rxi_FreePacket(packet);
939 /* Remove from connection hash table before proceeding */
940 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
941 conn->epoch, conn->type) ];
942 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
943 if (*conn_ptr == conn) {
944 *conn_ptr = conn->next;
948 /* if the conn that we are destroying was the last connection, then we
949 * clear rxLastConn as well */
950 if ( rxLastConn == conn )
953 /* Make sure the connection is completely reset before deleting it. */
954 /* get rid of pending events that could zap us later */
955 if (conn->challengeEvent)
956 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
957 if (conn->checkReachEvent)
958 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
960 /* Add the connection to the list of destroyed connections that
961 * need to be cleaned up. This is necessary to avoid deadlocks
962 * in the routines we call to inform others that this connection is
963 * being destroyed. */
964 conn->next = rx_connCleanup_list;
965 rx_connCleanup_list = conn;
968 /* Externally available version */
969 void rx_DestroyConnection(register struct rx_connection *conn)
975 rxi_DestroyConnection (conn);
980 /* Start a new rx remote procedure call, on the specified connection.
981 * If wait is set to 1, wait for a free call channel; otherwise return
982 * 0. Maxtime gives the maximum number of seconds this call may take,
983 * after rx_MakeCall returns. After this time interval, a call to any
984 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
985 * For fine grain locking, we hold the conn_call_lock in order to
986 * to ensure that we don't get signalle after we found a call in an active
987 * state and before we go to sleep.
989 struct rx_call *rx_NewCall(register struct rx_connection *conn)
992 register struct rx_call *call;
993 struct clock queueTime;
997 dpf (("rx_MakeCall(conn %x)\n", conn));
1000 clock_GetTime(&queueTime);
1002 MUTEX_ENTER(&conn->conn_call_lock);
1005 * Check if there are others waiting for a new call.
1006 * If so, let them go first to avoid starving them.
1007 * This is a fairly simple scheme, and might not be
1008 * a complete solution for large numbers of waiters.
1010 if (conn->makeCallWaiters) {
1011 #ifdef RX_ENABLE_LOCKS
1012 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1019 for (i=0; i<RX_MAXCALLS; i++) {
1020 call = conn->call[i];
1022 MUTEX_ENTER(&call->lock);
1023 if (call->state == RX_STATE_DALLY) {
1024 rxi_ResetCall(call, 0);
1025 (*call->callNumber)++;
1028 MUTEX_EXIT(&call->lock);
1031 call = rxi_NewCall(conn, i);
1035 if (i < RX_MAXCALLS) {
1038 MUTEX_ENTER(&conn->conn_data_lock);
1039 conn->flags |= RX_CONN_MAKECALL_WAITING;
1040 MUTEX_EXIT(&conn->conn_data_lock);
1042 conn->makeCallWaiters++;
1043 #ifdef RX_ENABLE_LOCKS
1044 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1048 conn->makeCallWaiters--;
1051 * Wake up anyone else who might be giving us a chance to
1052 * run (see code above that avoids resource starvation).
1054 #ifdef RX_ENABLE_LOCKS
1055 CV_BROADCAST(&conn->conn_call_cv);
1060 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1062 /* Client is initially in send mode */
1063 call->state = RX_STATE_ACTIVE;
1064 call->mode = RX_MODE_SENDING;
1066 /* remember start time for call in case we have hard dead time limit */
1067 call->queueTime = queueTime;
1068 clock_GetTime(&call->startTime);
1069 hzero(call->bytesSent);
1070 hzero(call->bytesRcvd);
1072 /* Turn on busy protocol. */
1073 rxi_KeepAliveOn(call);
1075 MUTEX_EXIT(&call->lock);
1076 MUTEX_EXIT(&conn->conn_call_lock);
1080 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1081 /* Now, if TQ wasn't cleared earlier, do it now. */
1083 MUTEX_ENTER(&call->lock);
1084 while (call->flags & RX_CALL_TQ_BUSY) {
1085 call->flags |= RX_CALL_TQ_WAIT;
1086 #ifdef RX_ENABLE_LOCKS
1087 CV_WAIT(&call->cv_tq, &call->lock);
1088 #else /* RX_ENABLE_LOCKS */
1089 osi_rxSleep(&call->tq);
1090 #endif /* RX_ENABLE_LOCKS */
1092 if (call->flags & RX_CALL_TQ_CLEARME) {
1093 rxi_ClearTransmitQueue(call, 0);
1094 queue_Init(&call->tq);
1096 MUTEX_EXIT(&call->lock);
1098 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1103 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1106 register struct rx_call *tcall;
1110 for(i=0; i<RX_MAXCALLS; i++) {
1111 if ((tcall = aconn->call[i])) {
1112 if ((tcall->state == RX_STATE_ACTIVE)
1113 || (tcall->state == RX_STATE_PRECALL)) {
1123 int rxi_GetCallNumberVector(register struct rx_connection *aconn,
1124 register afs_int32 *aint32s)
1127 register struct rx_call *tcall;
1131 for(i=0; i<RX_MAXCALLS; i++) {
1132 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1133 aint32s[i] = aconn->callNumber[i]+1;
1135 aint32s[i] = aconn->callNumber[i];
1141 int rxi_SetCallNumberVector(register struct rx_connection *aconn,
1142 register afs_int32 *aint32s)
1145 register struct rx_call *tcall;
1149 for(i=0; i<RX_MAXCALLS; i++) {
1150 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1151 aconn->callNumber[i] = aint32s[i] - 1;
1153 aconn->callNumber[i] = aint32s[i];
1159 /* Advertise a new service. A service is named locally by a UDP port
1160 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1163 char *serviceName; Name for identification purposes (e.g. the
1164 service name might be used for probing for
1166 struct rx_service *rx_NewService(u_short port, u_short serviceId,
1168 struct rx_securityClass **securityObjects,
1169 int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1171 osi_socket socket = OSI_NULLSOCKET;
1172 register struct rx_service *tservice;
1178 if (serviceId == 0) {
1179 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1185 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1192 tservice = rxi_AllocService();
1195 for (i = 0; i<RX_MAX_SERVICES; i++) {
1196 register struct rx_service *service = rx_services[i];
1198 if (port == service->servicePort) {
1199 if (service->serviceId == serviceId) {
1200 /* The identical service has already been
1201 * installed; if the caller was intending to
1202 * change the security classes used by this
1203 * service, he/she loses. */
1204 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1207 rxi_FreeService(tservice);
1210 /* Different service, same port: re-use the socket
1211 * which is bound to the same port */
1212 socket = service->socket;
1215 if (socket == OSI_NULLSOCKET) {
1216 /* If we don't already have a socket (from another
1217 * service on same port) get a new one */
1218 socket = rxi_GetUDPSocket(port);
1219 if (socket == OSI_NULLSOCKET) {
1222 rxi_FreeService(tservice);
1227 service->socket = socket;
1228 service->servicePort = port;
1229 service->serviceId = serviceId;
1230 service->serviceName = serviceName;
1231 service->nSecurityObjects = nSecurityObjects;
1232 service->securityObjects = securityObjects;
1233 service->minProcs = 0;
1234 service->maxProcs = 1;
1235 service->idleDeadTime = 60;
1236 service->connDeadTime = rx_connDeadTime;
1237 service->executeRequestProc = serviceProc;
1238 service->checkReach = 0;
1239 rx_services[i] = service; /* not visible until now */
1247 rxi_FreeService(tservice);
1248 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1252 /* Generic request processing loop. This routine should be called
1253 * by the implementation dependent rx_ServerProc. If socketp is
1254 * non-null, it will be set to the file descriptor that this thread
1255 * is now listening on. If socketp is null, this routine will never
1257 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1259 register struct rx_call *call;
1260 register afs_int32 code;
1261 register struct rx_service *tservice = NULL;
1268 call = rx_GetCall(threadID, tservice, socketp);
1269 if (socketp && *socketp != OSI_NULLSOCKET) {
1270 /* We are now a listener thread */
1275 /* if server is restarting( typically smooth shutdown) then do not
1276 * allow any new calls.
1279 if ( rx_tranquil && (call != NULL) ) {
1284 MUTEX_ENTER(&call->lock);
1286 rxi_CallError(call, RX_RESTARTING);
1287 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1289 MUTEX_EXIT(&call->lock);
1295 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1296 #ifdef RX_ENABLE_LOCKS
1298 #endif /* RX_ENABLE_LOCKS */
1299 afs_termState = AFSOP_STOP_AFS;
1300 afs_osi_Wakeup(&afs_termState);
1301 #ifdef RX_ENABLE_LOCKS
1303 #endif /* RX_ENABLE_LOCKS */
1308 tservice = call->conn->service;
1310 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1312 code = call->conn->service->executeRequestProc(call);
1314 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1316 rx_EndCall(call, code);
1317 MUTEX_ENTER(&rx_stats_mutex);
1319 MUTEX_EXIT(&rx_stats_mutex);
1324 void rx_WakeupServerProcs(void)
1326 struct rx_serverQueueEntry *np, *tqp;
1331 MUTEX_ENTER(&rx_serverPool_lock);
1333 #ifdef RX_ENABLE_LOCKS
1334 if (rx_waitForPacket)
1335 CV_BROADCAST(&rx_waitForPacket->cv);
1336 #else /* RX_ENABLE_LOCKS */
1337 if (rx_waitForPacket)
1338 osi_rxWakeup(rx_waitForPacket);
1339 #endif /* RX_ENABLE_LOCKS */
1340 MUTEX_ENTER(&freeSQEList_lock);
1341 for (np = rx_FreeSQEList; np; np = tqp) {
1342 tqp = *(struct rx_serverQueueEntry **)np;
1343 #ifdef RX_ENABLE_LOCKS
1344 CV_BROADCAST(&np->cv);
1345 #else /* RX_ENABLE_LOCKS */
1347 #endif /* RX_ENABLE_LOCKS */
1349 MUTEX_EXIT(&freeSQEList_lock);
1350 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1351 #ifdef RX_ENABLE_LOCKS
1352 CV_BROADCAST(&np->cv);
1353 #else /* RX_ENABLE_LOCKS */
1355 #endif /* RX_ENABLE_LOCKS */
1357 MUTEX_EXIT(&rx_serverPool_lock);
1363 * One thing that seems to happen is that all the server threads get
1364 * tied up on some empty or slow call, and then a whole bunch of calls
1365 * arrive at once, using up the packet pool, so now there are more
1366 * empty calls. The most critical resources here are server threads
1367 * and the free packet pool. The "doreclaim" code seems to help in
1368 * general. I think that eventually we arrive in this state: there
1369 * are lots of pending calls which do have all their packets present,
1370 * so they won't be reclaimed, are multi-packet calls, so they won't
1371 * be scheduled until later, and thus are tying up most of the free
1372 * packet pool for a very long time.
1374 * 1. schedule multi-packet calls if all the packets are present.
1375 * Probably CPU-bound operation, useful to return packets to pool.
1376 * Do what if there is a full window, but the last packet isn't here?
1377 * 3. preserve one thread which *only* runs "best" calls, otherwise
1378 * it sleeps and waits for that type of call.
1379 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1380 * the current dataquota business is badly broken. The quota isn't adjusted
1381 * to reflect how many packets are presently queued for a running call.
1382 * So, when we schedule a queued call with a full window of packets queued
1383 * up for it, that *should* free up a window full of packets for other 2d-class
1384 * calls to be able to use from the packet pool. But it doesn't.
1386 * NB. Most of the time, this code doesn't run -- since idle server threads
1387 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1388 * as a new call arrives.
1390 /* Sleep until a call arrives. Returns a pointer to the call, ready
1391 * for an rx_Read. */
1392 #ifdef RX_ENABLE_LOCKS
1393 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1395 struct rx_serverQueueEntry *sq;
1396 register struct rx_call *call = (struct rx_call *) 0;
1397 struct rx_service *service = NULL;
1400 MUTEX_ENTER(&freeSQEList_lock);
1402 if ((sq = rx_FreeSQEList)) {
1403 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1404 MUTEX_EXIT(&freeSQEList_lock);
1405 } else { /* otherwise allocate a new one and return that */
1406 MUTEX_EXIT(&freeSQEList_lock);
1407 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1408 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1409 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1412 MUTEX_ENTER(&rx_serverPool_lock);
1413 if (cur_service != NULL) {
1414 ReturnToServerPool(cur_service);
1417 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1418 register struct rx_call *tcall, *ncall, *choice2 = NULL;
1420 /* Scan for eligible incoming calls. A call is not eligible
1421 * if the maximum number of calls for its service type are
1422 * already executing */
1423 /* One thread will process calls FCFS (to prevent starvation),
1424 * while the other threads may run ahead looking for calls which
1425 * have all their input data available immediately. This helps
1426 * keep threads from blocking, waiting for data from the client. */
1427 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1428 service = tcall->conn->service;
1429 if (!QuotaOK(service)) {
1432 if (!tno || !tcall->queue_item_header.next) {
1433 /* If we're thread 0, then we'll just use
1434 * this call. If we haven't been able to find an optimal
1435 * choice, and we're at the end of the list, then use a
1436 * 2d choice if one has been identified. Otherwise... */
1437 call = (choice2 ? choice2 : tcall);
1438 service = call->conn->service;
1439 } else if (!queue_IsEmpty(&tcall->rq)) {
1440 struct rx_packet *rp;
1441 rp = queue_First(&tcall->rq, rx_packet);
1442 if (rp->header.seq == 1) {
1443 if (!meltdown_1pkt ||
1444 (rp->header.flags & RX_LAST_PACKET)) {
1446 } else if (rxi_2dchoice && !choice2 &&
1447 !(tcall->flags & RX_CALL_CLEARED) &&
1448 (tcall->rprev > rxi_HardAckRate)) {
1450 } else rxi_md2cnt++;
1456 ReturnToServerPool(service);
1463 MUTEX_EXIT(&rx_serverPool_lock);
1464 MUTEX_ENTER(&call->lock);
1466 if (call->state != RX_STATE_PRECALL || call->error) {
1467 MUTEX_EXIT(&call->lock);
1468 MUTEX_ENTER(&rx_serverPool_lock);
1469 ReturnToServerPool(service);
1474 if (queue_IsEmpty(&call->rq) ||
1475 queue_First(&call->rq, rx_packet)->header.seq != 1)
1476 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1478 CLEAR_CALL_QUEUE_LOCK(call);
1479 call->flags &= ~RX_CALL_WAIT_PROC;
1480 MUTEX_ENTER(&rx_stats_mutex);
1482 MUTEX_EXIT(&rx_stats_mutex);
1486 /* If there are no eligible incoming calls, add this process
1487 * to the idle server queue, to wait for one */
1491 *socketp = OSI_NULLSOCKET;
1493 sq->socketp = socketp;
1494 queue_Append(&rx_idleServerQueue, sq);
1495 #ifndef AFS_AIX41_ENV
1496 rx_waitForPacket = sq;
1497 #endif /* AFS_AIX41_ENV */
1499 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1501 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1502 MUTEX_EXIT(&rx_serverPool_lock);
1503 return (struct rx_call *)0;
1506 } while (!(call = sq->newcall) &&
1507 !(socketp && *socketp != OSI_NULLSOCKET));
1508 MUTEX_EXIT(&rx_serverPool_lock);
1510 MUTEX_ENTER(&call->lock);
1516 MUTEX_ENTER(&freeSQEList_lock);
1517 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1518 rx_FreeSQEList = sq;
1519 MUTEX_EXIT(&freeSQEList_lock);
1522 clock_GetTime(&call->startTime);
1523 call->state = RX_STATE_ACTIVE;
1524 call->mode = RX_MODE_RECEIVING;
1526 rxi_calltrace(RX_CALL_START, call);
1527 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1528 call->conn->service->servicePort,
1529 call->conn->service->serviceId, call));
1531 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1532 MUTEX_EXIT(&call->lock);
1534 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1539 #else /* RX_ENABLE_LOCKS */
1540 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1542 struct rx_serverQueueEntry *sq;
1543 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1544 struct rx_service *service = NULL;
1549 MUTEX_ENTER(&freeSQEList_lock);
1551 if ((sq = rx_FreeSQEList)) {
1552 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1553 MUTEX_EXIT(&freeSQEList_lock);
1554 } else { /* otherwise allocate a new one and return that */
1555 MUTEX_EXIT(&freeSQEList_lock);
1556 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1557 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1558 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1560 MUTEX_ENTER(&sq->lock);
1562 if (cur_service != NULL) {
1563 cur_service->nRequestsRunning--;
1564 if (cur_service->nRequestsRunning < cur_service->minProcs)
1568 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1569 register struct rx_call *tcall, *ncall;
1570 /* Scan for eligible incoming calls. A call is not eligible
1571 * if the maximum number of calls for its service type are
1572 * already executing */
1573 /* One thread will process calls FCFS (to prevent starvation),
1574 * while the other threads may run ahead looking for calls which
1575 * have all their input data available immediately. This helps
1576 * keep threads from blocking, waiting for data from the client. */
1577 choice2 = (struct rx_call *) 0;
1578 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1579 service = tcall->conn->service;
1580 if (QuotaOK(service)) {
1581 if (!tno || !tcall->queue_item_header.next ) {
1582 /* If we're thread 0, then we'll just use
1583 * this call. If we haven't been able to find an optimal
1584 * choice, and we're at the end of the list, then use a
1585 * 2d choice if one has been identified. Otherwise... */
1586 call = (choice2 ? choice2 : tcall);
1587 service = call->conn->service;
1588 } else if (!queue_IsEmpty(&tcall->rq)) {
1589 struct rx_packet *rp;
1590 rp = queue_First(&tcall->rq, rx_packet);
1591 if (rp->header.seq == 1
1592 && (!meltdown_1pkt ||
1593 (rp->header.flags & RX_LAST_PACKET))) {
1595 } else if (rxi_2dchoice && !choice2 &&
1596 !(tcall->flags & RX_CALL_CLEARED) &&
1597 (tcall->rprev > rxi_HardAckRate)) {
1599 } else rxi_md2cnt++;
1609 /* we can't schedule a call if there's no data!!! */
1610 /* send an ack if there's no data, if we're missing the
1611 * first packet, or we're missing something between first
1612 * and last -- there's a "hole" in the incoming data. */
1613 if (queue_IsEmpty(&call->rq) ||
1614 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1615 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1616 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1618 call->flags &= (~RX_CALL_WAIT_PROC);
1619 service->nRequestsRunning++;
1620 /* just started call in minProcs pool, need fewer to maintain
1622 if (service->nRequestsRunning <= service->minProcs)
1626 /* MUTEX_EXIT(&call->lock); */
1629 /* If there are no eligible incoming calls, add this process
1630 * to the idle server queue, to wait for one */
1633 *socketp = OSI_NULLSOCKET;
1635 sq->socketp = socketp;
1636 queue_Append(&rx_idleServerQueue, sq);
1640 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1643 return (struct rx_call *)0;
1646 } while (!(call = sq->newcall) &&
1647 !(socketp && *socketp != OSI_NULLSOCKET));
1649 MUTEX_EXIT(&sq->lock);
1651 MUTEX_ENTER(&freeSQEList_lock);
1652 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1653 rx_FreeSQEList = sq;
1654 MUTEX_EXIT(&freeSQEList_lock);
1657 clock_GetTime(&call->startTime);
1658 call->state = RX_STATE_ACTIVE;
1659 call->mode = RX_MODE_RECEIVING;
1661 rxi_calltrace(RX_CALL_START, call);
1662 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1663 call->conn->service->servicePort,
1664 call->conn->service->serviceId, call));
1666 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1674 #endif /* RX_ENABLE_LOCKS */
1678 /* Establish a procedure to be called when a packet arrives for a
1679 * call. This routine will be called at most once after each call,
1680 * and will also be called if there is an error condition on the or
1681 * the call is complete. Used by multi rx to build a selection
1682 * function which determines which of several calls is likely to be a
1683 * good one to read from.
1684 * NOTE: the way this is currently implemented it is probably only a
1685 * good idea to (1) use it immediately after a newcall (clients only)
1686 * and (2) only use it once. Other uses currently void your warranty
1688 void rx_SetArrivalProc(register struct rx_call *call,
1689 register VOID (*proc)(register struct rx_call *call,
1690 register struct multi_handle *mh, register int index),
1691 register VOID *handle, register VOID *arg)
1693 call->arrivalProc = proc;
1694 call->arrivalProcHandle = handle;
1695 call->arrivalProcArg = arg;
1698 /* Call is finished (possibly prematurely). Return rc to the peer, if
1699 * appropriate, and return the final error code from the conversation
1702 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1704 register struct rx_connection *conn = call->conn;
1705 register struct rx_service *service;
1706 register struct rx_packet *tp; /* Temporary packet pointer */
1707 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1711 dpf(("rx_EndCall(call %x)\n", call));
1715 MUTEX_ENTER(&call->lock);
1717 if (rc == 0 && call->error == 0) {
1718 call->abortCode = 0;
1719 call->abortCount = 0;
1722 call->arrivalProc = (VOID (*)()) 0;
1723 if (rc && call->error == 0) {
1724 rxi_CallError(call, rc);
1725 /* Send an abort message to the peer if this error code has
1726 * only just been set. If it was set previously, assume the
1727 * peer has already been sent the error code or will request it
1729 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1731 if (conn->type == RX_SERVER_CONNECTION) {
1732 /* Make sure reply or at least dummy reply is sent */
1733 if (call->mode == RX_MODE_RECEIVING) {
1734 rxi_WriteProc(call, 0, 0);
1736 if (call->mode == RX_MODE_SENDING) {
1737 rxi_FlushWrite(call);
1739 service = conn->service;
1740 rxi_calltrace(RX_CALL_END, call);
1741 /* Call goes to hold state until reply packets are acknowledged */
1742 if (call->tfirst + call->nSoftAcked < call->tnext) {
1743 call->state = RX_STATE_HOLD;
1745 call->state = RX_STATE_DALLY;
1746 rxi_ClearTransmitQueue(call, 0);
1747 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1748 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1751 else { /* Client connection */
1753 /* Make sure server receives input packets, in the case where
1754 * no reply arguments are expected */
1755 if ((call->mode == RX_MODE_SENDING)
1756 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1757 (void) rxi_ReadProc(call, &dummy, 1);
1760 /* If we had an outstanding delayed ack, be nice to the server
1761 * and force-send it now.
1763 if (call->delayedAckEvent) {
1764 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1765 call->delayedAckEvent = NULL;
1766 rxi_SendDelayedAck(NULL, call, NULL);
1769 /* We need to release the call lock since it's lower than the
1770 * conn_call_lock and we don't want to hold the conn_call_lock
1771 * over the rx_ReadProc call. The conn_call_lock needs to be held
1772 * here for the case where rx_NewCall is perusing the calls on
1773 * the connection structure. We don't want to signal until
1774 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1775 * have checked this call, found it active and by the time it
1776 * goes to sleep, will have missed the signal.
1778 MUTEX_EXIT(&call->lock);
1779 MUTEX_ENTER(&conn->conn_call_lock);
1780 MUTEX_ENTER(&call->lock);
1781 MUTEX_ENTER(&conn->conn_data_lock);
1782 conn->flags |= RX_CONN_BUSY;
1783 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1784 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1785 MUTEX_EXIT(&conn->conn_data_lock);
1786 #ifdef RX_ENABLE_LOCKS
1787 CV_BROADCAST(&conn->conn_call_cv);
1792 #ifdef RX_ENABLE_LOCKS
1794 MUTEX_EXIT(&conn->conn_data_lock);
1796 #endif /* RX_ENABLE_LOCKS */
1797 call->state = RX_STATE_DALLY;
1799 error = call->error;
1801 /* currentPacket, nLeft, and NFree must be zeroed here, because
1802 * ResetCall cannot: ResetCall may be called at splnet(), in the
1803 * kernel version, and may interrupt the macros rx_Read or
1804 * rx_Write, which run at normal priority for efficiency. */
1805 if (call->currentPacket) {
1806 rxi_FreePacket(call->currentPacket);
1807 call->currentPacket = (struct rx_packet *) 0;
1808 call->nLeft = call->nFree = call->curlen = 0;
1811 call->nLeft = call->nFree = call->curlen = 0;
1813 /* Free any packets from the last call to ReadvProc/WritevProc */
1814 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1819 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1820 MUTEX_EXIT(&call->lock);
1821 if (conn->type == RX_CLIENT_CONNECTION) {
1822 MUTEX_EXIT(&conn->conn_call_lock);
1823 conn->flags &= ~RX_CONN_BUSY;
1828 * Map errors to the local host's errno.h format.
1830 error = ntoh_syserr_conv(error);
1834 #if !defined(KERNEL)
1836 /* Call this routine when shutting down a server or client (especially
1837 * clients). This will allow Rx to gracefully garbage collect server
1838 * connections, and reduce the number of retries that a server might
1839 * make to a dead client.
1840 * This is not quite right, since some calls may still be ongoing and
1841 * we can't lock them to destroy them. */
1842 void rx_Finalize(void)
1844 register struct rx_connection **conn_ptr, **conn_end;
1848 if (rxinit_status == 1) {
1850 return; /* Already shutdown. */
1852 rxi_DeleteCachedConnections();
1853 if (rx_connHashTable) {
1854 MUTEX_ENTER(&rx_connHashTable_lock);
1855 for (conn_ptr = &rx_connHashTable[0],
1856 conn_end = &rx_connHashTable[rx_hashTableSize];
1857 conn_ptr < conn_end; conn_ptr++) {
1858 struct rx_connection *conn, *next;
1859 for (conn = *conn_ptr; conn; conn = next) {
1861 if (conn->type == RX_CLIENT_CONNECTION) {
1862 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1864 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1865 #ifdef RX_ENABLE_LOCKS
1866 rxi_DestroyConnectionNoLock(conn);
1867 #else /* RX_ENABLE_LOCKS */
1868 rxi_DestroyConnection(conn);
1869 #endif /* RX_ENABLE_LOCKS */
1873 #ifdef RX_ENABLE_LOCKS
1874 while (rx_connCleanup_list) {
1875 struct rx_connection *conn;
1876 conn = rx_connCleanup_list;
1877 rx_connCleanup_list = rx_connCleanup_list->next;
1878 MUTEX_EXIT(&rx_connHashTable_lock);
1879 rxi_CleanupConnection(conn);
1880 MUTEX_ENTER(&rx_connHashTable_lock);
1882 MUTEX_EXIT(&rx_connHashTable_lock);
1883 #endif /* RX_ENABLE_LOCKS */
1892 /* if we wakeup packet waiter too often, can get in loop with two
1893 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1894 void rxi_PacketsUnWait(void)
1896 if (!rx_waitingForPackets) {
1900 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1901 return; /* still over quota */
1904 rx_waitingForPackets = 0;
1905 #ifdef RX_ENABLE_LOCKS
1906 CV_BROADCAST(&rx_waitingForPackets_cv);
1908 osi_rxWakeup(&rx_waitingForPackets);
1914 /* ------------------Internal interfaces------------------------- */
1916 /* Return this process's service structure for the
1917 * specified socket and service */
1918 struct rx_service *rxi_FindService(register osi_socket socket,
1919 register u_short serviceId)
1921 register struct rx_service **sp;
1922 for (sp = &rx_services[0]; *sp; sp++) {
1923 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1929 /* Allocate a call structure, for the indicated channel of the
1930 * supplied connection. The mode and state of the call must be set by
1931 * the caller. Returns the call with mutex locked. */
1932 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1933 register int channel)
1935 register struct rx_call *call;
1936 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1937 register struct rx_call *cp; /* Call pointer temp */
1938 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1939 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1941 /* Grab an existing call structure, or allocate a new one.
1942 * Existing call structures are assumed to have been left reset by
1944 MUTEX_ENTER(&rx_freeCallQueue_lock);
1946 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1948 * EXCEPT that the TQ might not yet be cleared out.
1949 * Skip over those with in-use TQs.
1952 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1953 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1959 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1960 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1961 call = queue_First(&rx_freeCallQueue, rx_call);
1962 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1964 MUTEX_ENTER(&rx_stats_mutex);
1965 rx_stats.nFreeCallStructs--;
1966 MUTEX_EXIT(&rx_stats_mutex);
1967 MUTEX_EXIT(&rx_freeCallQueue_lock);
1968 MUTEX_ENTER(&call->lock);
1969 CLEAR_CALL_QUEUE_LOCK(call);
1970 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1971 /* Now, if TQ wasn't cleared earlier, do it now. */
1972 if (call->flags & RX_CALL_TQ_CLEARME) {
1973 rxi_ClearTransmitQueue(call, 0);
1974 queue_Init(&call->tq);
1976 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1977 /* Bind the call to its connection structure */
1979 rxi_ResetCall(call, 1);
1982 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1984 MUTEX_EXIT(&rx_freeCallQueue_lock);
1985 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1986 MUTEX_ENTER(&call->lock);
1987 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1988 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1989 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1991 MUTEX_ENTER(&rx_stats_mutex);
1992 rx_stats.nCallStructs++;
1993 MUTEX_EXIT(&rx_stats_mutex);
1994 /* Initialize once-only items */
1995 queue_Init(&call->tq);
1996 queue_Init(&call->rq);
1997 queue_Init(&call->iovq);
1998 /* Bind the call to its connection structure (prereq for reset) */
2000 rxi_ResetCall(call, 1);
2002 call->channel = channel;
2003 call->callNumber = &conn->callNumber[channel];
2004 /* Note that the next expected call number is retained (in
2005 * conn->callNumber[i]), even if we reallocate the call structure
2007 conn->call[channel] = call;
2008 /* if the channel's never been used (== 0), we should start at 1, otherwise
2009 the call number is valid from the last time this channel was used */
2010 if (*call->callNumber == 0) *call->callNumber = 1;
2015 /* A call has been inactive long enough that so we can throw away
2016 * state, including the call structure, which is placed on the call
2018 * Call is locked upon entry.
2019 * haveCTLock set if called from rxi_ReapConnections
2021 #ifdef RX_ENABLE_LOCKS
2022 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2023 #else /* RX_ENABLE_LOCKS */
2024 void rxi_FreeCall(register struct rx_call *call)
2025 #endif /* RX_ENABLE_LOCKS */
2027 register int channel = call->channel;
2028 register struct rx_connection *conn = call->conn;
2031 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2032 (*call->callNumber)++;
2033 rxi_ResetCall(call, 0);
2034 call->conn->call[channel] = (struct rx_call *) 0;
2036 MUTEX_ENTER(&rx_freeCallQueue_lock);
2037 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2038 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2039 /* A call may be free even though its transmit queue is still in use.
2040 * Since we search the call list from head to tail, put busy calls at
2041 * the head of the list, and idle calls at the tail.
2043 if (call->flags & RX_CALL_TQ_BUSY)
2044 queue_Prepend(&rx_freeCallQueue, call);
2046 queue_Append(&rx_freeCallQueue, call);
2047 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2048 queue_Append(&rx_freeCallQueue, call);
2049 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2050 MUTEX_ENTER(&rx_stats_mutex);
2051 rx_stats.nFreeCallStructs++;
2052 MUTEX_EXIT(&rx_stats_mutex);
2054 MUTEX_EXIT(&rx_freeCallQueue_lock);
2056 /* Destroy the connection if it was previously slated for
2057 * destruction, i.e. the Rx client code previously called
2058 * rx_DestroyConnection (client connections), or
2059 * rxi_ReapConnections called the same routine (server
2060 * connections). Only do this, however, if there are no
2061 * outstanding calls. Note that for fine grain locking, there appears
2062 * to be a deadlock in that rxi_FreeCall has a call locked and
2063 * DestroyConnectionNoLock locks each call in the conn. But note a
2064 * few lines up where we have removed this call from the conn.
2065 * If someone else destroys a connection, they either have no
2066 * call lock held or are going through this section of code.
2068 if (conn->flags & RX_CONN_DESTROY_ME) {
2069 MUTEX_ENTER(&conn->conn_data_lock);
2071 MUTEX_EXIT(&conn->conn_data_lock);
2072 #ifdef RX_ENABLE_LOCKS
2074 rxi_DestroyConnectionNoLock(conn);
2076 rxi_DestroyConnection(conn);
2077 #else /* RX_ENABLE_LOCKS */
2078 rxi_DestroyConnection(conn);
2079 #endif /* RX_ENABLE_LOCKS */
2083 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2084 char *rxi_Alloc(register size_t size)
2088 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2089 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2092 int glockOwner = ISAFS_GLOCK();
2096 MUTEX_ENTER(&rx_stats_mutex);
2097 rxi_Alloccnt++; rxi_Allocsize += size;
2098 MUTEX_EXIT(&rx_stats_mutex);
2099 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2100 if (size > AFS_SMALLOCSIZ) {
2101 p = (char *) osi_AllocMediumSpace(size);
2103 p = (char *) osi_AllocSmall(size, 1);
2104 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2109 p = (char *) osi_Alloc(size);
2111 if (!p) osi_Panic("rxi_Alloc error");
2116 void rxi_Free(void *addr, register size_t size)
2118 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2119 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2122 int glockOwner = ISAFS_GLOCK();
2126 MUTEX_ENTER(&rx_stats_mutex);
2127 rxi_Alloccnt--; rxi_Allocsize -= size;
2128 MUTEX_EXIT(&rx_stats_mutex);
2129 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2130 if (size > AFS_SMALLOCSIZ)
2131 osi_FreeMediumSpace(addr);
2133 osi_FreeSmall(addr);
2134 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2139 osi_Free(addr, size);
2143 /* Find the peer process represented by the supplied (host,port)
2144 * combination. If there is no appropriate active peer structure, a
2145 * new one will be allocated and initialized
2146 * The origPeer, if set, is a pointer to a peer structure on which the
2147 * refcount will be be decremented. This is used to replace the peer
2148 * structure hanging off a connection structure */
2149 struct rx_peer *rxi_FindPeer(register afs_uint32 host,
2150 register u_short port, struct rx_peer *origPeer, int create)
2152 register struct rx_peer *pp;
2154 hashIndex = PEER_HASH(host, port);
2155 MUTEX_ENTER(&rx_peerHashTable_lock);
2156 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2157 if ((pp->host == host) && (pp->port == port)) break;
2161 pp = rxi_AllocPeer(); /* This bzero's *pp */
2162 pp->host = host; /* set here or in InitPeerParams is zero */
2164 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2165 queue_Init(&pp->congestionQueue);
2166 queue_Init(&pp->rpcStats);
2167 pp->next = rx_peerHashTable[hashIndex];
2168 rx_peerHashTable[hashIndex] = pp;
2169 rxi_InitPeerParams(pp);
2170 MUTEX_ENTER(&rx_stats_mutex);
2171 rx_stats.nPeerStructs++;
2172 MUTEX_EXIT(&rx_stats_mutex);
2179 origPeer->refCount--;
2180 MUTEX_EXIT(&rx_peerHashTable_lock);
2185 /* Find the connection at (host, port) started at epoch, and with the
2186 * given connection id. Creates the server connection if necessary.
2187 * The type specifies whether a client connection or a server
2188 * connection is desired. In both cases, (host, port) specify the
2189 * peer's (host, pair) pair. Client connections are not made
2190 * automatically by this routine. The parameter socket gives the
2191 * socket descriptor on which the packet was received. This is used,
2192 * in the case of server connections, to check that *new* connections
2193 * come via a valid (port, serviceId). Finally, the securityIndex
2194 * parameter must match the existing index for the connection. If a
2195 * server connection is created, it will be created using the supplied
2196 * index, if the index is valid for this service */
2197 struct rx_connection *rxi_FindConnection(osi_socket socket,
2198 register afs_int32 host, register u_short port, u_short serviceId,
2199 afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2201 int hashindex, flag;
2202 register struct rx_connection *conn;
2203 hashindex = CONN_HASH(host, port, cid, epoch, type);
2204 MUTEX_ENTER(&rx_connHashTable_lock);
2205 rxLastConn ? (conn = rxLastConn, flag = 0) :
2206 (conn = rx_connHashTable[hashindex], flag = 1);
2208 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2209 && (epoch == conn->epoch)) {
2210 register struct rx_peer *pp = conn->peer;
2211 if (securityIndex != conn->securityIndex) {
2212 /* this isn't supposed to happen, but someone could forge a packet
2213 like this, and there seems to be some CM bug that makes this
2214 happen from time to time -- in which case, the fileserver
2216 MUTEX_EXIT(&rx_connHashTable_lock);
2217 return (struct rx_connection *) 0;
2219 if (pp->host == host && pp->port == port)
2221 if (type == RX_CLIENT_CONNECTION && pp->port == port)
2223 if (type == RX_CLIENT_CONNECTION && (conn->epoch & 0x80000000))
2228 /* the connection rxLastConn that was used the last time is not the
2229 ** one we are looking for now. Hence, start searching in the hash */
2231 conn = rx_connHashTable[hashindex];
2237 struct rx_service *service;
2238 if (type == RX_CLIENT_CONNECTION) {
2239 MUTEX_EXIT(&rx_connHashTable_lock);
2240 return (struct rx_connection *) 0;
2242 service = rxi_FindService(socket, serviceId);
2243 if (!service || (securityIndex >= service->nSecurityObjects)
2244 || (service->securityObjects[securityIndex] == 0)) {
2245 MUTEX_EXIT(&rx_connHashTable_lock);
2246 return (struct rx_connection *) 0;
2248 conn = rxi_AllocConnection(); /* This bzero's the connection */
2249 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2251 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2253 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2254 conn->next = rx_connHashTable[hashindex];
2255 rx_connHashTable[hashindex] = conn;
2256 conn->peer = rxi_FindPeer(host, port, 0, 1);
2257 conn->type = RX_SERVER_CONNECTION;
2258 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2259 conn->epoch = epoch;
2260 conn->cid = cid & RX_CIDMASK;
2261 /* conn->serial = conn->lastSerial = 0; */
2262 /* conn->timeout = 0; */
2263 conn->ackRate = RX_FAST_ACK_RATE;
2264 conn->service = service;
2265 conn->serviceId = serviceId;
2266 conn->securityIndex = securityIndex;
2267 conn->securityObject = service->securityObjects[securityIndex];
2268 conn->nSpecific = 0;
2269 conn->specific = NULL;
2270 rx_SetConnDeadTime(conn, service->connDeadTime);
2271 rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
2272 /* Notify security object of the new connection */
2273 RXS_NewConnection(conn->securityObject, conn);
2274 /* XXXX Connection timeout? */
2275 if (service->newConnProc) (*service->newConnProc)(conn);
2276 MUTEX_ENTER(&rx_stats_mutex);
2277 rx_stats.nServerConns++;
2278 MUTEX_EXIT(&rx_stats_mutex);
2281 MUTEX_ENTER(&conn->conn_data_lock);
2283 MUTEX_EXIT(&conn->conn_data_lock);
2285 rxLastConn = conn; /* store this connection as the last conn used */
2286 MUTEX_EXIT(&rx_connHashTable_lock);
2290 /* There are two packet tracing routines available for testing and monitoring
2291 * Rx. One is called just after every packet is received and the other is
2292 * called just before every packet is sent. Received packets, have had their
2293 * headers decoded, and packets to be sent have not yet had their headers
2294 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2295 * containing the network address. Both can be modified. The return value, if
2296 * non-zero, indicates that the packet should be dropped. */
2298 int (*rx_justReceived)() = 0;
2299 int (*rx_almostSent)() = 0;
2301 /* A packet has been received off the interface. Np is the packet, socket is
2302 * the socket number it was received from (useful in determining which service
2303 * this packet corresponds to), and (host, port) reflect the host,port of the
2304 * sender. This call returns the packet to the caller if it is finished with
2305 * it, rather than de-allocating it, just as a small performance hack */
2307 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np,
2308 osi_socket socket, afs_uint32 host, u_short port,
2309 int *tnop, struct rx_call **newcallp)
2311 register struct rx_call *call;
2312 register struct rx_connection *conn;
2314 afs_uint32 currentCallNumber;
2320 struct rx_packet *tnp;
2323 /* We don't print out the packet until now because (1) the time may not be
2324 * accurate enough until now in the lwp implementation (rx_Listener only gets
2325 * the time after the packet is read) and (2) from a protocol point of view,
2326 * this is the first time the packet has been seen */
2327 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2328 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2329 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2330 np->header.serial, packetType, host, port, np->header.serviceId,
2331 np->header.epoch, np->header.cid, np->header.callNumber,
2332 np->header.seq, np->header.flags, np));
2335 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2336 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2339 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2340 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2343 /* If an input tracer function is defined, call it with the packet and
2344 * network address. Note this function may modify its arguments. */
2345 if (rx_justReceived) {
2346 struct sockaddr_in addr;
2348 addr.sin_family = AF_INET;
2349 addr.sin_port = port;
2350 addr.sin_addr.s_addr = host;
2351 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2352 addr.sin_len = sizeof(addr);
2353 #endif /* AFS_OSF_ENV */
2354 drop = (*rx_justReceived) (np, &addr);
2355 /* drop packet if return value is non-zero */
2356 if (drop) return np;
2357 port = addr.sin_port; /* in case fcn changed addr */
2358 host = addr.sin_addr.s_addr;
2362 /* If packet was not sent by the client, then *we* must be the client */
2363 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2364 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2366 /* Find the connection (or fabricate one, if we're the server & if
2367 * necessary) associated with this packet */
2368 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2369 np->header.cid, np->header.epoch, type,
2370 np->header.securityIndex);
2373 /* If no connection found or fabricated, just ignore the packet.
2374 * (An argument could be made for sending an abort packet for
2379 MUTEX_ENTER(&conn->conn_data_lock);
2380 if (conn->maxSerial < np->header.serial)
2381 conn->maxSerial = np->header.serial;
2382 MUTEX_EXIT(&conn->conn_data_lock);
2384 /* If the connection is in an error state, send an abort packet and ignore
2385 * the incoming packet */
2387 /* Don't respond to an abort packet--we don't want loops! */
2388 MUTEX_ENTER(&conn->conn_data_lock);
2389 if (np->header.type != RX_PACKET_TYPE_ABORT)
2390 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2392 MUTEX_EXIT(&conn->conn_data_lock);
2396 /* Check for connection-only requests (i.e. not call specific). */
2397 if (np->header.callNumber == 0) {
2398 switch (np->header.type) {
2399 case RX_PACKET_TYPE_ABORT:
2400 /* What if the supplied error is zero? */
2401 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2402 MUTEX_ENTER(&conn->conn_data_lock);
2404 MUTEX_EXIT(&conn->conn_data_lock);
2406 case RX_PACKET_TYPE_CHALLENGE:
2407 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2408 MUTEX_ENTER(&conn->conn_data_lock);
2410 MUTEX_EXIT(&conn->conn_data_lock);
2412 case RX_PACKET_TYPE_RESPONSE:
2413 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2414 MUTEX_ENTER(&conn->conn_data_lock);
2416 MUTEX_EXIT(&conn->conn_data_lock);
2418 case RX_PACKET_TYPE_PARAMS:
2419 case RX_PACKET_TYPE_PARAMS+1:
2420 case RX_PACKET_TYPE_PARAMS+2:
2421 /* ignore these packet types for now */
2422 MUTEX_ENTER(&conn->conn_data_lock);
2424 MUTEX_EXIT(&conn->conn_data_lock);
2429 /* Should not reach here, unless the peer is broken: send an
2431 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2432 MUTEX_ENTER(&conn->conn_data_lock);
2433 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2435 MUTEX_EXIT(&conn->conn_data_lock);
2440 channel = np->header.cid & RX_CHANNELMASK;
2441 call = conn->call[channel];
2442 #ifdef RX_ENABLE_LOCKS
2444 MUTEX_ENTER(&call->lock);
2445 /* Test to see if call struct is still attached to conn. */
2446 if (call != conn->call[channel]) {
2448 MUTEX_EXIT(&call->lock);
2449 if (type == RX_SERVER_CONNECTION) {
2450 call = conn->call[channel];
2451 /* If we started with no call attached and there is one now,
2452 * another thread is also running this routine and has gotten
2453 * the connection channel. We should drop this packet in the tests
2454 * below. If there was a call on this connection and it's now
2455 * gone, then we'll be making a new call below.
2456 * If there was previously a call and it's now different then
2457 * the old call was freed and another thread running this routine
2458 * has created a call on this channel. One of these two threads
2459 * has a packet for the old call and the code below handles those
2463 MUTEX_ENTER(&call->lock);
2466 /* This packet can't be for this call. If the new call address is
2467 * 0 then no call is running on this channel. If there is a call
2468 * then, since this is a client connection we're getting data for
2469 * it must be for the previous call.
2471 MUTEX_ENTER(&rx_stats_mutex);
2472 rx_stats.spuriousPacketsRead++;
2473 MUTEX_EXIT(&rx_stats_mutex);
2474 MUTEX_ENTER(&conn->conn_data_lock);
2476 MUTEX_EXIT(&conn->conn_data_lock);
2481 currentCallNumber = conn->callNumber[channel];
2483 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2484 if (np->header.callNumber < currentCallNumber) {
2485 MUTEX_ENTER(&rx_stats_mutex);
2486 rx_stats.spuriousPacketsRead++;
2487 MUTEX_EXIT(&rx_stats_mutex);
2488 #ifdef RX_ENABLE_LOCKS
2490 MUTEX_EXIT(&call->lock);
2492 MUTEX_ENTER(&conn->conn_data_lock);
2494 MUTEX_EXIT(&conn->conn_data_lock);
2498 MUTEX_ENTER(&conn->conn_call_lock);
2499 call = rxi_NewCall(conn, channel);
2500 MUTEX_EXIT(&conn->conn_call_lock);
2501 *call->callNumber = np->header.callNumber;
2502 call->state = RX_STATE_PRECALL;
2503 clock_GetTime(&call->queueTime);
2504 hzero(call->bytesSent);
2505 hzero(call->bytesRcvd);
2506 rxi_KeepAliveOn(call);
2508 else if (np->header.callNumber != currentCallNumber) {
2509 /* Wait until the transmit queue is idle before deciding
2510 * whether to reset the current call. Chances are that the
2511 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2514 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2515 while ((call->state == RX_STATE_ACTIVE) &&
2516 (call->flags & RX_CALL_TQ_BUSY)) {
2517 call->flags |= RX_CALL_TQ_WAIT;
2518 #ifdef RX_ENABLE_LOCKS
2519 CV_WAIT(&call->cv_tq, &call->lock);
2520 #else /* RX_ENABLE_LOCKS */
2521 osi_rxSleep(&call->tq);
2522 #endif /* RX_ENABLE_LOCKS */
2524 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2525 /* If the new call cannot be taken right now send a busy and set
2526 * the error condition in this call, so that it terminates as
2527 * quickly as possible */
2528 if (call->state == RX_STATE_ACTIVE) {
2529 struct rx_packet *tp;
2531 rxi_CallError(call, RX_CALL_DEAD);
2532 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2533 MUTEX_EXIT(&call->lock);
2534 MUTEX_ENTER(&conn->conn_data_lock);
2536 MUTEX_EXIT(&conn->conn_data_lock);
2539 rxi_ResetCall(call, 0);
2540 *call->callNumber = np->header.callNumber;
2541 call->state = RX_STATE_PRECALL;
2542 clock_GetTime(&call->queueTime);
2543 hzero(call->bytesSent);
2544 hzero(call->bytesRcvd);
2546 * If the number of queued calls exceeds the overload
2547 * threshold then abort this call.
2549 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2550 struct rx_packet *tp;
2552 rxi_CallError(call, rx_BusyError);
2553 tp = rxi_SendCallAbort(call, np, 1, 0);
2554 MUTEX_EXIT(&call->lock);
2555 MUTEX_ENTER(&conn->conn_data_lock);
2557 MUTEX_EXIT(&conn->conn_data_lock);
2560 rxi_KeepAliveOn(call);
2563 /* Continuing call; do nothing here. */
2565 } else { /* we're the client */
2566 /* Ignore all incoming acknowledgements for calls in DALLY state */
2567 if ( call && (call->state == RX_STATE_DALLY)
2568 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2569 MUTEX_ENTER(&rx_stats_mutex);
2570 rx_stats.ignorePacketDally++;
2571 MUTEX_EXIT(&rx_stats_mutex);
2572 #ifdef RX_ENABLE_LOCKS
2574 MUTEX_EXIT(&call->lock);
2577 MUTEX_ENTER(&conn->conn_data_lock);
2579 MUTEX_EXIT(&conn->conn_data_lock);
2583 /* Ignore anything that's not relevant to the current call. If there
2584 * isn't a current call, then no packet is relevant. */
2585 if (!call || (np->header.callNumber != currentCallNumber)) {
2586 MUTEX_ENTER(&rx_stats_mutex);
2587 rx_stats.spuriousPacketsRead++;
2588 MUTEX_EXIT(&rx_stats_mutex);
2589 #ifdef RX_ENABLE_LOCKS
2591 MUTEX_EXIT(&call->lock);
2594 MUTEX_ENTER(&conn->conn_data_lock);
2596 MUTEX_EXIT(&conn->conn_data_lock);
2599 /* If the service security object index stamped in the packet does not
2600 * match the connection's security index, ignore the packet */
2601 if (np->header.securityIndex != conn->securityIndex) {
2602 #ifdef RX_ENABLE_LOCKS
2603 MUTEX_EXIT(&call->lock);
2605 MUTEX_ENTER(&conn->conn_data_lock);
2607 MUTEX_EXIT(&conn->conn_data_lock);
2611 /* If we're receiving the response, then all transmit packets are
2612 * implicitly acknowledged. Get rid of them. */
2613 if (np->header.type == RX_PACKET_TYPE_DATA) {
2614 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2615 /* XXX Hack. Because we must release the global rx lock when
2616 * sending packets (osi_NetSend) we drop all acks while we're
2617 * traversing the tq in rxi_Start sending packets out because
2618 * packets may move to the freePacketQueue as result of being here!
2619 * So we drop these packets until we're safely out of the
2620 * traversing. Really ugly!
2621 * For fine grain RX locking, we set the acked field in the
2622 * packets and let rxi_Start remove them from the transmit queue.
2624 if (call->flags & RX_CALL_TQ_BUSY) {
2625 #ifdef RX_ENABLE_LOCKS
2626 rxi_SetAcksInTransmitQueue(call);
2629 return np; /* xmitting; drop packet */
2633 rxi_ClearTransmitQueue(call, 0);
2635 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2636 rxi_ClearTransmitQueue(call, 0);
2637 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2639 if (np->header.type == RX_PACKET_TYPE_ACK) {
2640 /* now check to see if this is an ack packet acknowledging that the
2641 * server actually *lost* some hard-acked data. If this happens we
2642 * ignore this packet, as it may indicate that the server restarted in
2643 * the middle of a call. It is also possible that this is an old ack
2644 * packet. We don't abort the connection in this case, because this
2645 * *might* just be an old ack packet. The right way to detect a server
2646 * restart in the midst of a call is to notice that the server epoch
2648 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2649 * XXX unacknowledged. I think that this is off-by-one, but
2650 * XXX I don't dare change it just yet, since it will
2651 * XXX interact badly with the server-restart detection
2652 * XXX code in receiveackpacket. */
2653 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2654 MUTEX_ENTER(&rx_stats_mutex);
2655 rx_stats.spuriousPacketsRead++;
2656 MUTEX_EXIT(&rx_stats_mutex);
2657 MUTEX_EXIT(&call->lock);
2658 MUTEX_ENTER(&conn->conn_data_lock);
2660 MUTEX_EXIT(&conn->conn_data_lock);
2664 } /* else not a data packet */
2667 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2668 /* Set remote user defined status from packet */
2669 call->remoteStatus = np->header.userStatus;
2671 /* Note the gap between the expected next packet and the actual
2672 * packet that arrived, when the new packet has a smaller serial number
2673 * than expected. Rioses frequently reorder packets all by themselves,
2674 * so this will be quite important with very large window sizes.
2675 * Skew is checked against 0 here to avoid any dependence on the type of
2676 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2678 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2679 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2680 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2682 MUTEX_ENTER(&conn->conn_data_lock);
2683 skew = conn->lastSerial - np->header.serial;
2684 conn->lastSerial = np->header.serial;
2685 MUTEX_EXIT(&conn->conn_data_lock);
2687 register struct rx_peer *peer;
2689 if (skew > peer->inPacketSkew) {
2690 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2691 peer->inPacketSkew = skew;
2695 /* Now do packet type-specific processing */
2696 switch (np->header.type) {
2697 case RX_PACKET_TYPE_DATA:
2698 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2701 case RX_PACKET_TYPE_ACK:
2702 /* Respond immediately to ack packets requesting acknowledgement
2704 if (np->header.flags & RX_REQUEST_ACK) {
2706 (void) rxi_SendCallAbort(call, 0, 1, 0);
2708 (void) rxi_SendAck(call, 0, np->header.serial,
2709 RX_ACK_PING_RESPONSE, 1);
2711 np = rxi_ReceiveAckPacket(call, np, 1);
2713 case RX_PACKET_TYPE_ABORT:
2714 /* An abort packet: reset the connection, passing the error up to
2716 /* What if error is zero? */
2717 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2719 case RX_PACKET_TYPE_BUSY:
2722 case RX_PACKET_TYPE_ACKALL:
2723 /* All packets acknowledged, so we can drop all packets previously
2724 * readied for sending */
2725 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2726 /* XXX Hack. We because we can't release the global rx lock when
2727 * sending packets (osi_NetSend) we drop all ack pkts while we're
2728 * traversing the tq in rxi_Start sending packets out because
2729 * packets may move to the freePacketQueue as result of being
2730 * here! So we drop these packets until we're safely out of the
2731 * traversing. Really ugly!
2732 * For fine grain RX locking, we set the acked field in the packets
2733 * and let rxi_Start remove the packets from the transmit queue.
2735 if (call->flags & RX_CALL_TQ_BUSY) {
2736 #ifdef RX_ENABLE_LOCKS
2737 rxi_SetAcksInTransmitQueue(call);
2739 #else /* RX_ENABLE_LOCKS */
2741 return np; /* xmitting; drop packet */
2742 #endif /* RX_ENABLE_LOCKS */
2744 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2745 rxi_ClearTransmitQueue(call, 0);
2748 /* Should not reach here, unless the peer is broken: send an abort
2750 rxi_CallError(call, RX_PROTOCOL_ERROR);
2751 np = rxi_SendCallAbort(call, np, 1, 0);
2754 /* Note when this last legitimate packet was received, for keep-alive
2755 * processing. Note, we delay getting the time until now in the hope that
2756 * the packet will be delivered to the user before any get time is required
2757 * (if not, then the time won't actually be re-evaluated here). */
2758 call->lastReceiveTime = clock_Sec();
2759 MUTEX_EXIT(&call->lock);
2760 MUTEX_ENTER(&conn->conn_data_lock);
2762 MUTEX_EXIT(&conn->conn_data_lock);
2766 /* return true if this is an "interesting" connection from the point of view
2767 of someone trying to debug the system */
2768 int rxi_IsConnInteresting(struct rx_connection *aconn)
2771 register struct rx_call *tcall;
2773 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2775 for(i=0;i<RX_MAXCALLS;i++) {
2776 tcall = aconn->call[i];
2778 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2780 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2788 /* if this is one of the last few packets AND it wouldn't be used by the
2789 receiving call to immediately satisfy a read request, then drop it on
2790 the floor, since accepting it might prevent a lock-holding thread from
2791 making progress in its reading. If a call has been cleared while in
2792 the precall state then ignore all subsequent packets until the call
2793 is assigned to a thread. */
2795 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2798 MUTEX_ENTER(&rx_stats_mutex);
2799 if (((ap->header.seq != 1) &&
2800 (acall->flags & RX_CALL_CLEARED) &&
2801 (acall->state == RX_STATE_PRECALL)) ||
2802 ((rx_nFreePackets < rxi_dataQuota+2) &&
2803 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2804 && (acall->flags & RX_CALL_READER_WAIT)))) {
2807 MUTEX_EXIT(&rx_stats_mutex);
2812 static void rxi_CheckReachEvent(struct rxevent *event,
2813 struct rx_connection *conn, struct rx_call *acall)
2815 struct rx_call *call = acall;
2819 MUTEX_ENTER(&conn->conn_data_lock);
2820 conn->checkReachEvent = NULL;
2821 waiting = conn->flags & RX_CONN_ATTACHWAIT;
2822 if (event) conn->refCount--;
2823 MUTEX_EXIT(&conn->conn_data_lock);
2827 MUTEX_ENTER(&conn->conn_call_lock);
2828 MUTEX_ENTER(&conn->conn_data_lock);
2829 for (i=0; i<RX_MAXCALLS; i++) {
2830 struct rx_call *tc = conn->call[i];
2831 if (tc && tc->state == RX_STATE_PRECALL) {
2837 /* Indicate that rxi_CheckReachEvent is no longer running by
2838 * clearing the flag. Must be atomic under conn_data_lock to
2839 * avoid a new call slipping by: rxi_CheckConnReach holds
2840 * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2842 conn->flags &= ~RX_CONN_ATTACHWAIT;
2843 MUTEX_EXIT(&conn->conn_data_lock);
2844 MUTEX_EXIT(&conn->conn_call_lock);
2848 if (call != acall) MUTEX_ENTER(&call->lock);
2849 rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
2850 if (call != acall) MUTEX_EXIT(&call->lock);
2852 clock_GetTime(&when);
2853 when.sec += RX_CHECKREACH_TIMEOUT;
2854 MUTEX_ENTER(&conn->conn_data_lock);
2855 if (!conn->checkReachEvent) {
2857 conn->checkReachEvent =
2858 rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2860 MUTEX_EXIT(&conn->conn_data_lock);
2865 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2867 struct rx_service *service = conn->service;
2868 struct rx_peer *peer = conn->peer;
2869 afs_uint32 now, lastReach;
2871 if (service->checkReach == 0)
2875 MUTEX_ENTER(&peer->peer_lock);
2876 lastReach = peer->lastReachTime;
2877 MUTEX_EXIT(&peer->peer_lock);
2878 if (now - lastReach < RX_CHECKREACH_TTL)
2881 MUTEX_ENTER(&conn->conn_data_lock);
2882 if (conn->flags & RX_CONN_ATTACHWAIT) {
2883 MUTEX_EXIT(&conn->conn_data_lock);
2886 conn->flags |= RX_CONN_ATTACHWAIT;
2887 MUTEX_EXIT(&conn->conn_data_lock);
2888 if (!conn->checkReachEvent)
2889 rxi_CheckReachEvent(NULL, conn, call);
2894 /* try to attach call, if authentication is complete */
2895 static void TryAttach(register struct rx_call *acall,
2896 register osi_socket socket, register int *tnop,
2897 register struct rx_call **newcallp, int reachOverride)
2899 struct rx_connection *conn = acall->conn;
2901 if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2902 /* Don't attach until we have any req'd. authentication. */
2903 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2904 if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2905 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2906 /* Note: this does not necessarily succeed; there
2907 * may not any proc available
2911 rxi_ChallengeOn(acall->conn);
2916 /* A data packet has been received off the interface. This packet is
2917 * appropriate to the call (the call is in the right state, etc.). This
2918 * routine can return a packet to the caller, for re-use */
2920 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call,
2921 register struct rx_packet *np, int istack, osi_socket socket,
2922 afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2924 int ackNeeded = 0; /* 0 means no, otherwise ack_reason */
2928 afs_uint32 seq, serial, flags;
2930 struct rx_packet *tnp;
2932 MUTEX_ENTER(&rx_stats_mutex);
2933 rx_stats.dataPacketsRead++;
2934 MUTEX_EXIT(&rx_stats_mutex);
2937 /* If there are no packet buffers, drop this new packet, unless we can find
2938 * packet buffers from inactive calls */
2940 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2941 MUTEX_ENTER(&rx_freePktQ_lock);
2942 rxi_NeedMorePackets = TRUE;
2943 MUTEX_EXIT(&rx_freePktQ_lock);
2944 MUTEX_ENTER(&rx_stats_mutex);
2945 rx_stats.noPacketBuffersOnRead++;
2946 MUTEX_EXIT(&rx_stats_mutex);
2947 call->rprev = np->header.serial;
2948 rxi_calltrace(RX_TRACE_DROP, call);
2949 dpf (("packet %x dropped on receipt - quota problems", np));
2951 rxi_ClearReceiveQueue(call);
2952 clock_GetTime(&when);
2953 clock_Add(&when, &rx_softAckDelay);
2954 if (!call->delayedAckEvent ||
2955 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2956 rxevent_Cancel(call->delayedAckEvent, call,
2957 RX_CALL_REFCOUNT_DELAY);
2958 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2959 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2962 /* we've damaged this call already, might as well do it in. */
2968 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2969 * packet is one of several packets transmitted as a single
2970 * datagram. Do not send any soft or hard acks until all packets
2971 * in a jumbogram have been processed. Send negative acks right away.
2973 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2974 /* tnp is non-null when there are more packets in the
2975 * current jumbo gram */
2982 seq = np->header.seq;
2983 serial = np->header.serial;
2984 flags = np->header.flags;
2986 /* If the call is in an error state, send an abort message */
2988 return rxi_SendCallAbort(call, np, istack, 0);
2990 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2991 * AFS 3.5 jumbogram. */
2992 if (flags & RX_JUMBO_PACKET) {
2993 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2998 if (np->header.spare != 0) {
2999 MUTEX_ENTER(&call->conn->conn_data_lock);
3000 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3001 MUTEX_EXIT(&call->conn->conn_data_lock);
3004 /* The usual case is that this is the expected next packet */
3005 if (seq == call->rnext) {
3007 /* Check to make sure it is not a duplicate of one already queued */
3008 if (queue_IsNotEmpty(&call->rq)
3009 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3010 MUTEX_ENTER(&rx_stats_mutex);
3011 rx_stats.dupPacketsRead++;
3012 MUTEX_EXIT(&rx_stats_mutex);
3013 dpf (("packet %x dropped on receipt - duplicate", np));
3014 rxevent_Cancel(call->delayedAckEvent, call,
3015 RX_CALL_REFCOUNT_DELAY);
3016 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3022 /* It's the next packet. Stick it on the receive queue
3023 * for this call. Set newPackets to make sure we wake
3024 * the reader once all packets have been processed */
3025 queue_Prepend(&call->rq, np);
3027 np = NULL; /* We can't use this anymore */
3030 /* If an ack is requested then set a flag to make sure we
3031 * send an acknowledgement for this packet */
3032 if (flags & RX_REQUEST_ACK) {
3033 ackNeeded = RX_ACK_REQUESTED;
3036 /* Keep track of whether we have received the last packet */
3037 if (flags & RX_LAST_PACKET) {
3038 call->flags |= RX_CALL_HAVE_LAST;
3042 /* Check whether we have all of the packets for this call */
3043 if (call->flags & RX_CALL_HAVE_LAST) {
3044 afs_uint32 tseq; /* temporary sequence number */
3045 struct rx_packet *tp; /* Temporary packet pointer */
3046 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3048 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3049 if (tseq != tp->header.seq)
3051 if (tp->header.flags & RX_LAST_PACKET) {
3052 call->flags |= RX_CALL_RECEIVE_DONE;
3059 /* Provide asynchronous notification for those who want it
3060 * (e.g. multi rx) */
3061 if (call->arrivalProc) {
3062 (*call->arrivalProc)(call, call->arrivalProcHandle,
3063 (int) call->arrivalProcArg);
3064 call->arrivalProc = (VOID (*)()) 0;
3067 /* Update last packet received */
3070 /* If there is no server process serving this call, grab
3071 * one, if available. We only need to do this once. If a
3072 * server thread is available, this thread becomes a server
3073 * thread and the server thread becomes a listener thread. */
3075 TryAttach(call, socket, tnop, newcallp, 0);
3078 /* This is not the expected next packet. */
3080 /* Determine whether this is a new or old packet, and if it's
3081 * a new one, whether it fits into the current receive window.
3082 * Also figure out whether the packet was delivered in sequence.
3083 * We use the prev variable to determine whether the new packet
3084 * is the successor of its immediate predecessor in the
3085 * receive queue, and the missing flag to determine whether
3086 * any of this packets predecessors are missing. */
3088 afs_uint32 prev; /* "Previous packet" sequence number */
3089 struct rx_packet *tp; /* Temporary packet pointer */
3090 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3091 int missing; /* Are any predecessors missing? */
3093 /* If the new packet's sequence number has been sent to the
3094 * application already, then this is a duplicate */
3095 if (seq < call->rnext) {
3096 MUTEX_ENTER(&rx_stats_mutex);
3097 rx_stats.dupPacketsRead++;
3098 MUTEX_EXIT(&rx_stats_mutex);
3099 rxevent_Cancel(call->delayedAckEvent, call,
3100 RX_CALL_REFCOUNT_DELAY);
3101 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3107 /* If the sequence number is greater than what can be
3108 * accomodated by the current window, then send a negative
3109 * acknowledge and drop the packet */
3110 if ((call->rnext + call->rwind) <= seq) {
3111 rxevent_Cancel(call->delayedAckEvent, call,
3112 RX_CALL_REFCOUNT_DELAY);
3113 np = rxi_SendAck(call, np, serial,
3114 RX_ACK_EXCEEDS_WINDOW, istack);
3120 /* Look for the packet in the queue of old received packets */
3121 for (prev = call->rnext - 1, missing = 0,
3122 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3123 /*Check for duplicate packet */
3124 if (seq == tp->header.seq) {
3125 MUTEX_ENTER(&rx_stats_mutex);
3126 rx_stats.dupPacketsRead++;
3127 MUTEX_EXIT(&rx_stats_mutex);
3128 rxevent_Cancel(call->delayedAckEvent, call,
3129 RX_CALL_REFCOUNT_DELAY);
3130 np = rxi_SendAck(call, np, serial,
3131 RX_ACK_DUPLICATE, istack);
3136 /* If we find a higher sequence packet, break out and
3137 * insert the new packet here. */
3138 if (seq < tp->header.seq) break;
3139 /* Check for missing packet */
3140 if (tp->header.seq != prev+1) {
3144 prev = tp->header.seq;
3147 /* Keep track of whether we have received the last packet. */
3148 if (flags & RX_LAST_PACKET) {
3149 call->flags |= RX_CALL_HAVE_LAST;
3152 /* It's within the window: add it to the the receive queue.
3153 * tp is left by the previous loop either pointing at the
3154 * packet before which to insert the new packet, or at the
3155 * queue head if the queue is empty or the packet should be
3157 queue_InsertBefore(tp, np);
3161 /* Check whether we have all of the packets for this call */
3162 if ((call->flags & RX_CALL_HAVE_LAST)
3163 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3164 afs_uint32 tseq; /* temporary sequence number */
3166 for (tseq = call->rnext,
3167 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3168 if (tseq != tp->header.seq)
3170 if (tp->header.flags & RX_LAST_PACKET) {
3171 call->flags |= RX_CALL_RECEIVE_DONE;
3178 /* We need to send an ack of the packet is out of sequence,
3179 * or if an ack was requested by the peer. */
3180 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3181 ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
3184 /* Acknowledge the last packet for each call */
3185 if (flags & RX_LAST_PACKET) {
3196 * If the receiver is waiting for an iovec, fill the iovec
3197 * using the data from the receive queue */
3198 if (call->flags & RX_CALL_IOVEC_WAIT) {
3199 didHardAck = rxi_FillReadVec(call, serial);
3200 /* the call may have been aborted */
3209 /* Wakeup the reader if any */
3210 if ((call->flags & RX_CALL_READER_WAIT) &&
3211 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3212 (call->iovNext >= call->iovMax) ||
3213 (call->flags & RX_CALL_RECEIVE_DONE))) {
3214 call->flags &= ~RX_CALL_READER_WAIT;
3215 #ifdef RX_ENABLE_LOCKS
3216 CV_BROADCAST(&call->cv_rq);
3218 osi_rxWakeup(&call->rq);
3224 * Send an ack when requested by the peer, or once every
3225 * rxi_SoftAckRate packets until the last packet has been
3226 * received. Always send a soft ack for the last packet in
3227 * the server's reply. */
3229 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3230 np = rxi_SendAck(call, np, serial, ackNeeded, istack);
3231 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3232 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3233 np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
3234 } else if (call->nSoftAcks) {
3235 clock_GetTime(&when);
3236 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3237 clock_Add(&when, &rx_lastAckDelay);
3239 clock_Add(&when, &rx_softAckDelay);
3241 if (!call->delayedAckEvent ||
3242 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3243 rxevent_Cancel(call->delayedAckEvent, call,
3244 RX_CALL_REFCOUNT_DELAY);
3245 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3246 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3249 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3250 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3257 static void rxi_ComputeRate();
3260 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3262 struct rx_peer *peer = conn->peer;
3264 MUTEX_ENTER(&peer->peer_lock);
3265 peer->lastReachTime = clock_Sec();
3266 MUTEX_EXIT(&peer->peer_lock);
3268 MUTEX_ENTER(&conn->conn_data_lock);
3269 if (conn->flags & RX_CONN_ATTACHWAIT) {
3272 conn->flags &= ~RX_CONN_ATTACHWAIT;
3273 MUTEX_EXIT(&conn->conn_data_lock);
3275 for (i=0; i<RX_MAXCALLS; i++) {
3276 struct rx_call *call = conn->call[i];
3278 if (call != acall) MUTEX_ENTER(&call->lock);
3279 /* tnop can be null if newcallp is null */
3280 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3281 if (call != acall) MUTEX_EXIT(&call->lock);
3285 MUTEX_EXIT(&conn->conn_data_lock);
3288 /* rxi_ComputePeerNetStats
3290 * Called exclusively by rxi_ReceiveAckPacket to compute network link
3291 * estimates (like RTT and throughput) based on ack packets. Caller
3292 * must ensure that the packet in question is the right one (i.e.
3293 * serial number matches).
3296 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3297 struct rx_ackPacket *ap, struct rx_packet *np)
3299 struct rx_peer *peer = call->conn->peer;
3301 /* Use RTT if not delayed by client. */
3302 if (ap->reason != RX_ACK_DELAY)
3303 rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3305 rxi_ComputeRate(peer, call, p, np, ap->reason);
3309 /* The real smarts of the whole thing. */
3310 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call,
3311 struct rx_packet *np, int istack)
3313 struct rx_ackPacket *ap;
3315 register struct rx_packet *tp;
3316 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3317 register struct rx_connection *conn = call->conn;
3318 struct rx_peer *peer = conn->peer;
3321 /* because there are CM's that are bogus, sending weird values for this. */
3322 afs_uint32 skew = 0;
3327 int newAckCount = 0;
3328 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3329 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3331 MUTEX_ENTER(&rx_stats_mutex);
3332 rx_stats.ackPacketsRead++;
3333 MUTEX_EXIT(&rx_stats_mutex);
3334 ap = (struct rx_ackPacket *) rx_DataOf(np);
3335 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3337 return np; /* truncated ack packet */
3339 /* depends on ack packet struct */
3340 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3341 first = ntohl(ap->firstPacket);
3342 serial = ntohl(ap->serial);
3343 /* temporarily disabled -- needs to degrade over time
3344 skew = ntohs(ap->maxSkew); */
3346 /* Ignore ack packets received out of order */
3347 if (first < call->tfirst) {
3351 if (np->header.flags & RX_SLOW_START_OK) {
3352 call->flags |= RX_CALL_SLOW_START_OK;
3355 if (ap->reason == RX_ACK_PING_RESPONSE)
3356 rxi_UpdatePeerReach(conn, call);
3361 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3362 ap->reason, ntohl(ap->previousPacket),
3363 (unsigned int) np->header.seq, (unsigned int) serial,
3364 (unsigned int) skew, ntohl(ap->firstPacket));
3367 for (offset = 0; offset < nAcks; offset++)
3368 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3374 /* Update the outgoing packet skew value to the latest value of
3375 * the peer's incoming packet skew value. The ack packet, of
3376 * course, could arrive out of order, but that won't affect things
3378 MUTEX_ENTER(&peer->peer_lock);
3379 peer->outPacketSkew = skew;
3381 /* Check for packets that no longer need to be transmitted, and
3382 * discard them. This only applies to packets positively
3383 * acknowledged as having been sent to the peer's upper level.
3384 * All other packets must be retained. So only packets with
3385 * sequence numbers < ap->firstPacket are candidates. */
3386 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3387 if (tp->header.seq >= first) break;
3388 call->tfirst = tp->header.seq + 1;
3389 if (serial && (tp->header.serial == serial ||
3390 tp->firstSerial == serial))
3391 rxi_ComputePeerNetStats(call, tp, ap, np);
3392 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3393 /* XXX Hack. Because we have to release the global rx lock when sending
3394 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3395 * in rxi_Start sending packets out because packets may move to the
3396 * freePacketQueue as result of being here! So we drop these packets until
3397 * we're safely out of the traversing. Really ugly!
3398 * To make it even uglier, if we're using fine grain locking, we can
3399 * set the ack bits in the packets and have rxi_Start remove the packets
3400 * when it's done transmitting.
3402 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3405 if (call->flags & RX_CALL_TQ_BUSY) {
3406 #ifdef RX_ENABLE_LOCKS
3407 tp->flags |= RX_PKTFLAG_ACKED;
3408 call->flags |= RX_CALL_TQ_SOME_ACKED;
3409 #else /* RX_ENABLE_LOCKS */
3411 #endif /* RX_ENABLE_LOCKS */
3413 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3416 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3421 /* Give rate detector a chance to respond to ping requests */
3422 if (ap->reason == RX_ACK_PING_RESPONSE) {
3423 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3427 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3429 /* Now go through explicit acks/nacks and record the results in
3430 * the waiting packets. These are packets that can't be released
3431 * yet, even with a positive acknowledge. This positive
3432 * acknowledge only means the packet has been received by the
3433 * peer, not that it will be retained long enough to be sent to
3434 * the peer's upper level. In addition, reset the transmit timers
3435 * of any missing packets (those packets that must be missing
3436 * because this packet was out of sequence) */
3438 call->nSoftAcked = 0;
3439 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3440 /* Update round trip time if the ack was stimulated on receipt
3442 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3443 #ifdef RX_ENABLE_LOCKS
3444 if (tp->header.seq >= first)
3445 #endif /* RX_ENABLE_LOCKS */
3446 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3447 if (serial && (tp->header.serial == serial ||
3448 tp->firstSerial == serial))
3449 rxi_ComputePeerNetStats(call, tp, ap, np);
3451 /* Set the acknowledge flag per packet based on the
3452 * information in the ack packet. An acknowlegded packet can
3453 * be downgraded when the server has discarded a packet it
3454 * soacked previously, or when an ack packet is received
3455 * out of sequence. */
3456 if (tp->header.seq < first) {
3457 /* Implicit ack information */
3458 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3461 tp->flags |= RX_PKTFLAG_ACKED;
3463 else if (tp->header.seq < first + nAcks) {
3464 /* Explicit ack information: set it in the packet appropriately */
3465 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3466 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3468 tp->flags |= RX_PKTFLAG_ACKED;
3476 tp->flags &= ~RX_PKTFLAG_ACKED;
3481 tp->flags &= ~RX_PKTFLAG_ACKED;
3485 /* If packet isn't yet acked, and it has been transmitted at least
3486 * once, reset retransmit time using latest timeout
3487 * ie, this should readjust the retransmit timer for all outstanding
3488 * packets... So we don't just retransmit when we should know better*/
3490 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3491 tp->retryTime = tp->timeSent;
3492 clock_Add(&tp->retryTime, &peer->timeout);
3493 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3494 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3498 /* If the window has been extended by this acknowledge packet,
3499 * then wakeup a sender waiting in alloc for window space, or try
3500 * sending packets now, if he's been sitting on packets due to
3501 * lack of window space */
3502 if (call->tnext < (call->tfirst + call->twind)) {
3503 #ifdef RX_ENABLE_LOCKS
3504 CV_SIGNAL(&call->cv_twind);
3506 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3507 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3508 osi_rxWakeup(&call->twind);
3511 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3512 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3516 /* if the ack packet has a receivelen field hanging off it,
3517 * update our state */
3518 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3521 /* If the ack packet has a "recommended" size that is less than
3522 * what I am using now, reduce my size to match */
3523 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3524 sizeof(afs_int32), &tSize);
3525 tSize = (afs_uint32) ntohl(tSize);
3526 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3528 /* Get the maximum packet size to send to this peer */
3529 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3531 tSize = (afs_uint32)ntohl(tSize);
3532 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3533 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3535 /* sanity check - peer might have restarted with different params.
3536 * If peer says "send less", dammit, send less... Peer should never
3537 * be unable to accept packets of the size that prior AFS versions would
3538 * send without asking. */
3539 if (peer->maxMTU != tSize) {
3540 peer->maxMTU = tSize;
3541 peer->MTU = MIN(tSize, peer->MTU);
3542 call->MTU = MIN(call->MTU, tSize);
3546 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3548 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3549 sizeof(afs_int32), &tSize);
3550 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3551 if (tSize < call->twind) { /* smaller than our send */
3552 call->twind = tSize; /* window, we must send less... */
3553 call->ssthresh = MIN(call->twind, call->ssthresh);
3556 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3557 * network MTU confused with the loopback MTU. Calculate the
3558 * maximum MTU here for use in the slow start code below.
3560 maxMTU = peer->maxMTU;
3561 /* Did peer restart with older RX version? */
3562 if (peer->maxDgramPackets > 1) {
3563 peer->maxDgramPackets = 1;
3565 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3567 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3568 sizeof(afs_int32), &tSize);
3569 tSize = (afs_uint32) ntohl(tSize);
3571 * As of AFS 3.5 we set the send window to match the receive window.
3573 if (tSize < call->twind) {
3574 call->twind = tSize;
3575 call->ssthresh = MIN(call->twind, call->ssthresh);
3576 } else if (tSize > call->twind) {
3577 call->twind = tSize;
3581 * As of AFS 3.5, a jumbogram is more than one fixed size
3582 * packet transmitted in a single UDP datagram. If the remote
3583 * MTU is smaller than our local MTU then never send a datagram
3584 * larger than the natural MTU.
3586 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3587 sizeof(afs_int32), &tSize);
3588 maxDgramPackets = (afs_uint32) ntohl(tSize);
3589 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3590 maxDgramPackets = MIN(maxDgramPackets,
3591 (int)(peer->ifDgramPackets));
3592 maxDgramPackets = MIN(maxDgramPackets, tSize);
3593 if (maxDgramPackets > 1) {
3594 peer->maxDgramPackets = maxDgramPackets;
3595 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3597 peer->maxDgramPackets = 1;
3598 call->MTU = peer->natMTU;
3600 } else if (peer->maxDgramPackets > 1) {
3601 /* Restarted with lower version of RX */
3602 peer->maxDgramPackets = 1;
3604 } else if (peer->maxDgramPackets > 1 ||
3605 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3606 /* Restarted with lower version of RX */
3607 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3608 peer->natMTU = OLD_MAX_PACKET_SIZE;
3609 peer->MTU = OLD_MAX_PACKET_SIZE;
3610 peer->maxDgramPackets = 1;
3611 peer->nDgramPackets = 1;
3613 call->MTU = OLD_MAX_PACKET_SIZE;
3618 * Calculate how many datagrams were successfully received after
3619 * the first missing packet and adjust the negative ack counter
3624 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3625 if (call->nNacks < nNacked) {
3626 call->nNacks = nNacked;
3635 if (call->flags & RX_CALL_FAST_RECOVER) {
3637 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3639 call->flags &= ~RX_CALL_FAST_RECOVER;
3640 call->cwind = call->nextCwind;
3641 call->nextCwind = 0;
3644 call->nCwindAcks = 0;
3646 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3647 /* Three negative acks in a row trigger congestion recovery */
3648 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3649 MUTEX_EXIT(&peer->peer_lock);
3650 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3651 /* someone else is waiting to start recovery */
3654 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3655 while (call->flags & RX_CALL_TQ_BUSY) {
3656 call->flags |= RX_CALL_TQ_WAIT;
3657 #ifdef RX_ENABLE_LOCKS
3658 CV_WAIT(&call->cv_tq, &call->lock);
3659 #else /* RX_ENABLE_LOCKS */
3660 osi_rxSleep(&call->tq);
3661 #endif /* RX_ENABLE_LOCKS */
3663 MUTEX_ENTER(&peer->peer_lock);
3664 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3665 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3666 call->flags |= RX_CALL_FAST_RECOVER;
3667 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3668 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3670 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3671 call->nextCwind = call->ssthresh;
3674 peer->MTU = call->MTU;
3675 peer->cwind = call->nextCwind;
3676 peer->nDgramPackets = call->nDgramPackets;
3678 call->congestSeq = peer->congestSeq;
3679 /* Reset the resend times on the packets that were nacked
3680 * so we will retransmit as soon as the window permits*/
3681 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3683 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3684 clock_Zero(&tp->retryTime);
3686 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3691 /* If cwind is smaller than ssthresh, then increase
3692 * the window one packet for each ack we receive (exponential
3694 * If cwind is greater than or equal to ssthresh then increase
3695 * the congestion window by one packet for each cwind acks we
3696 * receive (linear growth). */
3697 if (call->cwind < call->ssthresh) {
3698 call->cwind = MIN((int)call->ssthresh,
3699 (int)(call->cwind + newAckCount));
3700 call->nCwindAcks = 0;
3702 call->nCwindAcks += newAckCount;
3703 if (call->nCwindAcks >= call->cwind) {
3704 call->nCwindAcks = 0;
3705 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3709 * If we have received several acknowledgements in a row then
3710 * it is time to increase the size of our datagrams
3712 if ((int)call->nAcks > rx_nDgramThreshold) {
3713 if (peer->maxDgramPackets > 1) {
3714 if (call->nDgramPackets < peer->maxDgramPackets) {
3715 call->nDgramPackets++;
3717 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3718 } else if (call->MTU < peer->maxMTU) {
3719 call->MTU += peer->natMTU;
3720 call->MTU = MIN(call->MTU, peer->maxMTU);
3726 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3728 /* Servers need to hold the call until all response packets have
3729 * been acknowledged. Soft acks are good enough since clients
3730 * are not allowed to clear their receive queues. */
3731 if (call->state == RX_STATE_HOLD &&
3732 call->tfirst + call->nSoftAcked >= call->tnext) {
3733 call->state = RX_STATE_DALLY;
3734 rxi_ClearTransmitQueue(call, 0);
3735 } else if (!queue_IsEmpty(&call->tq)) {
3736 rxi_Start(0, call, istack);
3741 /* Received a response to a challenge packet */
3742 struct rx_packet *rxi_ReceiveResponsePacket(register struct rx_connection *conn,
3743 register struct rx_packet *np, int istack)
3747 /* Ignore the packet if we're the client */
3748 if (conn->type == RX_CLIENT_CONNECTION) return np;
3750 /* If already authenticated, ignore the packet (it's probably a retry) */
3751 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3754 /* Otherwise, have the security object evaluate the response packet */
3755 error = RXS_CheckResponse(conn->securityObject, conn, np);
3757 /* If the response is invalid, reset the connection, sending
3758 * an abort to the peer */
3762 rxi_ConnectionError(conn, error);
3763 MUTEX_ENTER(&conn->conn_data_lock);
3764 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3765 MUTEX_EXIT(&conn->conn_data_lock);
3769 /* If the response is valid, any calls waiting to attach
3770 * servers can now do so */
3773 for (i=0; i<RX_MAXCALLS; i++) {
3774 struct rx_call *call = conn->call[i];
3776 MUTEX_ENTER(&call->lock);
3777 if (call->state == RX_STATE_PRECALL)
3778 rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
3779 /* tnop can be null if newcallp is null */
3780 MUTEX_EXIT(&call->lock);
3784 /* Update the peer reachability information, just in case
3785 * some calls went into attach-wait while we were waiting
3786 * for authentication..
3788 rxi_UpdatePeerReach(conn, NULL);
3793 /* A client has received an authentication challenge: the security
3794 * object is asked to cough up a respectable response packet to send
3795 * back to the server. The server is responsible for retrying the
3796 * challenge if it fails to get a response. */
3798 struct rx_packet *rxi_ReceiveChallengePacket(register struct rx_connection *conn,
3799 register struct rx_packet *np, int istack)
3803 /* Ignore the challenge if we're the server */
3804 if (conn->type == RX_SERVER_CONNECTION) return np;
3806 /* Ignore the challenge if the connection is otherwise idle; someone's
3807 * trying to use us as an oracle. */
3808 if (!rxi_HasActiveCalls(conn)) return np;
3810 /* Send the security object the challenge packet. It is expected to fill
3811 * in the response. */
3812 error = RXS_GetResponse(conn->securityObject, conn, np);
3814 /* If the security object is unable to return a valid response, reset the
3815 * connection and send an abort to the peer. Otherwise send the response
3816 * packet to the peer connection. */
3818 rxi_ConnectionError(conn, error);
3819 MUTEX_ENTER(&conn->conn_data_lock);
3820 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3821 MUTEX_EXIT(&conn->conn_data_lock);
3824 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3825 RX_PACKET_TYPE_RESPONSE, NULL, -1, istack);
3831 /* Find an available server process to service the current request in
3832 * the given call structure. If one isn't available, queue up this
3833 * call so it eventually gets one */
3834 void rxi_AttachServerProc(register struct rx_call *call,
3835 register osi_socket socket, register int *tnop, register struct rx_call **newcallp)
3837 register struct rx_serverQueueEntry *sq;
3838 register struct rx_service *service = call->conn->service;
3839 register int haveQuota = 0;
3841 /* May already be attached */
3842 if (call->state == RX_STATE_ACTIVE) return;
3844 MUTEX_ENTER(&rx_serverPool_lock);
3846 haveQuota = QuotaOK(service);
3847 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3848 /* If there are no processes available to service this call,
3849 * put the call on the incoming call queue (unless it's
3850 * already on the queue).
3852 #ifdef RX_ENABLE_LOCKS
3854 ReturnToServerPool(service);
3855 #endif /* RX_ENABLE_LOCKS */
3857 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3858 call->flags |= RX_CALL_WAIT_PROC;
3859 MUTEX_ENTER(&rx_stats_mutex);
3861 MUTEX_EXIT(&rx_stats_mutex);
3862 rxi_calltrace(RX_CALL_ARRIVAL, call);
3863 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3864 queue_Append(&rx_incomingCallQueue, call);
3868 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3870 /* If hot threads are enabled, and both newcallp and sq->socketp
3871 * are non-null, then this thread will process the call, and the
3872 * idle server thread will start listening on this threads socket.
3875 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3878 *sq->socketp = socket;
3879 clock_GetTime(&call->startTime);
3880 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3884 if (call->flags & RX_CALL_WAIT_PROC) {
3885 /* Conservative: I don't think this should happen */
3886 call->flags &= ~RX_CALL_WAIT_PROC;
3887 MUTEX_ENTER(&rx_stats_mutex);
3889 MUTEX_EXIT(&rx_stats_mutex);
3892 call->state = RX_STATE_ACTIVE;
3893 call->mode = RX_MODE_RECEIVING;
3894 if (call->flags & RX_CALL_CLEARED) {
3895 /* send an ack now to start the packet flow up again */
3896 call->flags &= ~RX_CALL_CLEARED;
3897 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
3899 #ifdef RX_ENABLE_LOCKS
3902 service->nRequestsRunning++;
3903 if (service->nRequestsRunning <= service->minProcs)
3909 MUTEX_EXIT(&rx_serverPool_lock);
3912 /* Delay the sending of an acknowledge event for a short while, while
3913 * a new call is being prepared (in the case of a client) or a reply
3914 * is being prepared (in the case of a server). Rather than sending
3915 * an ack packet, an ACKALL packet is sent. */
3916 void rxi_AckAll(struct rxevent *event, register struct rx_call *call, char *dummy)
3918 #ifdef RX_ENABLE_LOCKS
3920 MUTEX_ENTER(&call->lock);
3921 call->delayedAckEvent = NULL;
3922 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3924 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3925 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3927 MUTEX_EXIT(&call->lock);
3928 #else /* RX_ENABLE_LOCKS */
3929 if (event) call->delayedAckEvent = NULL;
3930 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3931 RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3932 #endif /* RX_ENABLE_LOCKS */
3935 void rxi_SendDelayedAck(struct rxevent *event, register struct rx_call *call, char *dummy)
3937 #ifdef RX_ENABLE_LOCKS
3939 MUTEX_ENTER(&call->lock);
3940 if (event == call->delayedAckEvent)
3941 call->delayedAckEvent = NULL;
3942 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3944 (void) rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
3946 MUTEX_EXIT(&call->lock);
3947 #else /* RX_ENABLE_LOCKS */
3948 if (event) call->delayedAckEvent = NULL;
3949 (void) rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
3950 #endif /* RX_ENABLE_LOCKS */
3954 #ifdef RX_ENABLE_LOCKS
3955 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3956 * clearing them out.
3958 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call)
3960 register struct rx_packet *p, *tp;
3963 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3966 p->flags |= RX_PKTFLAG_ACKED;
3970 call->flags |= RX_CALL_TQ_CLEARME;
3971 call->flags |= RX_CALL_TQ_SOME_ACKED;
3974 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3975 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3976 call->tfirst = call->tnext;
3977 call->nSoftAcked = 0;
3979 if (call->flags & RX_CALL_FAST_RECOVER) {
3980 call->flags &= ~RX_CALL_FAST_RECOVER;
3981 call->cwind = call->nextCwind;
3982 call->nextCwind = 0;
3985 CV_SIGNAL(&call->cv_twind);
3987 #endif /* RX_ENABLE_LOCKS */
3989 /* Clear out the transmit queue for the current call (all packets have
3990 * been received by peer) */
3991 void rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
3993 register struct rx_packet *p, *tp;
3995 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3996 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3998 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4001 p->flags |= RX_PKTFLAG_ACKED;
4005 call->flags |= RX_CALL_TQ_CLEARME;
4006 call->flags |= RX_CALL_TQ_SOME_ACKED;
4009 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4010 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4016 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4017 call->flags &= ~RX_CALL_TQ_CLEARME;
4019 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4021 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4022 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4023 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4024 call->nSoftAcked = 0;
4026 if (call->flags & RX_CALL_FAST_RECOVER) {
4027 call->flags &= ~RX_CALL_FAST_RECOVER;
4028 call->cwind = call->nextCwind;
4031 #ifdef RX_ENABLE_LOCKS
4032 CV_SIGNAL(&call->cv_twind);
4034 osi_rxWakeup(&call->twind);
4038 void rxi_ClearReceiveQueue(register struct rx_call *call)
4040 register struct rx_packet *p, *tp;
4041 if (queue_IsNotEmpty(&call->rq)) {
4042 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4047 rx_packetReclaims++;
4049 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4051 if (call->state == RX_STATE_PRECALL) {
4052 call->flags |= RX_CALL_CLEARED;
4056 /* Send an abort packet for the specified call */
4057 struct rx_packet *rxi_SendCallAbort(register struct rx_call *call,
4058 struct rx_packet *packet, int istack, int force)
4066 /* Clients should never delay abort messages */
4067 if (rx_IsClientConn(call->conn))
4070 if (call->abortCode != call->error) {
4071 call->abortCode = call->error;
4072 call->abortCount = 0;
4075 if (force || rxi_callAbortThreshhold == 0 ||
4076 call->abortCount < rxi_callAbortThreshhold) {
4077 if (call->delayedAbortEvent) {
4078 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4080 error = htonl(call->error);
4082 packet = rxi_SendSpecial(call, call->conn, packet,
4083 RX_PACKET_TYPE_ABORT, (char *)&error,
4084 sizeof(error), istack);
4085 } else if (!call->delayedAbortEvent) {
4086 clock_GetTime(&when);
4087 clock_Addmsec(&when, rxi_callAbortDelay);
4088 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4089 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4095 /* Send an abort packet for the specified connection. Packet is an
4096 * optional pointer to a packet that can be used to send the abort.
4097 * Once the number of abort messages reaches the threshhold, an
4098 * event is scheduled to send the abort. Setting the force flag
4099 * overrides sending delayed abort messages.
4101 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4102 * to send the abort packet.
4104 struct rx_packet *rxi_SendConnectionAbort(register struct rx_connection *conn,
4105 struct rx_packet *packet, int istack, int force)
4113 /* Clients should never delay abort messages */
4114 if (rx_IsClientConn(conn))
4117 if (force || rxi_connAbortThreshhold == 0 ||
4118 conn->abortCount < rxi_connAbortThreshhold) {
4119 if (conn->delayedAbortEvent) {
4120 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4122 error = htonl(conn->error);
4124 MUTEX_EXIT(&conn->conn_data_lock);
4125 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4126 RX_PACKET_TYPE_ABORT, (char *)&error,
4127 sizeof(error), istack);
4128 MUTEX_ENTER(&conn->conn_data_lock);
4129 } else if (!conn->delayedAbortEvent) {
4130 clock_GetTime(&when);
4131 clock_Addmsec(&when, rxi_connAbortDelay);
4132 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4138 /* Associate an error all of the calls owned by a connection. Called
4139 * with error non-zero. This is only for really fatal things, like
4140 * bad authentication responses. The connection itself is set in
4141 * error at this point, so that future packets received will be
4143 void rxi_ConnectionError(register struct rx_connection *conn,
4144 register afs_int32 error)
4148 MUTEX_ENTER(&conn->conn_data_lock);
4149 if (conn->challengeEvent)
4150 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4151 if (conn->checkReachEvent) {
4152 rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4153 conn->checkReachEvent = 0;
4154 conn->flags &= ~RX_CONN_ATTACHWAIT;
4157 MUTEX_EXIT(&conn->conn_data_lock);
4158 for (i=0; i<RX_MAXCALLS; i++) {
4159 struct rx_call *call = conn->call[i];
4161 MUTEX_ENTER(&call->lock);
4162 rxi_CallError(call, error);
4163 MUTEX_EXIT(&call->lock);
4166 conn->error = error;
4167 MUTEX_ENTER(&rx_stats_mutex);
4168 rx_stats.fatalErrors++;
4169 MUTEX_EXIT(&rx_stats_mutex);
4173 void rxi_CallError(register struct rx_call *call, afs_int32 error)
4175 if (call->error) error = call->error;
4176 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4177 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4178 rxi_ResetCall(call, 0);
4181 rxi_ResetCall(call, 0);
4183 call->error = error;
4184 call->mode = RX_MODE_ERROR;
4187 /* Reset various fields in a call structure, and wakeup waiting
4188 * processes. Some fields aren't changed: state & mode are not
4189 * touched (these must be set by the caller), and bufptr, nLeft, and
4190 * nFree are not reset, since these fields are manipulated by
4191 * unprotected macros, and may only be reset by non-interrupting code.
4194 /* this code requires that call->conn be set properly as a pre-condition. */
4195 #endif /* ADAPT_WINDOW */
4197 void rxi_ResetCall(register struct rx_call *call, register int newcall)
4200 register struct rx_peer *peer;
4201 struct rx_packet *packet;
4203 /* Notify anyone who is waiting for asynchronous packet arrival */
4204 if (call->arrivalProc) {
4205 (*call->arrivalProc)(call, call->arrivalProcHandle, (int) call->arrivalProcArg);
4206 call->arrivalProc = (VOID (*)()) 0;
4209 if (call->delayedAbortEvent) {
4210 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4211 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4213 rxi_SendCallAbort(call, packet, 0, 1);
4214 rxi_FreePacket(packet);
4219 * Update the peer with the congestion information in this call
4220 * so other calls on this connection can pick up where this call
4221 * left off. If the congestion sequence numbers don't match then
4222 * another call experienced a retransmission.
4224 peer = call->conn->peer;
4225 MUTEX_ENTER(&peer->peer_lock);
4227 if (call->congestSeq == peer->congestSeq) {
4228 peer->cwind = MAX(peer->cwind, call->cwind);
4229 peer->MTU = MAX(peer->MTU, call->MTU);
4230 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4233 call->abortCode = 0;
4234 call->abortCount = 0;
4236 if (peer->maxDgramPackets > 1) {
4237 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4239 call->MTU = peer->MTU;
4241 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4242 call->ssthresh = rx_maxSendWindow;
4243 call->nDgramPackets = peer->nDgramPackets;
4244 call->congestSeq = peer->congestSeq;
4245 MUTEX_EXIT(&peer->peer_lock);
4247 flags = call->flags;
4248 rxi_ClearReceiveQueue(call);
4249 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4250 if (call->flags & RX_CALL_TQ_BUSY) {
4251 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4252 call->flags |= (flags & RX_CALL_TQ_WAIT);
4254 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4256 rxi_ClearTransmitQueue(call, 0);
4257 queue_Init(&call->tq);
4260 queue_Init(&call->rq);
4262 call->rwind = rx_initReceiveWindow;
4263 call->twind = rx_initSendWindow;
4264 call->nSoftAcked = 0;
4265 call->nextCwind = 0;
4268 call->nCwindAcks = 0;
4269 call->nSoftAcks = 0;
4270 call->nHardAcks = 0;
4272 call->tfirst = call->rnext = call->tnext = 1;
4274 call->lastAcked = 0;
4275 call->localStatus = call->remoteStatus = 0;
4277 if (flags & RX_CALL_READER_WAIT) {
4278 #ifdef RX_ENABLE_LOCKS
4279 CV_BROADCAST(&call->cv_rq);
4281 osi_rxWakeup(&call->rq);
4284 if (flags & RX_CALL_WAIT_PACKETS) {
4285 MUTEX_ENTER(&rx_freePktQ_lock);
4286 rxi_PacketsUnWait(); /* XXX */
4287 MUTEX_EXIT(&rx_freePktQ_lock);
4290 #ifdef RX_ENABLE_LOCKS
4291 CV_SIGNAL(&call->cv_twind);
4293 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4294 osi_rxWakeup(&call->twind);
4297 #ifdef RX_ENABLE_LOCKS
4298 /* The following ensures that we don't mess with any queue while some
4299 * other thread might also be doing so. The call_queue_lock field is
4300 * is only modified under the call lock. If the call is in the process
4301 * of being removed from a queue, the call is not locked until the
4302 * the queue lock is dropped and only then is the call_queue_lock field
4303 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4304 * Note that any other routine which removes a call from a queue has to
4305 * obtain the queue lock before examing the queue and removing the call.
4307 if (call->call_queue_lock) {
4308 MUTEX_ENTER(call->call_queue_lock);
4309 if (queue_IsOnQueue(call)) {
4311 if (flags & RX_CALL_WAIT_PROC) {
4312 MUTEX_ENTER(&rx_stats_mutex);
4314 MUTEX_EXIT(&rx_stats_mutex);
4317 MUTEX_EXIT(call->call_queue_lock);
4318 CLEAR_CALL_QUEUE_LOCK(call);
4320 #else /* RX_ENABLE_LOCKS */
4321 if (queue_IsOnQueue(call)) {
4323 if (flags & RX_CALL_WAIT_PROC)
4326 #endif /* RX_ENABLE_LOCKS */
4328 rxi_KeepAliveOff(call);
4329 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4332 /* Send an acknowledge for the indicated packet (seq,serial) of the
4333 * indicated call, for the indicated reason (reason). This
4334 * acknowledge will specifically acknowledge receiving the packet, and
4335 * will also specify which other packets for this call have been
4336 * received. This routine returns the packet that was used to the
4337 * caller. The caller is responsible for freeing it or re-using it.
4338 * This acknowledgement also returns the highest sequence number
4339 * actually read out by the higher level to the sender; the sender
4340 * promises to keep around packets that have not been read by the
4341 * higher level yet (unless, of course, the sender decides to abort
4342 * the call altogether). Any of p, seq, serial, pflags, or reason may
4343 * be set to zero without ill effect. That is, if they are zero, they
4344 * will not convey any information.
4345 * NOW there is a trailer field, after the ack where it will safely be
4346 * ignored by mundanes, which indicates the maximum size packet this
4347 * host can swallow. */
4349 register struct rx_packet *optionalPacket; use to send ack (or null)
4350 int seq; Sequence number of the packet we are acking
4351 int serial; Serial number of the packet
4352 int pflags; Flags field from packet header
4353 int reason; Reason an acknowledge was prompted
4356 struct rx_packet *rxi_SendAck(register struct rx_call *call,
4357 register struct rx_packet *optionalPacket, int serial,
4358 int reason, int istack)
4360 struct rx_ackPacket *ap;
4361 register struct rx_packet *rqp;
4362 register struct rx_packet *nxp; /* For queue_Scan */
4363 register struct rx_packet *p;
4368 * Open the receive window once a thread starts reading packets
4370 if (call->rnext > 1) {
4371 call->rwind = rx_maxReceiveWindow;
4374 call->nHardAcks = 0;
4375 call->nSoftAcks = 0;
4376 if (call->rnext > call->lastAcked)
4377 call->lastAcked = call->rnext;
4381 rx_computelen(p, p->length); /* reset length, you never know */