2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX: Extended Remote Procedure Call */
12 #include <afsconfig.h>
14 #include "../afs/param.h"
16 #include <afs/param.h>
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
29 #include <net/net_globals.h>
30 #endif /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
41 #undef RXDEBUG /* turn off debugging */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
46 #include "../afsint/afsint.h"
53 #endif /* AFS_ALPHA_ENV */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS 211 /* Stop AFS process */
68 #define AFSOP_STOP_BKG 212 /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
77 # include <sys/types.h>
84 # include <sys/socket.h>
85 # include <sys/file.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include "rx_internal.h"
105 # include <afs/rxgen_consts.h>
108 int (*registerProgram)() = 0;
109 int (*swapNameProgram)() = 0;
111 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
113 afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
114 afs_int32 rxi_start_in_error;
116 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
119 * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
120 * currently allocated within rx. This number is used to allocate the
121 * memory required to return the statistics when queried.
124 static unsigned int rxi_rpc_peer_stat_cnt;
127 * rxi_rpc_process_stat_cnt counts the total number of local process stat
128 * structures currently allocated within rx. The number is used to allocate
129 * the memory required to return the statistics when queried.
132 static unsigned int rxi_rpc_process_stat_cnt;
134 #if !defined(offsetof)
135 #include <stddef.h> /* for definition of offsetof() */
138 #ifdef AFS_PTHREAD_ENV
142 * Use procedural initialization of mutexes/condition variables
146 extern pthread_mutex_t rxkad_stats_mutex;
147 extern pthread_mutex_t des_init_mutex;
148 extern pthread_mutex_t des_random_mutex;
149 extern pthread_mutex_t rx_clock_mutex;
150 extern pthread_mutex_t rxi_connCacheMutex;
151 extern pthread_mutex_t rx_event_mutex;
152 extern pthread_mutex_t osi_malloc_mutex;
153 extern pthread_mutex_t event_handler_mutex;
154 extern pthread_mutex_t listener_mutex;
155 extern pthread_mutex_t rx_if_init_mutex;
156 extern pthread_mutex_t rx_if_mutex;
157 extern pthread_mutex_t rxkad_client_uid_mutex;
158 extern pthread_mutex_t rxkad_random_mutex;
160 extern pthread_cond_t rx_event_handler_cond;
161 extern pthread_cond_t rx_listener_cond;
163 static pthread_mutex_t epoch_mutex;
164 static pthread_mutex_t rx_init_mutex;
165 static pthread_mutex_t rx_debug_mutex;
167 static void rxi_InitPthread(void) {
168 assert(pthread_mutex_init(&rx_clock_mutex,
169 (const pthread_mutexattr_t*)0)==0);
170 assert(pthread_mutex_init(&rxi_connCacheMutex,
171 (const pthread_mutexattr_t*)0)==0);
172 assert(pthread_mutex_init(&rx_init_mutex,
173 (const pthread_mutexattr_t*)0)==0);
174 assert(pthread_mutex_init(&epoch_mutex,
175 (const pthread_mutexattr_t*)0)==0);
176 assert(pthread_mutex_init(&rx_event_mutex,
177 (const pthread_mutexattr_t*)0)==0);
178 assert(pthread_mutex_init(&des_init_mutex,
179 (const pthread_mutexattr_t*)0)==0);
180 assert(pthread_mutex_init(&des_random_mutex,
181 (const pthread_mutexattr_t*)0)==0);
182 assert(pthread_mutex_init(&osi_malloc_mutex,
183 (const pthread_mutexattr_t*)0)==0);
184 assert(pthread_mutex_init(&event_handler_mutex,
185 (const pthread_mutexattr_t*)0)==0);
186 assert(pthread_mutex_init(&listener_mutex,
187 (const pthread_mutexattr_t*)0)==0);
188 assert(pthread_mutex_init(&rx_if_init_mutex,
189 (const pthread_mutexattr_t*)0)==0);
190 assert(pthread_mutex_init(&rx_if_mutex,
191 (const pthread_mutexattr_t*)0)==0);
192 assert(pthread_mutex_init(&rxkad_client_uid_mutex,
193 (const pthread_mutexattr_t*)0)==0);
194 assert(pthread_mutex_init(&rxkad_random_mutex,
195 (const pthread_mutexattr_t*)0)==0);
196 assert(pthread_mutex_init(&rxkad_stats_mutex,
197 (const pthread_mutexattr_t*)0)==0);
198 assert(pthread_mutex_init(&rx_debug_mutex,
199 (const pthread_mutexattr_t*)0)==0);
201 assert(pthread_cond_init(&rx_event_handler_cond,
202 (const pthread_condattr_t*)0)==0);
203 assert(pthread_cond_init(&rx_listener_cond,
204 (const pthread_condattr_t*)0)==0);
205 assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
208 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
209 #define INIT_PTHREAD_LOCKS \
210 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
212 * The rx_stats_mutex mutex protects the following global variables:
217 * rxi_lowConnRefCount
218 * rxi_lowPeerRefCount
227 #define INIT_PTHREAD_LOCKS
231 /* Variables for handling the minProcs implementation. availProcs gives the
232 * number of threads available in the pool at this moment (not counting dudes
233 * executing right now). totalMin gives the total number of procs required
234 * for handling all minProcs requests. minDeficit is a dynamic variable
235 * tracking the # of procs required to satisfy all of the remaining minProcs
237 * For fine grain locking to work, the quota check and the reservation of
238 * a server thread has to come while rxi_availProcs and rxi_minDeficit
239 * are locked. To this end, the code has been modified under #ifdef
240 * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
241 * same time. A new function, ReturnToServerPool() returns the allocation.
243 * A call can be on several queue's (but only one at a time). When
244 * rxi_ResetCall wants to remove the call from a queue, it has to ensure
245 * that no one else is touching the queue. To this end, we store the address
246 * of the queue lock in the call structure (under the call lock) when we
247 * put the call on a queue, and we clear the call_queue_lock when the
248 * call is removed from a queue (once the call lock has been obtained).
249 * This allows rxi_ResetCall to safely synchronize with others wishing
250 * to manipulate the queue.
253 #ifdef RX_ENABLE_LOCKS
254 static int rxi_ServerThreadSelectingCall;
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are
260 ** pretty good that the next packet coming in is from the same connection
261 ** as the last packet, since we're send multiple packets in a transmit window.
263 struct rx_connection *rxLastConn = 0;
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
268 * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269 * call->lock - locks call data fields.
270 * Most any other lock - these are all independent of each other.....
272 * rx_freeCallQueue_lock
274 * rx_connHashTable_lock
277 * rx_peerHashTable_lock - locked under rx_connHashTable_lock
280 * peer_lock - locks peer data fields.
281 * conn_data_lock - that more than one thread is not updating a conn data
282 * field at the same time.
283 * Do we need a lock to protect the peer field in the conn structure?
284 * conn->peer was previously a constant for all intents and so has no
285 * lock protecting this field. The multihomed client delta introduced
286 * a RX code change : change the peer field in the connection structure
287 * to that remote inetrface from which the last packet for this
288 * connection was sent out. This may become an issue if further changes
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 struct rx_serverQueueEntry *rx_waitForPacket = 0;
306 /* ------------Exported Interfaces------------- */
308 /* This function allows rxkad to set the epoch to a suitably random number
309 * which rx_NewConnection will use in the future. The principle purpose is to
310 * get rxnull connections to use the same epoch as the rxkad connections do, at
311 * least once the first rxkad connection is established. This is important now
312 * that the host/port addresses aren't used in FindConnection: the uniqueness
313 * of epoch/cid matters and the start time won't do. */
315 #ifdef AFS_PTHREAD_ENV
317 * This mutex protects the following global variables:
321 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
322 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
326 #endif /* AFS_PTHREAD_ENV */
328 void rx_SetEpoch (epoch)
336 /* Initialize rx. A port number may be mentioned, in which case this
337 * becomes the default port number for any service installed later.
338 * If 0 is provided for the port number, a random port will be chosen
339 * by the kernel. Whether this will ever overlap anything in
340 * /etc/services is anybody's guess... Returns 0 on success, -1 on
342 static int rxinit_status = 1;
343 #ifdef AFS_PTHREAD_ENV
345 * This mutex protects the following global variables:
349 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
353 #define UNLOCK_RX_INIT
356 int rx_Init(u_int port)
363 char *htable, *ptable;
366 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
367 __djgpp_set_quiet_socket(1);
374 if (rxinit_status == 0) {
375 tmp_status = rxinit_status;
377 return tmp_status; /* Already started; return previous error code. */
381 if (afs_winsockInit()<0)
387 * Initialize anything necessary to provide a non-premptive threading
390 rxi_InitializeThreadSupport();
393 /* Allocate and initialize a socket for client and perhaps server
396 rx_socket = rxi_GetUDPSocket((u_short)port);
397 if (rx_socket == OSI_NULLSOCKET) {
403 #ifdef RX_ENABLE_LOCKS
406 #endif /* RX_LOCKS_DB */
407 MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
408 MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
409 MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
410 MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
411 MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
413 CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
414 MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
415 MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
416 MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
418 MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
420 CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
421 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
423 rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
424 #endif /* KERNEL && AFS_HPUX110_ENV */
425 #else /* RX_ENABLE_LOCKS */
426 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
427 mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
428 #endif /* AFS_GLOBAL_SUNLOCK */
429 #endif /* RX_ENABLE_LOCKS */
432 rx_connDeadTime = 12;
433 rx_tranquil = 0; /* reset flag */
434 memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
436 osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
437 PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
438 memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
439 ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
440 PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
441 memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
443 /* Malloc up a bunch of packets & buffers */
445 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
446 queue_Init(&rx_freePacketQueue);
447 rxi_NeedMorePackets = FALSE;
448 rxi_MorePackets(rx_nPackets);
456 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
457 tv.tv_sec = clock_now.sec;
458 tv.tv_usec = clock_now.usec;
459 srand((unsigned int) tv.tv_usec);
466 #if defined(KERNEL) && !defined(UKERNEL)
467 /* Really, this should never happen in a real kernel */
470 struct sockaddr_in addr;
471 int addrlen = sizeof(addr);
472 if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
476 rx_port = addr.sin_port;
479 rx_stats.minRtt.sec = 9999999;
481 rx_SetEpoch (tv.tv_sec | 0x80000000);
483 rx_SetEpoch (tv.tv_sec); /* Start time of this package, rxkad
484 * will provide a randomer value. */
486 MUTEX_ENTER(&rx_stats_mutex);
487 rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
488 MUTEX_EXIT(&rx_stats_mutex);
489 /* *Slightly* random start time for the cid. This is just to help
490 * out with the hashing function at the peer */
491 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
492 rx_connHashTable = (struct rx_connection **) htable;
493 rx_peerHashTable = (struct rx_peer **) ptable;
495 rx_lastAckDelay.sec = 0;
496 rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
497 rx_hardAckDelay.sec = 0;
498 rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
499 rx_softAckDelay.sec = 0;
500 rx_softAckDelay.usec = 100000; /* 100 milliseconds */
502 rxevent_Init(20, rxi_ReScheduleEvents);
504 /* Initialize various global queues */
505 queue_Init(&rx_idleServerQueue);
506 queue_Init(&rx_incomingCallQueue);
507 queue_Init(&rx_freeCallQueue);
509 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
510 /* Initialize our list of usable IP addresses. */
514 /* Start listener process (exact function is dependent on the
515 * implementation environment--kernel or user space) */
520 tmp_status = rxinit_status = 0;
525 /* called with unincremented nRequestsRunning to see if it is OK to start
526 * a new thread in this service. Could be "no" for two reasons: over the
527 * max quota, or would prevent others from reaching their min quota.
529 #ifdef RX_ENABLE_LOCKS
530 /* This verion of QuotaOK reserves quota if it's ok while the
531 * rx_serverPool_lock is held. Return quota using ReturnToServerPool().
533 static int QuotaOK(aservice)
534 register struct rx_service *aservice;
536 /* check if over max quota */
537 if (aservice->nRequestsRunning >= aservice->maxProcs) {
541 /* under min quota, we're OK */
542 /* otherwise, can use only if there are enough to allow everyone
543 * to go to their min quota after this guy starts.
545 MUTEX_ENTER(&rx_stats_mutex);
546 if ((aservice->nRequestsRunning < aservice->minProcs) ||
547 (rxi_availProcs > rxi_minDeficit)) {
548 aservice->nRequestsRunning++;
549 /* just started call in minProcs pool, need fewer to maintain
551 if (aservice->nRequestsRunning <= aservice->minProcs)
554 MUTEX_EXIT(&rx_stats_mutex);
557 MUTEX_EXIT(&rx_stats_mutex);
561 static void ReturnToServerPool(aservice)
562 register struct rx_service *aservice;
564 aservice->nRequestsRunning--;
565 MUTEX_ENTER(&rx_stats_mutex);
566 if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
568 MUTEX_EXIT(&rx_stats_mutex);
571 #else /* RX_ENABLE_LOCKS */
572 static int QuotaOK(aservice)
573 register struct rx_service *aservice; {
575 /* under min quota, we're OK */
576 if (aservice->nRequestsRunning < aservice->minProcs) return 1;
578 /* check if over max quota */
579 if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
581 /* otherwise, can use only if there are enough to allow everyone
582 * to go to their min quota after this guy starts.
584 if (rxi_availProcs > rxi_minDeficit) rc = 1;
587 #endif /* RX_ENABLE_LOCKS */
590 /* Called by rx_StartServer to start up lwp's to service calls.
591 NExistingProcs gives the number of procs already existing, and which
592 therefore needn't be created. */
593 void rxi_StartServerProcs(nExistingProcs)
596 register struct rx_service *service;
601 /* For each service, reserve N processes, where N is the "minimum"
602 number of processes that MUST be able to execute a request in parallel,
603 at any time, for that process. Also compute the maximum difference
604 between any service's maximum number of processes that can run
605 (i.e. the maximum number that ever will be run, and a guarantee
606 that this number will run if other services aren't running), and its
607 minimum number. The result is the extra number of processes that
608 we need in order to provide the latter guarantee */
609 for (i=0; i<RX_MAX_SERVICES; i++) {
611 service = rx_services[i];
612 if (service == (struct rx_service *) 0) break;
613 nProcs += service->minProcs;
614 diff = service->maxProcs - service->minProcs;
615 if (diff > maxdiff) maxdiff = diff;
617 nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
618 nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
619 for (i = 0; i<nProcs; i++) {
620 rxi_StartServerProc(rx_ServerProc, rx_stackSize);
625 /* This routine must be called if any services are exported. If the
626 * donateMe flag is set, the calling process is donated to the server
628 void rx_StartServer(donateMe)
630 register struct rx_service *service;
631 register int i, nProcs=0;
637 /* Start server processes, if necessary (exact function is dependent
638 * on the implementation environment--kernel or user space). DonateMe
639 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
640 * case, one less new proc will be created rx_StartServerProcs.
642 rxi_StartServerProcs(donateMe);
644 /* count up the # of threads in minProcs, and add set the min deficit to
645 * be that value, too.
647 for (i=0; i<RX_MAX_SERVICES; i++) {
648 service = rx_services[i];
649 if (service == (struct rx_service *) 0) break;
650 MUTEX_ENTER(&rx_stats_mutex);
651 rxi_totalMin += service->minProcs;
652 /* below works even if a thread is running, since minDeficit would
653 * still have been decremented and later re-incremented.
655 rxi_minDeficit += service->minProcs;
656 MUTEX_EXIT(&rx_stats_mutex);
659 /* Turn on reaping of idle server connections */
660 rxi_ReapConnections();
669 #ifdef AFS_PTHREAD_ENV
671 pid = (pid_t) pthread_self();
672 #else /* AFS_PTHREAD_ENV */
674 LWP_CurrentProcess(&pid);
675 #endif /* AFS_PTHREAD_ENV */
677 sprintf(name,"srv_%d", ++nProcs);
679 (*registerProgram)(pid, name);
681 #endif /* AFS_NT40_ENV */
682 rx_ServerProc(); /* Never returns */
687 /* Create a new client connection to the specified service, using the
688 * specified security object to implement the security model for this
690 struct rx_connection *
691 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
692 register afs_uint32 shost; /* Server host */
693 u_short sport; /* Server port */
694 u_short sservice; /* Server service id */
695 register struct rx_securityClass *securityObject;
696 int serviceSecurityIndex;
700 register struct rx_connection *conn;
705 dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
706 shost, sport, sservice, securityObject, serviceSecurityIndex));
708 /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
709 * the case of kmem_alloc? */
710 conn = rxi_AllocConnection();
711 #ifdef RX_ENABLE_LOCKS
712 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
713 MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
714 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
718 MUTEX_ENTER(&rx_connHashTable_lock);
719 cid = (rx_nextCid += RX_MAXCALLS);
720 conn->type = RX_CLIENT_CONNECTION;
722 conn->epoch = rx_epoch;
723 conn->peer = rxi_FindPeer(shost, sport, 0, 1);
724 conn->serviceId = sservice;
725 conn->securityObject = securityObject;
726 /* This doesn't work in all compilers with void (they're buggy), so fake it
728 conn->securityData = (VOID *) 0;
729 conn->securityIndex = serviceSecurityIndex;
730 rx_SetConnDeadTime(conn, rx_connDeadTime);
731 conn->ackRate = RX_FAST_ACK_RATE;
733 conn->specific = NULL;
734 conn->challengeEvent = (struct rxevent *)0;
735 conn->delayedAbortEvent = (struct rxevent *)0;
736 conn->abortCount = 0;
739 RXS_NewConnection(securityObject, conn);
740 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
742 conn->refCount++; /* no lock required since only this thread knows... */
743 conn->next = rx_connHashTable[hashindex];
744 rx_connHashTable[hashindex] = conn;
745 MUTEX_ENTER(&rx_stats_mutex);
746 rx_stats.nClientConns++;
747 MUTEX_EXIT(&rx_stats_mutex);
749 MUTEX_EXIT(&rx_connHashTable_lock);
755 void rx_SetConnDeadTime(conn, seconds)
756 register struct rx_connection *conn;
757 register int seconds;
759 /* The idea is to set the dead time to a value that allows several
760 * keepalives to be dropped without timing out the connection. */
761 conn->secondsUntilDead = MAX(seconds, 6);
762 conn->secondsUntilPing = conn->secondsUntilDead/6;
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
769 * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770 * NOTE: must not be called with rx_connHashTable_lock held.
772 void rxi_CleanupConnection(conn)
773 struct rx_connection *conn;
777 /* Notify the service exporter, if requested, that this connection
778 * is being destroyed */
779 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780 (*conn->service->destroyConnProc)(conn);
782 /* Notify the security module that this connection is being destroyed */
783 RXS_DestroyConnection(conn->securityObject, conn);
785 /* If this is the last connection using the rx_peer struct, set its
786 * idle time to now. rxi_ReapConnections will reap it if it's still
787 * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
789 MUTEX_ENTER(&rx_peerHashTable_lock);
790 if (--conn->peer->refCount <= 0) {
791 conn->peer->idleWhen = clock_Sec();
792 if (conn->peer->refCount < 0) {
793 conn->peer->refCount = 0;
794 MUTEX_ENTER(&rx_stats_mutex);
795 rxi_lowPeerRefCount ++;
796 MUTEX_EXIT(&rx_stats_mutex);
799 MUTEX_EXIT(&rx_peerHashTable_lock);
801 MUTEX_ENTER(&rx_stats_mutex);
802 if (conn->type == RX_SERVER_CONNECTION)
803 rx_stats.nServerConns--;
805 rx_stats.nClientConns--;
806 MUTEX_EXIT(&rx_stats_mutex);
809 if (conn->specific) {
810 for (i = 0 ; i < conn->nSpecific ; i++) {
811 if (conn->specific[i] && rxi_keyCreate_destructor[i])
812 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813 conn->specific[i] = NULL;
815 free(conn->specific);
817 conn->specific = NULL;
821 MUTEX_DESTROY(&conn->conn_call_lock);
822 MUTEX_DESTROY(&conn->conn_data_lock);
823 CV_DESTROY(&conn->conn_call_cv);
825 rxi_FreeConnection(conn);
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(conn)
830 register struct rx_connection *conn;
832 MUTEX_ENTER(&rx_connHashTable_lock);
833 rxi_DestroyConnectionNoLock(conn);
834 /* conn should be at the head of the cleanup list */
835 if (conn == rx_connCleanup_list) {
836 rx_connCleanup_list = rx_connCleanup_list->next;
837 MUTEX_EXIT(&rx_connHashTable_lock);
838 rxi_CleanupConnection(conn);
840 #ifdef RX_ENABLE_LOCKS
842 MUTEX_EXIT(&rx_connHashTable_lock);
844 #endif /* RX_ENABLE_LOCKS */
847 static void rxi_DestroyConnectionNoLock(conn)
848 register struct rx_connection *conn;
850 register struct rx_connection **conn_ptr;
851 register int havecalls = 0;
852 struct rx_packet *packet;
859 MUTEX_ENTER(&conn->conn_data_lock);
860 if (conn->refCount > 0)
863 MUTEX_ENTER(&rx_stats_mutex);
864 rxi_lowConnRefCount++;
865 MUTEX_EXIT(&rx_stats_mutex);
868 if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
869 /* Busy; wait till the last guy before proceeding */
870 MUTEX_EXIT(&conn->conn_data_lock);
875 /* If the client previously called rx_NewCall, but it is still
876 * waiting, treat this as a running call, and wait to destroy the
877 * connection later when the call completes. */
878 if ((conn->type == RX_CLIENT_CONNECTION) &&
879 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
880 conn->flags |= RX_CONN_DESTROY_ME;
881 MUTEX_EXIT(&conn->conn_data_lock);
885 MUTEX_EXIT(&conn->conn_data_lock);
887 /* Check for extant references to this connection */
888 for (i = 0; i<RX_MAXCALLS; i++) {
889 register struct rx_call *call = conn->call[i];
892 if (conn->type == RX_CLIENT_CONNECTION) {
893 MUTEX_ENTER(&call->lock);
894 if (call->delayedAckEvent) {
895 /* Push the final acknowledgment out now--there
896 * won't be a subsequent call to acknowledge the
897 * last reply packets */
898 rxevent_Cancel(call->delayedAckEvent, call,
899 RX_CALL_REFCOUNT_DELAY);
900 if (call->state == RX_STATE_PRECALL ||
901 call->state == RX_STATE_ACTIVE) {
902 rxi_SendDelayedAck(call->delayedAckEvent, call, 0);
904 rxi_AckAll((struct rxevent *)0, call, 0);
907 MUTEX_EXIT(&call->lock);
911 #ifdef RX_ENABLE_LOCKS
913 if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
914 MUTEX_EXIT(&conn->conn_data_lock);
917 /* Someone is accessing a packet right now. */
921 #endif /* RX_ENABLE_LOCKS */
924 /* Don't destroy the connection if there are any call
925 * structures still in use */
926 MUTEX_ENTER(&conn->conn_data_lock);
927 conn->flags |= RX_CONN_DESTROY_ME;
928 MUTEX_EXIT(&conn->conn_data_lock);
933 if (conn->delayedAbortEvent) {
934 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
935 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
937 MUTEX_ENTER(&conn->conn_data_lock);
938 rxi_SendConnectionAbort(conn, packet, 0, 1);
939 MUTEX_EXIT(&conn->conn_data_lock);
940 rxi_FreePacket(packet);
944 /* Remove from connection hash table before proceeding */
945 conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
946 conn->epoch, conn->type) ];
947 for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
948 if (*conn_ptr == conn) {
949 *conn_ptr = conn->next;
953 /* if the conn that we are destroying was the last connection, then we
954 * clear rxLastConn as well */
955 if ( rxLastConn == conn )
958 /* Make sure the connection is completely reset before deleting it. */
959 /* get rid of pending events that could zap us later */
960 if (conn->challengeEvent) {
961 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
964 /* Add the connection to the list of destroyed connections that
965 * need to be cleaned up. This is necessary to avoid deadlocks
966 * in the routines we call to inform others that this connection is
967 * being destroyed. */
968 conn->next = rx_connCleanup_list;
969 rx_connCleanup_list = conn;
972 /* Externally available version */
973 void rx_DestroyConnection(conn)
974 register struct rx_connection *conn;
980 rxi_DestroyConnection (conn);
985 /* Start a new rx remote procedure call, on the specified connection.
986 * If wait is set to 1, wait for a free call channel; otherwise return
987 * 0. Maxtime gives the maximum number of seconds this call may take,
988 * after rx_MakeCall returns. After this time interval, a call to any
989 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
990 * For fine grain locking, we hold the conn_call_lock in order to
991 * to ensure that we don't get signalle after we found a call in an active
992 * state and before we go to sleep.
994 struct rx_call *rx_NewCall(conn)
995 register struct rx_connection *conn;
998 register struct rx_call *call;
999 struct clock queueTime;
1003 dpf (("rx_MakeCall(conn %x)\n", conn));
1006 clock_GetTime(&queueTime);
1008 MUTEX_ENTER(&conn->conn_call_lock);
1011 * Check if there are others waiting for a new call.
1012 * If so, let them go first to avoid starving them.
1013 * This is a fairly simple scheme, and might not be
1014 * a complete solution for large numbers of waiters.
1016 if (conn->makeCallWaiters) {
1017 #ifdef RX_ENABLE_LOCKS
1018 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1025 for (i=0; i<RX_MAXCALLS; i++) {
1026 call = conn->call[i];
1028 MUTEX_ENTER(&call->lock);
1029 if (call->state == RX_STATE_DALLY) {
1030 rxi_ResetCall(call, 0);
1031 (*call->callNumber)++;
1034 MUTEX_EXIT(&call->lock);
1037 call = rxi_NewCall(conn, i);
1041 if (i < RX_MAXCALLS) {
1044 MUTEX_ENTER(&conn->conn_data_lock);
1045 conn->flags |= RX_CONN_MAKECALL_WAITING;
1046 MUTEX_EXIT(&conn->conn_data_lock);
1048 conn->makeCallWaiters++;
1049 #ifdef RX_ENABLE_LOCKS
1050 CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1054 conn->makeCallWaiters--;
1057 * Wake up anyone else who might be giving us a chance to
1058 * run (see code above that avoids resource starvation).
1060 #ifdef RX_ENABLE_LOCKS
1061 CV_BROADCAST(&conn->conn_call_cv);
1066 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1068 /* Client is initially in send mode */
1069 call->state = RX_STATE_ACTIVE;
1070 call->mode = RX_MODE_SENDING;
1072 /* remember start time for call in case we have hard dead time limit */
1073 call->queueTime = queueTime;
1074 clock_GetTime(&call->startTime);
1075 hzero(call->bytesSent);
1076 hzero(call->bytesRcvd);
1078 /* Turn on busy protocol. */
1079 rxi_KeepAliveOn(call);
1081 MUTEX_EXIT(&call->lock);
1082 MUTEX_EXIT(&conn->conn_call_lock);
1086 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1087 /* Now, if TQ wasn't cleared earlier, do it now. */
1089 MUTEX_ENTER(&call->lock);
1090 while (call->flags & RX_CALL_TQ_BUSY) {
1091 call->flags |= RX_CALL_TQ_WAIT;
1092 #ifdef RX_ENABLE_LOCKS
1093 CV_WAIT(&call->cv_tq, &call->lock);
1094 #else /* RX_ENABLE_LOCKS */
1095 osi_rxSleep(&call->tq);
1096 #endif /* RX_ENABLE_LOCKS */
1098 if (call->flags & RX_CALL_TQ_CLEARME) {
1099 rxi_ClearTransmitQueue(call, 0);
1100 queue_Init(&call->tq);
1102 MUTEX_EXIT(&call->lock);
1104 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1110 rxi_HasActiveCalls(aconn)
1111 register struct rx_connection *aconn; {
1113 register struct rx_call *tcall;
1117 for(i=0; i<RX_MAXCALLS; i++) {
1118 if ((tcall = aconn->call[i])) {
1119 if ((tcall->state == RX_STATE_ACTIVE)
1120 || (tcall->state == RX_STATE_PRECALL)) {
1131 rxi_GetCallNumberVector(aconn, aint32s)
1132 register struct rx_connection *aconn;
1133 register afs_int32 *aint32s; {
1135 register struct rx_call *tcall;
1139 for(i=0; i<RX_MAXCALLS; i++) {
1140 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1141 aint32s[i] = aconn->callNumber[i]+1;
1143 aint32s[i] = aconn->callNumber[i];
1150 rxi_SetCallNumberVector(aconn, aint32s)
1151 register struct rx_connection *aconn;
1152 register afs_int32 *aint32s; {
1154 register struct rx_call *tcall;
1158 for(i=0; i<RX_MAXCALLS; i++) {
1159 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1160 aconn->callNumber[i] = aint32s[i] - 1;
1162 aconn->callNumber[i] = aint32s[i];
1168 /* Advertise a new service. A service is named locally by a UDP port
1169 * number plus a 16-bit service id. Returns (struct rx_service *) 0
1172 rx_NewService(port, serviceId, serviceName, securityObjects,
1173 nSecurityObjects, serviceProc)
1176 char *serviceName; /* Name for identification purposes (e.g. the
1177 * service name might be used for probing for
1179 struct rx_securityClass **securityObjects;
1180 int nSecurityObjects;
1181 afs_int32 (*serviceProc)();
1183 osi_socket socket = OSI_NULLSOCKET;
1184 register struct rx_service *tservice;
1190 if (serviceId == 0) {
1191 (osi_Msg "rx_NewService: service id for service %s is not non-zero.\n",
1197 (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1204 tservice = rxi_AllocService();
1207 for (i = 0; i<RX_MAX_SERVICES; i++) {
1208 register struct rx_service *service = rx_services[i];
1210 if (port == service->servicePort) {
1211 if (service->serviceId == serviceId) {
1212 /* The identical service has already been
1213 * installed; if the caller was intending to
1214 * change the security classes used by this
1215 * service, he/she loses. */
1216 (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1219 rxi_FreeService(tservice);
1222 /* Different service, same port: re-use the socket
1223 * which is bound to the same port */
1224 socket = service->socket;
1227 if (socket == OSI_NULLSOCKET) {
1228 /* If we don't already have a socket (from another
1229 * service on same port) get a new one */
1230 socket = rxi_GetUDPSocket(port);
1231 if (socket == OSI_NULLSOCKET) {
1234 rxi_FreeService(tservice);
1239 service->socket = socket;
1240 service->servicePort = port;
1241 service->serviceId = serviceId;
1242 service->serviceName = serviceName;
1243 service->nSecurityObjects = nSecurityObjects;
1244 service->securityObjects = securityObjects;
1245 service->minProcs = 0;
1246 service->maxProcs = 1;
1247 service->idleDeadTime = 60;
1248 service->connDeadTime = rx_connDeadTime;
1249 service->executeRequestProc = serviceProc;
1250 rx_services[i] = service; /* not visible until now */
1258 rxi_FreeService(tservice);
1259 (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1263 /* Generic request processing loop. This routine should be called
1264 * by the implementation dependent rx_ServerProc. If socketp is
1265 * non-null, it will be set to the file descriptor that this thread
1266 * is now listening on. If socketp is null, this routine will never
1268 void rxi_ServerProc(threadID, newcall, socketp)
1270 struct rx_call *newcall;
1271 osi_socket *socketp;
1273 register struct rx_call *call;
1274 register afs_int32 code;
1275 register struct rx_service *tservice = NULL;
1282 call = rx_GetCall(threadID, tservice, socketp);
1283 if (socketp && *socketp != OSI_NULLSOCKET) {
1284 /* We are now a listener thread */
1289 /* if server is restarting( typically smooth shutdown) then do not
1290 * allow any new calls.
1293 if ( rx_tranquil && (call != NULL) ) {
1298 MUTEX_ENTER(&call->lock);
1300 rxi_CallError(call, RX_RESTARTING);
1301 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1303 MUTEX_EXIT(&call->lock);
1309 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1310 #ifdef RX_ENABLE_LOCKS
1312 #endif /* RX_ENABLE_LOCKS */
1313 afs_termState = AFSOP_STOP_AFS;
1314 afs_osi_Wakeup(&afs_termState);
1315 #ifdef RX_ENABLE_LOCKS
1317 #endif /* RX_ENABLE_LOCKS */
1322 tservice = call->conn->service;
1324 if (tservice->beforeProc) (*tservice->beforeProc)(call);
1326 code = call->conn->service->executeRequestProc(call);
1328 if (tservice->afterProc) (*tservice->afterProc)(call, code);
1330 rx_EndCall(call, code);
1331 MUTEX_ENTER(&rx_stats_mutex);
1333 MUTEX_EXIT(&rx_stats_mutex);
1338 void rx_WakeupServerProcs()
1340 struct rx_serverQueueEntry *np, *tqp;
1345 MUTEX_ENTER(&rx_serverPool_lock);
1347 #ifdef RX_ENABLE_LOCKS
1348 if (rx_waitForPacket)
1349 CV_BROADCAST(&rx_waitForPacket->cv);
1350 #else /* RX_ENABLE_LOCKS */
1351 if (rx_waitForPacket)
1352 osi_rxWakeup(rx_waitForPacket);
1353 #endif /* RX_ENABLE_LOCKS */
1354 MUTEX_ENTER(&freeSQEList_lock);
1355 for (np = rx_FreeSQEList; np; np = tqp) {
1356 tqp = *(struct rx_serverQueueEntry **)np;
1357 #ifdef RX_ENABLE_LOCKS
1358 CV_BROADCAST(&np->cv);
1359 #else /* RX_ENABLE_LOCKS */
1361 #endif /* RX_ENABLE_LOCKS */
1363 MUTEX_EXIT(&freeSQEList_lock);
1364 for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1365 #ifdef RX_ENABLE_LOCKS
1366 CV_BROADCAST(&np->cv);
1367 #else /* RX_ENABLE_LOCKS */
1369 #endif /* RX_ENABLE_LOCKS */
1371 MUTEX_EXIT(&rx_serverPool_lock);
1377 * One thing that seems to happen is that all the server threads get
1378 * tied up on some empty or slow call, and then a whole bunch of calls
1379 * arrive at once, using up the packet pool, so now there are more
1380 * empty calls. The most critical resources here are server threads
1381 * and the free packet pool. The "doreclaim" code seems to help in
1382 * general. I think that eventually we arrive in this state: there
1383 * are lots of pending calls which do have all their packets present,
1384 * so they won't be reclaimed, are multi-packet calls, so they won't
1385 * be scheduled until later, and thus are tying up most of the free
1386 * packet pool for a very long time.
1388 * 1. schedule multi-packet calls if all the packets are present.
1389 * Probably CPU-bound operation, useful to return packets to pool.
1390 * Do what if there is a full window, but the last packet isn't here?
1391 * 3. preserve one thread which *only* runs "best" calls, otherwise
1392 * it sleeps and waits for that type of call.
1393 * 4. Don't necessarily reserve a whole window for each thread. In fact,
1394 * the current dataquota business is badly broken. The quota isn't adjusted
1395 * to reflect how many packets are presently queued for a running call.
1396 * So, when we schedule a queued call with a full window of packets queued
1397 * up for it, that *should* free up a window full of packets for other 2d-class
1398 * calls to be able to use from the packet pool. But it doesn't.
1400 * NB. Most of the time, this code doesn't run -- since idle server threads
1401 * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1402 * as a new call arrives.
1404 /* Sleep until a call arrives. Returns a pointer to the call, ready
1405 * for an rx_Read. */
1406 #ifdef RX_ENABLE_LOCKS
1408 rx_GetCall(tno, cur_service, socketp)
1410 struct rx_service *cur_service;
1411 osi_socket *socketp;
1413 struct rx_serverQueueEntry *sq;
1414 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1415 struct rx_service *service = NULL;
1418 MUTEX_ENTER(&freeSQEList_lock);
1420 if ((sq = rx_FreeSQEList)) {
1421 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1422 MUTEX_EXIT(&freeSQEList_lock);
1423 } else { /* otherwise allocate a new one and return that */
1424 MUTEX_EXIT(&freeSQEList_lock);
1425 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1426 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1427 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1430 MUTEX_ENTER(&rx_serverPool_lock);
1431 if (cur_service != NULL) {
1432 ReturnToServerPool(cur_service);
1435 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1436 register struct rx_call *tcall, *ncall;
1437 choice2 = (struct rx_call *) 0;
1438 /* Scan for eligible incoming calls. A call is not eligible
1439 * if the maximum number of calls for its service type are
1440 * already executing */
1441 /* One thread will process calls FCFS (to prevent starvation),
1442 * while the other threads may run ahead looking for calls which
1443 * have all their input data available immediately. This helps
1444 * keep threads from blocking, waiting for data from the client. */
1445 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1446 service = tcall->conn->service;
1447 if (!QuotaOK(service)) {
1450 if (!tno || !tcall->queue_item_header.next ) {
1451 /* If we're thread 0, then we'll just use
1452 * this call. If we haven't been able to find an optimal
1453 * choice, and we're at the end of the list, then use a
1454 * 2d choice if one has been identified. Otherwise... */
1455 call = (choice2 ? choice2 : tcall);
1456 service = call->conn->service;
1457 } else if (!queue_IsEmpty(&tcall->rq)) {
1458 struct rx_packet *rp;
1459 rp = queue_First(&tcall->rq, rx_packet);
1460 if (rp->header.seq == 1) {
1461 if (!meltdown_1pkt ||
1462 (rp->header.flags & RX_LAST_PACKET)) {
1464 } else if (rxi_2dchoice && !choice2 &&
1465 !(tcall->flags & RX_CALL_CLEARED) &&
1466 (tcall->rprev > rxi_HardAckRate)) {
1468 } else rxi_md2cnt++;
1474 ReturnToServerPool(service);
1481 rxi_ServerThreadSelectingCall = 1;
1482 MUTEX_EXIT(&rx_serverPool_lock);
1483 MUTEX_ENTER(&call->lock);
1484 MUTEX_ENTER(&rx_serverPool_lock);
1486 if (queue_IsEmpty(&call->rq) ||
1487 queue_First(&call->rq, rx_packet)->header.seq != 1)
1488 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1490 CLEAR_CALL_QUEUE_LOCK(call);
1492 MUTEX_EXIT(&call->lock);
1493 ReturnToServerPool(service);
1494 rxi_ServerThreadSelectingCall = 0;
1495 CV_SIGNAL(&rx_serverPool_cv);
1496 call = (struct rx_call*)0;
1499 call->flags &= (~RX_CALL_WAIT_PROC);
1500 MUTEX_ENTER(&rx_stats_mutex);
1502 MUTEX_EXIT(&rx_stats_mutex);
1503 rxi_ServerThreadSelectingCall = 0;
1504 CV_SIGNAL(&rx_serverPool_cv);
1505 MUTEX_EXIT(&rx_serverPool_lock);
1509 /* If there are no eligible incoming calls, add this process
1510 * to the idle server queue, to wait for one */
1514 *socketp = OSI_NULLSOCKET;
1516 sq->socketp = socketp;
1517 queue_Append(&rx_idleServerQueue, sq);
1518 #ifndef AFS_AIX41_ENV
1519 rx_waitForPacket = sq;
1520 #endif /* AFS_AIX41_ENV */
1522 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1524 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1525 MUTEX_EXIT(&rx_serverPool_lock);
1526 return (struct rx_call *)0;
1529 } while (!(call = sq->newcall) &&
1530 !(socketp && *socketp != OSI_NULLSOCKET));
1531 MUTEX_EXIT(&rx_serverPool_lock);
1533 MUTEX_ENTER(&call->lock);
1539 MUTEX_ENTER(&freeSQEList_lock);
1540 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1541 rx_FreeSQEList = sq;
1542 MUTEX_EXIT(&freeSQEList_lock);
1545 clock_GetTime(&call->startTime);
1546 call->state = RX_STATE_ACTIVE;
1547 call->mode = RX_MODE_RECEIVING;
1549 rxi_calltrace(RX_CALL_START, call);
1550 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1551 call->conn->service->servicePort,
1552 call->conn->service->serviceId, call));
1554 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1555 MUTEX_EXIT(&call->lock);
1557 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1562 #else /* RX_ENABLE_LOCKS */
1564 rx_GetCall(tno, cur_service, socketp)
1566 struct rx_service *cur_service;
1567 osi_socket *socketp;
1569 struct rx_serverQueueEntry *sq;
1570 register struct rx_call *call = (struct rx_call *) 0, *choice2;
1571 struct rx_service *service = NULL;
1576 MUTEX_ENTER(&freeSQEList_lock);
1578 if ((sq = rx_FreeSQEList)) {
1579 rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1580 MUTEX_EXIT(&freeSQEList_lock);
1581 } else { /* otherwise allocate a new one and return that */
1582 MUTEX_EXIT(&freeSQEList_lock);
1583 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1584 MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1585 CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1587 MUTEX_ENTER(&sq->lock);
1589 if (cur_service != NULL) {
1590 cur_service->nRequestsRunning--;
1591 if (cur_service->nRequestsRunning < cur_service->minProcs)
1595 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1596 register struct rx_call *tcall, *ncall;
1597 /* Scan for eligible incoming calls. A call is not eligible
1598 * if the maximum number of calls for its service type are
1599 * already executing */
1600 /* One thread will process calls FCFS (to prevent starvation),
1601 * while the other threads may run ahead looking for calls which
1602 * have all their input data available immediately. This helps
1603 * keep threads from blocking, waiting for data from the client. */
1604 choice2 = (struct rx_call *) 0;
1605 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1606 service = tcall->conn->service;
1607 if (QuotaOK(service)) {
1608 if (!tno || !tcall->queue_item_header.next ) {
1609 /* If we're thread 0, then we'll just use
1610 * this call. If we haven't been able to find an optimal
1611 * choice, and we're at the end of the list, then use a
1612 * 2d choice if one has been identified. Otherwise... */
1613 call = (choice2 ? choice2 : tcall);
1614 service = call->conn->service;
1615 } else if (!queue_IsEmpty(&tcall->rq)) {
1616 struct rx_packet *rp;
1617 rp = queue_First(&tcall->rq, rx_packet);
1618 if (rp->header.seq == 1
1619 && (!meltdown_1pkt ||
1620 (rp->header.flags & RX_LAST_PACKET))) {
1622 } else if (rxi_2dchoice && !choice2 &&
1623 !(tcall->flags & RX_CALL_CLEARED) &&
1624 (tcall->rprev > rxi_HardAckRate)) {
1626 } else rxi_md2cnt++;
1636 /* we can't schedule a call if there's no data!!! */
1637 /* send an ack if there's no data, if we're missing the
1638 * first packet, or we're missing something between first
1639 * and last -- there's a "hole" in the incoming data. */
1640 if (queue_IsEmpty(&call->rq) ||
1641 queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1642 call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1643 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1645 call->flags &= (~RX_CALL_WAIT_PROC);
1646 service->nRequestsRunning++;
1647 /* just started call in minProcs pool, need fewer to maintain
1649 if (service->nRequestsRunning <= service->minProcs)
1653 /* MUTEX_EXIT(&call->lock); */
1656 /* If there are no eligible incoming calls, add this process
1657 * to the idle server queue, to wait for one */
1660 *socketp = OSI_NULLSOCKET;
1662 sq->socketp = socketp;
1663 queue_Append(&rx_idleServerQueue, sq);
1667 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1670 return (struct rx_call *)0;
1673 } while (!(call = sq->newcall) &&
1674 !(socketp && *socketp != OSI_NULLSOCKET));
1676 MUTEX_EXIT(&sq->lock);
1678 MUTEX_ENTER(&freeSQEList_lock);
1679 *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1680 rx_FreeSQEList = sq;
1681 MUTEX_EXIT(&freeSQEList_lock);
1684 clock_GetTime(&call->startTime);
1685 call->state = RX_STATE_ACTIVE;
1686 call->mode = RX_MODE_RECEIVING;
1688 rxi_calltrace(RX_CALL_START, call);
1689 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1690 call->conn->service->servicePort,
1691 call->conn->service->serviceId, call));
1693 dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1701 #endif /* RX_ENABLE_LOCKS */
1705 /* Establish a procedure to be called when a packet arrives for a
1706 * call. This routine will be called at most once after each call,
1707 * and will also be called if there is an error condition on the or
1708 * the call is complete. Used by multi rx to build a selection
1709 * function which determines which of several calls is likely to be a
1710 * good one to read from.
1711 * NOTE: the way this is currently implemented it is probably only a
1712 * good idea to (1) use it immediately after a newcall (clients only)
1713 * and (2) only use it once. Other uses currently void your warranty
1715 void rx_SetArrivalProc(call, proc, handle, arg)
1716 register struct rx_call *call;
1717 register VOID (*proc)();
1718 register VOID *handle;
1721 call->arrivalProc = proc;
1722 call->arrivalProcHandle = handle;
1723 call->arrivalProcArg = arg;
1726 /* Call is finished (possibly prematurely). Return rc to the peer, if
1727 * appropriate, and return the final error code from the conversation
1730 afs_int32 rx_EndCall(call, rc)
1731 register struct rx_call *call;
1734 register struct rx_connection *conn = call->conn;
1735 register struct rx_service *service;
1736 register struct rx_packet *tp; /* Temporary packet pointer */
1737 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1741 dpf(("rx_EndCall(call %x)\n", call));
1745 MUTEX_ENTER(&call->lock);
1747 if (rc == 0 && call->error == 0) {
1748 call->abortCode = 0;
1749 call->abortCount = 0;
1752 call->arrivalProc = (VOID (*)()) 0;
1753 if (rc && call->error == 0) {
1754 rxi_CallError(call, rc);
1755 /* Send an abort message to the peer if this error code has
1756 * only just been set. If it was set previously, assume the
1757 * peer has already been sent the error code or will request it
1759 rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1761 if (conn->type == RX_SERVER_CONNECTION) {
1762 /* Make sure reply or at least dummy reply is sent */
1763 if (call->mode == RX_MODE_RECEIVING) {
1764 rxi_WriteProc(call, 0, 0);
1766 if (call->mode == RX_MODE_SENDING) {
1767 rxi_FlushWrite(call);
1769 service = conn->service;
1770 rxi_calltrace(RX_CALL_END, call);
1771 /* Call goes to hold state until reply packets are acknowledged */
1772 if (call->tfirst + call->nSoftAcked < call->tnext) {
1773 call->state = RX_STATE_HOLD;
1775 call->state = RX_STATE_DALLY;
1776 rxi_ClearTransmitQueue(call, 0);
1777 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1778 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1781 else { /* Client connection */
1783 /* Make sure server receives input packets, in the case where
1784 * no reply arguments are expected */
1785 if ((call->mode == RX_MODE_SENDING)
1786 || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1787 (void) rxi_ReadProc(call, &dummy, 1);
1789 /* We need to release the call lock since it's lower than the
1790 * conn_call_lock and we don't want to hold the conn_call_lock
1791 * over the rx_ReadProc call. The conn_call_lock needs to be held
1792 * here for the case where rx_NewCall is perusing the calls on
1793 * the connection structure. We don't want to signal until
1794 * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1795 * have checked this call, found it active and by the time it
1796 * goes to sleep, will have missed the signal.
1798 MUTEX_EXIT(&call->lock);
1799 MUTEX_ENTER(&conn->conn_call_lock);
1800 MUTEX_ENTER(&call->lock);
1801 MUTEX_ENTER(&conn->conn_data_lock);
1802 conn->flags |= RX_CONN_BUSY;
1803 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1804 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1805 MUTEX_EXIT(&conn->conn_data_lock);
1806 #ifdef RX_ENABLE_LOCKS
1807 CV_BROADCAST(&conn->conn_call_cv);
1812 #ifdef RX_ENABLE_LOCKS
1814 MUTEX_EXIT(&conn->conn_data_lock);
1816 #endif /* RX_ENABLE_LOCKS */
1817 call->state = RX_STATE_DALLY;
1819 error = call->error;
1821 /* currentPacket, nLeft, and NFree must be zeroed here, because
1822 * ResetCall cannot: ResetCall may be called at splnet(), in the
1823 * kernel version, and may interrupt the macros rx_Read or
1824 * rx_Write, which run at normal priority for efficiency. */
1825 if (call->currentPacket) {
1826 rxi_FreePacket(call->currentPacket);
1827 call->currentPacket = (struct rx_packet *) 0;
1828 call->nLeft = call->nFree = call->curlen = 0;
1831 call->nLeft = call->nFree = call->curlen = 0;
1833 /* Free any packets from the last call to ReadvProc/WritevProc */
1834 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1839 CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1840 MUTEX_EXIT(&call->lock);
1841 if (conn->type == RX_CLIENT_CONNECTION) {
1842 MUTEX_EXIT(&conn->conn_call_lock);
1843 conn->flags &= ~RX_CONN_BUSY;
1848 * Map errors to the local host's errno.h format.
1850 error = ntoh_syserr_conv(error);
1854 #if !defined(KERNEL)
1856 /* Call this routine when shutting down a server or client (especially
1857 * clients). This will allow Rx to gracefully garbage collect server
1858 * connections, and reduce the number of retries that a server might
1859 * make to a dead client.
1860 * This is not quite right, since some calls may still be ongoing and
1861 * we can't lock them to destroy them. */
1862 void rx_Finalize() {
1863 register struct rx_connection **conn_ptr, **conn_end;
1867 if (rxinit_status == 1) {
1869 return; /* Already shutdown. */
1871 rxi_DeleteCachedConnections();
1872 if (rx_connHashTable) {
1873 MUTEX_ENTER(&rx_connHashTable_lock);
1874 for (conn_ptr = &rx_connHashTable[0],
1875 conn_end = &rx_connHashTable[rx_hashTableSize];
1876 conn_ptr < conn_end; conn_ptr++) {
1877 struct rx_connection *conn, *next;
1878 for (conn = *conn_ptr; conn; conn = next) {
1880 if (conn->type == RX_CLIENT_CONNECTION) {
1881 /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1883 /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1884 #ifdef RX_ENABLE_LOCKS
1885 rxi_DestroyConnectionNoLock(conn);
1886 #else /* RX_ENABLE_LOCKS */
1887 rxi_DestroyConnection(conn);
1888 #endif /* RX_ENABLE_LOCKS */
1892 #ifdef RX_ENABLE_LOCKS
1893 while (rx_connCleanup_list) {
1894 struct rx_connection *conn;
1895 conn = rx_connCleanup_list;
1896 rx_connCleanup_list = rx_connCleanup_list->next;
1897 MUTEX_EXIT(&rx_connHashTable_lock);
1898 rxi_CleanupConnection(conn);
1899 MUTEX_ENTER(&rx_connHashTable_lock);
1901 MUTEX_EXIT(&rx_connHashTable_lock);
1902 #endif /* RX_ENABLE_LOCKS */
1911 /* if we wakeup packet waiter too often, can get in loop with two
1912 AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1914 rxi_PacketsUnWait() {
1916 if (!rx_waitingForPackets) {
1920 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1921 return; /* still over quota */
1924 rx_waitingForPackets = 0;
1925 #ifdef RX_ENABLE_LOCKS
1926 CV_BROADCAST(&rx_waitingForPackets_cv);
1928 osi_rxWakeup(&rx_waitingForPackets);
1934 /* ------------------Internal interfaces------------------------- */
1936 /* Return this process's service structure for the
1937 * specified socket and service */
1938 struct rx_service *rxi_FindService(socket, serviceId)
1939 register osi_socket socket;
1940 register u_short serviceId;
1942 register struct rx_service **sp;
1943 for (sp = &rx_services[0]; *sp; sp++) {
1944 if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
1950 /* Allocate a call structure, for the indicated channel of the
1951 * supplied connection. The mode and state of the call must be set by
1952 * the caller. Returns the call with mutex locked. */
1953 struct rx_call *rxi_NewCall(conn, channel)
1954 register struct rx_connection *conn;
1955 register int channel;
1957 register struct rx_call *call;
1958 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1959 register struct rx_call *cp; /* Call pointer temp */
1960 register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
1961 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1963 /* Grab an existing call structure, or allocate a new one.
1964 * Existing call structures are assumed to have been left reset by
1966 MUTEX_ENTER(&rx_freeCallQueue_lock);
1968 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1970 * EXCEPT that the TQ might not yet be cleared out.
1971 * Skip over those with in-use TQs.
1974 for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1975 if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1981 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1982 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1983 call = queue_First(&rx_freeCallQueue, rx_call);
1984 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1986 MUTEX_ENTER(&rx_stats_mutex);
1987 rx_stats.nFreeCallStructs--;
1988 MUTEX_EXIT(&rx_stats_mutex);
1989 MUTEX_EXIT(&rx_freeCallQueue_lock);
1990 MUTEX_ENTER(&call->lock);
1991 CLEAR_CALL_QUEUE_LOCK(call);
1992 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1993 /* Now, if TQ wasn't cleared earlier, do it now. */
1994 if (call->flags & RX_CALL_TQ_CLEARME) {
1995 rxi_ClearTransmitQueue(call, 0);
1996 queue_Init(&call->tq);
1998 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1999 /* Bind the call to its connection structure */
2001 rxi_ResetCall(call, 1);
2004 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
2006 MUTEX_EXIT(&rx_freeCallQueue_lock);
2007 MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
2008 MUTEX_ENTER(&call->lock);
2009 CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
2010 CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
2011 CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
2013 MUTEX_ENTER(&rx_stats_mutex);
2014 rx_stats.nCallStructs++;
2015 MUTEX_EXIT(&rx_stats_mutex);
2016 /* Initialize once-only items */
2017 queue_Init(&call->tq);
2018 queue_Init(&call->rq);
2019 queue_Init(&call->iovq);
2020 /* Bind the call to its connection structure (prereq for reset) */
2022 rxi_ResetCall(call, 1);
2024 call->channel = channel;
2025 call->callNumber = &conn->callNumber[channel];
2026 /* Note that the next expected call number is retained (in
2027 * conn->callNumber[i]), even if we reallocate the call structure
2029 conn->call[channel] = call;
2030 /* if the channel's never been used (== 0), we should start at 1, otherwise
2031 the call number is valid from the last time this channel was used */
2032 if (*call->callNumber == 0) *call->callNumber = 1;
2037 /* A call has been inactive long enough that so we can throw away
2038 * state, including the call structure, which is placed on the call
2040 * Call is locked upon entry.
2042 #ifdef RX_ENABLE_LOCKS
2043 void rxi_FreeCall(call, haveCTLock)
2044 int haveCTLock; /* Set if called from rxi_ReapConnections */
2045 #else /* RX_ENABLE_LOCKS */
2046 void rxi_FreeCall(call)
2047 #endif /* RX_ENABLE_LOCKS */
2048 register struct rx_call *call;
2050 register int channel = call->channel;
2051 register struct rx_connection *conn = call->conn;
2054 if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2055 (*call->callNumber)++;
2056 rxi_ResetCall(call, 0);
2057 call->conn->call[channel] = (struct rx_call *) 0;
2059 MUTEX_ENTER(&rx_freeCallQueue_lock);
2060 SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2061 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2062 /* A call may be free even though its transmit queue is still in use.
2063 * Since we search the call list from head to tail, put busy calls at
2064 * the head of the list, and idle calls at the tail.
2066 if (call->flags & RX_CALL_TQ_BUSY)
2067 queue_Prepend(&rx_freeCallQueue, call);
2069 queue_Append(&rx_freeCallQueue, call);
2070 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2071 queue_Append(&rx_freeCallQueue, call);
2072 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2073 MUTEX_ENTER(&rx_stats_mutex);
2074 rx_stats.nFreeCallStructs++;
2075 MUTEX_EXIT(&rx_stats_mutex);
2077 MUTEX_EXIT(&rx_freeCallQueue_lock);
2079 /* Destroy the connection if it was previously slated for
2080 * destruction, i.e. the Rx client code previously called
2081 * rx_DestroyConnection (client connections), or
2082 * rxi_ReapConnections called the same routine (server
2083 * connections). Only do this, however, if there are no
2084 * outstanding calls. Note that for fine grain locking, there appears
2085 * to be a deadlock in that rxi_FreeCall has a call locked and
2086 * DestroyConnectionNoLock locks each call in the conn. But note a
2087 * few lines up where we have removed this call from the conn.
2088 * If someone else destroys a connection, they either have no
2089 * call lock held or are going through this section of code.
2091 if (conn->flags & RX_CONN_DESTROY_ME) {
2092 MUTEX_ENTER(&conn->conn_data_lock);
2094 MUTEX_EXIT(&conn->conn_data_lock);
2095 #ifdef RX_ENABLE_LOCKS
2097 rxi_DestroyConnectionNoLock(conn);
2099 rxi_DestroyConnection(conn);
2100 #else /* RX_ENABLE_LOCKS */
2101 rxi_DestroyConnection(conn);
2102 #endif /* RX_ENABLE_LOCKS */
2106 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2107 char *rxi_Alloc(size)
2108 register size_t size;
2112 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2113 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2116 int glockOwner = ISAFS_GLOCK();
2120 MUTEX_ENTER(&rx_stats_mutex);
2121 rxi_Alloccnt++; rxi_Allocsize += size;
2122 MUTEX_EXIT(&rx_stats_mutex);
2123 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2124 if (size > AFS_SMALLOCSIZ) {
2125 p = (char *) osi_AllocMediumSpace(size);
2127 p = (char *) osi_AllocSmall(size, 1);
2128 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2133 p = (char *) osi_Alloc(size);
2135 if (!p) osi_Panic("rxi_Alloc error");
2140 void rxi_Free(addr, size)
2142 register size_t size;
2144 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2145 /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2148 int glockOwner = ISAFS_GLOCK();
2152 MUTEX_ENTER(&rx_stats_mutex);
2153 rxi_Alloccnt--; rxi_Allocsize -= size;
2154 MUTEX_EXIT(&rx_stats_mutex);
2155 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2156 if (size > AFS_SMALLOCSIZ)
2157 osi_FreeMediumSpace(addr);
2159 osi_FreeSmall(addr);
2160 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2165 osi_Free(addr, size);
2169 /* Find the peer process represented by the supplied (host,port)
2170 * combination. If there is no appropriate active peer structure, a
2171 * new one will be allocated and initialized
2172 * The origPeer, if set, is a pointer to a peer structure on which the
2173 * refcount will be be decremented. This is used to replace the peer
2174 * structure hanging off a connection structure */
2175 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2176 register afs_uint32 host;
2177 register u_short port;
2178 struct rx_peer *origPeer;
2181 register struct rx_peer *pp;
2183 hashIndex = PEER_HASH(host, port);
2184 MUTEX_ENTER(&rx_peerHashTable_lock);
2185 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2186 if ((pp->host == host) && (pp->port == port)) break;
2190 pp = rxi_AllocPeer(); /* This bzero's *pp */
2191 pp->host = host; /* set here or in InitPeerParams is zero */
2193 MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2194 queue_Init(&pp->congestionQueue);
2195 queue_Init(&pp->rpcStats);
2196 pp->next = rx_peerHashTable[hashIndex];
2197 rx_peerHashTable[hashIndex] = pp;
2198 rxi_InitPeerParams(pp);
2199 MUTEX_ENTER(&rx_stats_mutex);
2200 rx_stats.nPeerStructs++;
2201 MUTEX_EXIT(&rx_stats_mutex);
2208 origPeer->refCount--;
2209 MUTEX_EXIT(&rx_peerHashTable_lock);
2214 /* Find the connection at (host, port) started at epoch, and with the
2215 * given connection id. Creates the server connection if necessary.
2216 * The type specifies whether a client connection or a server
2217 * connection is desired. In both cases, (host, port) specify the
2218 * peer's (host, pair) pair. Client connections are not made
2219 * automatically by this routine. The parameter socket gives the
2220 * socket descriptor on which the packet was received. This is used,
2221 * in the case of server connections, to check that *new* connections
2222 * come via a valid (port, serviceId). Finally, the securityIndex
2223 * parameter must match the existing index for the connection. If a
2224 * server connection is created, it will be created using the supplied
2225 * index, if the index is valid for this service */
2226 struct rx_connection *
2227 rxi_FindConnection(socket, host, port, serviceId, cid,
2228 epoch, type, securityIndex)
2230 register afs_int32 host;
2231 register u_short port;
2236 u_int securityIndex;
2238 int hashindex, flag;
2239 register struct rx_connection *conn;
2240 struct rx_peer *peer;
2241 hashindex = CONN_HASH(host, port, cid, epoch, type);
2242 MUTEX_ENTER(&rx_connHashTable_lock);
2243 rxLastConn ? (conn = rxLastConn, flag = 0) :
2244 (conn = rx_connHashTable[hashindex], flag = 1);
2246 if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid)
2247 && (epoch == conn->epoch)) {
2248 register struct rx_peer *pp = conn->peer;
2249 if (securityIndex != conn->securityIndex) {
2250 /* this isn't supposed to happen, but someone could forge a packet
2251 like this, and there seems to be some CM bug that makes this
2252 happen from time to time -- in which case, the fileserver
2254 MUTEX_EXIT(&rx_connHashTable_lock);
2255 return (struct rx_connection *) 0;
2257 /* epoch's high order bits mean route for security reasons only on
2258 * the cid, not the host and port fields.
2260 if (conn->epoch & 0x80000000) break;
2261 if (((type == RX_CLIENT_CONNECTION)
2262 || (pp->host == host)) && (pp->port == port))
2267 /* the connection rxLastConn that was used the last time is not the
2268 ** one we are looking for now. Hence, start searching in the hash */
2270 conn = rx_connHashTable[hashindex];
2276 struct rx_service *service;
2277 if (type == RX_CLIENT_CONNECTION) {
2278 MUTEX_EXIT(&rx_connHashTable_lock);
2279 return (struct rx_connection *) 0;
2281 service = rxi_FindService(socket, serviceId);
2282 if (!service || (securityIndex >= service->nSecurityObjects)
2283 || (service->securityObjects[securityIndex] == 0)) {
2284 MUTEX_EXIT(&rx_connHashTable_lock);
2285 return (struct rx_connection *) 0;
2287 conn = rxi_AllocConnection(); /* This bzero's the connection */
2288 MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2290 MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2292 CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2293 conn->next = rx_connHashTable[hashindex];
2294 rx_connHashTable[hashindex] = conn;
2295 peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2296 conn->type = RX_SERVER_CONNECTION;
2297 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
2298 conn->epoch = epoch;
2299 conn->cid = cid & RX_CIDMASK;
2300 /* conn->serial = conn->lastSerial = 0; */
2301 /* conn->timeout = 0; */
2302 conn->ackRate = RX_FAST_ACK_RATE;
2303 conn->service = service;
2304 conn->serviceId = serviceId;
2305 conn->securityIndex = securityIndex;
2306 conn->securityObject = service->securityObjects[securityIndex];
2307 conn->nSpecific = 0;
2308 conn->specific = NULL;
2309 rx_SetConnDeadTime(conn, service->connDeadTime);
2310 /* Notify security object of the new connection */
2311 RXS_NewConnection(conn->securityObject, conn);
2312 /* XXXX Connection timeout? */
2313 if (service->newConnProc) (*service->newConnProc)(conn);
2314 MUTEX_ENTER(&rx_stats_mutex);
2315 rx_stats.nServerConns++;
2316 MUTEX_EXIT(&rx_stats_mutex);
2320 /* Ensure that the peer structure is set up in such a way that
2321 ** replies in this connection go back to that remote interface
2322 ** from which the last packet was sent out. In case, this packet's
2323 ** source IP address does not match the peer struct for this conn,
2324 ** then drop the refCount on conn->peer and get a new peer structure.
2325 ** We can check the host,port field in the peer structure without the
2326 ** rx_peerHashTable_lock because the peer structure has its refCount
2327 ** incremented and the only time the host,port in the peer struct gets
2328 ** updated is when the peer structure is created.
2330 if (conn->peer->host == host )
2331 peer = conn->peer; /* no change to the peer structure */
2333 peer = rxi_FindPeer(host, port, conn->peer, 1);
2336 MUTEX_ENTER(&conn->conn_data_lock);
2339 MUTEX_EXIT(&conn->conn_data_lock);
2341 rxLastConn = conn; /* store this connection as the last conn used */
2342 MUTEX_EXIT(&rx_connHashTable_lock);
2346 /* There are two packet tracing routines available for testing and monitoring
2347 * Rx. One is called just after every packet is received and the other is
2348 * called just before every packet is sent. Received packets, have had their
2349 * headers decoded, and packets to be sent have not yet had their headers
2350 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
2351 * containing the network address. Both can be modified. The return value, if
2352 * non-zero, indicates that the packet should be dropped. */
2354 int (*rx_justReceived)() = 0;
2355 int (*rx_almostSent)() = 0;
2357 /* A packet has been received off the interface. Np is the packet, socket is
2358 * the socket number it was received from (useful in determining which service
2359 * this packet corresponds to), and (host, port) reflect the host,port of the
2360 * sender. This call returns the packet to the caller if it is finished with
2361 * it, rather than de-allocating it, just as a small performance hack */
2363 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2364 register struct rx_packet *np;
2369 struct rx_call **newcallp;
2371 register struct rx_call *call;
2372 register struct rx_connection *conn;
2374 afs_uint32 currentCallNumber;
2380 struct rx_packet *tnp;
2383 /* We don't print out the packet until now because (1) the time may not be
2384 * accurate enough until now in the lwp implementation (rx_Listener only gets
2385 * the time after the packet is read) and (2) from a protocol point of view,
2386 * this is the first time the packet has been seen */
2387 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2388 ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2389 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2390 np->header.serial, packetType, host, port, np->header.serviceId,
2391 np->header.epoch, np->header.cid, np->header.callNumber,
2392 np->header.seq, np->header.flags, np));
2395 if(np->header.type == RX_PACKET_TYPE_VERSION) {
2396 return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2399 if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2400 return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2403 /* If an input tracer function is defined, call it with the packet and
2404 * network address. Note this function may modify its arguments. */
2405 if (rx_justReceived) {
2406 struct sockaddr_in addr;
2408 addr.sin_family = AF_INET;
2409 addr.sin_port = port;
2410 addr.sin_addr.s_addr = host;
2411 #if defined(AFS_OSF_ENV) && defined(_KERNEL)
2412 addr.sin_len = sizeof(addr);
2413 #endif /* AFS_OSF_ENV */
2414 drop = (*rx_justReceived) (np, &addr);
2415 /* drop packet if return value is non-zero */
2416 if (drop) return np;
2417 port = addr.sin_port; /* in case fcn changed addr */
2418 host = addr.sin_addr.s_addr;
2422 /* If packet was not sent by the client, then *we* must be the client */
2423 type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2424 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2426 /* Find the connection (or fabricate one, if we're the server & if
2427 * necessary) associated with this packet */
2428 conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2429 np->header.cid, np->header.epoch, type,
2430 np->header.securityIndex);
2433 /* If no connection found or fabricated, just ignore the packet.
2434 * (An argument could be made for sending an abort packet for
2439 MUTEX_ENTER(&conn->conn_data_lock);
2440 if (conn->maxSerial < np->header.serial)
2441 conn->maxSerial = np->header.serial;
2442 MUTEX_EXIT(&conn->conn_data_lock);
2444 /* If the connection is in an error state, send an abort packet and ignore
2445 * the incoming packet */
2447 /* Don't respond to an abort packet--we don't want loops! */
2448 MUTEX_ENTER(&conn->conn_data_lock);
2449 if (np->header.type != RX_PACKET_TYPE_ABORT)
2450 np = rxi_SendConnectionAbort(conn, np, 1, 0);
2452 MUTEX_EXIT(&conn->conn_data_lock);
2456 /* Check for connection-only requests (i.e. not call specific). */
2457 if (np->header.callNumber == 0) {
2458 switch (np->header.type) {
2459 case RX_PACKET_TYPE_ABORT:
2460 /* What if the supplied error is zero? */
2461 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2462 MUTEX_ENTER(&conn->conn_data_lock);
2464 MUTEX_EXIT(&conn->conn_data_lock);
2466 case RX_PACKET_TYPE_CHALLENGE:
2467 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2468 MUTEX_ENTER(&conn->conn_data_lock);
2470 MUTEX_EXIT(&conn->conn_data_lock);
2472 case RX_PACKET_TYPE_RESPONSE:
2473 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2474 MUTEX_ENTER(&conn->conn_data_lock);
2476 MUTEX_EXIT(&conn->conn_data_lock);
2478 case RX_PACKET_TYPE_PARAMS:
2479 case RX_PACKET_TYPE_PARAMS+1:
2480 case RX_PACKET_TYPE_PARAMS+2:
2481 /* ignore these packet types for now */
2482 MUTEX_ENTER(&conn->conn_data_lock);
2484 MUTEX_EXIT(&conn->conn_data_lock);
2489 /* Should not reach here, unless the peer is broken: send an
2491 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2492 MUTEX_ENTER(&conn->conn_data_lock);
2493 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2495 MUTEX_EXIT(&conn->conn_data_lock);
2500 channel = np->header.cid & RX_CHANNELMASK;
2501 call = conn->call[channel];
2502 #ifdef RX_ENABLE_LOCKS
2504 MUTEX_ENTER(&call->lock);
2505 /* Test to see if call struct is still attached to conn. */
2506 if (call != conn->call[channel]) {
2508 MUTEX_EXIT(&call->lock);
2509 if (type == RX_SERVER_CONNECTION) {
2510 call = conn->call[channel];
2511 /* If we started with no call attached and there is one now,
2512 * another thread is also running this routine and has gotten
2513 * the connection channel. We should drop this packet in the tests
2514 * below. If there was a call on this connection and it's now
2515 * gone, then we'll be making a new call below.
2516 * If there was previously a call and it's now different then
2517 * the old call was freed and another thread running this routine
2518 * has created a call on this channel. One of these two threads
2519 * has a packet for the old call and the code below handles those
2523 MUTEX_ENTER(&call->lock);
2526 /* This packet can't be for this call. If the new call address is
2527 * 0 then no call is running on this channel. If there is a call
2528 * then, since this is a client connection we're getting data for
2529 * it must be for the previous call.
2531 MUTEX_ENTER(&rx_stats_mutex);
2532 rx_stats.spuriousPacketsRead++;
2533 MUTEX_EXIT(&rx_stats_mutex);
2534 MUTEX_ENTER(&conn->conn_data_lock);
2536 MUTEX_EXIT(&conn->conn_data_lock);
2541 currentCallNumber = conn->callNumber[channel];
2543 if (type == RX_SERVER_CONNECTION) { /* We're the server */
2544 if (np->header.callNumber < currentCallNumber) {
2545 MUTEX_ENTER(&rx_stats_mutex);
2546 rx_stats.spuriousPacketsRead++;
2547 MUTEX_EXIT(&rx_stats_mutex);
2548 #ifdef RX_ENABLE_LOCKS
2550 MUTEX_EXIT(&call->lock);
2552 MUTEX_ENTER(&conn->conn_data_lock);
2554 MUTEX_EXIT(&conn->conn_data_lock);
2558 call = rxi_NewCall(conn, channel);
2559 *call->callNumber = np->header.callNumber;
2560 call->state = RX_STATE_PRECALL;
2561 clock_GetTime(&call->queueTime);
2562 hzero(call->bytesSent);
2563 hzero(call->bytesRcvd);
2564 rxi_KeepAliveOn(call);
2566 else if (np->header.callNumber != currentCallNumber) {
2567 /* Wait until the transmit queue is idle before deciding
2568 * whether to reset the current call. Chances are that the
2569 * call will be in ether DALLY or HOLD state once the TQ_BUSY
2572 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2573 while ((call->state == RX_STATE_ACTIVE) &&
2574 (call->flags & RX_CALL_TQ_BUSY)) {
2575 call->flags |= RX_CALL_TQ_WAIT;
2576 #ifdef RX_ENABLE_LOCKS
2577 CV_WAIT(&call->cv_tq, &call->lock);
2578 #else /* RX_ENABLE_LOCKS */
2579 osi_rxSleep(&call->tq);
2580 #endif /* RX_ENABLE_LOCKS */
2582 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2583 /* If the new call cannot be taken right now send a busy and set
2584 * the error condition in this call, so that it terminates as
2585 * quickly as possible */
2586 if (call->state == RX_STATE_ACTIVE) {
2587 struct rx_packet *tp;
2589 rxi_CallError(call, RX_CALL_DEAD);
2590 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2591 MUTEX_EXIT(&call->lock);
2592 MUTEX_ENTER(&conn->conn_data_lock);
2594 MUTEX_EXIT(&conn->conn_data_lock);
2597 rxi_ResetCall(call, 0);
2598 *call->callNumber = np->header.callNumber;
2599 call->state = RX_STATE_PRECALL;
2600 clock_GetTime(&call->queueTime);
2601 hzero(call->bytesSent);
2602 hzero(call->bytesRcvd);
2604 * If the number of queued calls exceeds the overload
2605 * threshold then abort this call.
2607 if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2608 struct rx_packet *tp;
2610 rxi_CallError(call, rx_BusyError);
2611 tp = rxi_SendCallAbort(call, np, 1, 0);
2612 MUTEX_EXIT(&call->lock);
2613 MUTEX_ENTER(&conn->conn_data_lock);
2615 MUTEX_EXIT(&conn->conn_data_lock);
2618 rxi_KeepAliveOn(call);
2621 /* Continuing call; do nothing here. */
2623 } else { /* we're the client */
2624 /* Ignore all incoming acknowledgements for calls in DALLY state */
2625 if ( call && (call->state == RX_STATE_DALLY)
2626 && (np->header.type == RX_PACKET_TYPE_ACK)) {
2627 MUTEX_ENTER(&rx_stats_mutex);
2628 rx_stats.ignorePacketDally++;
2629 MUTEX_EXIT(&rx_stats_mutex);
2630 #ifdef RX_ENABLE_LOCKS
2632 MUTEX_EXIT(&call->lock);
2635 MUTEX_ENTER(&conn->conn_data_lock);
2637 MUTEX_EXIT(&conn->conn_data_lock);
2641 /* Ignore anything that's not relevant to the current call. If there
2642 * isn't a current call, then no packet is relevant. */
2643 if (!call || (np->header.callNumber != currentCallNumber)) {
2644 MUTEX_ENTER(&rx_stats_mutex);
2645 rx_stats.spuriousPacketsRead++;
2646 MUTEX_EXIT(&rx_stats_mutex);
2647 #ifdef RX_ENABLE_LOCKS
2649 MUTEX_EXIT(&call->lock);
2652 MUTEX_ENTER(&conn->conn_data_lock);
2654 MUTEX_EXIT(&conn->conn_data_lock);
2657 /* If the service security object index stamped in the packet does not
2658 * match the connection's security index, ignore the packet */
2659 if (np->header.securityIndex != conn->securityIndex) {
2660 #ifdef RX_ENABLE_LOCKS
2661 MUTEX_EXIT(&call->lock);
2663 MUTEX_ENTER(&conn->conn_data_lock);
2665 MUTEX_EXIT(&conn->conn_data_lock);
2669 /* If we're receiving the response, then all transmit packets are
2670 * implicitly acknowledged. Get rid of them. */
2671 if (np->header.type == RX_PACKET_TYPE_DATA) {
2672 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2673 /* XXX Hack. Because we must release the global rx lock when
2674 * sending packets (osi_NetSend) we drop all acks while we're
2675 * traversing the tq in rxi_Start sending packets out because
2676 * packets may move to the freePacketQueue as result of being here!
2677 * So we drop these packets until we're safely out of the
2678 * traversing. Really ugly!
2679 * For fine grain RX locking, we set the acked field in the
2680 * packets and let rxi_Start remove them from the transmit queue.
2682 if (call->flags & RX_CALL_TQ_BUSY) {
2683 #ifdef RX_ENABLE_LOCKS
2684 rxi_SetAcksInTransmitQueue(call);
2687 return np; /* xmitting; drop packet */
2691 rxi_ClearTransmitQueue(call, 0);
2693 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2694 rxi_ClearTransmitQueue(call, 0);
2695 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2697 if (np->header.type == RX_PACKET_TYPE_ACK) {
2698 /* now check to see if this is an ack packet acknowledging that the
2699 * server actually *lost* some hard-acked data. If this happens we
2700 * ignore this packet, as it may indicate that the server restarted in
2701 * the middle of a call. It is also possible that this is an old ack
2702 * packet. We don't abort the connection in this case, because this
2703 * *might* just be an old ack packet. The right way to detect a server
2704 * restart in the midst of a call is to notice that the server epoch
2706 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2707 * XXX unacknowledged. I think that this is off-by-one, but
2708 * XXX I don't dare change it just yet, since it will
2709 * XXX interact badly with the server-restart detection
2710 * XXX code in receiveackpacket. */
2711 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2712 MUTEX_ENTER(&rx_stats_mutex);
2713 rx_stats.spuriousPacketsRead++;
2714 MUTEX_EXIT(&rx_stats_mutex);
2715 MUTEX_EXIT(&call->lock);
2716 MUTEX_ENTER(&conn->conn_data_lock);
2718 MUTEX_EXIT(&conn->conn_data_lock);
2722 } /* else not a data packet */
2725 osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2726 /* Set remote user defined status from packet */
2727 call->remoteStatus = np->header.userStatus;
2729 /* Note the gap between the expected next packet and the actual
2730 * packet that arrived, when the new packet has a smaller serial number
2731 * than expected. Rioses frequently reorder packets all by themselves,
2732 * so this will be quite important with very large window sizes.
2733 * Skew is checked against 0 here to avoid any dependence on the type of
2734 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
2736 * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
2737 * see CalculateRoundTripTime for an example of how to keep smoothed values.
2738 * I think using a beta of 1/8 is probably appropriate. 93.04.21
2740 MUTEX_ENTER(&conn->conn_data_lock);
2741 skew = conn->lastSerial - np->header.serial;
2742 conn->lastSerial = np->header.serial;
2743 MUTEX_EXIT(&conn->conn_data_lock);
2745 register struct rx_peer *peer;
2747 if (skew > peer->inPacketSkew) {
2748 dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2749 peer->inPacketSkew = skew;
2753 /* Now do packet type-specific processing */
2754 switch (np->header.type) {
2755 case RX_PACKET_TYPE_DATA:
2756 np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2759 case RX_PACKET_TYPE_ACK:
2760 /* Respond immediately to ack packets requesting acknowledgement
2762 if (np->header.flags & RX_REQUEST_ACK) {
2763 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2764 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2766 np = rxi_ReceiveAckPacket(call, np, 1);
2768 case RX_PACKET_TYPE_ABORT:
2769 /* An abort packet: reset the connection, passing the error up to
2771 /* What if error is zero? */
2772 rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2774 case RX_PACKET_TYPE_BUSY:
2777 case RX_PACKET_TYPE_ACKALL:
2778 /* All packets acknowledged, so we can drop all packets previously
2779 * readied for sending */
2780 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2781 /* XXX Hack. We because we can't release the global rx lock when
2782 * sending packets (osi_NetSend) we drop all ack pkts while we're
2783 * traversing the tq in rxi_Start sending packets out because
2784 * packets may move to the freePacketQueue as result of being
2785 * here! So we drop these packets until we're safely out of the
2786 * traversing. Really ugly!
2787 * For fine grain RX locking, we set the acked field in the packets
2788 * and let rxi_Start remove the packets from the transmit queue.
2790 if (call->flags & RX_CALL_TQ_BUSY) {
2791 #ifdef RX_ENABLE_LOCKS
2792 rxi_SetAcksInTransmitQueue(call);
2794 #else /* RX_ENABLE_LOCKS */
2796 return np; /* xmitting; drop packet */
2797 #endif /* RX_ENABLE_LOCKS */
2799 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2800 rxi_ClearTransmitQueue(call, 0);
2803 /* Should not reach here, unless the peer is broken: send an abort
2805 rxi_CallError(call, RX_PROTOCOL_ERROR);
2806 np = rxi_SendCallAbort(call, np, 1, 0);
2809 /* Note when this last legitimate packet was received, for keep-alive
2810 * processing. Note, we delay getting the time until now in the hope that
2811 * the packet will be delivered to the user before any get time is required
2812 * (if not, then the time won't actually be re-evaluated here). */
2813 call->lastReceiveTime = clock_Sec();
2814 MUTEX_EXIT(&call->lock);
2815 MUTEX_ENTER(&conn->conn_data_lock);
2817 MUTEX_EXIT(&conn->conn_data_lock);
2821 /* return true if this is an "interesting" connection from the point of view
2822 of someone trying to debug the system */
2823 int rxi_IsConnInteresting(struct rx_connection *aconn)
2826 register struct rx_call *tcall;
2828 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2830 for(i=0;i<RX_MAXCALLS;i++) {
2831 tcall = aconn->call[i];
2833 if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2835 if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2843 /* if this is one of the last few packets AND it wouldn't be used by the
2844 receiving call to immediately satisfy a read request, then drop it on
2845 the floor, since accepting it might prevent a lock-holding thread from
2846 making progress in its reading. If a call has been cleared while in
2847 the precall state then ignore all subsequent packets until the call
2848 is assigned to a thread. */
2850 static TooLow(ap, acall)
2851 struct rx_call *acall;
2852 struct rx_packet *ap; {
2854 MUTEX_ENTER(&rx_stats_mutex);
2855 if (((ap->header.seq != 1) &&
2856 (acall->flags & RX_CALL_CLEARED) &&
2857 (acall->state == RX_STATE_PRECALL)) ||
2858 ((rx_nFreePackets < rxi_dataQuota+2) &&
2859 !( (ap->header.seq < acall->rnext+rx_initSendWindow)
2860 && (acall->flags & RX_CALL_READER_WAIT)))) {
2863 MUTEX_EXIT(&rx_stats_mutex);
2868 /* try to attach call, if authentication is complete */
2869 static void TryAttach(acall, socket, tnop, newcallp)
2870 register struct rx_call *acall;
2871 register osi_socket socket;
2873 register struct rx_call **newcallp; {
2874 register struct rx_connection *conn;
2876 if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2877 /* Don't attach until we have any req'd. authentication. */
2878 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2879 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2880 /* Note: this does not necessarily succeed; there
2881 may not any proc available */
2884 rxi_ChallengeOn(acall->conn);
2889 /* A data packet has been received off the interface. This packet is
2890 * appropriate to the call (the call is in the right state, etc.). This
2891 * routine can return a packet to the caller, for re-use */
2893 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2894 port, tnop, newcallp)
2895 register struct rx_call *call;
2896 register struct rx_packet *np;
2902 struct rx_call **newcallp;
2908 afs_uint32 seq, serial, flags;
2910 struct rx_packet *tnp;
2912 MUTEX_ENTER(&rx_stats_mutex);
2913 rx_stats.dataPacketsRead++;
2914 MUTEX_EXIT(&rx_stats_mutex);
2917 /* If there are no packet buffers, drop this new packet, unless we can find
2918 * packet buffers from inactive calls */
2920 (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2921 MUTEX_ENTER(&rx_freePktQ_lock);
2922 rxi_NeedMorePackets = TRUE;
2923 MUTEX_EXIT(&rx_freePktQ_lock);
2924 MUTEX_ENTER(&rx_stats_mutex);
2925 rx_stats.noPacketBuffersOnRead++;
2926 MUTEX_EXIT(&rx_stats_mutex);
2927 call->rprev = np->header.serial;
2928 rxi_calltrace(RX_TRACE_DROP, call);
2929 dpf (("packet %x dropped on receipt - quota problems", np));
2931 rxi_ClearReceiveQueue(call);
2932 clock_GetTime(&when);
2933 clock_Add(&when, &rx_softAckDelay);
2934 if (!call->delayedAckEvent ||
2935 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2936 rxevent_Cancel(call->delayedAckEvent, call,
2937 RX_CALL_REFCOUNT_DELAY);
2938 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2939 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2942 /* we've damaged this call already, might as well do it in. */
2948 * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2949 * packet is one of several packets transmitted as a single
2950 * datagram. Do not send any soft or hard acks until all packets
2951 * in a jumbogram have been processed. Send negative acks right away.
2953 for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2954 /* tnp is non-null when there are more packets in the
2955 * current jumbo gram */
2962 seq = np->header.seq;
2963 serial = np->header.serial;
2964 flags = np->header.flags;
2966 /* If the call is in an error state, send an abort message */
2968 return rxi_SendCallAbort(call, np, istack, 0);
2970 /* The RX_JUMBO_PACKET is set in all but the last packet in each
2971 * AFS 3.5 jumbogram. */
2972 if (flags & RX_JUMBO_PACKET) {
2973 tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2978 if (np->header.spare != 0) {
2979 MUTEX_ENTER(&call->conn->conn_data_lock);
2980 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2981 MUTEX_EXIT(&call->conn->conn_data_lock);
2984 /* The usual case is that this is the expected next packet */
2985 if (seq == call->rnext) {
2987 /* Check to make sure it is not a duplicate of one already queued */
2988 if (queue_IsNotEmpty(&call->rq)
2989 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2990 MUTEX_ENTER(&rx_stats_mutex);
2991 rx_stats.dupPacketsRead++;
2992 MUTEX_EXIT(&rx_stats_mutex);
2993 dpf (("packet %x dropped on receipt - duplicate", np));
2994 rxevent_Cancel(call->delayedAckEvent, call,
2995 RX_CALL_REFCOUNT_DELAY);
2996 np = rxi_SendAck(call, np, seq, serial,
2997 flags, RX_ACK_DUPLICATE, istack);
3003 /* It's the next packet. Stick it on the receive queue
3004 * for this call. Set newPackets to make sure we wake
3005 * the reader once all packets have been processed */
3006 queue_Prepend(&call->rq, np);
3008 np = NULL; /* We can't use this anymore */
3011 /* If an ack is requested then set a flag to make sure we
3012 * send an acknowledgement for this packet */
3013 if (flags & RX_REQUEST_ACK) {
3017 /* Keep track of whether we have received the last packet */
3018 if (flags & RX_LAST_PACKET) {
3019 call->flags |= RX_CALL_HAVE_LAST;
3023 /* Check whether we have all of the packets for this call */
3024 if (call->flags & RX_CALL_HAVE_LAST) {
3025 afs_uint32 tseq; /* temporary sequence number */
3026 struct rx_packet *tp; /* Temporary packet pointer */
3027 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3029 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3030 if (tseq != tp->header.seq)
3032 if (tp->header.flags & RX_LAST_PACKET) {
3033 call->flags |= RX_CALL_RECEIVE_DONE;
3040 /* Provide asynchronous notification for those who want it
3041 * (e.g. multi rx) */
3042 if (call->arrivalProc) {
3043 (*call->arrivalProc)(call, call->arrivalProcHandle,
3044 call->arrivalProcArg);
3045 call->arrivalProc = (VOID (*)()) 0;
3048 /* Update last packet received */
3051 /* If there is no server process serving this call, grab
3052 * one, if available. We only need to do this once. If a
3053 * server thread is available, this thread becomes a server
3054 * thread and the server thread becomes a listener thread. */
3056 TryAttach(call, socket, tnop, newcallp);
3059 /* This is not the expected next packet. */
3061 /* Determine whether this is a new or old packet, and if it's
3062 * a new one, whether it fits into the current receive window.
3063 * Also figure out whether the packet was delivered in sequence.
3064 * We use the prev variable to determine whether the new packet
3065 * is the successor of its immediate predecessor in the
3066 * receive queue, and the missing flag to determine whether
3067 * any of this packets predecessors are missing. */
3069 afs_uint32 prev; /* "Previous packet" sequence number */
3070 struct rx_packet *tp; /* Temporary packet pointer */
3071 struct rx_packet *nxp; /* Next pointer, for queue_Scan */
3072 int missing; /* Are any predecessors missing? */
3074 /* If the new packet's sequence number has been sent to the
3075 * application already, then this is a duplicate */
3076 if (seq < call->rnext) {
3077 MUTEX_ENTER(&rx_stats_mutex);
3078 rx_stats.dupPacketsRead++;
3079 MUTEX_EXIT(&rx_stats_mutex);
3080 rxevent_Cancel(call->delayedAckEvent, call,
3081 RX_CALL_REFCOUNT_DELAY);
3082 np = rxi_SendAck(call, np, seq, serial,
3083 flags, RX_ACK_DUPLICATE, istack);
3089 /* If the sequence number is greater than what can be
3090 * accomodated by the current window, then send a negative
3091 * acknowledge and drop the packet */
3092 if ((call->rnext + call->rwind) <= seq) {
3093 rxevent_Cancel(call->delayedAckEvent, call,
3094 RX_CALL_REFCOUNT_DELAY);
3095 np = rxi_SendAck(call, np, seq, serial,
3096 flags, RX_ACK_EXCEEDS_WINDOW, istack);
3102 /* Look for the packet in the queue of old received packets */
3103 for (prev = call->rnext - 1, missing = 0,
3104 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3105 /*Check for duplicate packet */
3106 if (seq == tp->header.seq) {
3107 MUTEX_ENTER(&rx_stats_mutex);
3108 rx_stats.dupPacketsRead++;
3109 MUTEX_EXIT(&rx_stats_mutex);
3110 rxevent_Cancel(call->delayedAckEvent, call,
3111 RX_CALL_REFCOUNT_DELAY);
3112 np = rxi_SendAck(call, np, seq, serial,
3113 flags, RX_ACK_DUPLICATE, istack);
3118 /* If we find a higher sequence packet, break out and
3119 * insert the new packet here. */
3120 if (seq < tp->header.seq) break;
3121 /* Check for missing packet */
3122 if (tp->header.seq != prev+1) {
3126 prev = tp->header.seq;
3129 /* Keep track of whether we have received the last packet. */
3130 if (flags & RX_LAST_PACKET) {
3131 call->flags |= RX_CALL_HAVE_LAST;
3134 /* It's within the window: add it to the the receive queue.
3135 * tp is left by the previous loop either pointing at the
3136 * packet before which to insert the new packet, or at the
3137 * queue head if the queue is empty or the packet should be
3139 queue_InsertBefore(tp, np);
3143 /* Check whether we have all of the packets for this call */
3144 if ((call->flags & RX_CALL_HAVE_LAST)
3145 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3146 afs_uint32 tseq; /* temporary sequence number */
3148 for (tseq = call->rnext,
3149 queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3150 if (tseq != tp->header.seq)
3152 if (tp->header.flags & RX_LAST_PACKET) {
3153 call->flags |= RX_CALL_RECEIVE_DONE;
3160 /* We need to send an ack of the packet is out of sequence,
3161 * or if an ack was requested by the peer. */
3162 if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3166 /* Acknowledge the last packet for each call */
3167 if (flags & RX_LAST_PACKET) {
3178 * If the receiver is waiting for an iovec, fill the iovec
3179 * using the data from the receive queue */
3180 if (call->flags & RX_CALL_IOVEC_WAIT) {
3181 didHardAck = rxi_FillReadVec(call, seq, serial, flags);
3182 /* the call may have been aborted */
3191 /* Wakeup the reader if any */
3192 if ((call->flags & RX_CALL_READER_WAIT) &&
3193 (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3194 (call->iovNext >= call->iovMax) ||
3195 (call->flags & RX_CALL_RECEIVE_DONE))) {
3196 call->flags &= ~RX_CALL_READER_WAIT;
3197 #ifdef RX_ENABLE_LOCKS
3198 CV_BROADCAST(&call->cv_rq);
3200 osi_rxWakeup(&call->rq);
3206 * Send an ack when requested by the peer, or once every
3207 * rxi_SoftAckRate packets until the last packet has been
3208 * received. Always send a soft ack for the last packet in
3209 * the server's reply. */
3211 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3212 np = rxi_SendAck(call, np, seq, serial, flags,
3213 RX_ACK_REQUESTED, istack);
3214 } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3215 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3216 np = rxi_SendAck(call, np, seq, serial, flags,
3217 RX_ACK_IDLE, istack);
3218 } else if (call->nSoftAcks) {
3219 clock_GetTime(&when);
3220 if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3221 clock_Add(&when, &rx_lastAckDelay);
3223 clock_Add(&when, &rx_softAckDelay);
3225 if (!call->delayedAckEvent ||
3226 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3227 rxevent_Cancel(call->delayedAckEvent, call,
3228 RX_CALL_REFCOUNT_DELAY);
3229 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3230 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3233 } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3234 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3241 static void rxi_ComputeRate();
3244 /* The real smarts of the whole thing. */
3245 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3246 register struct rx_call *call;
3247 struct rx_packet *np;
3250 struct rx_ackPacket *ap;
3252 register struct rx_packet *tp;
3253 register struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
3254 register struct rx_connection *conn = call->conn;
3255 struct rx_peer *peer = conn->peer;
3258 /* because there are CM's that are bogus, sending weird values for this. */
3259 afs_uint32 skew = 0;
3264 int newAckCount = 0;
3265 u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
3266 int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3268 MUTEX_ENTER(&rx_stats_mutex);
3269 rx_stats.ackPacketsRead++;
3270 MUTEX_EXIT(&rx_stats_mutex);
3271 ap = (struct rx_ackPacket *) rx_DataOf(np);
3272 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3274 return np; /* truncated ack packet */
3276 /* depends on ack packet struct */
3277 nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3278 first = ntohl(ap->firstPacket);
3279 serial = ntohl(ap->serial);
3280 /* temporarily disabled -- needs to degrade over time
3281 skew = ntohs(ap->maxSkew); */
3283 /* Ignore ack packets received out of order */
3284 if (first < call->tfirst) {
3288 if (np->header.flags & RX_SLOW_START_OK) {
3289 call->flags |= RX_CALL_SLOW_START_OK;
3295 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3296 ap->reason, ntohl(ap->previousPacket),
3297 (unsigned int) np->header.seq, (unsigned int) serial,
3298 (unsigned int) skew, ntohl(ap->firstPacket));
3301 for (offset = 0; offset < nAcks; offset++)
3302 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3308 /* if a server connection has been re-created, it doesn't remember what
3309 serial # it was up to. An ack will tell us, since the serial field
3310 contains the largest serial received by the other side */
3311 MUTEX_ENTER(&conn->conn_data_lock);
3312 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3313 conn->serial = serial+1;
3315 MUTEX_EXIT(&conn->conn_data_lock);
3317 /* Update the outgoing packet skew value to the latest value of
3318 * the peer's incoming packet skew value. The ack packet, of
3319 * course, could arrive out of order, but that won't affect things
3321 MUTEX_ENTER(&peer->peer_lock);
3322 peer->outPacketSkew = skew;
3324 /* Check for packets that no longer need to be transmitted, and
3325 * discard them. This only applies to packets positively
3326 * acknowledged as having been sent to the peer's upper level.
3327 * All other packets must be retained. So only packets with
3328 * sequence numbers < ap->firstPacket are candidates. */
3329 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3330 if (tp->header.seq >= first) break;
3331 call->tfirst = tp->header.seq + 1;
3332 if (tp->header.serial == serial) {
3333 /* Use RTT if not delayed by client. */
3334 if (ap->reason != RX_ACK_DELAY)
3335 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3337 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3340 else if (tp->firstSerial == serial) {
3341 /* Use RTT if not delayed by client. */
3342 if (ap->reason != RX_ACK_DELAY)
3343 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3345 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3348 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3349 /* XXX Hack. Because we have to release the global rx lock when sending
3350 * packets (osi_NetSend) we drop all acks while we're traversing the tq
3351 * in rxi_Start sending packets out because packets may move to the
3352 * freePacketQueue as result of being here! So we drop these packets until
3353 * we're safely out of the traversing. Really ugly!
3354 * To make it even uglier, if we're using fine grain locking, we can
3355 * set the ack bits in the packets and have rxi_Start remove the packets
3356 * when it's done transmitting.
3358 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3361 if (call->flags & RX_CALL_TQ_BUSY) {
3362 #ifdef RX_ENABLE_LOCKS
3363 tp->flags |= RX_PKTFLAG_ACKED;
3364 call->flags |= RX_CALL_TQ_SOME_ACKED;
3365 #else /* RX_ENABLE_LOCKS */
3367 #endif /* RX_ENABLE_LOCKS */
3369 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3372 rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3377 /* Give rate detector a chance to respond to ping requests */
3378 if (ap->reason == RX_ACK_PING_RESPONSE) {
3379 rxi_ComputeRate(peer, call, 0, np, ap->reason);
3383 /* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
3385 /* Now go through explicit acks/nacks and record the results in
3386 * the waiting packets. These are packets that can't be released
3387 * yet, even with a positive acknowledge. This positive
3388 * acknowledge only means the packet has been received by the
3389 * peer, not that it will be retained long enough to be sent to
3390 * the peer's upper level. In addition, reset the transmit timers
3391 * of any missing packets (those packets that must be missing
3392 * because this packet was out of sequence) */
3394 call->nSoftAcked = 0;
3395 for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3396 /* Update round trip time if the ack was stimulated on receipt
3398 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3399 #ifdef RX_ENABLE_LOCKS
3400 if (tp->header.seq >= first) {
3401 #endif /* RX_ENABLE_LOCKS */
3402 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3403 if (tp->header.serial == serial) {
3404 /* Use RTT if not delayed by client. */
3405 if (ap->reason != RX_ACK_DELAY)
3406 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3408 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3411 else if ((tp->firstSerial == serial)) {
3412 /* Use RTT if not delayed by client. */
3413 if (ap->reason != RX_ACK_DELAY)
3414 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3416 rxi_ComputeRate(peer, call, tp, np, ap->reason);
3419 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3420 #ifdef RX_ENABLE_LOCKS
3422 #endif /* RX_ENABLE_LOCKS */
3423 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3425 /* Set the acknowledge flag per packet based on the
3426 * information in the ack packet. An acknowlegded packet can
3427 * be downgraded when the server has discarded a packet it
3428 * soacked previously, or when an ack packet is received
3429 * out of sequence. */
3430 if (tp->header.seq < first) {
3431 /* Implicit ack information */
3432 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3435 tp->flags |= RX_PKTFLAG_ACKED;
3437 else if (tp->header.seq < first + nAcks) {
3438 /* Explicit ack information: set it in the packet appropriately */
3439 if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3440 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3442 tp->flags |= RX_PKTFLAG_ACKED;
3450 tp->flags &= ~RX_PKTFLAG_ACKED;
3455 tp->flags &= ~RX_PKTFLAG_ACKED;
3459 /* If packet isn't yet acked, and it has been transmitted at least
3460 * once, reset retransmit time using latest timeout
3461 * ie, this should readjust the retransmit timer for all outstanding
3462 * packets... So we don't just retransmit when we should know better*/
3464 if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3465 tp->retryTime = tp->timeSent;
3466 clock_Add(&tp->retryTime, &peer->timeout);
3467 /* shift by eight because one quarter-sec ~ 256 milliseconds */
3468 clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3472 /* If the window has been extended by this acknowledge packet,
3473 * then wakeup a sender waiting in alloc for window space, or try
3474 * sending packets now, if he's been sitting on packets due to
3475 * lack of window space */
3476 if (call->tnext < (call->tfirst + call->twind)) {
3477 #ifdef RX_ENABLE_LOCKS
3478 CV_SIGNAL(&call->cv_twind);
3480 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3481 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3482 osi_rxWakeup(&call->twind);
3485 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3486 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3490 /* if the ack packet has a receivelen field hanging off it,
3491 * update our state */
3492 if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3495 /* If the ack packet has a "recommended" size that is less than
3496 * what I am using now, reduce my size to match */
3497 rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3498 sizeof(afs_int32), &tSize);
3499 tSize = (afs_uint32) ntohl(tSize);
3500 peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3502 /* Get the maximum packet size to send to this peer */
3503 rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3505 tSize = (afs_uint32)ntohl(tSize);
3506 tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3507 tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3509 /* sanity check - peer might have restarted with different params.
3510 * If peer says "send less", dammit, send less... Peer should never
3511 * be unable to accept packets of the size that prior AFS versions would
3512 * send without asking. */
3513 if (peer->maxMTU != tSize) {
3514 peer->maxMTU = tSize;
3515 peer->MTU = MIN(tSize, peer->MTU);
3516 call->MTU = MIN(call->MTU, tSize);
3520 if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3522 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3523 sizeof(afs_int32), &tSize);
3524 tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
3525 if (tSize < call->twind) { /* smaller than our send */
3526 call->twind = tSize; /* window, we must send less... */
3527 call->ssthresh = MIN(call->twind, call->ssthresh);
3530 /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3531 * network MTU confused with the loopback MTU. Calculate the
3532 * maximum MTU here for use in the slow start code below.
3534 maxMTU = peer->maxMTU;
3535 /* Did peer restart with older RX version? */
3536 if (peer->maxDgramPackets > 1) {
3537 peer->maxDgramPackets = 1;
3539 } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3541 rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3542 sizeof(afs_int32), &tSize);
3543 tSize = (afs_uint32) ntohl(tSize);
3545 * As of AFS 3.5 we set the send window to match the receive window.
3547 if (tSize < call->twind) {
3548 call->twind = tSize;
3549 call->ssthresh = MIN(call->twind, call->ssthresh);
3550 } else if (tSize > call->twind) {
3551 call->twind = tSize;
3555 * As of AFS 3.5, a jumbogram is more than one fixed size
3556 * packet transmitted in a single UDP datagram. If the remote
3557 * MTU is smaller than our local MTU then never send a datagram
3558 * larger than the natural MTU.
3560 rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3561 sizeof(afs_int32), &tSize);
3562 maxDgramPackets = (afs_uint32) ntohl(tSize);
3563 maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3564 maxDgramPackets = MIN(maxDgramPackets,
3565 (int)(peer->ifDgramPackets));
3566 maxDgramPackets = MIN(maxDgramPackets, tSize);
3567 if (maxDgramPackets > 1) {
3568 peer->maxDgramPackets = maxDgramPackets;
3569 call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3571 peer->maxDgramPackets = 1;
3572 call->MTU = peer->natMTU;
3574 } else if (peer->maxDgramPackets > 1) {
3575 /* Restarted with lower version of RX */
3576 peer->maxDgramPackets = 1;
3578 } else if (peer->maxDgramPackets > 1 ||
3579 peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3580 /* Restarted with lower version of RX */
3581 peer->maxMTU = OLD_MAX_PACKET_SIZE;
3582 peer->natMTU = OLD_MAX_PACKET_SIZE;
3583 peer->MTU = OLD_MAX_PACKET_SIZE;
3584 peer->maxDgramPackets = 1;
3585 peer->nDgramPackets = 1;
3587 call->MTU = OLD_MAX_PACKET_SIZE;
3592 * Calculate how many datagrams were successfully received after
3593 * the first missing packet and adjust the negative ack counter
3598 nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3599 if (call->nNacks < nNacked) {
3600 call->nNacks = nNacked;
3609 if (call->flags & RX_CALL_FAST_RECOVER) {
3611 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3613 call->flags &= ~RX_CALL_FAST_RECOVER;
3614 call->cwind = call->nextCwind;
3615 call->nextCwind = 0;
3618 call->nCwindAcks = 0;
3620 else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3621 /* Three negative acks in a row trigger congestion recovery */
3622 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3623 MUTEX_EXIT(&peer->peer_lock);
3624 if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3625 /* someone else is waiting to start recovery */
3628 call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3629 while (call->flags & RX_CALL_TQ_BUSY) {
3630 call->flags |= RX_CALL_TQ_WAIT;
3631 #ifdef RX_ENABLE_LOCKS
3632 CV_WAIT(&call->cv_tq, &call->lock);
3633 #else /* RX_ENABLE_LOCKS */
3634 osi_rxSleep(&call->tq);
3635 #endif /* RX_ENABLE_LOCKS */
3637 MUTEX_ENTER(&peer->peer_lock);
3638 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3639 call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3640 call->flags |= RX_CALL_FAST_RECOVER;
3641 call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3642 call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3644 call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3645 call->nextCwind = call->ssthresh;
3648 peer->MTU = call->MTU;
3649 peer->cwind = call->nextCwind;
3650 peer->nDgramPackets = call->nDgramPackets;
3652 call->congestSeq = peer->congestSeq;
3653 /* Reset the resend times on the packets that were nacked
3654 * so we will retransmit as soon as the window permits*/
3655 for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3657 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3658 clock_Zero(&tp->retryTime);
3660 } else if (tp->flags & RX_PKTFLAG_ACKED) {
3665 /* If cwind is smaller than ssthresh, then increase
3666 * the window one packet for each ack we receive (exponential
3668 * If cwind is greater than or equal to ssthresh then increase
3669 * the congestion window by one packet for each cwind acks we
3670 * receive (linear growth). */
3671 if (call->cwind < call->ssthresh) {
3672 call->cwind = MIN((int)call->ssthresh,
3673 (int)(call->cwind + newAckCount));
3674 call->nCwindAcks = 0;
3676 call->nCwindAcks += newAckCount;
3677 if (call->nCwindAcks >= call->cwind) {
3678 call->nCwindAcks = 0;
3679 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3683 * If we have received several acknowledgements in a row then
3684 * it is time to increase the size of our datagrams
3686 if ((int)call->nAcks > rx_nDgramThreshold) {
3687 if (peer->maxDgramPackets > 1) {
3688 if (call->nDgramPackets < peer->maxDgramPackets) {
3689 call->nDgramPackets++;
3691 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3692 } else if (call->MTU < peer->maxMTU) {
3693 call->MTU += peer->natMTU;
3694 call->MTU = MIN(call->MTU, peer->maxMTU);
3700 MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3702 /* Servers need to hold the call until all response packets have
3703 * been acknowledged. Soft acks are good enough since clients
3704 * are not allowed to clear their receive queues. */
3705 if (call->state == RX_STATE_HOLD &&
3706 call->tfirst + call->nSoftAcked >= call->tnext) {
3707 call->state = RX_STATE_DALLY;
3708 rxi_ClearTransmitQueue(call, 0);
3709 } else if (!queue_IsEmpty(&call->tq)) {
3710 rxi_Start(0, call, istack);
3715 /* Received a response to a challenge packet */
3716 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3717 register struct rx_connection *conn;
3718 register struct rx_packet *np;
3723 /* Ignore the packet if we're the client */
3724 if (conn->type == RX_CLIENT_CONNECTION) return np;
3726 /* If already authenticated, ignore the packet (it's probably a retry) */
3727 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3730 /* Otherwise, have the security object evaluate the response packet */
3731 error = RXS_CheckResponse(conn->securityObject, conn, np);
3733 /* If the response is invalid, reset the connection, sending
3734 * an abort to the peer */
3738 rxi_ConnectionError(conn, error);
3739 MUTEX_ENTER(&conn->conn_data_lock);
3740 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3741 MUTEX_EXIT(&conn->conn_data_lock);
3745 /* If the response is valid, any calls waiting to attach
3746 * servers can now do so */
3748 for (i=0; i<RX_MAXCALLS; i++) {
3749 struct rx_call *call = conn->call[i];
3751 MUTEX_ENTER(&call->lock);
3752 if (call->state == RX_STATE_PRECALL)
3753 rxi_AttachServerProc(call, -1, NULL, NULL);
3754 MUTEX_EXIT(&call->lock);
3761 /* A client has received an authentication challenge: the security
3762 * object is asked to cough up a respectable response packet to send
3763 * back to the server. The server is responsible for retrying the
3764 * challenge if it fails to get a response. */
3767 rxi_ReceiveChallengePacket(conn, np, istack)
3768 register struct rx_connection *conn;
3769 register struct rx_packet *np;
3774 /* Ignore the challenge if we're the server */
3775 if (conn->type == RX_SERVER_CONNECTION) return np;
3777 /* Ignore the challenge if the connection is otherwise idle; someone's
3778 * trying to use us as an oracle. */
3779 if (!rxi_HasActiveCalls(conn)) return np;
3781 /* Send the security object the challenge packet. It is expected to fill
3782 * in the response. */
3783 error = RXS_GetResponse(conn->securityObject, conn, np);
3785 /* If the security object is unable to return a valid response, reset the
3786 * connection and send an abort to the peer. Otherwise send the response
3787 * packet to the peer connection. */
3789 rxi_ConnectionError(conn, error);
3790 MUTEX_ENTER(&conn->conn_data_lock);
3791 np = rxi_SendConnectionAbort(conn, np, istack, 0);
3792 MUTEX_EXIT(&conn->conn_data_lock);
3795 np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3796 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3802 /* Find an available server process to service the current request in
3803 * the given call structure. If one isn't available, queue up this
3804 * call so it eventually gets one */
3806 rxi_AttachServerProc(call, socket, tnop, newcallp)
3807 register struct rx_call *call;
3808 register osi_socket socket;
3810 register struct rx_call **newcallp;
3812 register struct rx_serverQueueEntry *sq;
3813 register struct rx_service *service = call->conn->service;
3814 #ifdef RX_ENABLE_LOCKS
3815 register int haveQuota = 0;
3816 #endif /* RX_ENABLE_LOCKS */
3817 /* May already be attached */
3818 if (call->state == RX_STATE_ACTIVE) return;
3820 MUTEX_ENTER(&rx_serverPool_lock);
3821 #ifdef RX_ENABLE_LOCKS
3822 while(rxi_ServerThreadSelectingCall) {
3823 MUTEX_EXIT(&call->lock);
3824 CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3825 MUTEX_EXIT(&rx_serverPool_lock);
3826 MUTEX_ENTER(&call->lock);
3827 MUTEX_ENTER(&rx_serverPool_lock);
3828 /* Call may have been attached */
3829 if (call->state == RX_STATE_ACTIVE) return;
3832 haveQuota = QuotaOK(service);
3833 if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3834 /* If there are no processes available to service this call,
3835 * put the call on the incoming call queue (unless it's
3836 * already on the queue).
3839 ReturnToServerPool(service);
3840 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3841 call->flags |= RX_CALL_WAIT_PROC;
3842 MUTEX_ENTER(&rx_stats_mutex);
3844 MUTEX_EXIT(&rx_stats_mutex);
3845 rxi_calltrace(RX_CALL_ARRIVAL, call);
3846 SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3847 queue_Append(&rx_incomingCallQueue, call);
3850 #else /* RX_ENABLE_LOCKS */
3851 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3852 /* If there are no processes available to service this call,
3853 * put the call on the incoming call queue (unless it's
3854 * already on the queue).
3856 if (!(call->flags & RX_CALL_WAIT_PROC)) {
3857 call->flags |= RX_CALL_WAIT_PROC;
3859 rxi_calltrace(RX_CALL_ARRIVAL, call);
3860 queue_Append(&rx_incomingCallQueue, call);
3863 #endif /* RX_ENABLE_LOCKS */
3865 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3867 /* If hot threads are enabled, and both newcallp and sq->socketp
3868 * are non-null, then this thread will process the call, and the
3869 * idle server thread will start listening on this threads socket.
3872 if (rx_enable_hot_thread && newcallp && sq->socketp) {
3875 *sq->socketp = socket;
3876 clock_GetTime(&call->startTime);
3877 CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3881 if (call->flags & RX_CALL_WAIT_PROC) {
3882 /* Conservative: I don't think this should happen */
3883 call->flags &= ~RX_CALL_WAIT_PROC;
3884 MUTEX_ENTER(&rx_stats_mutex);
3886 MUTEX_EXIT(&rx_stats_mutex);
3889 call->state = RX_STATE_ACTIVE;
3890 call->mode = RX_MODE_RECEIVING;
3891 if (call->flags & RX_CALL_CLEARED) {
3892 /* send an ack now to start the packet flow up again */
3893 call->flags &= ~RX_CALL_CLEARED;
3894 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3896 #ifdef RX_ENABLE_LOCKS
3899 service->nRequestsRunning++;
3900 if (service->nRequestsRunning <= service->minProcs)
3906 MUTEX_EXIT(&rx_serverPool_lock);
3909 /* Delay the sending of an acknowledge event for a short while, while
3910 * a new call is being prepared (in the case of a client) or a reply
3911 * is being prepared (in the case of a server). Rather than sending
3912 * an ack packet, an ACKALL packet is sent. */
3913 void rxi_AckAll(event, call, dummy)
3914 struct rxevent *event;
3915 register struct rx_call *call;
3918 #ifdef RX_ENABLE_LOCKS
3920 MUTEX_ENTER(&call->lock);
3921 call->delayedAckEvent = (struct rxevent *) 0;
3922 CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3924 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3925 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3927 MUTEX_EXIT(&call->lock);
3928 #else /* RX_ENABLE_LOCKS */
3929 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3930 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3931 RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3932 #endif /* RX_ENABLE_LOCKS */
3935 void rxi_SendDelayedAck(event, call, dummy)
3936 struct rxevent *event;
3937 register struct rx_call *call;
3940 #ifdef RX_ENABLE_LOCKS
3942 MUTEX_ENTER(&call->lock);
3943 if (event == call->delayedAckEvent)
3944 call->delayedAckEvent = (struct rxevent *) 0;
3945 CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3947 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3949 MUTEX_EXIT(&call->lock);
3950 #else /* RX_ENABLE_LOCKS */
3951 if (event) call->delayedAckEvent = (struct rxevent *) 0;
3952 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3953 #endif /* RX_ENABLE_LOCKS */
3957 #ifdef RX_ENABLE_LOCKS
3958 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3959 * clearing them out.
3961 static void rxi_SetAcksInTransmitQueue(call)
3962 register struct rx_call *call;
3964 register struct rx_packet *p, *tp;
3967 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3970 p->flags |= RX_PKTFLAG_ACKED;
3974 call->flags |= RX_CALL_TQ_CLEARME;
3975 call->flags |= RX_CALL_TQ_SOME_ACKED;
3978 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3979 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3980 call->tfirst = call->tnext;
3981 call->nSoftAcked = 0;
3983 if (call->flags & RX_CALL_FAST_RECOVER) {
3984 call->flags &= ~RX_CALL_FAST_RECOVER;
3985 call->cwind = call->nextCwind;
3986 call->nextCwind = 0;
3989 CV_SIGNAL(&call->cv_twind);
3991 #endif /* RX_ENABLE_LOCKS */
3993 /* Clear out the transmit queue for the current call (all packets have
3994 * been received by peer) */
3995 void rxi_ClearTransmitQueue(call, force)
3996 register struct rx_call *call;
3999 register struct rx_packet *p, *tp;
4001 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4002 if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
4004 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4007 p->flags |= RX_PKTFLAG_ACKED;
4011 call->flags |= RX_CALL_TQ_CLEARME;
4012 call->flags |= RX_CALL_TQ_SOME_ACKED;
4015 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4016 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4022 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4023 call->flags &= ~RX_CALL_TQ_CLEARME;
4025 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4027 rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4028 rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4029 call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4030 call->nSoftAcked = 0;
4032 if (call->flags & RX_CALL_FAST_RECOVER) {
4033 call->flags &= ~RX_CALL_FAST_RECOVER;
4034 call->cwind = call->nextCwind;
4037 #ifdef RX_ENABLE_LOCKS
4038 CV_SIGNAL(&call->cv_twind);
4040 osi_rxWakeup(&call->twind);
4044 void rxi_ClearReceiveQueue(call)
4045 register struct rx_call *call;
4047 register struct rx_packet *p, *tp;
4048 if (queue_IsNotEmpty(&call->rq)) {
4049 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4054 rx_packetReclaims++;
4056 call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4058 if (call->state == RX_STATE_PRECALL) {
4059 call->flags |= RX_CALL_CLEARED;
4063 /* Send an abort packet for the specified call */
4064 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4065 register struct rx_call *call;
4066 struct rx_packet *packet;
4076 /* Clients should never delay abort messages */
4077 if (rx_IsClientConn(call->conn))
4080 if (call->abortCode != call->error) {
4081 call->abortCode = call->error;
4082 call->abortCount = 0;
4085 if (force || rxi_callAbortThreshhold == 0 ||
4086 call->abortCount < rxi_callAbortThreshhold) {
4087 if (call->delayedAbortEvent) {
4088 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4090 error = htonl(call->error);
4092 packet = rxi_SendSpecial(call, call->conn, packet,
4093 RX_PACKET_TYPE_ABORT, (char *)&error,
4094 sizeof(error), istack);
4095 } else if (!call->delayedAbortEvent) {
4096 clock_GetTime(&when);
4097 clock_Addmsec(&when, rxi_callAbortDelay);
4098 CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4099 call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4105 /* Send an abort packet for the specified connection. Packet is an
4106 * optional pointer to a packet that can be used to send the abort.
4107 * Once the number of abort messages reaches the threshhold, an
4108 * event is scheduled to send the abort. Setting the force flag
4109 * overrides sending delayed abort messages.
4111 * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4112 * to send the abort packet.
4114 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4115 register struct rx_connection *conn;
4116 struct rx_packet *packet;
4126 /* Clients should never delay abort messages */
4127 if (rx_IsClientConn(conn))
4130 if (force || rxi_connAbortThreshhold == 0 ||
4131 conn->abortCount < rxi_connAbortThreshhold) {
4132 if (conn->delayedAbortEvent) {
4133 rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4135 error = htonl(conn->error);
4137 MUTEX_EXIT(&conn->conn_data_lock);
4138 packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4139 RX_PACKET_TYPE_ABORT, (char *)&error,
4140 sizeof(error), istack);
4141 MUTEX_ENTER(&conn->conn_data_lock);
4142 } else if (!conn->delayedAbortEvent) {
4143 clock_GetTime(&when);
4144 clock_Addmsec(&when, rxi_connAbortDelay);
4145 conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4151 /* Associate an error all of the calls owned by a connection. Called
4152 * with error non-zero. This is only for really fatal things, like
4153 * bad authentication responses. The connection itself is set in
4154 * error at this point, so that future packets received will be
4156 void rxi_ConnectionError(conn, error)
4157 register struct rx_connection *conn;
4158 register afs_int32 error;
4162 if (conn->challengeEvent)
4163 rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4164 for (i=0; i<RX_MAXCALLS; i++) {
4165 struct rx_call *call = conn->call[i];
4167 MUTEX_ENTER(&call->lock);
4168 rxi_CallError(call, error);
4169 MUTEX_EXIT(&call->lock);
4172 conn->error = error;
4173 MUTEX_ENTER(&rx_stats_mutex);
4174 rx_stats.fatalErrors++;
4175 MUTEX_EXIT(&rx_stats_mutex);
4179 void rxi_CallError(call, error)
4180 register struct rx_call *call;
4183 if (call->error) error = call->error;
4184 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4185 if (!(call->flags & RX_CALL_TQ_BUSY)) {
4186 rxi_ResetCall(call, 0);
4189 rxi_ResetCall(call, 0);
4191 call->error = error;
4192 call->mode = RX_MODE_ERROR;
4195 /* Reset various fields in a call structure, and wakeup waiting
4196 * processes. Some fields aren't changed: state & mode are not
4197 * touched (these must be set by the caller), and bufptr, nLeft, and
4198 * nFree are not reset, since these fields are manipulated by
4199 * unprotected macros, and may only be reset by non-interrupting code.
4202 /* this code requires that call->conn be set properly as a pre-condition. */
4203 #endif /* ADAPT_WINDOW */
4205 void rxi_ResetCall(call, newcall)
4206 register struct rx_call *call;
4207 register int newcall;
4210 register struct rx_peer *peer;
4211 struct rx_packet *packet;
4213 /* Notify anyone who is waiting for asynchronous packet arrival */
4214 if (call->arrivalProc) {
4215 (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4216 call->arrivalProc = (VOID (*)()) 0;
4219 if (call->delayedAbortEvent) {
4220 rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4221 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4223 rxi_SendCallAbort(call, packet, 0, 1);
4224 rxi_FreePacket(packet);
4229 * Update the peer with the congestion information in this call
4230 * so other calls on this connection can pick up where this call
4231 * left off. If the congestion sequence numbers don't match then
4232 * another call experienced a retransmission.
4234 peer = call->conn->peer;
4235 MUTEX_ENTER(&peer->peer_lock);
4237 if (call->congestSeq == peer->congestSeq) {
4238 peer->cwind = MAX(peer->cwind, call->cwind);
4239 peer->MTU = MAX(peer->MTU, call->MTU);
4240 peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4243 call->abortCode = 0;
4244 call->abortCount = 0;
4246 if (peer->maxDgramPackets > 1) {
4247 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4249 call->MTU = peer->MTU;
4251 call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4252 call->ssthresh = rx_maxSendWindow;
4253 call->nDgramPackets = peer->nDgramPackets;
4254 call->congestSeq = peer->congestSeq;
4255 MUTEX_EXIT(&peer->peer_lock);
4257 flags = call->flags;
4258 rxi_ClearReceiveQueue(call);
4259 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
4260 if (call->flags & RX_CALL_TQ_BUSY) {
4261 call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4262 call->flags |= (flags & RX_CALL_TQ_WAIT);
4264 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4266 rxi_ClearTransmitQueue(call, 0);
4267 queue_Init(&call->tq);
4270 queue_Init(&call->rq);
4272 call->rwind = rx_initReceiveWindow;
4273 call->twind = rx_initSendWindow;
4274 call->nSoftAcked = 0;
4275 call->nextCwind = 0;
4278 call->nCwindAcks = 0;
4279 call->nSoftAcks = 0;
4280 call->nHardAcks = 0;
4282 call->tfirst = call->rnext = call->tnext = 1;
4284 call->lastAcked = 0;
4285 call->localStatus = call->remoteStatus = 0;
4287 if (flags & RX_CALL_READER_WAIT) {
4288 #ifdef RX_ENABLE_LOCKS
4289 CV_BROADCAST(&call->cv_rq);
4291 osi_rxWakeup(&call->rq);
4294 if (flags & RX_CALL_WAIT_PACKETS) {
4295 MUTEX_ENTER(&rx_freePktQ_lock);
4296 rxi_PacketsUnWait(); /* XXX */
4297 MUTEX_EXIT(&rx_freePktQ_lock);
4300 #ifdef RX_ENABLE_LOCKS
4301 CV_SIGNAL(&call->cv_twind);
4303 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4304 osi_rxWakeup(&call->twind);
4307 #ifdef RX_ENABLE_LOCKS
4308 /* The following ensures that we don't mess with any queue while some
4309 * other thread might also be doing so. The call_queue_lock field is
4310 * is only modified under the call lock. If the call is in the process
4311 * of being removed from a queue, the call is not locked until the
4312 * the queue lock is dropped and only then is the call_queue_lock field
4313 * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4314 * Note that any other routine which removes a call from a queue has to
4315 * obtain the queue lock before examing the queue and removing the call.
4317 if (call->call_queue_lock) {
4318 MUTEX_ENTER(call->call_queue_lock);
4319 if (queue_IsOnQueue(call)) {
4321 if (flags & RX_CALL_WAIT_PROC) {
4322 MUTEX_ENTER(&rx_stats_mutex);
4324 MUTEX_EXIT(&rx_stats_mutex);
4327 MUTEX_EXIT(call->call_queue_lock);
4328 CLEAR_CALL_QUEUE_LOCK(call);
4330 #else /* RX_ENABLE_LOCKS */
4331 if (queue_IsOnQueue(call)) {
4333 if (flags & RX_CALL_WAIT_PROC)
4336 #endif /* RX_ENABLE_LOCKS */
4338 rxi_KeepAliveOff(call);
4339 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4342 /* Send an acknowledge for the indicated packet (seq,serial) of the
4343 * indicated call, for the indicated reason (reason). This
4344 * acknowledge will specifically acknowledge receiving the packet, and
4345 * will also specify which other packets for this call have been
4346 * received. This routine returns the packet that was used to the
4347 * caller. The caller is responsible for freeing it or re-using it.
4348 * This acknowledgement also returns the highest sequence number
4349 * actually read out by the higher level to the sender; the sender
4350 * promises to keep around packets that have not been read by the
4351 * higher level yet (unless, of course, the sender decides to abort
4352 * the call altogether). Any of p, seq, serial, pflags, or reason may
4353 * be set to zero without ill effect. That is, if they are zero, they
4354 * will not convey any information.
4355 * NOW there is a trailer field, after the ack where it will safely be
4356 * ignored by mundanes, which indicates the maximum size packet this
4357 * host can swallow. */
4358 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4359 register struct rx_call *call;
4360 register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4361 int seq; /* Sequence number of the packet we are acking */
4362 int serial; /* Serial number of the packet */
4363 int pflags; /* Flags field from packet header */
4364 int reason; /* Reason an acknowledge was prompted */
4367 struct rx_ackPacket *ap;
4368 register struct rx_packet *rqp;
4369 register struct rx_packet *nxp; /* For queue_Scan */
4370 register struct rx_packet *p;
4375 * Open the receive window once a thread starts reading packets
4377 if (call->rnext > 1) {
4378 call->rwind = rx_maxReceiveWindow;
4381 call->nHardAcks = 0;
4382 call->nSoftAcks = 0;
4383 if (call->rnext > call->lastAcked)
4384 call->lastAcked = call->rnext;
4388 rx_computelen(p, p->length); /* reset length, you never know */
4389 } /* where that's been... */
4391 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4392 /* We won't send the ack, but don't panic. */
4393 return optionalPacket;
4396 templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);