fix-rx-rtt-computation-when-delay-is-involved-20010326
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #ifdef  KERNEL
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
16 #ifndef UKERNEL
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
20 #ifdef  AFS_OSF_ENV
21 #include <net/net_globals.h>
22 #endif  /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
25 #endif
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
31 #endif
32 #ifdef RXDEBUG
33 #undef RXDEBUG      /* turn off debugging */
34 #endif /* RXDEBUG */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
37 #endif
38 #include "../afsint/afsint.h"
39 #ifdef  AFS_ALPHA_ENV
40 #undef kmem_alloc
41 #undef kmem_free
42 #undef mem_alloc
43 #undef mem_free
44 #undef register
45 #endif  /* AFS_ALPHA_ENV */
46 #else /* !UKERNEL */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
49 #endif /* !UKERNEL */
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
55 #include "../rx/rx.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
60 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
63 #ifdef AFS_AIX41_ENV
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
68 #else /* KERNEL */
69 # include <afs/param.h>
70 # include <sys/types.h>
71 # include <errno.h>
72 #ifdef AFS_NT40_ENV
73 # include <stdlib.h>
74 # include <fcntl.h>
75 # include <afsutil.h>
76 #else
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
83 #endif
84 # include "rx.h"
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
92 #endif /* KERNEL */
93
94 #ifdef RXDEBUG
95 extern afs_uint32 LWP_ThreadId();
96 #endif /* RXDEBUG */
97
98 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
99 struct rx_tq_debug {
100     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
101     afs_int32 rxi_start_in_error;
102 } rx_tq_debug;
103 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
104
105 /*
106  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
107  * currently allocated within rx.  This number is used to allocate the
108  * memory required to return the statistics when queried.
109  */
110
111 static unsigned int rxi_rpc_peer_stat_cnt;
112
113 /*
114  * rxi_rpc_process_stat_cnt counts the total number of local process stat
115  * structures currently allocated within rx.  The number is used to allocate
116  * the memory required to return the statistics when queried.
117  */
118
119 static unsigned int rxi_rpc_process_stat_cnt;
120
121 #if !defined(offsetof)
122 #include <stddef.h>     /* for definition of offsetof() */
123 #endif
124
125 #ifdef AFS_PTHREAD_ENV
126 #include <assert.h>
127
128 /*
129  * Use procedural initialization of mutexes/condition variables
130  * to ease NT porting
131  */
132
133 extern pthread_mutex_t rxkad_stats_mutex;
134 extern pthread_mutex_t des_init_mutex;
135 extern pthread_mutex_t des_random_mutex;
136 extern pthread_mutex_t rx_clock_mutex;
137 extern pthread_mutex_t rxi_connCacheMutex;
138 extern pthread_mutex_t rx_event_mutex;
139 extern pthread_mutex_t osi_malloc_mutex;
140 extern pthread_mutex_t event_handler_mutex;
141 extern pthread_mutex_t listener_mutex;
142 extern pthread_mutex_t rx_if_init_mutex;
143 extern pthread_mutex_t rx_if_mutex;
144 extern pthread_mutex_t rxkad_client_uid_mutex;
145 extern pthread_mutex_t rxkad_random_mutex;
146
147 extern pthread_cond_t rx_event_handler_cond;
148 extern pthread_cond_t rx_listener_cond;
149
150 static pthread_mutex_t epoch_mutex;
151 static pthread_mutex_t rx_init_mutex;
152 static pthread_mutex_t rx_debug_mutex;
153
154 static void rxi_InitPthread(void) {
155     assert(pthread_mutex_init(&rx_clock_mutex,
156                               (const pthread_mutexattr_t*)0)==0);
157     assert(pthread_mutex_init(&rxi_connCacheMutex,
158                               (const pthread_mutexattr_t*)0)==0);
159     assert(pthread_mutex_init(&rx_init_mutex,
160                               (const pthread_mutexattr_t*)0)==0);
161     assert(pthread_mutex_init(&epoch_mutex,
162                               (const pthread_mutexattr_t*)0)==0);
163     assert(pthread_mutex_init(&rx_event_mutex,
164                               (const pthread_mutexattr_t*)0)==0);
165     assert(pthread_mutex_init(&des_init_mutex,
166                               (const pthread_mutexattr_t*)0)==0);
167     assert(pthread_mutex_init(&des_random_mutex,
168                               (const pthread_mutexattr_t*)0)==0);
169     assert(pthread_mutex_init(&osi_malloc_mutex,
170                               (const pthread_mutexattr_t*)0)==0);
171     assert(pthread_mutex_init(&event_handler_mutex,
172                               (const pthread_mutexattr_t*)0)==0);
173     assert(pthread_mutex_init(&listener_mutex,
174                               (const pthread_mutexattr_t*)0)==0);
175     assert(pthread_mutex_init(&rx_if_init_mutex,
176                               (const pthread_mutexattr_t*)0)==0);
177     assert(pthread_mutex_init(&rx_if_mutex,
178                               (const pthread_mutexattr_t*)0)==0);
179     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
180                               (const pthread_mutexattr_t*)0)==0);
181     assert(pthread_mutex_init(&rxkad_random_mutex,
182                               (const pthread_mutexattr_t*)0)==0);
183     assert(pthread_mutex_init(&rxkad_stats_mutex,
184                               (const pthread_mutexattr_t*)0)==0);
185     assert(pthread_mutex_init(&rx_debug_mutex,
186                               (const pthread_mutexattr_t*)0)==0);
187
188     assert(pthread_cond_init(&rx_event_handler_cond,
189                               (const pthread_condattr_t*)0)==0);
190     assert(pthread_cond_init(&rx_listener_cond,
191                               (const pthread_condattr_t*)0)==0);
192     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
193 }
194
195 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
196 #define INIT_PTHREAD_LOCKS \
197 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
198 /*
199  * The rx_stats_mutex mutex protects the following global variables:
200  * rxi_dataQuota
201  * rxi_minDeficit
202  * rxi_availProcs
203  * rxi_totalMin
204  * rxi_lowConnRefCount
205  * rxi_lowPeerRefCount
206  * rxi_nCalls
207  * rxi_Alloccnt
208  * rxi_Allocsize
209  * rx_nFreePackets
210  * rx_tq_debug
211  * rx_stats
212  */
213 #else
214 #define INIT_PTHREAD_LOCKS
215 #endif
216
217 extern void rxi_DeleteCachedConnections(void);
218
219
220 /* Variables for handling the minProcs implementation.  availProcs gives the
221  * number of threads available in the pool at this moment (not counting dudes
222  * executing right now).  totalMin gives the total number of procs required
223  * for handling all minProcs requests.  minDeficit is a dynamic variable
224  * tracking the # of procs required to satisfy all of the remaining minProcs
225  * demands.
226  * For fine grain locking to work, the quota check and the reservation of
227  * a server thread has to come while rxi_availProcs and rxi_minDeficit
228  * are locked. To this end, the code has been modified under #ifdef
229  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
230  * same time. A new function, ReturnToServerPool() returns the allocation.
231  * 
232  * A call can be on several queue's (but only one at a time). When
233  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
234  * that no one else is touching the queue. To this end, we store the address
235  * of the queue lock in the call structure (under the call lock) when we
236  * put the call on a queue, and we clear the call_queue_lock when the
237  * call is removed from a queue (once the call lock has been obtained).
238  * This allows rxi_ResetCall to safely synchronize with others wishing
239  * to manipulate the queue.
240  */
241
242 extern void rxi_Delay(int);
243
244 static int rxi_ServerThreadSelectingCall;
245
246 #ifdef RX_ENABLE_LOCKS
247 static afs_kmutex_t rx_rpc_stats;
248 void rxi_StartUnlocked();
249 #endif
250
251 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
252 ** pretty good that the next packet coming in is from the same connection 
253 ** as the last packet, since we're send multiple packets in a transmit window.
254 */
255 struct rx_connection *rxLastConn = 0; 
256
257 #ifdef RX_ENABLE_LOCKS
258 /* The locking hierarchy for rx fine grain locking is composed of five
259  * tiers:
260  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
261  * call->lock - locks call data fields.
262  * Most any other lock - these are all independent of each other.....
263  *      rx_freePktQ_lock
264  *      rx_freeCallQueue_lock
265  *      freeSQEList_lock
266  *      rx_connHashTable_lock
267  *      rx_serverPool_lock
268  *      rxi_keyCreate_lock
269  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
270
271  * lowest level:
272  *      peer_lock - locks peer data fields.
273  *      conn_data_lock - that more than one thread is not updating a conn data
274  *              field at the same time.
275  * Do we need a lock to protect the peer field in the conn structure?
276  *      conn->peer was previously a constant for all intents and so has no
277  *      lock protecting this field. The multihomed client delta introduced
278  *      a RX code change : change the peer field in the connection structure
279  *      to that remote inetrface from which the last packet for this
280  *      connection was sent out. This may become an issue if further changes
281  *      are made.
282  */
283 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
284 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
285 #ifdef RX_LOCKS_DB
286 /* rxdb_fileID is used to identify the lock location, along with line#. */
287 static int rxdb_fileID = RXDB_FILE_RX;
288 #endif /* RX_LOCKS_DB */
289 static void rxi_SetAcksInTransmitQueue();
290 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
291 #else /* RX_ENABLE_LOCKS */
292 #define SET_CALL_QUEUE_LOCK(C, L)
293 #define CLEAR_CALL_QUEUE_LOCK(C)
294 #endif /* RX_ENABLE_LOCKS */
295 static void rxi_DestroyConnectionNoLock();
296 void rxi_DestroyConnection();
297 void rxi_CleanupConnection();
298 struct rx_serverQueueEntry *rx_waitForPacket = 0;
299
300 /* ------------Exported Interfaces------------- */
301
302 /* This function allows rxkad to set the epoch to a suitably random number
303  * which rx_NewConnection will use in the future.  The principle purpose is to
304  * get rxnull connections to use the same epoch as the rxkad connections do, at
305  * least once the first rxkad connection is established.  This is important now
306  * that the host/port addresses aren't used in FindConnection: the uniqueness
307  * of epoch/cid matters and the start time won't do. */
308
309 #ifdef AFS_PTHREAD_ENV
310 /*
311  * This mutex protects the following global variables:
312  * rx_epoch
313  */
314
315 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
316 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
317 #else
318 #define LOCK_EPOCH
319 #define UNLOCK_EPOCH
320 #endif /* AFS_PTHREAD_ENV */
321
322 void rx_SetEpoch (epoch)
323   afs_uint32 epoch;
324 {
325     LOCK_EPOCH
326     rx_epoch = epoch;
327     UNLOCK_EPOCH
328 }
329
330 /* Initialize rx.  A port number may be mentioned, in which case this
331  * becomes the default port number for any service installed later.
332  * If 0 is provided for the port number, a random port will be chosen
333  * by the kernel.  Whether this will ever overlap anything in
334  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
335  * error. */
336 static int rxinit_status = 1;
337 #ifdef AFS_PTHREAD_ENV
338 /*
339  * This mutex protects the following global variables:
340  * rxinit_status
341  */
342
343 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
344 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
345 #else
346 #define LOCK_RX_INIT
347 #define UNLOCK_RX_INIT
348 #endif
349
350 int rx_Init(u_int port)
351 {
352 #ifdef KERNEL
353     osi_timeval_t tv;
354 #else /* KERNEL */
355     struct timeval tv;
356 #endif /* KERNEL */
357     char *htable, *ptable;
358     int tmp_status;
359
360     SPLVAR;
361
362     INIT_PTHREAD_LOCKS
363     LOCK_RX_INIT
364     if (rxinit_status == 0) {
365         tmp_status = rxinit_status;
366         UNLOCK_RX_INIT
367         return tmp_status; /* Already started; return previous error code. */
368     }
369
370 #ifdef AFS_NT40_ENV
371     if (afs_winsockInit()<0)
372         return -1;
373 #endif
374
375 #ifndef KERNEL
376     /*
377      * Initialize anything necessary to provide a non-premptive threading
378      * environment.
379      */
380     rxi_InitializeThreadSupport();
381 #endif
382
383     /* Allocate and initialize a socket for client and perhaps server
384      * connections. */
385
386     rx_socket = rxi_GetUDPSocket((u_short)port); 
387     if (rx_socket == OSI_NULLSOCKET) {
388         UNLOCK_RX_INIT
389         return RX_ADDRINUSE;
390     }
391     
392
393 #ifdef  RX_ENABLE_LOCKS
394 #ifdef RX_LOCKS_DB
395     rxdb_init();
396 #endif /* RX_LOCKS_DB */
397     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
398     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
399     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
400     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
401     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
402                MUTEX_DEFAULT,0);
403     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
404     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
405     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
406     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
407 #ifndef KERNEL
408     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
409 #endif /* !KERNEL */
410     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
411 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
412     if ( !uniprocessor )
413       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
414 #endif /* KERNEL && AFS_HPUX110_ENV */
415 #else /* RX_ENABLE_LOCKS */
416 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
417     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
418 #endif /* AFS_GLOBAL_SUNLOCK */
419 #endif /* RX_ENABLE_LOCKS */
420
421     rxi_nCalls = 0;
422     rx_connDeadTime = 12;
423     rx_tranquil     = 0;        /* reset flag */
424     bzero((char *)&rx_stats, sizeof(struct rx_stats));
425     htable = (char *)
426         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
427     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
428     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
429     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
430     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
431     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
432
433     /* Malloc up a bunch of packets & buffers */
434     rx_nFreePackets = 0;
435     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
436     queue_Init(&rx_freePacketQueue);
437     rxi_NeedMorePackets = FALSE;
438     rxi_MorePackets(rx_nPackets);
439     rx_CheckPackets();
440
441     NETPRI;
442     AFS_RXGLOCK();
443
444     clock_Init();
445
446 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
447     tv.tv_sec = clock_now.sec;
448     tv.tv_usec = clock_now.usec;
449     srand((unsigned int) tv.tv_usec);
450 #else
451     osi_GetTime(&tv);
452 #endif
453     if (port) {
454         rx_port = port;
455     } else {
456 #if defined(KERNEL) && !defined(UKERNEL)
457         /* Really, this should never happen in a real kernel */
458         rx_port = 0;
459 #else
460         struct sockaddr_in addr;
461         int addrlen = sizeof(addr);
462         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
463             rx_Finalize();
464             return -1;
465         }
466         rx_port = addr.sin_port;
467 #endif
468     }
469     rx_stats.minRtt.sec = 9999999;
470 #ifdef  KERNEL
471     rx_SetEpoch (tv.tv_sec | 0x80000000);
472 #else
473     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
474                                          * will provide a randomer value. */
475 #endif
476     MUTEX_ENTER(&rx_stats_mutex);
477     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
478     MUTEX_EXIT(&rx_stats_mutex);
479     /* *Slightly* random start time for the cid.  This is just to help
480      * out with the hashing function at the peer */
481     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
482     rx_connHashTable = (struct rx_connection **) htable;
483     rx_peerHashTable = (struct rx_peer **) ptable;
484
485     rx_lastAckDelay.sec = 0;
486     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
487     rx_hardAckDelay.sec = 0;
488     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
489     rx_softAckDelay.sec = 0;
490     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
491
492     rxevent_Init(20, rxi_ReScheduleEvents);
493
494     /* Initialize various global queues */
495     queue_Init(&rx_idleServerQueue);
496     queue_Init(&rx_incomingCallQueue);
497     queue_Init(&rx_freeCallQueue);
498
499 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
500     /* Initialize our list of usable IP addresses. */
501     rx_GetIFInfo();
502 #endif
503
504     /* Start listener process (exact function is dependent on the
505      * implementation environment--kernel or user space) */
506     rxi_StartListener();
507
508     AFS_RXGUNLOCK();
509     USERPRI;
510     tmp_status = rxinit_status = 0;
511     UNLOCK_RX_INIT
512     return tmp_status;
513 }
514
515 /* called with unincremented nRequestsRunning to see if it is OK to start
516  * a new thread in this service.  Could be "no" for two reasons: over the
517  * max quota, or would prevent others from reaching their min quota.
518  */
519 #ifdef RX_ENABLE_LOCKS
520 /* This verion of QuotaOK reserves quota if it's ok while the
521  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
522  */
523 static int QuotaOK(aservice)
524 register struct rx_service *aservice;
525 {
526     /* check if over max quota */
527     if (aservice->nRequestsRunning >= aservice->maxProcs) {
528         return 0;
529     }
530
531     /* under min quota, we're OK */
532     /* otherwise, can use only if there are enough to allow everyone
533      * to go to their min quota after this guy starts.
534      */
535     MUTEX_ENTER(&rx_stats_mutex);
536     if ((aservice->nRequestsRunning < aservice->minProcs) ||
537          (rxi_availProcs > rxi_minDeficit)) {
538         aservice->nRequestsRunning++;
539         /* just started call in minProcs pool, need fewer to maintain
540          * guarantee */
541         if (aservice->nRequestsRunning <= aservice->minProcs)
542             rxi_minDeficit--;
543         rxi_availProcs--;
544         MUTEX_EXIT(&rx_stats_mutex);
545         return 1;
546     }
547     MUTEX_EXIT(&rx_stats_mutex);
548
549     return 0;
550 }
551 static void ReturnToServerPool(aservice)
552 register struct rx_service *aservice;
553 {
554     aservice->nRequestsRunning--;
555     MUTEX_ENTER(&rx_stats_mutex);
556     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
557     rxi_availProcs++;
558     MUTEX_EXIT(&rx_stats_mutex);
559 }
560
561 #else /* RX_ENABLE_LOCKS */
562 static QuotaOK(aservice)
563 register struct rx_service *aservice; {
564     int rc=0;
565     /* under min quota, we're OK */
566     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
567
568     /* check if over max quota */
569     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
570
571     /* otherwise, can use only if there are enough to allow everyone
572      * to go to their min quota after this guy starts.
573      */
574     if (rxi_availProcs > rxi_minDeficit) rc = 1;
575     return rc;
576 }
577 #endif /* RX_ENABLE_LOCKS */
578
579 #ifndef KERNEL
580 /* Called by rx_StartServer to start up lwp's to service calls.
581    NExistingProcs gives the number of procs already existing, and which
582    therefore needn't be created. */
583 void rxi_StartServerProcs(nExistingProcs)
584     int nExistingProcs;
585 {
586     register struct rx_service *service;
587     register int i;
588     int maxdiff = 0;
589     int nProcs = 0;
590
591     /* For each service, reserve N processes, where N is the "minimum"
592        number of processes that MUST be able to execute a request in parallel,
593        at any time, for that process.  Also compute the maximum difference
594        between any service's maximum number of processes that can run
595        (i.e. the maximum number that ever will be run, and a guarantee
596        that this number will run if other services aren't running), and its
597        minimum number.  The result is the extra number of processes that
598        we need in order to provide the latter guarantee */
599     for (i=0; i<RX_MAX_SERVICES; i++) {
600         int diff;
601         service = rx_services[i];
602         if (service == (struct rx_service *) 0) break;
603         nProcs += service->minProcs;
604         diff = service->maxProcs - service->minProcs;
605         if (diff > maxdiff) maxdiff = diff;
606     }
607     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
608     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
609     for (i = 0; i<nProcs; i++) {
610         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
611     }
612 }
613 #endif /* KERNEL */
614
615 /* This routine must be called if any services are exported.  If the
616  * donateMe flag is set, the calling process is donated to the server
617  * process pool */
618 void rx_StartServer(donateMe)
619 {
620     register struct rx_service *service;
621     register int i;
622     SPLVAR;
623     clock_NewTime();
624
625     NETPRI;
626     AFS_RXGLOCK();
627     /* Start server processes, if necessary (exact function is dependent
628      * on the implementation environment--kernel or user space).  DonateMe
629      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
630      * case, one less new proc will be created rx_StartServerProcs.
631      */
632     rxi_StartServerProcs(donateMe);
633
634     /* count up the # of threads in minProcs, and add set the min deficit to
635      * be that value, too.
636      */
637     for (i=0; i<RX_MAX_SERVICES; i++) {
638         service = rx_services[i];
639         if (service == (struct rx_service *) 0) break;
640         MUTEX_ENTER(&rx_stats_mutex);
641         rxi_totalMin += service->minProcs;
642         /* below works even if a thread is running, since minDeficit would
643          * still have been decremented and later re-incremented.
644          */
645         rxi_minDeficit += service->minProcs;
646         MUTEX_EXIT(&rx_stats_mutex);
647     }
648
649     /* Turn on reaping of idle server connections */
650     rxi_ReapConnections();
651
652     AFS_RXGUNLOCK();
653     USERPRI;
654
655     if (donateMe) rx_ServerProc(); /* Never returns */
656     return;
657 }
658
659 /* Create a new client connection to the specified service, using the
660  * specified security object to implement the security model for this
661  * connection. */
662 struct rx_connection *
663 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
664     register afs_uint32 shost;      /* Server host */
665     u_short sport;                  /* Server port */
666     u_short sservice;               /* Server service id */
667     register struct rx_securityClass *securityObject;
668     int serviceSecurityIndex;
669 {
670     int hashindex;
671     afs_int32 cid;
672     register struct rx_connection *conn;
673
674     SPLVAR;
675
676     clock_NewTime();
677     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
678           shost, sport, sservice, securityObject, serviceSecurityIndex));
679
680     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
681      * the case of kmem_alloc? */
682     conn = rxi_AllocConnection();
683 #ifdef  RX_ENABLE_LOCKS
684     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
685     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
686     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
687 #endif
688     NETPRI;
689     AFS_RXGLOCK();
690     MUTEX_ENTER(&rx_connHashTable_lock);
691     cid = (rx_nextCid += RX_MAXCALLS);
692     conn->type = RX_CLIENT_CONNECTION;
693     conn->cid = cid;
694     conn->epoch = rx_epoch;
695     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
696     conn->serviceId = sservice;
697     conn->securityObject = securityObject;
698     /* This doesn't work in all compilers with void (they're buggy), so fake it
699      * with VOID */
700     conn->securityData = (VOID *) 0;
701     conn->securityIndex = serviceSecurityIndex;
702     rx_SetConnDeadTime(conn, rx_connDeadTime);
703     conn->ackRate = RX_FAST_ACK_RATE;
704     conn->nSpecific = 0;
705     conn->specific = NULL;
706     conn->challengeEvent = (struct rxevent *)0;
707     conn->delayedAbortEvent = (struct rxevent *)0;
708     conn->abortCount = 0;
709     conn->error = 0;
710
711     RXS_NewConnection(securityObject, conn);
712     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
713     
714     conn->refCount++; /* no lock required since only this thread knows... */
715     conn->next = rx_connHashTable[hashindex];
716     rx_connHashTable[hashindex] = conn;
717     MUTEX_ENTER(&rx_stats_mutex);
718     rx_stats.nClientConns++;
719     MUTEX_EXIT(&rx_stats_mutex);
720
721     MUTEX_EXIT(&rx_connHashTable_lock);
722     AFS_RXGUNLOCK();
723     USERPRI;
724     return conn;
725 }
726
727 void rx_SetConnDeadTime(conn, seconds)
728     register struct rx_connection *conn;
729     register int seconds;
730 {
731     /* The idea is to set the dead time to a value that allows several
732      * keepalives to be dropped without timing out the connection. */
733     conn->secondsUntilDead = MAX(seconds, 6);
734     conn->secondsUntilPing = conn->secondsUntilDead/6;
735 }
736
737 int rxi_lowPeerRefCount = 0;
738 int rxi_lowConnRefCount = 0;
739
740 /*
741  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
742  * NOTE: must not be called with rx_connHashTable_lock held.
743  */
744 void rxi_CleanupConnection(conn)
745     struct rx_connection *conn;
746 {
747     int i;
748
749     /* Notify the service exporter, if requested, that this connection
750      * is being destroyed */
751     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
752       (*conn->service->destroyConnProc)(conn);
753
754     /* Notify the security module that this connection is being destroyed */
755     RXS_DestroyConnection(conn->securityObject, conn);
756
757     /* If this is the last connection using the rx_peer struct, set its
758      * idle time to now. rxi_ReapConnections will reap it if it's still
759      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
760      */
761     MUTEX_ENTER(&rx_peerHashTable_lock);
762     if (--conn->peer->refCount <= 0) {
763         conn->peer->idleWhen = clock_Sec();
764         if (conn->peer->refCount < 0) {
765             conn->peer->refCount = 0; 
766             MUTEX_ENTER(&rx_stats_mutex);
767             rxi_lowPeerRefCount ++;
768             MUTEX_EXIT(&rx_stats_mutex);
769         }
770     }
771     MUTEX_EXIT(&rx_peerHashTable_lock);
772
773     MUTEX_ENTER(&rx_stats_mutex);
774     if (conn->type == RX_SERVER_CONNECTION)
775       rx_stats.nServerConns--;
776     else
777       rx_stats.nClientConns--;
778     MUTEX_EXIT(&rx_stats_mutex);
779
780 #ifndef KERNEL
781     if (conn->specific) {
782         for (i = 0 ; i < conn->nSpecific ; i++) {
783             if (conn->specific[i] && rxi_keyCreate_destructor[i])
784                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
785             conn->specific[i] = NULL;
786         }
787         free(conn->specific);
788     }
789     conn->specific = NULL;
790     conn->nSpecific = 0;
791 #endif /* !KERNEL */
792
793     MUTEX_DESTROY(&conn->conn_call_lock);
794     MUTEX_DESTROY(&conn->conn_data_lock);
795     CV_DESTROY(&conn->conn_call_cv);
796         
797     rxi_FreeConnection(conn);
798 }
799
800 /* Destroy the specified connection */
801 void rxi_DestroyConnection(conn)
802     register struct rx_connection *conn;
803 {
804     MUTEX_ENTER(&rx_connHashTable_lock);
805     rxi_DestroyConnectionNoLock(conn);
806     /* conn should be at the head of the cleanup list */
807     if (conn == rx_connCleanup_list) {
808         rx_connCleanup_list = rx_connCleanup_list->next;
809         MUTEX_EXIT(&rx_connHashTable_lock);
810         rxi_CleanupConnection(conn);
811     }
812 #ifdef RX_ENABLE_LOCKS
813     else {
814         MUTEX_EXIT(&rx_connHashTable_lock);
815     }
816 #endif /* RX_ENABLE_LOCKS */
817 }
818     
819 static void rxi_DestroyConnectionNoLock(conn)
820     register struct rx_connection *conn;
821 {
822     register struct rx_connection **conn_ptr;
823     register int havecalls = 0;
824     struct rx_packet *packet;
825     int i;
826     SPLVAR;
827
828     clock_NewTime();
829
830     NETPRI;
831     MUTEX_ENTER(&conn->conn_data_lock);
832     if (conn->refCount > 0)
833         conn->refCount--;
834     else {
835         MUTEX_ENTER(&rx_stats_mutex);
836         rxi_lowConnRefCount++;
837         MUTEX_EXIT(&rx_stats_mutex);
838     }
839
840     if (conn->refCount > 0) {
841         /* Busy; wait till the last guy before proceeding */
842         MUTEX_EXIT(&conn->conn_data_lock);
843         USERPRI;
844         return;
845     }
846
847     /* If the client previously called rx_NewCall, but it is still
848      * waiting, treat this as a running call, and wait to destroy the
849      * connection later when the call completes. */
850     if ((conn->type == RX_CLIENT_CONNECTION) &&
851         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
852         conn->flags |= RX_CONN_DESTROY_ME;
853         MUTEX_EXIT(&conn->conn_data_lock);
854         USERPRI;
855         return;
856     }
857     MUTEX_EXIT(&conn->conn_data_lock);
858
859     /* Check for extant references to this connection */
860     for (i = 0; i<RX_MAXCALLS; i++) {
861         register struct rx_call *call = conn->call[i];
862         if (call) {
863             havecalls = 1;
864             if (conn->type == RX_CLIENT_CONNECTION) {
865                 MUTEX_ENTER(&call->lock);
866                 if (call->delayedAckEvent) {
867                     /* Push the final acknowledgment out now--there
868                      * won't be a subsequent call to acknowledge the
869                      * last reply packets */
870                     rxevent_Cancel(call->delayedAckEvent, call,
871                                    RX_CALL_REFCOUNT_DELAY);
872                     rxi_AckAll((struct rxevent *)0, call, 0);
873                 }
874                 MUTEX_EXIT(&call->lock);
875             }
876         }
877     }
878 #ifdef RX_ENABLE_LOCKS
879     if (!havecalls) {
880         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
881             MUTEX_EXIT(&conn->conn_data_lock);
882         }
883         else {
884             /* Someone is accessing a packet right now. */
885             havecalls = 1;
886         }
887     }
888 #endif /* RX_ENABLE_LOCKS */
889
890     if (havecalls) {
891         /* Don't destroy the connection if there are any call
892          * structures still in use */
893         MUTEX_ENTER(&conn->conn_data_lock);
894         conn->flags |= RX_CONN_DESTROY_ME;
895         MUTEX_EXIT(&conn->conn_data_lock);
896         USERPRI;
897         return;
898     }
899
900     if (conn->delayedAbortEvent) {
901         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
902         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
903         if (packet) {
904             MUTEX_ENTER(&conn->conn_data_lock);
905             rxi_SendConnectionAbort(conn, packet, 0, 1);
906             MUTEX_EXIT(&conn->conn_data_lock);
907             rxi_FreePacket(packet);
908         }
909     }
910
911     /* Remove from connection hash table before proceeding */
912     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
913                                              conn->epoch, conn->type) ];
914     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
915         if (*conn_ptr == conn) {
916             *conn_ptr = conn->next;
917             break;
918         }
919     }
920     /* if the conn that we are destroying was the last connection, then we
921     * clear rxLastConn as well */
922     if ( rxLastConn == conn )
923         rxLastConn = 0;
924
925     /* Make sure the connection is completely reset before deleting it. */
926     /* get rid of pending events that could zap us later */
927     if (conn->challengeEvent) {
928         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
929     }
930  
931     /* Add the connection to the list of destroyed connections that
932      * need to be cleaned up. This is necessary to avoid deadlocks
933      * in the routines we call to inform others that this connection is
934      * being destroyed. */
935     conn->next = rx_connCleanup_list;
936     rx_connCleanup_list = conn;
937 }
938
939 /* Externally available version */
940 void rx_DestroyConnection(conn) 
941     register struct rx_connection *conn;
942 {
943     SPLVAR;
944
945     NETPRI;
946     AFS_RXGLOCK();
947     rxi_DestroyConnection (conn);
948     AFS_RXGUNLOCK();
949     USERPRI;
950 }
951
952 /* Start a new rx remote procedure call, on the specified connection.
953  * If wait is set to 1, wait for a free call channel; otherwise return
954  * 0.  Maxtime gives the maximum number of seconds this call may take,
955  * after rx_MakeCall returns.  After this time interval, a call to any
956  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
957  * For fine grain locking, we hold the conn_call_lock in order to 
958  * to ensure that we don't get signalle after we found a call in an active
959  * state and before we go to sleep.
960  */
961 struct rx_call *rx_NewCall(conn)
962     register struct rx_connection *conn;
963 {
964     register int i;
965     register struct rx_call *call;
966     struct clock queueTime;
967     SPLVAR;
968
969     clock_NewTime();
970     dpf (("rx_MakeCall(conn %x)\n", conn));
971
972     NETPRI;
973     clock_GetTime(&queueTime);
974     AFS_RXGLOCK();
975     MUTEX_ENTER(&conn->conn_call_lock);
976     for (;;) {
977         for (i=0; i<RX_MAXCALLS; i++) {
978             call = conn->call[i];
979             if (call) {
980                 MUTEX_ENTER(&call->lock);
981                 if (call->state == RX_STATE_DALLY) {
982                     rxi_ResetCall(call, 0);
983                     (*call->callNumber)++;
984                     break;
985                 }
986                 MUTEX_EXIT(&call->lock);
987             }
988             else {
989                 call = rxi_NewCall(conn, i);
990                 MUTEX_ENTER(&call->lock);
991                 break;
992             }
993         }
994         if (i < RX_MAXCALLS) {
995             break;
996         }
997         MUTEX_ENTER(&conn->conn_data_lock);
998         conn->flags |= RX_CONN_MAKECALL_WAITING;
999         MUTEX_EXIT(&conn->conn_data_lock);
1000 #ifdef  RX_ENABLE_LOCKS
1001         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1002 #else
1003         osi_rxSleep(conn);
1004 #endif
1005     }
1006
1007     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1008
1009     /* Client is initially in send mode */
1010     call->state = RX_STATE_ACTIVE;
1011     call->mode = RX_MODE_SENDING;
1012
1013     /* remember start time for call in case we have hard dead time limit */
1014     call->queueTime = queueTime;
1015     clock_GetTime(&call->startTime);
1016     hzero(call->bytesSent);
1017     hzero(call->bytesRcvd);
1018
1019     /* Turn on busy protocol. */
1020     rxi_KeepAliveOn(call);
1021
1022     MUTEX_EXIT(&call->lock);
1023     MUTEX_EXIT(&conn->conn_call_lock);
1024     AFS_RXGUNLOCK();
1025     USERPRI;
1026
1027 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1028     /* Now, if TQ wasn't cleared earlier, do it now. */
1029     AFS_RXGLOCK();
1030     MUTEX_ENTER(&call->lock);
1031     while (call->flags & RX_CALL_TQ_BUSY) {
1032         call->flags |= RX_CALL_TQ_WAIT;
1033 #ifdef RX_ENABLE_LOCKS
1034         CV_WAIT(&call->cv_tq, &call->lock);
1035 #else /* RX_ENABLE_LOCKS */
1036         osi_rxSleep(&call->tq);
1037 #endif /* RX_ENABLE_LOCKS */
1038     }
1039     if (call->flags & RX_CALL_TQ_CLEARME) {
1040         rxi_ClearTransmitQueue(call, 0);
1041         queue_Init(&call->tq);
1042     }
1043     MUTEX_EXIT(&call->lock);
1044     AFS_RXGUNLOCK();
1045 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1046
1047     return call;
1048 }
1049
1050 rxi_HasActiveCalls(aconn)
1051 register struct rx_connection *aconn; {
1052     register int i;
1053     register struct rx_call *tcall;
1054     SPLVAR;
1055
1056     NETPRI;
1057     for(i=0; i<RX_MAXCALLS; i++) {
1058       if (tcall = aconn->call[i]) {
1059         if ((tcall->state == RX_STATE_ACTIVE) 
1060             || (tcall->state == RX_STATE_PRECALL)) {
1061           USERPRI;
1062           return 1;
1063         }
1064       }
1065     }
1066     USERPRI;
1067     return 0;
1068 }
1069
1070 rxi_GetCallNumberVector(aconn, aint32s)
1071 register struct rx_connection *aconn;
1072 register afs_int32 *aint32s; {
1073     register int i;
1074     register struct rx_call *tcall;
1075     SPLVAR;
1076
1077     NETPRI;
1078     for(i=0; i<RX_MAXCALLS; i++) {
1079         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1080             aint32s[i] = aconn->callNumber[i]+1;
1081         else
1082             aint32s[i] = aconn->callNumber[i];
1083     }
1084     USERPRI;
1085     return 0;
1086 }
1087
1088 rxi_SetCallNumberVector(aconn, aint32s)
1089 register struct rx_connection *aconn;
1090 register afs_int32 *aint32s; {
1091     register int i;
1092     register struct rx_call *tcall;
1093     SPLVAR;
1094
1095     NETPRI;
1096     for(i=0; i<RX_MAXCALLS; i++) {
1097         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1098             aconn->callNumber[i] = aint32s[i] - 1;
1099         else
1100             aconn->callNumber[i] = aint32s[i];
1101     }
1102     USERPRI;
1103     return 0;
1104 }
1105
1106 /* Advertise a new service.  A service is named locally by a UDP port
1107  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1108  * on a failure. */
1109 struct rx_service *
1110 rx_NewService(port, serviceId, serviceName, securityObjects,
1111               nSecurityObjects, serviceProc)
1112     u_short port;
1113     u_short serviceId;
1114     char *serviceName;  /* Name for identification purposes (e.g. the
1115                          * service name might be used for probing for
1116                          * statistics) */
1117     struct rx_securityClass **securityObjects;
1118     int nSecurityObjects;
1119     afs_int32 (*serviceProc)();
1120 {    
1121     osi_socket socket = OSI_NULLSOCKET;
1122     register struct rx_service *tservice;    
1123     register int i;
1124     SPLVAR;
1125
1126     clock_NewTime();
1127
1128     if (serviceId == 0) {
1129         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1130          serviceName);
1131         return 0;
1132     }
1133     if (port == 0) {
1134         if (rx_port == 0) {
1135             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1136             return 0;
1137         }
1138         port = rx_port;
1139         socket = rx_socket;
1140     }
1141
1142     tservice = rxi_AllocService();
1143     NETPRI;
1144     AFS_RXGLOCK();
1145     for (i = 0; i<RX_MAX_SERVICES; i++) {
1146         register struct rx_service *service = rx_services[i];
1147         if (service) {
1148             if (port == service->servicePort) {
1149                 if (service->serviceId == serviceId) {
1150                     /* The identical service has already been
1151                      * installed; if the caller was intending to
1152                      * change the security classes used by this
1153                      * service, he/she loses. */
1154                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1155                     AFS_RXGUNLOCK();
1156                     USERPRI;
1157                     rxi_FreeService(tservice);
1158                     return service;
1159                 }
1160                 /* Different service, same port: re-use the socket
1161                  * which is bound to the same port */
1162                 socket = service->socket;
1163             }
1164         } else {
1165             if (socket == OSI_NULLSOCKET) {
1166                 /* If we don't already have a socket (from another
1167                  * service on same port) get a new one */
1168                 socket = rxi_GetUDPSocket(port);
1169                 if (socket == OSI_NULLSOCKET) {
1170                     AFS_RXGUNLOCK();
1171                     USERPRI;
1172                     rxi_FreeService(tservice);
1173                     return 0;
1174                 }
1175             }
1176             service = tservice;
1177             service->socket = socket;
1178             service->servicePort = port;
1179             service->serviceId = serviceId;
1180             service->serviceName = serviceName;
1181             service->nSecurityObjects = nSecurityObjects;
1182             service->securityObjects = securityObjects;
1183             service->minProcs = 0;
1184             service->maxProcs = 1;
1185             service->idleDeadTime = 60;
1186             service->connDeadTime = rx_connDeadTime;
1187             service->executeRequestProc = serviceProc;
1188             rx_services[i] = service;   /* not visible until now */
1189             AFS_RXGUNLOCK();
1190             USERPRI;
1191             return service;
1192         }
1193     }
1194     AFS_RXGUNLOCK();
1195     USERPRI;
1196     rxi_FreeService(tservice);
1197     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1198     return 0;
1199 }
1200
1201 /* Generic request processing loop. This routine should be called
1202  * by the implementation dependent rx_ServerProc. If socketp is
1203  * non-null, it will be set to the file descriptor that this thread
1204  * is now listening on. If socketp is null, this routine will never
1205  * returns. */
1206 void rxi_ServerProc(threadID, newcall, socketp)
1207 int threadID;
1208 struct rx_call *newcall;
1209 osi_socket *socketp;
1210 {
1211     register struct rx_call *call;
1212     register afs_int32 code;
1213     register struct rx_service *tservice = NULL;
1214
1215     for (;;) {
1216         if (newcall) {
1217             call = newcall;
1218             newcall = NULL;
1219         } else {
1220             call = rx_GetCall(threadID, tservice, socketp);
1221             if (socketp && *socketp != OSI_NULLSOCKET) {
1222                 /* We are now a listener thread */
1223                 return;
1224             }
1225         }
1226
1227         /* if server is restarting( typically smooth shutdown) then do not
1228          * allow any new calls.
1229          */
1230
1231         if ( rx_tranquil && (call != NULL) ) {
1232             SPLVAR;
1233
1234             NETPRI;
1235             AFS_RXGLOCK();
1236             MUTEX_ENTER(&call->lock);
1237
1238             rxi_CallError(call, RX_RESTARTING);
1239             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1240
1241             MUTEX_EXIT(&call->lock);
1242             AFS_RXGUNLOCK();
1243             USERPRI;
1244         }
1245
1246 #ifdef  KERNEL
1247         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1248 #ifdef RX_ENABLE_LOCKS
1249             AFS_GLOCK();
1250 #endif /* RX_ENABLE_LOCKS */
1251             afs_termState = AFSOP_STOP_AFS;
1252             afs_osi_Wakeup(&afs_termState);
1253 #ifdef RX_ENABLE_LOCKS
1254             AFS_GUNLOCK();
1255 #endif /* RX_ENABLE_LOCKS */
1256             return;
1257         }
1258 #endif
1259
1260         tservice = call->conn->service;
1261
1262         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1263
1264         code = call->conn->service->executeRequestProc(call);
1265
1266         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1267
1268         rx_EndCall(call, code);
1269         MUTEX_ENTER(&rx_stats_mutex);
1270         rxi_nCalls++;
1271         MUTEX_EXIT(&rx_stats_mutex);
1272     }
1273 }
1274
1275
1276 void rx_WakeupServerProcs()
1277 {
1278     struct rx_serverQueueEntry *np, *tqp;
1279     SPLVAR;
1280
1281     NETPRI;
1282     AFS_RXGLOCK();
1283     MUTEX_ENTER(&rx_serverPool_lock);
1284
1285 #ifdef RX_ENABLE_LOCKS
1286     if (rx_waitForPacket)
1287         CV_BROADCAST(&rx_waitForPacket->cv);
1288 #else /* RX_ENABLE_LOCKS */
1289     if (rx_waitForPacket)
1290         osi_rxWakeup(rx_waitForPacket);
1291 #endif /* RX_ENABLE_LOCKS */
1292     MUTEX_ENTER(&freeSQEList_lock);
1293     for (np = rx_FreeSQEList; np; np = tqp) {
1294       tqp = *(struct rx_serverQueueEntry **)np;
1295 #ifdef RX_ENABLE_LOCKS
1296       CV_BROADCAST(&np->cv);
1297 #else /* RX_ENABLE_LOCKS */
1298       osi_rxWakeup(np);
1299 #endif /* RX_ENABLE_LOCKS */
1300     }
1301     MUTEX_EXIT(&freeSQEList_lock);
1302     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1303 #ifdef RX_ENABLE_LOCKS
1304       CV_BROADCAST(&np->cv);
1305 #else /* RX_ENABLE_LOCKS */
1306       osi_rxWakeup(np);
1307 #endif /* RX_ENABLE_LOCKS */
1308     }
1309     MUTEX_EXIT(&rx_serverPool_lock);
1310     AFS_RXGUNLOCK();
1311     USERPRI;
1312 }
1313
1314 /* meltdown:
1315  * One thing that seems to happen is that all the server threads get
1316  * tied up on some empty or slow call, and then a whole bunch of calls
1317  * arrive at once, using up the packet pool, so now there are more 
1318  * empty calls.  The most critical resources here are server threads
1319  * and the free packet pool.  The "doreclaim" code seems to help in
1320  * general.  I think that eventually we arrive in this state: there
1321  * are lots of pending calls which do have all their packets present,
1322  * so they won't be reclaimed, are multi-packet calls, so they won't
1323  * be scheduled until later, and thus are tying up most of the free 
1324  * packet pool for a very long time.
1325  * future options:
1326  * 1.  schedule multi-packet calls if all the packets are present.  
1327  * Probably CPU-bound operation, useful to return packets to pool. 
1328  * Do what if there is a full window, but the last packet isn't here?
1329  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1330  * it sleeps and waits for that type of call.
1331  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1332  * the current dataquota business is badly broken.  The quota isn't adjusted
1333  * to reflect how many packets are presently queued for a running call.
1334  * So, when we schedule a queued call with a full window of packets queued
1335  * up for it, that *should* free up a window full of packets for other 2d-class
1336  * calls to be able to use from the packet pool.  But it doesn't.
1337  *
1338  * NB.  Most of the time, this code doesn't run -- since idle server threads
1339  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1340  * as a new call arrives.
1341  */
1342 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1343  * for an rx_Read. */
1344 #ifdef RX_ENABLE_LOCKS
1345 struct rx_call *
1346 rx_GetCall(tno, cur_service, socketp)
1347 int tno;
1348 struct rx_service *cur_service;
1349 osi_socket *socketp;
1350 {
1351     struct rx_serverQueueEntry *sq;
1352     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1353     struct rx_service *service;
1354     SPLVAR;
1355
1356     MUTEX_ENTER(&freeSQEList_lock);
1357
1358     if (sq = rx_FreeSQEList) {
1359         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1360         MUTEX_EXIT(&freeSQEList_lock);
1361     } else {    /* otherwise allocate a new one and return that */
1362         MUTEX_EXIT(&freeSQEList_lock);
1363         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1364         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1365         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1366     }
1367
1368     MUTEX_ENTER(&rx_serverPool_lock);
1369     if (cur_service != NULL) {
1370         ReturnToServerPool(cur_service);
1371     }
1372     while (1) {
1373         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1374             register struct rx_call *tcall, *ncall;
1375             choice2 = (struct rx_call *) 0;
1376             /* Scan for eligible incoming calls.  A call is not eligible
1377              * if the maximum number of calls for its service type are
1378              * already executing */
1379             /* One thread will process calls FCFS (to prevent starvation),
1380              * while the other threads may run ahead looking for calls which
1381              * have all their input data available immediately.  This helps 
1382              * keep threads from blocking, waiting for data from the client. */
1383             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1384               service = tcall->conn->service;
1385               if (!QuotaOK(service)) {
1386                 continue;
1387               }
1388               if (!tno || !tcall->queue_item_header.next  ) {
1389                 /* If we're thread 0, then  we'll just use 
1390                  * this call. If we haven't been able to find an optimal 
1391                  * choice, and we're at the end of the list, then use a 
1392                  * 2d choice if one has been identified.  Otherwise... */
1393                 call = (choice2 ? choice2 : tcall);
1394                 service = call->conn->service;
1395               } else if (!queue_IsEmpty(&tcall->rq)) {
1396                 struct rx_packet *rp;
1397                 rp = queue_First(&tcall->rq, rx_packet);
1398                 if (rp->header.seq == 1) {
1399                   if (!meltdown_1pkt ||             
1400                       (rp->header.flags & RX_LAST_PACKET)) {
1401                     call = tcall;
1402                   } else if (rxi_2dchoice && !choice2 &&
1403                              !(tcall->flags & RX_CALL_CLEARED) &&
1404                              (tcall->rprev > rxi_HardAckRate)) {
1405                     choice2 = tcall;
1406                   } else rxi_md2cnt++;
1407                 }
1408               }
1409               if (call)  {
1410                 break;
1411               } else {
1412                   ReturnToServerPool(service);
1413               }
1414             }
1415           }
1416
1417         if (call) {
1418             queue_Remove(call);
1419             rxi_ServerThreadSelectingCall = 1;
1420             MUTEX_EXIT(&rx_serverPool_lock);
1421             MUTEX_ENTER(&call->lock);
1422             MUTEX_ENTER(&rx_serverPool_lock);
1423
1424             if (queue_IsEmpty(&call->rq) ||
1425                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1426               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1427
1428             CLEAR_CALL_QUEUE_LOCK(call);
1429             if (call->error) {
1430                 MUTEX_EXIT(&call->lock);
1431                 ReturnToServerPool(service);
1432                 rxi_ServerThreadSelectingCall = 0;
1433                 CV_SIGNAL(&rx_serverPool_cv);
1434                 call = (struct rx_call*)0;
1435                 continue;
1436             }
1437             call->flags &= (~RX_CALL_WAIT_PROC);
1438             MUTEX_ENTER(&rx_stats_mutex);
1439             rx_nWaiting--;
1440             MUTEX_EXIT(&rx_stats_mutex);
1441             rxi_ServerThreadSelectingCall = 0;
1442             CV_SIGNAL(&rx_serverPool_cv);
1443             MUTEX_EXIT(&rx_serverPool_lock);
1444             break;
1445         }
1446         else {
1447             /* If there are no eligible incoming calls, add this process
1448              * to the idle server queue, to wait for one */
1449             sq->newcall = 0;
1450             sq->tno = tno;
1451             if (socketp) {
1452                 *socketp = OSI_NULLSOCKET;
1453             }
1454             sq->socketp = socketp;
1455             queue_Append(&rx_idleServerQueue, sq);
1456 #ifndef AFS_AIX41_ENV
1457             rx_waitForPacket = sq;
1458 #endif /* AFS_AIX41_ENV */
1459             do {
1460                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1461 #ifdef  KERNEL
1462                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1463                     MUTEX_EXIT(&rx_serverPool_lock);
1464                     return (struct rx_call *)0;
1465                 }
1466 #endif
1467             } while (!(call = sq->newcall) &&
1468                      !(socketp && *socketp != OSI_NULLSOCKET));
1469             MUTEX_EXIT(&rx_serverPool_lock);
1470             if (call) {
1471                 MUTEX_ENTER(&call->lock);
1472             }
1473             break;
1474         }
1475     }
1476
1477     MUTEX_ENTER(&freeSQEList_lock);
1478     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1479     rx_FreeSQEList = sq;
1480     MUTEX_EXIT(&freeSQEList_lock);
1481
1482     if (call) {
1483         clock_GetTime(&call->startTime);
1484         call->state = RX_STATE_ACTIVE;
1485         call->mode = RX_MODE_RECEIVING;
1486
1487         rxi_calltrace(RX_CALL_START, call);
1488         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1489              call->conn->service->servicePort, 
1490              call->conn->service->serviceId, call));
1491
1492         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1493         MUTEX_EXIT(&call->lock);
1494     } else {
1495         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1496     }
1497
1498     return call;
1499 }
1500 #else /* RX_ENABLE_LOCKS */
1501 struct rx_call *
1502 rx_GetCall(tno, cur_service, socketp)
1503   int tno;
1504   struct rx_service *cur_service;
1505   osi_socket *socketp;
1506 {
1507     struct rx_serverQueueEntry *sq;
1508     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1509     struct rx_service *service;
1510     SPLVAR;
1511
1512     NETPRI;
1513     AFS_RXGLOCK();
1514     MUTEX_ENTER(&freeSQEList_lock);
1515
1516     if (sq = rx_FreeSQEList) {
1517         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1518         MUTEX_EXIT(&freeSQEList_lock);
1519     } else {    /* otherwise allocate a new one and return that */
1520         MUTEX_EXIT(&freeSQEList_lock);
1521         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1522         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1523         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1524     }
1525     MUTEX_ENTER(&sq->lock);
1526
1527     if (cur_service != NULL) {
1528         cur_service->nRequestsRunning--;
1529         if (cur_service->nRequestsRunning < cur_service->minProcs)
1530             rxi_minDeficit++;
1531         rxi_availProcs++;
1532     }
1533     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1534         register struct rx_call *tcall, *ncall;
1535         /* Scan for eligible incoming calls.  A call is not eligible
1536          * if the maximum number of calls for its service type are
1537          * already executing */
1538         /* One thread will process calls FCFS (to prevent starvation),
1539          * while the other threads may run ahead looking for calls which
1540          * have all their input data available immediately.  This helps 
1541          * keep threads from blocking, waiting for data from the client. */
1542         choice2 = (struct rx_call *) 0;
1543         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1544           service = tcall->conn->service;
1545           if (QuotaOK(service)) {
1546              if (!tno || !tcall->queue_item_header.next  ) {
1547                  /* If we're thread 0, then  we'll just use 
1548                   * this call. If we haven't been able to find an optimal 
1549                   * choice, and we're at the end of the list, then use a 
1550                   * 2d choice if one has been identified.  Otherwise... */
1551                  call = (choice2 ? choice2 : tcall);
1552                  service = call->conn->service;
1553              } else if (!queue_IsEmpty(&tcall->rq)) {
1554                  struct rx_packet *rp;
1555                  rp = queue_First(&tcall->rq, rx_packet);
1556                  if (rp->header.seq == 1
1557                      && (!meltdown_1pkt ||
1558                          (rp->header.flags & RX_LAST_PACKET))) {
1559                      call = tcall;
1560                  } else if (rxi_2dchoice && !choice2 &&
1561                             !(tcall->flags & RX_CALL_CLEARED) &&
1562                             (tcall->rprev > rxi_HardAckRate)) {
1563                      choice2 = tcall;
1564                  } else rxi_md2cnt++;
1565              }
1566           }
1567           if (call) 
1568              break;
1569         }
1570       }
1571
1572     if (call) {
1573         queue_Remove(call);
1574         /* we can't schedule a call if there's no data!!! */
1575         /* send an ack if there's no data, if we're missing the
1576          * first packet, or we're missing something between first 
1577          * and last -- there's a "hole" in the incoming data. */
1578         if (queue_IsEmpty(&call->rq) ||
1579             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1580             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1581           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1582
1583         call->flags &= (~RX_CALL_WAIT_PROC);
1584         service->nRequestsRunning++;
1585         /* just started call in minProcs pool, need fewer to maintain
1586          * guarantee */
1587         if (service->nRequestsRunning <= service->minProcs)
1588             rxi_minDeficit--;
1589         rxi_availProcs--;
1590         rx_nWaiting--;
1591         /* MUTEX_EXIT(&call->lock); */
1592     }
1593     else {
1594         /* If there are no eligible incoming calls, add this process
1595          * to the idle server queue, to wait for one */
1596         sq->newcall = 0;
1597         if (socketp) {
1598             *socketp = OSI_NULLSOCKET;
1599         }
1600         sq->socketp = socketp;
1601         queue_Append(&rx_idleServerQueue, sq);
1602         do {
1603             osi_rxSleep(sq);
1604 #ifdef  KERNEL
1605                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1606                     AFS_RXGUNLOCK();
1607                     USERPRI;
1608                     return (struct rx_call *)0;
1609                 }
1610 #endif
1611         } while (!(call = sq->newcall) &&
1612                  !(socketp && *socketp != OSI_NULLSOCKET));
1613     }
1614     MUTEX_EXIT(&sq->lock);
1615
1616     MUTEX_ENTER(&freeSQEList_lock);
1617     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1618     rx_FreeSQEList = sq;
1619     MUTEX_EXIT(&freeSQEList_lock);
1620
1621     if (call) {
1622         clock_GetTime(&call->startTime);
1623         call->state = RX_STATE_ACTIVE;
1624         call->mode = RX_MODE_RECEIVING;
1625
1626         rxi_calltrace(RX_CALL_START, call);
1627         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1628          call->conn->service->servicePort, 
1629          call->conn->service->serviceId, call));
1630     } else {
1631         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1632     }
1633
1634     AFS_RXGUNLOCK();
1635     USERPRI;
1636
1637     return call;
1638 }
1639 #endif /* RX_ENABLE_LOCKS */
1640
1641
1642
1643 /* Establish a procedure to be called when a packet arrives for a
1644  * call.  This routine will be called at most once after each call,
1645  * and will also be called if there is an error condition on the or
1646  * the call is complete.  Used by multi rx to build a selection
1647  * function which determines which of several calls is likely to be a
1648  * good one to read from.  
1649  * NOTE: the way this is currently implemented it is probably only a
1650  * good idea to (1) use it immediately after a newcall (clients only)
1651  * and (2) only use it once.  Other uses currently void your warranty
1652  */
1653 void rx_SetArrivalProc(call, proc, handle, arg)
1654     register struct rx_call *call;
1655     register VOID (*proc)();
1656     register VOID *handle;
1657     register VOID *arg;
1658 {
1659     call->arrivalProc = proc;
1660     call->arrivalProcHandle = handle;
1661     call->arrivalProcArg = arg;
1662 }
1663
1664 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1665  * appropriate, and return the final error code from the conversation
1666  * to the caller */
1667
1668 afs_int32 rx_EndCall(call, rc)
1669     register struct rx_call *call;
1670     afs_int32 rc;
1671 {
1672     register struct rx_connection *conn = call->conn;
1673     register struct rx_service *service;
1674     register struct rx_packet *tp; /* Temporary packet pointer */
1675     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1676     afs_int32 error;
1677     SPLVAR;
1678
1679     dpf(("rx_EndCall(call %x)\n", call));
1680
1681     NETPRI;
1682     AFS_RXGLOCK();
1683     MUTEX_ENTER(&call->lock);
1684
1685     if (rc == 0 && call->error == 0) {
1686         call->abortCode = 0;
1687         call->abortCount = 0;
1688     }
1689
1690     call->arrivalProc = (VOID (*)()) 0;
1691     if (rc && call->error == 0) {
1692         rxi_CallError(call, rc);
1693         /* Send an abort message to the peer if this error code has
1694          * only just been set.  If it was set previously, assume the
1695          * peer has already been sent the error code or will request it 
1696          */
1697         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1698     }
1699     if (conn->type == RX_SERVER_CONNECTION) {
1700         /* Make sure reply or at least dummy reply is sent */
1701         if (call->mode == RX_MODE_RECEIVING) {
1702             rxi_WriteProc(call, 0, 0);
1703         }
1704         if (call->mode == RX_MODE_SENDING) {
1705             rxi_FlushWrite(call);
1706         }
1707         service = conn->service;
1708         rxi_calltrace(RX_CALL_END, call);
1709         /* Call goes to hold state until reply packets are acknowledged */
1710         if (call->tfirst + call->nSoftAcked < call->tnext) {
1711             call->state = RX_STATE_HOLD;
1712         } else {
1713             call->state = RX_STATE_DALLY;
1714             rxi_ClearTransmitQueue(call, 0);
1715             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1716             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1717         }
1718     }
1719     else { /* Client connection */
1720         char dummy;
1721         /* Make sure server receives input packets, in the case where
1722          * no reply arguments are expected */
1723         if ((call->mode == RX_MODE_SENDING)
1724          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1725             (void) rxi_ReadProc(call, &dummy, 1);
1726         }
1727         /* We need to release the call lock since it's lower than the
1728          * conn_call_lock and we don't want to hold the conn_call_lock
1729          * over the rx_ReadProc call. The conn_call_lock needs to be held
1730          * here for the case where rx_NewCall is perusing the calls on
1731          * the connection structure. We don't want to signal until
1732          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1733          * have checked this call, found it active and by the time it
1734          * goes to sleep, will have missed the signal.
1735          */
1736         MUTEX_EXIT(&call->lock);
1737         MUTEX_ENTER(&conn->conn_call_lock);
1738         MUTEX_ENTER(&call->lock);
1739         MUTEX_ENTER(&conn->conn_data_lock);
1740         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1741             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1742             MUTEX_EXIT(&conn->conn_data_lock);
1743 #ifdef  RX_ENABLE_LOCKS
1744             CV_BROADCAST(&conn->conn_call_cv);
1745 #else
1746             osi_rxWakeup(conn);
1747 #endif
1748         }
1749 #ifdef RX_ENABLE_LOCKS
1750         else {
1751             MUTEX_EXIT(&conn->conn_data_lock);
1752         }
1753 #endif /* RX_ENABLE_LOCKS */
1754         call->state = RX_STATE_DALLY;
1755     }
1756     error = call->error;
1757
1758     /* currentPacket, nLeft, and NFree must be zeroed here, because
1759      * ResetCall cannot: ResetCall may be called at splnet(), in the
1760      * kernel version, and may interrupt the macros rx_Read or
1761      * rx_Write, which run at normal priority for efficiency. */
1762     if (call->currentPacket) {
1763         rxi_FreePacket(call->currentPacket);
1764         call->currentPacket = (struct rx_packet *) 0;
1765         call->nLeft = call->nFree = call->curlen = 0;
1766     }
1767     else
1768         call->nLeft = call->nFree = call->curlen = 0;
1769
1770     /* Free any packets from the last call to ReadvProc/WritevProc */
1771     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1772         queue_Remove(tp);
1773         rxi_FreePacket(tp);
1774     }
1775
1776     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1777     MUTEX_EXIT(&call->lock);
1778     if (conn->type == RX_CLIENT_CONNECTION)
1779         MUTEX_EXIT(&conn->conn_call_lock);
1780     AFS_RXGUNLOCK();
1781     USERPRI;
1782     /*
1783      * Map errors to the local host's errno.h format.
1784      */
1785     error = ntoh_syserr_conv(error);
1786     return error;
1787 }
1788
1789 #if !defined(KERNEL)
1790
1791 /* Call this routine when shutting down a server or client (especially
1792  * clients).  This will allow Rx to gracefully garbage collect server
1793  * connections, and reduce the number of retries that a server might
1794  * make to a dead client.
1795  * This is not quite right, since some calls may still be ongoing and
1796  * we can't lock them to destroy them. */
1797 void rx_Finalize() {
1798     register struct rx_connection **conn_ptr, **conn_end;
1799
1800     INIT_PTHREAD_LOCKS
1801     LOCK_RX_INIT
1802     if (rxinit_status == 1) {
1803         UNLOCK_RX_INIT
1804         return; /* Already shutdown. */
1805     }
1806     rxi_DeleteCachedConnections();
1807     if (rx_connHashTable) {
1808         MUTEX_ENTER(&rx_connHashTable_lock);
1809         for (conn_ptr = &rx_connHashTable[0], 
1810              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1811              conn_ptr < conn_end; conn_ptr++) {
1812             struct rx_connection *conn, *next;
1813             for (conn = *conn_ptr; conn; conn = next) {
1814                 next = conn->next;
1815                 if (conn->type == RX_CLIENT_CONNECTION) {
1816                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1817                     conn->refCount++;
1818                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1819 #ifdef RX_ENABLE_LOCKS
1820                     rxi_DestroyConnectionNoLock(conn);
1821 #else /* RX_ENABLE_LOCKS */
1822                     rxi_DestroyConnection(conn);
1823 #endif /* RX_ENABLE_LOCKS */
1824                 }
1825             }
1826         }
1827 #ifdef RX_ENABLE_LOCKS
1828         while (rx_connCleanup_list) {
1829             struct rx_connection *conn;
1830             conn = rx_connCleanup_list;
1831             rx_connCleanup_list = rx_connCleanup_list->next;
1832             MUTEX_EXIT(&rx_connHashTable_lock);
1833             rxi_CleanupConnection(conn);
1834             MUTEX_ENTER(&rx_connHashTable_lock);
1835         }
1836         MUTEX_EXIT(&rx_connHashTable_lock);
1837 #endif /* RX_ENABLE_LOCKS */
1838     }
1839     rxi_flushtrace();
1840
1841     rxinit_status = 1;
1842     UNLOCK_RX_INIT
1843 }
1844 #endif
1845
1846 /* if we wakeup packet waiter too often, can get in loop with two
1847     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1848 void
1849 rxi_PacketsUnWait() {
1850
1851     if (!rx_waitingForPackets) {
1852         return;
1853     }
1854 #ifdef KERNEL
1855     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1856         return;                                     /* still over quota */
1857     }
1858 #endif /* KERNEL */
1859     rx_waitingForPackets = 0;
1860 #ifdef  RX_ENABLE_LOCKS
1861     CV_BROADCAST(&rx_waitingForPackets_cv);
1862 #else
1863     osi_rxWakeup(&rx_waitingForPackets);
1864 #endif
1865     return;
1866 }
1867
1868
1869 /* ------------------Internal interfaces------------------------- */
1870
1871 /* Return this process's service structure for the
1872  * specified socket and service */
1873 struct rx_service *rxi_FindService(socket, serviceId)
1874     register osi_socket socket;
1875     register u_short serviceId;
1876 {
1877     register struct rx_service **sp;    
1878     for (sp = &rx_services[0]; *sp; sp++) {
1879         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1880           return *sp;
1881     }
1882     return 0;
1883 }
1884
1885 /* Allocate a call structure, for the indicated channel of the
1886  * supplied connection.  The mode and state of the call must be set by
1887  * the caller. */
1888 struct rx_call *rxi_NewCall(conn, channel)
1889     register struct rx_connection *conn;
1890     register int channel;
1891 {
1892     register struct rx_call *call;
1893 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1894     register struct rx_call *cp;        /* Call pointer temp */
1895     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1896 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1897
1898     /* Grab an existing call structure, or allocate a new one.
1899      * Existing call structures are assumed to have been left reset by
1900      * rxi_FreeCall */
1901     MUTEX_ENTER(&rx_freeCallQueue_lock);
1902
1903 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1904     /*
1905      * EXCEPT that the TQ might not yet be cleared out.
1906      * Skip over those with in-use TQs.
1907      */
1908     call = NULL;
1909     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1910         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1911             call = cp;
1912             break;
1913         }
1914     }
1915     if (call) {
1916 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1917     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1918         call = queue_First(&rx_freeCallQueue, rx_call);
1919 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1920         queue_Remove(call);
1921         MUTEX_ENTER(&rx_stats_mutex);
1922         rx_stats.nFreeCallStructs--;
1923         MUTEX_EXIT(&rx_stats_mutex);
1924         MUTEX_EXIT(&rx_freeCallQueue_lock);
1925         MUTEX_ENTER(&call->lock);
1926         CLEAR_CALL_QUEUE_LOCK(call);
1927 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1928         /* Now, if TQ wasn't cleared earlier, do it now. */
1929         if (call->flags & RX_CALL_TQ_CLEARME) {
1930             rxi_ClearTransmitQueue(call, 0);
1931             queue_Init(&call->tq);
1932         }
1933 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1934         /* Bind the call to its connection structure */
1935         call->conn = conn;
1936         rxi_ResetCall(call, 1);
1937     }
1938     else {
1939         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1940
1941         MUTEX_EXIT(&rx_freeCallQueue_lock);
1942         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1943         MUTEX_ENTER(&call->lock);
1944         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1945         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1946         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1947
1948         MUTEX_ENTER(&rx_stats_mutex);
1949         rx_stats.nCallStructs++;
1950         MUTEX_EXIT(&rx_stats_mutex);
1951         /* Initialize once-only items */
1952         queue_Init(&call->tq);
1953         queue_Init(&call->rq);
1954         queue_Init(&call->iovq);
1955         /* Bind the call to its connection structure (prereq for reset) */
1956         call->conn = conn;
1957         rxi_ResetCall(call, 1);
1958     }
1959     call->channel = channel;
1960     call->callNumber = &conn->callNumber[channel];
1961     /* Note that the next expected call number is retained (in
1962      * conn->callNumber[i]), even if we reallocate the call structure
1963      */
1964     conn->call[channel] = call;
1965     /* if the channel's never been used (== 0), we should start at 1, otherwise
1966         the call number is valid from the last time this channel was used */
1967     if (*call->callNumber == 0) *call->callNumber = 1;
1968
1969     MUTEX_EXIT(&call->lock);
1970     return call;
1971 }
1972
1973 /* A call has been inactive long enough that so we can throw away
1974  * state, including the call structure, which is placed on the call
1975  * free list.
1976  * Call is locked upon entry.
1977  */
1978 #ifdef RX_ENABLE_LOCKS
1979 void rxi_FreeCall(call, haveCTLock)
1980     int haveCTLock; /* Set if called from rxi_ReapConnections */
1981 #else /* RX_ENABLE_LOCKS */
1982 void rxi_FreeCall(call)
1983 #endif /* RX_ENABLE_LOCKS */
1984     register struct rx_call *call;
1985 {
1986     register int channel = call->channel;
1987     register struct rx_connection *conn = call->conn;
1988
1989
1990     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
1991       (*call->callNumber)++;
1992     rxi_ResetCall(call, 0);
1993     call->conn->call[channel] = (struct rx_call *) 0;
1994
1995     MUTEX_ENTER(&rx_freeCallQueue_lock);
1996     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
1997 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1998     /* A call may be free even though its transmit queue is still in use.
1999      * Since we search the call list from head to tail, put busy calls at
2000      * the head of the list, and idle calls at the tail.
2001      */
2002     if (call->flags & RX_CALL_TQ_BUSY)
2003         queue_Prepend(&rx_freeCallQueue, call);
2004     else
2005         queue_Append(&rx_freeCallQueue, call);
2006 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2007     queue_Append(&rx_freeCallQueue, call);
2008 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2009     MUTEX_ENTER(&rx_stats_mutex);
2010     rx_stats.nFreeCallStructs++;
2011     MUTEX_EXIT(&rx_stats_mutex);
2012
2013     MUTEX_EXIT(&rx_freeCallQueue_lock);
2014  
2015     /* Destroy the connection if it was previously slated for
2016      * destruction, i.e. the Rx client code previously called
2017      * rx_DestroyConnection (client connections), or
2018      * rxi_ReapConnections called the same routine (server
2019      * connections).  Only do this, however, if there are no
2020      * outstanding calls. Note that for fine grain locking, there appears
2021      * to be a deadlock in that rxi_FreeCall has a call locked and
2022      * DestroyConnectionNoLock locks each call in the conn. But note a
2023      * few lines up where we have removed this call from the conn.
2024      * If someone else destroys a connection, they either have no
2025      * call lock held or are going through this section of code.
2026      */
2027     if (conn->flags & RX_CONN_DESTROY_ME) {
2028         MUTEX_ENTER(&conn->conn_data_lock);
2029         conn->refCount++;
2030         MUTEX_EXIT(&conn->conn_data_lock);
2031 #ifdef RX_ENABLE_LOCKS
2032         if (haveCTLock)
2033             rxi_DestroyConnectionNoLock(conn);
2034         else
2035             rxi_DestroyConnection(conn);
2036 #else /* RX_ENABLE_LOCKS */
2037         rxi_DestroyConnection(conn);
2038 #endif /* RX_ENABLE_LOCKS */
2039     }
2040 }
2041
2042 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2043 char *rxi_Alloc(size)
2044 register size_t size;
2045 {
2046     register char *p;
2047
2048 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2049     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2050      * implementation.
2051      */
2052     int glockOwner = ISAFS_GLOCK();
2053     if (!glockOwner)
2054         AFS_GLOCK();
2055 #endif
2056     MUTEX_ENTER(&rx_stats_mutex);
2057     rxi_Alloccnt++; rxi_Allocsize += size;
2058     MUTEX_EXIT(&rx_stats_mutex);
2059 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2060     if (size > AFS_SMALLOCSIZ) {
2061         p = (char *) osi_AllocMediumSpace(size);
2062     } else
2063         p = (char *) osi_AllocSmall(size, 1);
2064 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2065     if (!glockOwner)
2066         AFS_GUNLOCK();
2067 #endif
2068 #else
2069     p = (char *) osi_Alloc(size);
2070 #endif
2071     if (!p) osi_Panic("rxi_Alloc error");
2072     bzero(p, size);
2073     return p;
2074 }
2075
2076 void rxi_Free(addr, size)
2077 void *addr;
2078 register size_t size;
2079 {
2080 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2081     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2082      * implementation.
2083      */
2084     int glockOwner = ISAFS_GLOCK();
2085     if (!glockOwner)
2086         AFS_GLOCK();
2087 #endif
2088     MUTEX_ENTER(&rx_stats_mutex);
2089     rxi_Alloccnt--; rxi_Allocsize -= size;
2090     MUTEX_EXIT(&rx_stats_mutex);
2091 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2092     if (size > AFS_SMALLOCSIZ)
2093         osi_FreeMediumSpace(addr);
2094     else
2095         osi_FreeSmall(addr);
2096 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2097     if (!glockOwner)
2098         AFS_GUNLOCK();
2099 #endif
2100 #else
2101     osi_Free(addr, size);
2102 #endif    
2103 }
2104
2105 /* Find the peer process represented by the supplied (host,port)
2106  * combination.  If there is no appropriate active peer structure, a
2107  * new one will be allocated and initialized 
2108  * The origPeer, if set, is a pointer to a peer structure on which the
2109  * refcount will be be decremented. This is used to replace the peer
2110  * structure hanging off a connection structure */
2111 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2112     register afs_uint32 host;
2113     register u_short port;
2114     struct rx_peer *origPeer;
2115     int create;
2116 {
2117     register struct rx_peer *pp;
2118     int hashIndex;
2119     hashIndex = PEER_HASH(host, port);
2120     MUTEX_ENTER(&rx_peerHashTable_lock);
2121     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2122         if ((pp->host == host) && (pp->port == port)) break;
2123     }
2124     if (!pp) {
2125         if (create) {
2126             pp = rxi_AllocPeer(); /* This bzero's *pp */
2127             pp->host = host;      /* set here or in InitPeerParams is zero */
2128             pp->port = port;
2129             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2130             queue_Init(&pp->congestionQueue);
2131             queue_Init(&pp->rpcStats);
2132             pp->next = rx_peerHashTable[hashIndex];
2133             rx_peerHashTable[hashIndex] = pp;
2134             rxi_InitPeerParams(pp);
2135             MUTEX_ENTER(&rx_stats_mutex);
2136             rx_stats.nPeerStructs++;
2137             MUTEX_EXIT(&rx_stats_mutex);
2138         }
2139     }
2140     if (pp && create) {
2141         pp->refCount++;
2142     }
2143     if ( origPeer)
2144         origPeer->refCount--;
2145     MUTEX_EXIT(&rx_peerHashTable_lock);
2146     return pp;
2147 }
2148
2149
2150 /* Find the connection at (host, port) started at epoch, and with the
2151  * given connection id.  Creates the server connection if necessary.
2152  * The type specifies whether a client connection or a server
2153  * connection is desired.  In both cases, (host, port) specify the
2154  * peer's (host, pair) pair.  Client connections are not made
2155  * automatically by this routine.  The parameter socket gives the
2156  * socket descriptor on which the packet was received.  This is used,
2157  * in the case of server connections, to check that *new* connections
2158  * come via a valid (port, serviceId).  Finally, the securityIndex
2159  * parameter must match the existing index for the connection.  If a
2160  * server connection is created, it will be created using the supplied
2161  * index, if the index is valid for this service */
2162 struct rx_connection *
2163 rxi_FindConnection(socket, host, port, serviceId, cid, 
2164                    epoch, type, securityIndex)
2165     osi_socket socket;
2166     register afs_int32 host;
2167     register u_short port;
2168     u_short serviceId;
2169     afs_uint32 cid;
2170     afs_uint32 epoch;
2171     int type;
2172     u_int securityIndex;
2173 {
2174     int hashindex, flag;
2175     register struct rx_connection *conn;
2176     struct rx_peer *peer;
2177     hashindex = CONN_HASH(host, port, cid, epoch, type);
2178     MUTEX_ENTER(&rx_connHashTable_lock);
2179     rxLastConn ? (conn = rxLastConn, flag = 0) :
2180                  (conn = rx_connHashTable[hashindex], flag = 1);
2181     for (; conn; ) {
2182       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2183           && (epoch == conn->epoch)) {
2184         register struct rx_peer *pp = conn->peer;
2185         if (securityIndex != conn->securityIndex) {
2186             /* this isn't supposed to happen, but someone could forge a packet
2187                like this, and there seems to be some CM bug that makes this
2188                happen from time to time -- in which case, the fileserver
2189                asserts. */  
2190             MUTEX_EXIT(&rx_connHashTable_lock);
2191             return (struct rx_connection *) 0;
2192         }
2193         /* epoch's high order bits mean route for security reasons only on
2194          * the cid, not the host and port fields.
2195          */
2196         if (conn->epoch & 0x80000000) break;
2197         if (((type == RX_CLIENT_CONNECTION) 
2198              || (pp->host == host)) && (pp->port == port))
2199           break;
2200       }
2201       if ( !flag )
2202       {
2203         /* the connection rxLastConn that was used the last time is not the
2204         ** one we are looking for now. Hence, start searching in the hash */
2205         flag = 1;
2206         conn = rx_connHashTable[hashindex];
2207       }
2208       else
2209         conn = conn->next;
2210     }
2211     if (!conn) {
2212         struct rx_service *service;
2213         if (type == RX_CLIENT_CONNECTION) {
2214             MUTEX_EXIT(&rx_connHashTable_lock);
2215             return (struct rx_connection *) 0;
2216         }
2217         service = rxi_FindService(socket, serviceId);
2218         if (!service || (securityIndex >= service->nSecurityObjects) 
2219             || (service->securityObjects[securityIndex] == 0)) {
2220             MUTEX_EXIT(&rx_connHashTable_lock);
2221             return (struct rx_connection *) 0;
2222         }
2223         conn = rxi_AllocConnection(); /* This bzero's the connection */
2224         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2225                    MUTEX_DEFAULT,0);
2226         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2227                    MUTEX_DEFAULT,0);
2228         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2229         conn->next = rx_connHashTable[hashindex];
2230         rx_connHashTable[hashindex] = conn;
2231         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2232         conn->type = RX_SERVER_CONNECTION;
2233         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2234         conn->epoch = epoch;
2235         conn->cid = cid & RX_CIDMASK;
2236         /* conn->serial = conn->lastSerial = 0; */
2237         /* conn->timeout = 0; */
2238         conn->ackRate = RX_FAST_ACK_RATE;
2239         conn->service = service;
2240         conn->serviceId = serviceId;
2241         conn->securityIndex = securityIndex;
2242         conn->securityObject = service->securityObjects[securityIndex];
2243         conn->nSpecific = 0;
2244         conn->specific = NULL;
2245         rx_SetConnDeadTime(conn, service->connDeadTime);
2246         /* Notify security object of the new connection */
2247         RXS_NewConnection(conn->securityObject, conn);
2248         /* XXXX Connection timeout? */
2249         if (service->newConnProc) (*service->newConnProc)(conn);
2250         MUTEX_ENTER(&rx_stats_mutex);
2251         rx_stats.nServerConns++;
2252         MUTEX_EXIT(&rx_stats_mutex);
2253     }
2254     else
2255     {
2256     /* Ensure that the peer structure is set up in such a way that
2257     ** replies in this connection go back to that remote interface
2258     ** from which the last packet was sent out. In case, this packet's
2259     ** source IP address does not match the peer struct for this conn,
2260     ** then drop the refCount on conn->peer and get a new peer structure.
2261     ** We can check the host,port field in the peer structure without the
2262     ** rx_peerHashTable_lock because the peer structure has its refCount
2263     ** incremented and the only time the host,port in the peer struct gets
2264     ** updated is when the peer structure is created.
2265     */
2266         if (conn->peer->host == host )
2267                 peer = conn->peer; /* no change to the peer structure */
2268         else
2269                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2270     }
2271
2272     MUTEX_ENTER(&conn->conn_data_lock);
2273     conn->refCount++;
2274     conn->peer = peer;
2275     MUTEX_EXIT(&conn->conn_data_lock);
2276
2277     rxLastConn = conn;  /* store this connection as the last conn used */
2278     MUTEX_EXIT(&rx_connHashTable_lock);
2279     return conn;
2280 }
2281
2282 /* There are two packet tracing routines available for testing and monitoring
2283  * Rx.  One is called just after every packet is received and the other is
2284  * called just before every packet is sent.  Received packets, have had their
2285  * headers decoded, and packets to be sent have not yet had their headers
2286  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2287  * containing the network address.  Both can be modified.  The return value, if
2288  * non-zero, indicates that the packet should be dropped.  */
2289
2290 int (*rx_justReceived)() = 0;
2291 int (*rx_almostSent)() = 0;
2292
2293 /* A packet has been received off the interface.  Np is the packet, socket is
2294  * the socket number it was received from (useful in determining which service
2295  * this packet corresponds to), and (host, port) reflect the host,port of the
2296  * sender.  This call returns the packet to the caller if it is finished with
2297  * it, rather than de-allocating it, just as a small performance hack */
2298
2299 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2300     register struct rx_packet *np;
2301     osi_socket socket;
2302     afs_uint32 host;
2303     u_short port;
2304     int *tnop;
2305     struct rx_call **newcallp;
2306 {
2307     register struct rx_call *call;
2308     register struct rx_connection *conn;
2309     int channel;
2310     afs_uint32 currentCallNumber;
2311     int type;
2312     int skew;
2313 #ifdef RXDEBUG
2314     char *packetType;
2315 #endif
2316     struct rx_packet *tnp;
2317
2318 #ifdef RXDEBUG
2319 /* We don't print out the packet until now because (1) the time may not be
2320  * accurate enough until now in the lwp implementation (rx_Listener only gets
2321  * the time after the packet is read) and (2) from a protocol point of view,
2322  * this is the first time the packet has been seen */
2323     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2324         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2325     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2326          np->header.serial, packetType, host, port, np->header.serviceId,
2327          np->header.epoch, np->header.cid, np->header.callNumber, 
2328          np->header.seq, np->header.flags, np));
2329 #endif
2330
2331     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2332       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2333     }
2334
2335     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2336         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2337     }
2338 #ifdef RXDEBUG
2339     /* If an input tracer function is defined, call it with the packet and
2340      * network address.  Note this function may modify its arguments. */
2341     if (rx_justReceived) {
2342         struct sockaddr_in addr;
2343         int drop;
2344         addr.sin_family = AF_INET;
2345         addr.sin_port = port;
2346         addr.sin_addr.s_addr = host;
2347 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2348         addr.sin_len = sizeof(addr);
2349 #endif  /* AFS_OSF_ENV */
2350         drop = (*rx_justReceived) (np, &addr);
2351         /* drop packet if return value is non-zero */
2352         if (drop) return np;
2353         port = addr.sin_port;           /* in case fcn changed addr */
2354         host = addr.sin_addr.s_addr;
2355     }
2356 #endif
2357
2358     /* If packet was not sent by the client, then *we* must be the client */
2359     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2360         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2361
2362     /* Find the connection (or fabricate one, if we're the server & if
2363      * necessary) associated with this packet */
2364     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2365                               np->header.cid, np->header.epoch, type, 
2366                               np->header.securityIndex);
2367
2368     if (!conn) {
2369       /* If no connection found or fabricated, just ignore the packet.
2370        * (An argument could be made for sending an abort packet for
2371        * the conn) */
2372       return np;
2373     }   
2374
2375     MUTEX_ENTER(&conn->conn_data_lock);
2376     if (conn->maxSerial < np->header.serial)
2377       conn->maxSerial = np->header.serial;
2378     MUTEX_EXIT(&conn->conn_data_lock);
2379
2380     /* If the connection is in an error state, send an abort packet and ignore
2381      * the incoming packet */
2382     if (conn->error) {
2383         /* Don't respond to an abort packet--we don't want loops! */
2384         MUTEX_ENTER(&conn->conn_data_lock);
2385         if (np->header.type != RX_PACKET_TYPE_ABORT)
2386             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2387         conn->refCount--;
2388         MUTEX_EXIT(&conn->conn_data_lock);
2389         return np;
2390     }
2391
2392     /* Check for connection-only requests (i.e. not call specific). */
2393     if (np->header.callNumber == 0) {
2394         switch (np->header.type) {
2395             case RX_PACKET_TYPE_ABORT:
2396                 /* What if the supplied error is zero? */
2397                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2398                 MUTEX_ENTER(&conn->conn_data_lock);
2399                 conn->refCount--;
2400                 MUTEX_EXIT(&conn->conn_data_lock);
2401                 return np;
2402             case RX_PACKET_TYPE_CHALLENGE:
2403                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2404                 MUTEX_ENTER(&conn->conn_data_lock);
2405                 conn->refCount--;
2406                 MUTEX_EXIT(&conn->conn_data_lock);
2407                 return tnp;
2408             case RX_PACKET_TYPE_RESPONSE:
2409                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2410                 MUTEX_ENTER(&conn->conn_data_lock);
2411                 conn->refCount--;
2412                 MUTEX_EXIT(&conn->conn_data_lock);
2413                 return tnp;
2414             case RX_PACKET_TYPE_PARAMS:
2415             case RX_PACKET_TYPE_PARAMS+1:
2416             case RX_PACKET_TYPE_PARAMS+2:
2417                 /* ignore these packet types for now */
2418                 MUTEX_ENTER(&conn->conn_data_lock);
2419                 conn->refCount--;
2420                 MUTEX_EXIT(&conn->conn_data_lock);
2421                 return np;
2422
2423
2424             default:
2425                 /* Should not reach here, unless the peer is broken: send an
2426                  * abort packet */
2427                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2428                 MUTEX_ENTER(&conn->conn_data_lock);
2429                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2430                 conn->refCount--;
2431                 MUTEX_EXIT(&conn->conn_data_lock);
2432                 return tnp;
2433         }
2434     }
2435
2436     channel = np->header.cid & RX_CHANNELMASK;
2437     call = conn->call[channel];
2438 #ifdef  RX_ENABLE_LOCKS
2439     if (call)
2440         MUTEX_ENTER(&call->lock);
2441     /* Test to see if call struct is still attached to conn. */
2442     if (call != conn->call[channel]) {
2443         if (call)
2444             MUTEX_EXIT(&call->lock);
2445         if (type == RX_SERVER_CONNECTION) {
2446             call = conn->call[channel];
2447             /* If we started with no call attached and there is one now,
2448              * another thread is also running this routine and has gotten
2449              * the connection channel. We should drop this packet in the tests
2450              * below. If there was a call on this connection and it's now
2451              * gone, then we'll be making a new call below.
2452              * If there was previously a call and it's now different then
2453              * the old call was freed and another thread running this routine
2454              * has created a call on this channel. One of these two threads
2455              * has a packet for the old call and the code below handles those
2456              * cases.
2457              */
2458             if (call)
2459                 MUTEX_ENTER(&call->lock);
2460         }
2461         else {
2462             /* This packet can't be for this call. If the new call address is
2463              * 0 then no call is running on this channel. If there is a call
2464              * then, since this is a client connection we're getting data for
2465              * it must be for the previous call.
2466              */
2467             MUTEX_ENTER(&rx_stats_mutex);
2468             rx_stats.spuriousPacketsRead++;
2469             MUTEX_EXIT(&rx_stats_mutex);
2470             MUTEX_ENTER(&conn->conn_data_lock);
2471             conn->refCount--;
2472             MUTEX_EXIT(&conn->conn_data_lock);
2473             return np;
2474         }
2475     }
2476 #endif
2477     currentCallNumber = conn->callNumber[channel];
2478
2479     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2480         if (np->header.callNumber < currentCallNumber) {
2481             MUTEX_ENTER(&rx_stats_mutex);
2482             rx_stats.spuriousPacketsRead++;
2483             MUTEX_EXIT(&rx_stats_mutex);
2484 #ifdef  RX_ENABLE_LOCKS
2485             if (call)
2486                 MUTEX_EXIT(&call->lock);
2487 #endif
2488             MUTEX_ENTER(&conn->conn_data_lock);
2489             conn->refCount--;
2490             MUTEX_EXIT(&conn->conn_data_lock);
2491             return np;
2492         }
2493         if (!call) {
2494             call = rxi_NewCall(conn, channel);
2495             MUTEX_ENTER(&call->lock);
2496             *call->callNumber = np->header.callNumber;
2497             call->state = RX_STATE_PRECALL;
2498             clock_GetTime(&call->queueTime);
2499             hzero(call->bytesSent);
2500             hzero(call->bytesRcvd);
2501             rxi_KeepAliveOn(call);
2502         }
2503         else if (np->header.callNumber != currentCallNumber) {
2504             /* Wait until the transmit queue is idle before deciding
2505              * whether to reset the current call. Chances are that the
2506              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2507              * flag is cleared.
2508              */
2509 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2510             while ((call->state == RX_STATE_ACTIVE) &&
2511                    (call->flags & RX_CALL_TQ_BUSY)) {
2512                 call->flags |= RX_CALL_TQ_WAIT;
2513 #ifdef RX_ENABLE_LOCKS
2514                 CV_WAIT(&call->cv_tq, &call->lock);
2515 #else /* RX_ENABLE_LOCKS */
2516                 osi_rxSleep(&call->tq);
2517 #endif /* RX_ENABLE_LOCKS */
2518             }
2519 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2520             /* If the new call cannot be taken right now send a busy and set
2521              * the error condition in this call, so that it terminates as
2522              * quickly as possible */
2523             if (call->state == RX_STATE_ACTIVE) {
2524                 struct rx_packet *tp;
2525
2526                 rxi_CallError(call, RX_CALL_DEAD);
2527                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2528                 MUTEX_EXIT(&call->lock);
2529                 MUTEX_ENTER(&conn->conn_data_lock);
2530                 conn->refCount--;
2531                 MUTEX_EXIT(&conn->conn_data_lock);
2532                 return tp;
2533             }
2534             rxi_ResetCall(call, 0);
2535             *call->callNumber = np->header.callNumber;
2536             call->state = RX_STATE_PRECALL;
2537             clock_GetTime(&call->queueTime);
2538             hzero(call->bytesSent);
2539             hzero(call->bytesRcvd);
2540             /*
2541              * If the number of queued calls exceeds the overload
2542              * threshold then abort this call.
2543              */
2544             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2545                 struct rx_packet *tp;
2546
2547                 rxi_CallError(call, rx_BusyError);
2548                 tp = rxi_SendCallAbort(call, np, 1, 0);
2549                 MUTEX_EXIT(&call->lock);
2550                 MUTEX_ENTER(&conn->conn_data_lock);
2551                 conn->refCount--;
2552                 MUTEX_EXIT(&conn->conn_data_lock);
2553                 return tp;
2554             }
2555             rxi_KeepAliveOn(call);
2556         }
2557         else {
2558             /* Continuing call; do nothing here. */
2559         }
2560     } else { /* we're the client */
2561         /* Ignore all incoming acknowledgements for calls in DALLY state */
2562         if ( call && (call->state == RX_STATE_DALLY) 
2563          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2564             MUTEX_ENTER(&rx_stats_mutex);
2565             rx_stats.ignorePacketDally++;
2566             MUTEX_EXIT(&rx_stats_mutex);
2567 #ifdef  RX_ENABLE_LOCKS
2568             if (call) {
2569                 MUTEX_EXIT(&call->lock);
2570             }
2571 #endif
2572             MUTEX_ENTER(&conn->conn_data_lock);
2573             conn->refCount--;
2574             MUTEX_EXIT(&conn->conn_data_lock);
2575             return np;
2576         }
2577         
2578         /* Ignore anything that's not relevant to the current call.  If there
2579          * isn't a current call, then no packet is relevant. */
2580         if (!call || (np->header.callNumber != currentCallNumber)) {
2581             MUTEX_ENTER(&rx_stats_mutex);
2582             rx_stats.spuriousPacketsRead++;
2583             MUTEX_EXIT(&rx_stats_mutex);
2584 #ifdef  RX_ENABLE_LOCKS
2585             if (call) {
2586                 MUTEX_EXIT(&call->lock);
2587             }
2588 #endif
2589             MUTEX_ENTER(&conn->conn_data_lock);
2590             conn->refCount--;
2591             MUTEX_EXIT(&conn->conn_data_lock);
2592             return np;  
2593         }
2594         /* If the service security object index stamped in the packet does not
2595          * match the connection's security index, ignore the packet */
2596         if (np->header.securityIndex != conn->securityIndex) {
2597 #ifdef  RX_ENABLE_LOCKS
2598             MUTEX_EXIT(&call->lock);
2599 #endif
2600             MUTEX_ENTER(&conn->conn_data_lock);
2601             conn->refCount--;       
2602             MUTEX_EXIT(&conn->conn_data_lock);
2603             return np;
2604         }
2605
2606         /* If we're receiving the response, then all transmit packets are
2607          * implicitly acknowledged.  Get rid of them. */
2608         if (np->header.type == RX_PACKET_TYPE_DATA) {
2609 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2610             /* XXX Hack. Because we must release the global rx lock when
2611              * sending packets (osi_NetSend) we drop all acks while we're
2612              * traversing the tq in rxi_Start sending packets out because
2613              * packets may move to the freePacketQueue as result of being here!
2614              * So we drop these packets until we're safely out of the
2615              * traversing. Really ugly! 
2616              * For fine grain RX locking, we set the acked field in the
2617              * packets and let rxi_Start remove them from the transmit queue.
2618              */
2619             if (call->flags & RX_CALL_TQ_BUSY) {
2620 #ifdef  RX_ENABLE_LOCKS
2621                 rxi_SetAcksInTransmitQueue(call);
2622 #else
2623                 conn->refCount--;
2624                 return np;              /* xmitting; drop packet */
2625 #endif
2626             }
2627             else {
2628                 rxi_ClearTransmitQueue(call, 0);
2629             }
2630 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2631             rxi_ClearTransmitQueue(call, 0);
2632 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2633         } else {
2634           if (np->header.type == RX_PACKET_TYPE_ACK) {
2635         /* now check to see if this is an ack packet acknowledging that the
2636          * server actually *lost* some hard-acked data.  If this happens we
2637          * ignore this packet, as it may indicate that the server restarted in
2638          * the middle of a call.  It is also possible that this is an old ack
2639          * packet.  We don't abort the connection in this case, because this
2640          * *might* just be an old ack packet.  The right way to detect a server
2641          * restart in the midst of a call is to notice that the server epoch
2642          * changed, btw.  */
2643         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2644          * XXX unacknowledged.  I think that this is off-by-one, but
2645          * XXX I don't dare change it just yet, since it will
2646          * XXX interact badly with the server-restart detection 
2647          * XXX code in receiveackpacket.  */
2648             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2649                 MUTEX_ENTER(&rx_stats_mutex);
2650                 rx_stats.spuriousPacketsRead++;
2651                 MUTEX_EXIT(&rx_stats_mutex);
2652                 MUTEX_EXIT(&call->lock);
2653                 MUTEX_ENTER(&conn->conn_data_lock);
2654                 conn->refCount--;
2655                 MUTEX_EXIT(&conn->conn_data_lock);
2656                 return np;
2657             }
2658           }
2659         } /* else not a data packet */
2660     }
2661
2662     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2663     /* Set remote user defined status from packet */
2664     call->remoteStatus = np->header.userStatus;
2665
2666     /* Note the gap between the expected next packet and the actual
2667      * packet that arrived, when the new packet has a smaller serial number
2668      * than expected.  Rioses frequently reorder packets all by themselves,
2669      * so this will be quite important with very large window sizes.
2670      * Skew is checked against 0 here to avoid any dependence on the type of
2671      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2672      * true! 
2673      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2674      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2675      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2676      */
2677     MUTEX_ENTER(&conn->conn_data_lock);
2678     skew = conn->lastSerial - np->header.serial;
2679     conn->lastSerial = np->header.serial;
2680     MUTEX_EXIT(&conn->conn_data_lock);
2681     if (skew > 0) {
2682       register struct rx_peer *peer;
2683       peer = conn->peer;
2684       if (skew > peer->inPacketSkew) {
2685         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2686         peer->inPacketSkew = skew;
2687       }
2688     }
2689
2690     /* Now do packet type-specific processing */
2691     switch (np->header.type) {
2692         case RX_PACKET_TYPE_DATA:
2693             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2694                                        tnop, newcallp);
2695             break;
2696         case RX_PACKET_TYPE_ACK:
2697             /* Respond immediately to ack packets requesting acknowledgement
2698              * (ping packets) */
2699             if (np->header.flags & RX_REQUEST_ACK) {
2700                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2701                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2702             }
2703             np = rxi_ReceiveAckPacket(call, np, 1);
2704             break;
2705         case RX_PACKET_TYPE_ABORT:
2706             /* An abort packet: reset the connection, passing the error up to
2707              * the user */
2708             /* What if error is zero? */
2709             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2710             break;
2711         case RX_PACKET_TYPE_BUSY:
2712             /* XXXX */
2713             break;
2714         case RX_PACKET_TYPE_ACKALL:
2715             /* All packets acknowledged, so we can drop all packets previously
2716              * readied for sending */
2717 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2718             /* XXX Hack. We because we can't release the global rx lock when
2719              * sending packets (osi_NetSend) we drop all ack pkts while we're
2720              * traversing the tq in rxi_Start sending packets out because
2721              * packets may move to the freePacketQueue as result of being
2722              * here! So we drop these packets until we're safely out of the
2723              * traversing. Really ugly! 
2724              * For fine grain RX locking, we set the acked field in the packets
2725              * and let rxi_Start remove the packets from the transmit queue.
2726              */
2727             if (call->flags & RX_CALL_TQ_BUSY) {
2728 #ifdef  RX_ENABLE_LOCKS
2729                 rxi_SetAcksInTransmitQueue(call);
2730                 break;
2731 #else /* RX_ENABLE_LOCKS */
2732                 conn->refCount--;
2733                 return np;              /* xmitting; drop packet */
2734 #endif /* RX_ENABLE_LOCKS */
2735             }
2736 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2737             rxi_ClearTransmitQueue(call, 0);
2738             break;
2739         default:
2740             /* Should not reach here, unless the peer is broken: send an abort
2741              * packet */
2742             rxi_CallError(call, RX_PROTOCOL_ERROR);
2743             np = rxi_SendCallAbort(call, np, 1, 0);
2744             break;
2745     };
2746     /* Note when this last legitimate packet was received, for keep-alive
2747      * processing.  Note, we delay getting the time until now in the hope that
2748      * the packet will be delivered to the user before any get time is required
2749      * (if not, then the time won't actually be re-evaluated here). */
2750     call->lastReceiveTime = clock_Sec();
2751     MUTEX_EXIT(&call->lock);
2752     MUTEX_ENTER(&conn->conn_data_lock);
2753     conn->refCount--;
2754     MUTEX_EXIT(&conn->conn_data_lock);
2755     return np;
2756 }
2757
2758 /* return true if this is an "interesting" connection from the point of view
2759     of someone trying to debug the system */
2760 int rxi_IsConnInteresting(struct rx_connection *aconn)
2761 {
2762     register int i;
2763     register struct rx_call *tcall;
2764
2765     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2766         return 1;
2767     for(i=0;i<RX_MAXCALLS;i++) {
2768         tcall = aconn->call[i];
2769         if (tcall) {
2770             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2771                 return 1;
2772             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2773                 return 1;
2774         }
2775     }
2776     return 0;
2777 }
2778
2779 #ifdef KERNEL
2780 /* if this is one of the last few packets AND it wouldn't be used by the
2781    receiving call to immediately satisfy a read request, then drop it on
2782    the floor, since accepting it might prevent a lock-holding thread from
2783    making progress in its reading. If a call has been cleared while in
2784    the precall state then ignore all subsequent packets until the call
2785    is assigned to a thread. */
2786
2787 static TooLow(ap, acall)
2788   struct rx_call *acall;
2789   struct rx_packet *ap; {
2790     int rc=0;
2791     MUTEX_ENTER(&rx_stats_mutex);
2792     if (((ap->header.seq != 1) &&
2793          (acall->flags & RX_CALL_CLEARED) &&
2794          (acall->state == RX_STATE_PRECALL)) ||
2795         ((rx_nFreePackets < rxi_dataQuota+2) &&
2796          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2797            && (acall->flags & RX_CALL_READER_WAIT)))) {
2798         rc = 1;
2799     }
2800     MUTEX_EXIT(&rx_stats_mutex);
2801     return rc;
2802 }
2803 #endif /* KERNEL */
2804
2805 /* try to attach call, if authentication is complete */
2806 static void TryAttach(acall, socket, tnop, newcallp)
2807 register struct rx_call *acall;
2808 register osi_socket socket;
2809 register int *tnop;
2810 register struct rx_call **newcallp; {
2811     register struct rx_connection *conn;
2812     conn = acall->conn;
2813     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2814         /* Don't attach until we have any req'd. authentication. */
2815         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2816             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2817             /* Note:  this does not necessarily succeed; there
2818                may not any proc available */
2819         }
2820         else {
2821             rxi_ChallengeOn(acall->conn);
2822         }
2823     }
2824 }
2825
2826 /* A data packet has been received off the interface.  This packet is
2827  * appropriate to the call (the call is in the right state, etc.).  This
2828  * routine can return a packet to the caller, for re-use */
2829
2830 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2831                                         port, tnop, newcallp)
2832     register struct rx_call *call;
2833     register struct rx_packet *np;
2834     int istack;
2835     osi_socket socket;
2836     afs_uint32 host;
2837     u_short port;
2838     int *tnop;
2839     struct rx_call **newcallp;
2840 {
2841     int ackNeeded = 0;
2842     int newPackets = 0;
2843     int didHardAck = 0;
2844     int haveLast = 0;
2845     afs_uint32 seq, serial, flags;
2846     int isFirst;
2847     struct rx_packet *tnp;
2848     struct clock when;
2849     MUTEX_ENTER(&rx_stats_mutex);
2850     rx_stats.dataPacketsRead++;
2851     MUTEX_EXIT(&rx_stats_mutex);
2852
2853 #ifdef KERNEL
2854     /* If there are no packet buffers, drop this new packet, unless we can find
2855      * packet buffers from inactive calls */
2856     if (!call->error &&
2857         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2858         MUTEX_ENTER(&rx_freePktQ_lock);
2859         rxi_NeedMorePackets = TRUE;
2860         MUTEX_EXIT(&rx_freePktQ_lock);
2861         MUTEX_ENTER(&rx_stats_mutex);
2862         rx_stats.noPacketBuffersOnRead++;
2863         MUTEX_EXIT(&rx_stats_mutex);
2864         call->rprev = np->header.serial;
2865         rxi_calltrace(RX_TRACE_DROP, call);
2866         dpf (("packet %x dropped on receipt - quota problems", np));
2867         if (rxi_doreclaim)
2868             rxi_ClearReceiveQueue(call);
2869         clock_GetTime(&when);
2870         clock_Add(&when, &rx_softAckDelay);
2871         if (!call->delayedAckEvent ||
2872             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2873             rxevent_Cancel(call->delayedAckEvent, call,
2874                            RX_CALL_REFCOUNT_DELAY);
2875             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2876             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2877                                                  call, 0);
2878         }
2879         /* we've damaged this call already, might as well do it in. */
2880         return np;
2881     }
2882 #endif /* KERNEL */
2883
2884     /*
2885      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2886      * packet is one of several packets transmitted as a single
2887      * datagram. Do not send any soft or hard acks until all packets
2888      * in a jumbogram have been processed. Send negative acks right away.
2889      */
2890     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2891         /* tnp is non-null when there are more packets in the
2892          * current jumbo gram */
2893         if (tnp) {
2894             if (np)
2895                 rxi_FreePacket(np);
2896             np = tnp;
2897         }
2898
2899         seq = np->header.seq;
2900         serial = np->header.serial;
2901         flags = np->header.flags;
2902
2903         /* If the call is in an error state, send an abort message */
2904         if (call->error)
2905             return rxi_SendCallAbort(call, np, istack, 0);
2906
2907         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2908          * AFS 3.5 jumbogram. */
2909         if (flags & RX_JUMBO_PACKET) {
2910             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2911         } else {
2912             tnp = NULL;
2913         }
2914
2915         if (np->header.spare != 0) {
2916             MUTEX_ENTER(&call->conn->conn_data_lock);
2917             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2918             MUTEX_EXIT(&call->conn->conn_data_lock);
2919         }
2920
2921         /* The usual case is that this is the expected next packet */
2922         if (seq == call->rnext) {
2923
2924             /* Check to make sure it is not a duplicate of one already queued */
2925             if (queue_IsNotEmpty(&call->rq) 
2926                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2927                 MUTEX_ENTER(&rx_stats_mutex);
2928                 rx_stats.dupPacketsRead++;
2929                 MUTEX_EXIT(&rx_stats_mutex);
2930                 dpf (("packet %x dropped on receipt - duplicate", np));
2931                 rxevent_Cancel(call->delayedAckEvent, call,
2932                                RX_CALL_REFCOUNT_DELAY);
2933                 np = rxi_SendAck(call, np, seq, serial,
2934                                  flags, RX_ACK_DUPLICATE, istack);
2935                 ackNeeded = 0;
2936                 call->rprev = seq;
2937                 continue;
2938             }
2939
2940             /* It's the next packet. Stick it on the receive queue
2941              * for this call. Set newPackets to make sure we wake
2942              * the reader once all packets have been processed */
2943             queue_Prepend(&call->rq, np);
2944             call->nSoftAcks++;
2945             np = NULL; /* We can't use this anymore */
2946             newPackets = 1;
2947
2948             /* If an ack is requested then set a flag to make sure we
2949              * send an acknowledgement for this packet */
2950             if (flags & RX_REQUEST_ACK) {
2951                 ackNeeded = 1;
2952             }
2953
2954             /* Keep track of whether we have received the last packet */
2955             if (flags & RX_LAST_PACKET) {
2956                 call->flags |= RX_CALL_HAVE_LAST;
2957                 haveLast = 1;
2958             }
2959
2960             /* Check whether we have all of the packets for this call */
2961             if (call->flags & RX_CALL_HAVE_LAST) {
2962                 afs_uint32 tseq;                /* temporary sequence number */
2963                 struct rx_packet *tp;   /* Temporary packet pointer */
2964                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2965
2966                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2967                     if (tseq != tp->header.seq)
2968                         break;
2969                     if (tp->header.flags & RX_LAST_PACKET) {
2970                         call->flags |= RX_CALL_RECEIVE_DONE;
2971                         break;
2972                     }
2973                     tseq++;
2974                 }
2975             }
2976
2977             /* Provide asynchronous notification for those who want it
2978              * (e.g. multi rx) */
2979             if (call->arrivalProc) {
2980                 (*call->arrivalProc)(call, call->arrivalProcHandle,
2981                                      call->arrivalProcArg);
2982                 call->arrivalProc = (VOID (*)()) 0;
2983             }
2984
2985             /* Update last packet received */
2986             call->rprev = seq;
2987
2988             /* If there is no server process serving this call, grab
2989              * one, if available. We only need to do this once. If a
2990              * server thread is available, this thread becomes a server
2991              * thread and the server thread becomes a listener thread. */
2992             if (isFirst) {
2993                 TryAttach(call, socket, tnop, newcallp);
2994             }
2995         }       
2996         /* This is not the expected next packet. */
2997         else {
2998             /* Determine whether this is a new or old packet, and if it's
2999              * a new one, whether it fits into the current receive window.
3000              * Also figure out whether the packet was delivered in sequence.
3001              * We use the prev variable to determine whether the new packet
3002              * is the successor of its immediate predecessor in the
3003              * receive queue, and the missing flag to determine whether
3004              * any of this packets predecessors are missing.  */
3005
3006             afs_uint32 prev;            /* "Previous packet" sequence number */
3007             struct rx_packet *tp;       /* Temporary packet pointer */
3008             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3009             int missing;                /* Are any predecessors missing? */
3010
3011             /* If the new packet's sequence number has been sent to the
3012              * application already, then this is a duplicate */
3013             if (seq < call->rnext) {
3014                 MUTEX_ENTER(&rx_stats_mutex);
3015                 rx_stats.dupPacketsRead++;
3016                 MUTEX_EXIT(&rx_stats_mutex);
3017                 rxevent_Cancel(call->delayedAckEvent, call,
3018                                RX_CALL_REFCOUNT_DELAY);
3019                 np = rxi_SendAck(call, np, seq, serial,
3020                                  flags, RX_ACK_DUPLICATE, istack);
3021                 ackNeeded = 0;
3022                 call->rprev = seq;
3023                 continue;
3024             }
3025
3026             /* If the sequence number is greater than what can be
3027              * accomodated by the current window, then send a negative
3028              * acknowledge and drop the packet */
3029             if ((call->rnext + call->rwind) <= seq) {
3030                 rxevent_Cancel(call->delayedAckEvent, call,
3031                                RX_CALL_REFCOUNT_DELAY);
3032                 np = rxi_SendAck(call, np, seq, serial,
3033                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3034                 ackNeeded = 0;
3035                 call->rprev = seq;
3036                 continue;
3037             }
3038
3039             /* Look for the packet in the queue of old received packets */
3040             for (prev = call->rnext - 1, missing = 0,
3041                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3042                 /*Check for duplicate packet */
3043                 if (seq == tp->header.seq) {
3044                     MUTEX_ENTER(&rx_stats_mutex);
3045                     rx_stats.dupPacketsRead++;
3046                     MUTEX_EXIT(&rx_stats_mutex);
3047                     rxevent_Cancel(call->delayedAckEvent, call,
3048                                    RX_CALL_REFCOUNT_DELAY);
3049                     np = rxi_SendAck(call, np, seq, serial, 
3050                                      flags, RX_ACK_DUPLICATE, istack);
3051                     ackNeeded = 0;
3052                     call->rprev = seq;
3053                     goto nextloop;
3054                 }
3055                 /* If we find a higher sequence packet, break out and
3056                  * insert the new packet here. */
3057                 if (seq < tp->header.seq) break;
3058                 /* Check for missing packet */
3059                 if (tp->header.seq != prev+1) {
3060                     missing = 1;
3061                 }
3062
3063                 prev = tp->header.seq;
3064             }
3065
3066             /* Keep track of whether we have received the last packet. */
3067             if (flags & RX_LAST_PACKET) {
3068                 call->flags |= RX_CALL_HAVE_LAST;
3069             }
3070
3071             /* It's within the window: add it to the the receive queue.
3072              * tp is left by the previous loop either pointing at the
3073              * packet before which to insert the new packet, or at the
3074              * queue head if the queue is empty or the packet should be
3075              * appended. */
3076             queue_InsertBefore(tp, np);
3077             call->nSoftAcks++;
3078             np = NULL;
3079
3080             /* Check whether we have all of the packets for this call */
3081             if ((call->flags & RX_CALL_HAVE_LAST)
3082              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3083                 afs_uint32 tseq;                /* temporary sequence number */
3084
3085                 for (tseq = call->rnext,
3086                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3087                     if (tseq != tp->header.seq)
3088                         break;
3089                     if (tp->header.flags & RX_LAST_PACKET) {
3090                         call->flags |= RX_CALL_RECEIVE_DONE;
3091                         break;
3092                     }
3093                     tseq++;
3094                 }
3095             }
3096
3097             /* We need to send an ack of the packet is out of sequence, 
3098              * or if an ack was requested by the peer. */
3099             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3100                 ackNeeded = 1;
3101             }
3102
3103             /* Acknowledge the last packet for each call */
3104             if (flags & RX_LAST_PACKET) {
3105                 haveLast = 1;
3106             }
3107
3108             call->rprev = seq;
3109         }
3110 nextloop:;
3111     }
3112
3113     if (newPackets) {
3114         /*
3115          * If the receiver is waiting for an iovec, fill the iovec
3116          * using the data from the receive queue */
3117         if (call->flags & RX_CALL_IOVEC_WAIT) {
3118             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3119             /* the call may have been aborted */
3120             if (call->error) {
3121                 return NULL;
3122             }
3123             if (didHardAck) {
3124                 ackNeeded = 0;
3125             }
3126         }
3127
3128         /* Wakeup the reader if any */
3129         if ((call->flags & RX_CALL_READER_WAIT) &&
3130             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3131              (call->iovNext >= call->iovMax) ||
3132              (call->flags & RX_CALL_RECEIVE_DONE))) {
3133             call->flags &= ~RX_CALL_READER_WAIT;
3134 #ifdef  RX_ENABLE_LOCKS
3135             CV_BROADCAST(&call->cv_rq);
3136 #else
3137             osi_rxWakeup(&call->rq);
3138 #endif
3139         }
3140     }
3141
3142     /*
3143      * Send an ack when requested by the peer, or once every
3144      * rxi_SoftAckRate packets until the last packet has been
3145      * received. Always send a soft ack for the last packet in
3146      * the server's reply. */
3147     if (ackNeeded) {
3148         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3149         np = rxi_SendAck(call, np, seq, serial, flags,
3150                          RX_ACK_REQUESTED, istack);
3151     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3152         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3153         np = rxi_SendAck(call, np, seq, serial, flags,
3154                          RX_ACK_IDLE, istack);
3155     } else if (call->nSoftAcks) {
3156         clock_GetTime(&when);
3157         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3158             clock_Add(&when, &rx_lastAckDelay);
3159         } else {
3160             clock_Add(&when, &rx_softAckDelay);
3161         }
3162         if (!call->delayedAckEvent ||
3163             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3164             rxevent_Cancel(call->delayedAckEvent, call,
3165                            RX_CALL_REFCOUNT_DELAY);
3166             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3167             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3168                                                  call, 0);
3169         }
3170     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3171         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3172     }
3173
3174     return np;
3175 }
3176
3177 #ifdef  ADAPT_WINDOW
3178 static void rxi_ComputeRate();
3179 #endif
3180
3181 /* The real smarts of the whole thing.  */
3182 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3183     register struct rx_call *call;
3184     struct rx_packet *np;
3185     int istack;
3186 {
3187     struct rx_ackPacket *ap;
3188     int nAcks;
3189     register struct rx_packet *tp;
3190     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3191     register struct rx_connection *conn = call->conn;
3192     struct rx_peer *peer = conn->peer;
3193     afs_uint32 first;
3194     afs_uint32 serial;
3195     /* because there are CM's that are bogus, sending weird values for this. */
3196     afs_uint32 skew = 0;
3197     int needRxStart = 0;
3198     int nbytes;
3199     int missing;
3200     int acked;
3201     int nNacked = 0;
3202     int newAckCount = 0;
3203     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3204     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3205
3206     MUTEX_ENTER(&rx_stats_mutex);
3207     rx_stats.ackPacketsRead++;
3208     MUTEX_EXIT(&rx_stats_mutex);
3209     ap = (struct rx_ackPacket *) rx_DataOf(np);
3210     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3211     if (nbytes < 0)
3212       return np;       /* truncated ack packet */
3213
3214     /* depends on ack packet struct */
3215     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3216     first = ntohl(ap->firstPacket);
3217     serial = ntohl(ap->serial);
3218     /* temporarily disabled -- needs to degrade over time 
3219        skew = ntohs(ap->maxSkew); */
3220
3221     /* Ignore ack packets received out of order */
3222     if (first < call->tfirst) {
3223         return np;
3224     }
3225
3226     if (np->header.flags & RX_SLOW_START_OK) {
3227         call->flags |= RX_CALL_SLOW_START_OK;
3228     }
3229     
3230 #ifdef RXDEBUG
3231     if (rx_Log) {
3232       fprintf( rx_Log, 
3233               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3234               ap->reason, ntohl(ap->previousPacket), np->header.seq, serial, 
3235               skew, ntohl(ap->firstPacket));
3236         if (nAcks) {
3237             int offset;
3238             for (offset = 0; offset < nAcks; offset++) 
3239                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3240         }
3241         putc('\n', rx_Log);
3242     }
3243 #endif
3244
3245     /* if a server connection has been re-created, it doesn't remember what
3246         serial # it was up to.  An ack will tell us, since the serial field
3247         contains the largest serial received by the other side */
3248     MUTEX_ENTER(&conn->conn_data_lock);
3249     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3250         conn->serial = serial+1;
3251     }
3252     MUTEX_EXIT(&conn->conn_data_lock);
3253
3254     /* Update the outgoing packet skew value to the latest value of
3255      * the peer's incoming packet skew value.  The ack packet, of
3256      * course, could arrive out of order, but that won't affect things
3257      * much */
3258     MUTEX_ENTER(&peer->peer_lock);
3259     peer->outPacketSkew = skew;
3260
3261     /* Check for packets that no longer need to be transmitted, and
3262      * discard them.  This only applies to packets positively
3263      * acknowledged as having been sent to the peer's upper level.
3264      * All other packets must be retained.  So only packets with
3265      * sequence numbers < ap->firstPacket are candidates. */
3266     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3267         if (tp->header.seq >= first) break;
3268         call->tfirst = tp->header.seq + 1;
3269         if (tp->header.serial == serial) {
3270           /* Use RTT if not delayed by client. */
3271           if (ap->reason != RX_ACK_DELAY)
3272               rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3273 #ifdef ADAPT_WINDOW
3274           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3275 #endif
3276         }
3277         else if (tp->firstSerial == serial) {
3278             /* Use RTT if not delayed by client. */
3279             if (ap->reason != RX_ACK_DELAY)
3280                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3281 #ifdef ADAPT_WINDOW
3282           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3283 #endif
3284         }
3285 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3286     /* XXX Hack. Because we have to release the global rx lock when sending
3287      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3288      * in rxi_Start sending packets out because packets may move to the
3289      * freePacketQueue as result of being here! So we drop these packets until
3290      * we're safely out of the traversing. Really ugly! 
3291      * To make it even uglier, if we're using fine grain locking, we can
3292      * set the ack bits in the packets and have rxi_Start remove the packets
3293      * when it's done transmitting.
3294      */
3295         if (!tp->acked) {
3296             newAckCount++;
3297         }
3298         if (call->flags & RX_CALL_TQ_BUSY) {
3299 #ifdef RX_ENABLE_LOCKS
3300             tp->acked = 1;
3301             call->flags |= RX_CALL_TQ_SOME_ACKED;
3302 #else /* RX_ENABLE_LOCKS */
3303             break;
3304 #endif /* RX_ENABLE_LOCKS */
3305         } else
3306 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3307         {
3308         queue_Remove(tp);
3309         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3310         }
3311     }
3312
3313 #ifdef ADAPT_WINDOW
3314     /* Give rate detector a chance to respond to ping requests */
3315     if (ap->reason == RX_ACK_PING_RESPONSE) {
3316         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3317     }
3318 #endif
3319
3320     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3321    
3322    /* Now go through explicit acks/nacks and record the results in
3323     * the waiting packets.  These are packets that can't be released
3324     * yet, even with a positive acknowledge.  This positive
3325     * acknowledge only means the packet has been received by the
3326     * peer, not that it will be retained long enough to be sent to
3327     * the peer's upper level.  In addition, reset the transmit timers
3328     * of any missing packets (those packets that must be missing
3329     * because this packet was out of sequence) */
3330
3331     call->nSoftAcked = 0;
3332     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3333         /* Update round trip time if the ack was stimulated on receipt
3334          * of this packet */
3335 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3336 #ifdef RX_ENABLE_LOCKS
3337         if (tp->header.seq >= first) {
3338 #endif /* RX_ENABLE_LOCKS */
3339 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3340         if (tp->header.serial == serial) {
3341             /* Use RTT if not delayed by client. */
3342             if (ap->reason != RX_ACK_DELAY)
3343                 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3344 #ifdef ADAPT_WINDOW
3345           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3346 #endif
3347         }
3348         else if ((tp->firstSerial == serial)) {
3349             /* Use RTT if not delayed by client. */
3350             if (ap->reason != RX_ACK_DELAY)
3351                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3352 #ifdef ADAPT_WINDOW
3353           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3354 #endif
3355         }
3356 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3357 #ifdef RX_ENABLE_LOCKS
3358         }
3359 #endif /* RX_ENABLE_LOCKS */
3360 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3361
3362         /* Set the acknowledge flag per packet based on the
3363          * information in the ack packet. An acknowlegded packet can
3364          * be downgraded when the server has discarded a packet it
3365          * soacked previously, or when an ack packet is received
3366          * out of sequence. */
3367         if (tp->header.seq < first) {
3368             /* Implicit ack information */
3369             if (!tp->acked) {
3370                 newAckCount++;
3371             }
3372             tp->acked = 1;
3373         }
3374         else if (tp->header.seq < first + nAcks) {
3375             /* Explicit ack information:  set it in the packet appropriately */
3376             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3377                 if (!tp->acked) {
3378                     newAckCount++;
3379                     tp->acked = 1;
3380                 }
3381                 if (missing) {
3382                     nNacked++;
3383                 } else {
3384                     call->nSoftAcked++;
3385                 }
3386             } else {
3387                 tp->acked = 0;
3388                 missing = 1;
3389             }
3390         }
3391         else {
3392             tp->acked = 0;
3393             missing = 1;
3394         }
3395
3396         /* If packet isn't yet acked, and it has been transmitted at least 
3397          * once, reset retransmit time using latest timeout 
3398          * ie, this should readjust the retransmit timer for all outstanding 
3399          * packets...  So we don't just retransmit when we should know better*/
3400
3401         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3402           tp->retryTime = tp->timeSent;
3403           clock_Add(&tp->retryTime, &peer->timeout);
3404           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3405           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3406         }
3407     }
3408
3409     /* If the window has been extended by this acknowledge packet,
3410      * then wakeup a sender waiting in alloc for window space, or try
3411      * sending packets now, if he's been sitting on packets due to
3412      * lack of window space */
3413     if (call->tnext < (call->tfirst + call->twind))  {
3414 #ifdef  RX_ENABLE_LOCKS
3415         CV_SIGNAL(&call->cv_twind);
3416 #else
3417         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3418             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3419             osi_rxWakeup(&call->twind);
3420         }
3421 #endif
3422         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3423             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3424         }
3425     }
3426
3427     /* if the ack packet has a receivelen field hanging off it,
3428      * update our state */
3429     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3430       afs_uint32 tSize;
3431
3432       /* If the ack packet has a "recommended" size that is less than 
3433        * what I am using now, reduce my size to match */
3434       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3435                     sizeof(afs_int32), &tSize);
3436       tSize = (afs_uint32) ntohl(tSize);
3437       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3438
3439       /* Get the maximum packet size to send to this peer */
3440       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3441                     &tSize);
3442       tSize = (afs_uint32)ntohl(tSize);
3443       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3444       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3445
3446       /* sanity check - peer might have restarted with different params.
3447        * If peer says "send less", dammit, send less...  Peer should never 
3448        * be unable to accept packets of the size that prior AFS versions would
3449        * send without asking.  */
3450       if (peer->maxMTU != tSize) {
3451           peer->maxMTU = tSize;
3452           peer->MTU = MIN(tSize, peer->MTU);
3453           call->MTU = MIN(call->MTU, tSize);
3454           peer->congestSeq++;
3455       }
3456
3457       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3458           /* AFS 3.4a */
3459           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3460                         sizeof(afs_int32), &tSize);
3461           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3462           if (tSize < call->twind) {       /* smaller than our send */
3463               call->twind = tSize;         /* window, we must send less... */
3464               call->ssthresh = MIN(call->twind, call->ssthresh);
3465           }
3466
3467           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3468            * network MTU confused with the loopback MTU. Calculate the
3469            * maximum MTU here for use in the slow start code below.
3470            */
3471           maxMTU = peer->maxMTU;
3472           /* Did peer restart with older RX version? */
3473           if (peer->maxDgramPackets > 1) {
3474               peer->maxDgramPackets = 1;
3475           }
3476       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3477           /* AFS 3.5 */
3478           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3479                         sizeof(afs_int32), &tSize);
3480           tSize = (afs_uint32) ntohl(tSize);
3481           /*
3482            * As of AFS 3.5 we set the send window to match the receive window. 
3483            */
3484           if (tSize < call->twind) {
3485               call->twind = tSize;
3486               call->ssthresh = MIN(call->twind, call->ssthresh);
3487           } else if (tSize > call->twind) {
3488               call->twind = tSize;
3489           }
3490
3491           /*
3492            * As of AFS 3.5, a jumbogram is more than one fixed size
3493            * packet transmitted in a single UDP datagram. If the remote
3494            * MTU is smaller than our local MTU then never send a datagram
3495            * larger than the natural MTU.
3496            */
3497           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3498                         sizeof(afs_int32), &tSize);
3499           maxDgramPackets = (afs_uint32) ntohl(tSize);
3500           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3501           maxDgramPackets = MIN(maxDgramPackets,
3502                                 (int)(peer->ifDgramPackets));
3503           maxDgramPackets = MIN(maxDgramPackets, tSize);
3504           if (maxDgramPackets > 1) {
3505             peer->maxDgramPackets = maxDgramPackets;
3506             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3507           } else {
3508             peer->maxDgramPackets = 1;
3509             call->MTU = peer->natMTU;
3510           }
3511        } else if (peer->maxDgramPackets > 1) {
3512           /* Restarted with lower version of RX */
3513           peer->maxDgramPackets = 1;
3514        }
3515     } else if (peer->maxDgramPackets > 1 ||
3516                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3517         /* Restarted with lower version of RX */
3518         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3519         peer->natMTU = OLD_MAX_PACKET_SIZE;
3520         peer->MTU = OLD_MAX_PACKET_SIZE;
3521         peer->maxDgramPackets = 1;
3522         peer->nDgramPackets = 1;
3523         peer->congestSeq++;
3524         call->MTU = OLD_MAX_PACKET_SIZE;
3525     }
3526
3527     if (nNacked) {
3528         /*
3529          * Calculate how many datagrams were successfully received after
3530          * the first missing packet and adjust the negative ack counter
3531          * accordingly.
3532          */
3533         call->nAcks = 0;
3534         call->nNacks++;
3535         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3536         if (call->nNacks < nNacked) {
3537             call->nNacks = nNacked;
3538         }
3539     } else {
3540         if (newAckCount) {
3541             call->nAcks++;
3542         }
3543         call->nNacks = 0;
3544     }
3545
3546     if (call->flags & RX_CALL_FAST_RECOVER) {
3547         if (nNacked) {
3548             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3549         } else {
3550             call->flags &= ~RX_CALL_FAST_RECOVER;
3551             call->cwind = call->nextCwind;
3552             call->nextCwind = 0;
3553             call->nAcks = 0;
3554         }
3555         call->nCwindAcks = 0;
3556     }
3557     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3558         /* Three negative acks in a row trigger congestion recovery */
3559 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3560         MUTEX_EXIT(&peer->peer_lock);
3561         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3562                 /* someone else is waiting to start recovery */
3563                 return np;
3564         }
3565         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3566         while (call->flags & RX_CALL_TQ_BUSY) {
3567             call->flags |= RX_CALL_TQ_WAIT;
3568 #ifdef RX_ENABLE_LOCKS
3569             CV_WAIT(&call->cv_tq, &call->lock);
3570 #else /* RX_ENABLE_LOCKS */
3571             osi_rxSleep(&call->tq);
3572 #endif /* RX_ENABLE_LOCKS */
3573         }
3574         MUTEX_ENTER(&peer->peer_lock);
3575 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3576         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3577         call->flags |= RX_CALL_FAST_RECOVER;
3578         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3579         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3580                           rx_maxSendWindow);
3581         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3582         call->nextCwind = call->ssthresh;
3583         call->nAcks = 0;
3584         call->nNacks = 0;
3585         peer->MTU = call->MTU;
3586         peer->cwind = call->nextCwind;
3587         peer->nDgramPackets = call->nDgramPackets;
3588         peer->congestSeq++;
3589         call->congestSeq = peer->congestSeq;
3590         /* Reset the resend times on the packets that were nacked
3591          * so we will retransmit as soon as the window permits*/
3592         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3593             if (acked) {
3594                 if (!tp->acked) {
3595                     clock_Zero(&tp->retryTime);
3596                 }
3597             } else if (tp->acked) {
3598                 acked = 1;
3599             }
3600         }
3601     } else {
3602         /* If cwind is smaller than ssthresh, then increase
3603          * the window one packet for each ack we receive (exponential
3604          * growth).
3605          * If cwind is greater than or equal to ssthresh then increase
3606          * the congestion window by one packet for each cwind acks we
3607          * receive (linear growth).  */
3608         if (call->cwind < call->ssthresh) {
3609             call->cwind = MIN((int)call->ssthresh,
3610                               (int)(call->cwind + newAckCount));
3611             call->nCwindAcks = 0;
3612         } else {
3613             call->nCwindAcks += newAckCount;
3614             if (call->nCwindAcks >= call->cwind) {
3615                 call->nCwindAcks = 0;
3616                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3617             }
3618         }
3619         /*
3620          * If we have received several acknowledgements in a row then
3621          * it is time to increase the size of our datagrams
3622          */
3623         if ((int)call->nAcks > rx_nDgramThreshold) {
3624             if (peer->maxDgramPackets > 1) {
3625                 if (call->nDgramPackets < peer->maxDgramPackets) {
3626                     call->nDgramPackets++;
3627                 }
3628                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3629             } else if (call->MTU < peer->maxMTU) {
3630                 call->MTU += peer->natMTU;
3631                 call->MTU = MIN(call->MTU, peer->maxMTU);
3632             }
3633             call->nAcks = 0;
3634         }
3635     }
3636
3637     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3638
3639     /* Servers need to hold the call until all response packets have
3640      * been acknowledged. Soft acks are good enough since clients
3641      * are not allowed to clear their receive queues. */
3642     if (call->state == RX_STATE_HOLD &&
3643         call->tfirst + call->nSoftAcked >= call->tnext) {
3644         call->state = RX_STATE_DALLY;
3645         rxi_ClearTransmitQueue(call, 0);
3646     } else if (!queue_IsEmpty(&call->tq)) {
3647         rxi_Start(0, call, istack);
3648     }
3649     return np;
3650 }
3651
3652 /* Received a response to a challenge packet */
3653 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3654     register struct rx_connection *conn;
3655     register struct rx_packet *np;
3656     int istack;
3657 {
3658     int error;
3659
3660     /* Ignore the packet if we're the client */
3661     if (conn->type == RX_CLIENT_CONNECTION) return np;
3662
3663     /* If already authenticated, ignore the packet (it's probably a retry) */
3664     if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3665         return np;
3666
3667     /* Otherwise, have the security object evaluate the response packet */
3668     error = RXS_CheckResponse(conn->securityObject, conn, np);
3669     if (error) {
3670         /* If the response is invalid, reset the connection, sending
3671          * an abort to the peer */
3672 #ifndef KERNEL
3673         rxi_Delay(1);
3674 #endif
3675         rxi_ConnectionError(conn, error);
3676         MUTEX_ENTER(&conn->conn_data_lock);
3677         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3678         MUTEX_EXIT(&conn->conn_data_lock);
3679         return np;
3680     }
3681     else {
3682         /* If the response is valid, any calls waiting to attach
3683          * servers can now do so */
3684         int i;
3685         for (i=0; i<RX_MAXCALLS; i++) {
3686             struct rx_call *call = conn->call[i];
3687             if (call) {
3688                 MUTEX_ENTER(&call->lock);
3689                  if (call->state == RX_STATE_PRECALL)
3690                      rxi_AttachServerProc(call, -1, NULL, NULL);
3691                 MUTEX_EXIT(&call->lock);
3692             }
3693         }
3694     }
3695     return np;
3696 }
3697
3698 /* A client has received an authentication challenge: the security
3699  * object is asked to cough up a respectable response packet to send
3700  * back to the server.  The server is responsible for retrying the
3701  * challenge if it fails to get a response. */
3702
3703 struct rx_packet *
3704 rxi_ReceiveChallengePacket(conn, np, istack)
3705     register struct rx_connection *conn;
3706     register struct rx_packet *np;
3707     int istack;
3708 {
3709     int error;
3710
3711     /* Ignore the challenge if we're the server */
3712     if (conn->type == RX_SERVER_CONNECTION) return np;
3713
3714     /* Ignore the challenge if the connection is otherwise idle; someone's
3715      * trying to use us as an oracle. */
3716     if (!rxi_HasActiveCalls(conn)) return np;
3717
3718     /* Send the security object the challenge packet.  It is expected to fill
3719      * in the response. */
3720     error = RXS_GetResponse(conn->securityObject, conn, np);
3721
3722     /* If the security object is unable to return a valid response, reset the
3723      * connection and send an abort to the peer.  Otherwise send the response
3724      * packet to the peer connection. */
3725     if (error) {
3726         rxi_ConnectionError(conn, error);
3727         MUTEX_ENTER(&conn->conn_data_lock);
3728         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3729         MUTEX_EXIT(&conn->conn_data_lock);
3730     }
3731     else {
3732         np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3733                              RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3734     }
3735     return np;
3736 }
3737
3738
3739 /* Find an available server process to service the current request in
3740  * the given call structure.  If one isn't available, queue up this
3741  * call so it eventually gets one */
3742 void 
3743 rxi_AttachServerProc(call, socket, tnop, newcallp)
3744 register struct rx_call *call;
3745 register osi_socket socket;
3746 register int *tnop;
3747 register struct rx_call **newcallp;
3748 {
3749     register struct rx_serverQueueEntry *sq;
3750     register struct rx_service *service = call->conn->service;
3751 #ifdef RX_ENABLE_LOCKS
3752     register int haveQuota = 0;
3753 #endif /* RX_ENABLE_LOCKS */
3754     /* May already be attached */
3755     if (call->state == RX_STATE_ACTIVE) return;
3756
3757     MUTEX_ENTER(&rx_serverPool_lock);
3758 #ifdef RX_ENABLE_LOCKS
3759     while(rxi_ServerThreadSelectingCall) {
3760         MUTEX_EXIT(&call->lock);
3761         CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3762         MUTEX_EXIT(&rx_serverPool_lock);
3763         MUTEX_ENTER(&call->lock);
3764         MUTEX_ENTER(&rx_serverPool_lock);
3765         /* Call may have been attached */
3766         if (call->state == RX_STATE_ACTIVE) return;
3767     }
3768
3769     haveQuota = QuotaOK(service);
3770     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3771         /* If there are no processes available to service this call,
3772          * put the call on the incoming call queue (unless it's
3773          * already on the queue).
3774          */
3775         if (haveQuota)
3776             ReturnToServerPool(service);
3777         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3778             call->flags |= RX_CALL_WAIT_PROC;
3779             MUTEX_ENTER(&rx_stats_mutex);
3780             rx_nWaiting++;
3781             MUTEX_EXIT(&rx_stats_mutex);
3782             rxi_calltrace(RX_CALL_ARRIVAL, call);
3783             SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3784             queue_Append(&rx_incomingCallQueue, call);
3785         }
3786     }
3787 #else /* RX_ENABLE_LOCKS */
3788     if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3789         /* If there are no processes available to service this call,
3790          * put the call on the incoming call queue (unless it's
3791          * already on the queue).
3792          */
3793         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3794             call->flags |= RX_CALL_WAIT_PROC;
3795             rx_nWaiting++;
3796             rxi_calltrace(RX_CALL_ARRIVAL, call);
3797             queue_Append(&rx_incomingCallQueue, call);
3798         }
3799     }
3800 #endif /* RX_ENABLE_LOCKS */
3801     else {
3802         sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3803
3804         /* If hot threads are enabled, and both newcallp and sq->socketp
3805          * are non-null, then this thread will process the call, and the
3806          * idle server thread will start listening on this threads socket.
3807          */
3808         queue_Remove(sq);
3809         if (rx_enable_hot_thread && newcallp && sq->socketp) {
3810             *newcallp = call;
3811             *tnop = sq->tno;
3812             *sq->socketp = socket;
3813             clock_GetTime(&call->startTime);
3814             CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3815         } else {
3816             sq->newcall = call;
3817         }
3818         if (call->flags & RX_CALL_WAIT_PROC) {
3819             /* Conservative:  I don't think this should happen */
3820             call->flags &= ~RX_CALL_WAIT_PROC;
3821             MUTEX_ENTER(&rx_stats_mutex);
3822             rx_nWaiting--;
3823             MUTEX_EXIT(&rx_stats_mutex);
3824             queue_Remove(call);
3825         }
3826         call->state = RX_STATE_ACTIVE;
3827         call->mode = RX_MODE_RECEIVING;
3828         if (call->flags & RX_CALL_CLEARED) {
3829             /* send an ack now to start the packet flow up again */
3830             call->flags &= ~RX_CALL_CLEARED;
3831             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3832         }
3833 #ifdef  RX_ENABLE_LOCKS
3834         CV_SIGNAL(&sq->cv);
3835 #else
3836         service->nRequestsRunning++;
3837         if (service->nRequestsRunning <= service->minProcs)
3838           rxi_minDeficit--;
3839         rxi_availProcs--;
3840         osi_rxWakeup(sq);
3841 #endif
3842     }
3843     MUTEX_EXIT(&rx_serverPool_lock);
3844 }
3845
3846 /* Delay the sending of an acknowledge event for a short while, while
3847  * a new call is being prepared (in the case of a client) or a reply
3848  * is being prepared (in the case of a server).  Rather than sending
3849  * an ack packet, an ACKALL packet is sent. */
3850 void rxi_AckAll(event, call, dummy)
3851 struct rxevent *event;
3852 register struct rx_call *call;
3853 char *dummy;
3854 {
3855 #ifdef RX_ENABLE_LOCKS
3856     if (event) {
3857         MUTEX_ENTER(&call->lock);
3858         call->delayedAckEvent = (struct rxevent *) 0;
3859         CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3860     }
3861     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3862                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3863     if (event)
3864         MUTEX_EXIT(&call->lock);
3865 #else /* RX_ENABLE_LOCKS */
3866     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3867     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3868                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3869 #endif /* RX_ENABLE_LOCKS */
3870 }
3871
3872 void rxi_SendDelayedAck(event, call, dummy)     
3873 struct rxevent *event;
3874 register struct rx_call *call;
3875 char *dummy;
3876 {
3877 #ifdef RX_ENABLE_LOCKS
3878     if (event) {
3879         MUTEX_ENTER(&call->lock);
3880         if (event == call->delayedAckEvent)
3881             call->delayedAckEvent = (struct rxevent *) 0;
3882         CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3883     }
3884     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3885     if (event)
3886         MUTEX_EXIT(&call->lock);
3887 #else /* RX_ENABLE_LOCKS */
3888     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3889     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3890 #endif /* RX_ENABLE_LOCKS */
3891 }
3892
3893
3894 #ifdef RX_ENABLE_LOCKS
3895 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3896  * clearing them out.
3897  */
3898 static void rxi_SetAcksInTransmitQueue(call)
3899       register struct rx_call *call;
3900 {
3901     register struct rx_packet *p, *tp;
3902     int someAcked = 0;
3903
3904      for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3905          if (!p) 
3906              break;
3907          p->acked = 1;
3908          someAcked = 1;
3909      }
3910      if (someAcked) {
3911          call->flags |= RX_CALL_TQ_CLEARME;
3912          call->flags |= RX_CALL_TQ_SOME_ACKED;
3913      }
3914
3915      rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3916      rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3917      call->tfirst = call->tnext;
3918      call->nSoftAcked = 0;
3919
3920      if (call->flags & RX_CALL_FAST_RECOVER) {
3921         call->flags &= ~RX_CALL_FAST_RECOVER;
3922         call->cwind = call->nextCwind;
3923         call->nextCwind = 0;
3924      }
3925
3926      CV_SIGNAL(&call->cv_twind);
3927 }
3928 #endif /* RX_ENABLE_LOCKS */
3929
3930 /* Clear out the transmit queue for the current call (all packets have
3931  * been received by peer) */
3932 void rxi_ClearTransmitQueue(call, force)
3933     register struct rx_call *call;
3934     register int force;
3935 {
3936     register struct rx_packet *p, *tp;
3937
3938 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3939     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3940         int someAcked = 0;
3941         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3942           if (!p) 
3943              break;
3944           p->acked = 1;
3945           someAcked = 1;
3946         }
3947         if (someAcked) {
3948             call->flags |= RX_CALL_TQ_CLEARME;
3949             call->flags |= RX_CALL_TQ_SOME_ACKED;
3950         }
3951     } else {
3952 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3953         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3954             if (!p) 
3955                 break;
3956             queue_Remove(p);
3957             rxi_FreePacket(p);
3958         }
3959 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3960         call->flags &= ~RX_CALL_TQ_CLEARME;
3961     }
3962 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3963
3964     rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3965     rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3966     call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3967     call->nSoftAcked = 0;
3968
3969     if (call->flags & RX_CALL_FAST_RECOVER) {
3970         call->flags &= ~RX_CALL_FAST_RECOVER;
3971         call->cwind = call->nextCwind;
3972     }
3973
3974 #ifdef  RX_ENABLE_LOCKS
3975     CV_SIGNAL(&call->cv_twind);
3976 #else
3977     osi_rxWakeup(&call->twind);
3978 #endif
3979 }
3980
3981 void rxi_ClearReceiveQueue(call)
3982     register struct rx_call *call;
3983 {
3984     register struct rx_packet *p, *tp;
3985     if (queue_IsNotEmpty(&call->rq)) {
3986       for (queue_Scan(&call->rq, p, tp, rx_packet)) {
3987         if (!p)
3988           break;
3989         queue_Remove(p);
3990         rxi_FreePacket(p);
3991         rx_packetReclaims++;
3992       }
3993       call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
3994     }
3995     if (call->state == RX_STATE_PRECALL) {
3996         call->flags |= RX_CALL_CLEARED;
3997     }
3998 }
3999
4000 /* Send an abort packet for the specified call */
4001 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4002     register struct rx_call *call;
4003     struct rx_packet *packet;
4004     int istack;
4005     int force;
4006 {
4007   afs_int32 error;
4008   struct clock when;
4009
4010   if (!call->error)
4011     return packet;
4012
4013   /* Clients should never delay abort messages */
4014   if (rx_IsClientConn(call->conn))
4015     force = 1;
4016
4017   if (call->abortCode != call->error) {
4018     call->abortCode = call->error;
4019     call->abortCount = 0;
4020   }
4021
4022   if (force || rxi_callAbortThreshhold == 0 ||
4023       call->abortCount < rxi_callAbortThreshhold) {
4024     if (call->delayedAbortEvent) {
4025         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4026     }
4027     error = htonl(call->error);
4028     call->abortCount++;
4029     packet = rxi_SendSpecial(call, call->conn, packet,
4030                              RX_PACKET_TYPE_ABORT, (char *)&error,
4031                              sizeof(error), istack);
4032   } else if (!call->delayedAbortEvent) {
4033     clock_GetTime(&when);
4034     clock_Addmsec(&when, rxi_callAbortDelay);
4035     CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4036     call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4037                                            call, 0);
4038   }
4039   return packet;
4040 }
4041
4042 /* Send an abort packet for the specified connection.  Packet is an
4043  * optional pointer to a packet that can be used to send the abort.
4044  * Once the number of abort messages reaches the threshhold, an
4045  * event is scheduled to send the abort. Setting the force flag
4046  * overrides sending delayed abort messages.
4047  *
4048  * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4049  *       to send the abort packet.
4050  */
4051 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4052     register struct rx_connection *conn;
4053     struct rx_packet *packet;
4054     int istack;
4055     int force;
4056 {
4057   afs_int32 error;
4058   struct clock when;
4059
4060   if (!conn->error)
4061     return packet;
4062
4063   /* Clients should never delay abort messages */
4064   if (rx_IsClientConn(conn))
4065     force = 1;
4066
4067   if (force || rxi_connAbortThreshhold == 0 ||
4068       conn->abortCount < rxi_connAbortThreshhold) {
4069     if (conn->delayedAbortEvent) {
4070         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4071     }
4072     error = htonl(conn->error);
4073     conn->abortCount++;
4074     MUTEX_EXIT(&conn->conn_data_lock);
4075     packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4076                              RX_PACKET_TYPE_ABORT, (char *)&error,
4077                              sizeof(error), istack);
4078     MUTEX_ENTER(&conn->conn_data_lock);
4079   } else if (!conn->delayedAbortEvent) {
4080     clock_GetTime(&when);
4081     clock_Addmsec(&when, rxi_connAbortDelay);
4082     conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4083                                            conn, 0);
4084   }
4085   return packet;
4086 }
4087
4088 /* Associate an error all of the calls owned by a connection.  Called
4089  * with error non-zero.  This is only for really fatal things, like
4090  * bad authentication responses.  The connection itself is set in
4091  * error at this point, so that future packets received will be
4092  * rejected. */
4093 void rxi_ConnectionError(conn, error)
4094     register struct rx_connection *conn;
4095     register afs_int32 error;
4096 {
4097     if (error) {
4098         register int i;
4099         if (conn->challengeEvent)
4100             rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4101         for (i=0; i<RX_MAXCALLS; i++) {
4102             struct rx_call *call = conn->call[i];
4103             if (call) {
4104                 MUTEX_ENTER(&call->lock);
4105                 rxi_CallError(call, error);
4106                 MUTEX_EXIT(&call->lock);
4107             }
4108         }
4109         conn->error = error;
4110         MUTEX_ENTER(&rx_stats_mutex);
4111         rx_stats.fatalErrors++;
4112         MUTEX_EXIT(&rx_stats_mutex);
4113     }
4114 }
4115
4116 void rxi_CallError(call, error)
4117     register struct rx_call *call;
4118     afs_int32 error;
4119 {
4120     if (call->error) error = call->error;
4121 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4122     if (!(call->flags & RX_CALL_TQ_BUSY)) {
4123         rxi_ResetCall(call, 0);
4124     }
4125 #else
4126         rxi_ResetCall(call, 0);
4127 #endif
4128     call->error = error;
4129     call->mode = RX_MODE_ERROR;
4130 }
4131
4132 /* Reset various fields in a call structure, and wakeup waiting
4133  * processes.  Some fields aren't changed: state & mode are not
4134  * touched (these must be set by the caller), and bufptr, nLeft, and
4135  * nFree are not reset, since these fields are manipulated by
4136  * unprotected macros, and may only be reset by non-interrupting code.
4137  */
4138 #ifdef ADAPT_WINDOW
4139 /* this code requires that call->conn be set properly as a pre-condition. */
4140 #endif /* ADAPT_WINDOW */
4141
4142 void rxi_ResetCall(call, newcall)
4143     register struct rx_call *call;
4144     register int newcall;
4145 {
4146     register int flags;
4147     register struct rx_peer *peer;
4148     struct rx_packet *packet;
4149
4150     /* Notify anyone who is waiting for asynchronous packet arrival */
4151     if (call->arrivalProc) {
4152         (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4153         call->arrivalProc = (VOID (*)()) 0;
4154     }
4155
4156     if (call->delayedAbortEvent) {
4157         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4158         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4159         if (packet) {
4160             rxi_SendCallAbort(call, packet, 0, 1);
4161             rxi_FreePacket(packet);
4162         }
4163     }
4164
4165     /*
4166      * Update the peer with the congestion information in this call
4167      * so other calls on this connection can pick up where this call
4168      * left off. If the congestion sequence numbers don't match then
4169      * another call experienced a retransmission.
4170      */
4171     peer = call->conn->peer;
4172     MUTEX_ENTER(&peer->peer_lock);
4173     if (!newcall) {
4174         if (call->congestSeq == peer->congestSeq) {
4175             peer->cwind = MAX(peer->cwind, call->cwind);
4176             peer->MTU = MAX(peer->MTU, call->MTU);
4177             peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4178         }
4179     } else {
4180         call->abortCode = 0;
4181         call->abortCount = 0;
4182     }
4183     if (peer->maxDgramPackets > 1) {
4184         call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4185     } else {
4186         call->MTU = peer->MTU;
4187     }
4188     call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4189     call->ssthresh = rx_maxSendWindow;
4190     call->nDgramPackets = peer->nDgramPackets;
4191     call->congestSeq = peer->congestSeq;
4192     MUTEX_EXIT(&peer->peer_lock);
4193
4194     flags = call->flags;
4195     rxi_ClearReceiveQueue(call);
4196 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4197     if (call->flags & RX_CALL_TQ_BUSY) {
4198         call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4199         call->flags |= (flags & RX_CALL_TQ_WAIT);
4200     } else
4201 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4202     {
4203         rxi_ClearTransmitQueue(call, 0);
4204         queue_Init(&call->tq);
4205         call->flags = 0;
4206     }
4207     queue_Init(&call->rq);
4208     call->error = 0;
4209     call->rwind = rx_initReceiveWindow; 
4210     call->twind = rx_initSendWindow; 
4211     call->nSoftAcked = 0;
4212     call->nextCwind = 0;
4213     call->nAcks = 0;
4214     call->nNacks = 0;
4215     call->nCwindAcks = 0;
4216     call->nSoftAcks = 0;
4217     call->nHardAcks = 0;
4218
4219     call->tfirst = call->rnext = call->tnext = 1;
4220     call->rprev = 0;
4221     call->lastAcked = 0;
4222     call->localStatus = call->remoteStatus = 0;
4223
4224     if (flags & RX_CALL_READER_WAIT)  {
4225 #ifdef  RX_ENABLE_LOCKS
4226         CV_BROADCAST(&call->cv_rq);
4227 #else
4228         osi_rxWakeup(&call->rq);
4229 #endif
4230     }
4231     if (flags & RX_CALL_WAIT_PACKETS) {
4232         MUTEX_ENTER(&rx_freePktQ_lock);
4233         rxi_PacketsUnWait();            /* XXX */
4234         MUTEX_EXIT(&rx_freePktQ_lock);
4235     }
4236
4237 #ifdef  RX_ENABLE_LOCKS
4238     CV_SIGNAL(&call->cv_twind);
4239 #else
4240     if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4241         osi_rxWakeup(&call->twind);
4242 #endif
4243
4244 #ifdef RX_ENABLE_LOCKS
4245     /* The following ensures that we don't mess with any queue while some
4246      * other thread might also be doing so. The call_queue_lock field is
4247      * is only modified under the call lock. If the call is in the process
4248      * of being removed from a queue, the call is not locked until the
4249      * the queue lock is dropped and only then is the call_queue_lock field
4250      * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4251      * Note that any other routine which removes a call from a queue has to
4252      * obtain the queue lock before examing the queue and removing the call.
4253      */
4254     if (call->call_queue_lock) {
4255         MUTEX_ENTER(call->call_queue_lock);
4256         if (queue_IsOnQueue(call)) {
4257             queue_Remove(call);
4258             if (flags & RX_CALL_WAIT_PROC) {
4259                 MUTEX_ENTER(&rx_stats_mutex);
4260                 rx_nWaiting--;
4261                 MUTEX_EXIT(&rx_stats_mutex);
4262             }
4263         }
4264         MUTEX_EXIT(call->call_queue_lock);
4265         CLEAR_CALL_QUEUE_LOCK(call);
4266     }
4267 #else /* RX_ENABLE_LOCKS */
4268     if (queue_IsOnQueue(call)) {
4269       queue_Remove(call);
4270       if (flags & RX_CALL_WAIT_PROC)
4271         rx_nWaiting--;
4272     }
4273 #endif /* RX_ENABLE_LOCKS */
4274
4275     rxi_KeepAliveOff(call);
4276     rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4277 }
4278
4279 /* Send an acknowledge for the indicated packet (seq,serial) of the
4280  * indicated call, for the indicated reason (reason).  This
4281  * acknowledge will specifically acknowledge receiving the packet, and
4282  * will also specify which other packets for this call have been
4283  * received.  This routine returns the packet that was used to the
4284  * caller.  The caller is responsible for freeing it or re-using it.
4285  * This acknowledgement also returns the highest sequence number
4286  * actually read out by the higher level to the sender; the sender
4287  * promises to keep around packets that have not been read by the
4288  * higher level yet (unless, of course, the sender decides to abort
4289  * the call altogether).  Any of p, seq, serial, pflags, or reason may
4290  * be set to zero without ill effect.  That is, if they are zero, they
4291  * will not convey any information.  
4292  * NOW there is a trailer field, after the ack where it will safely be
4293  * ignored by mundanes, which indicates the maximum size packet this 
4294  * host can swallow.  */  
4295 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4296     register struct rx_call *call;
4297     register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4298     int seq;                    /* Sequence number of the packet we are acking */
4299     int serial;                 /* Serial number of the packet */
4300     int pflags;                 /* Flags field from packet header */
4301     int reason;                 /* Reason an acknowledge was prompted */
4302     int istack;
4303 {
4304     struct rx_ackPacket *ap;
4305     register struct rx_packet *rqp;
4306     register struct rx_packet *nxp;  /* For queue_Scan */
4307     register struct rx_packet *p;
4308     u_char offset;
4309     afs_int32 templ;
4310
4311     /*
4312      * Open the receive window once a thread starts reading packets
4313      */
4314     if (call->rnext > 1) {
4315         call->rwind = rx_maxReceiveWindow;
4316     }
4317
4318     call->nHardAcks = 0;
4319     call->nSoftAcks = 0;
4320     if (call->rnext > call->lastAcked)
4321       call->lastAcked = call->rnext;
4322     p = optionalPacket;
4323
4324     if (p) {
4325       rx_computelen(p, p->length);  /* reset length, you never know */
4326     }                               /* where that's been...         */
4327     else
4328       if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4329           /* We won't send the ack, but don't panic. */
4330           return optionalPacket;
4331       }
4332
4333     templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4334     if (templ > 0) {
4335       if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4336           if (!optionalPacket) rxi_FreePacket(p);
4337           return optionalPacket;
4338       }
4339       templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32); 
4340       if (rx_Contiguous(p)<templ) {
4341           if (!optionalPacket) rxi_FreePacket(p);
4342           return optionalPacket;
4343       }
4344     }    /* MTUXXX failing to send an ack is very serious.  We should */
4345          /* try as hard as possible to send even a partial ack; it's */
4346          /* better than nothing. */
4347
4348     ap = (struct rx_ackPacket *) rx_DataOf(p);
4349     ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4350     ap->reason = reason;
4351
4352     /* The skew computation used to be bogus, I think it's better now. */
4353     /* We should start paying attention to skew.    XXX  */
4354     ap->serial = htonl(call->conn->maxSerial);
4355     ap->maxSkew = 0;    /* used to be peer->inPacketSkew */
4356
4357     ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4358     ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4359
4360     /* No fear of running out of ack packet here because there can only be at most
4361      * one window full of unacknowledged packets.  The window size must be constrained 
4362      * to be less than the maximum ack size, of course.  Also, an ack should always
4363      * fit into a single packet -- it should not ever be fragmented.  */
4364     for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4365         if (!rqp || !call->rq.next 
4366         &nb