rx-warnings-and-prototyping-20010623
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #ifdef  KERNEL
13 #include "../afs/param.h"
14 #include <afsconfig.h>
15 #include "../afs/sysincludes.h"
16 #include "../afs/afsincludes.h"
17 #ifndef UKERNEL
18 #include "../h/types.h"
19 #include "../h/time.h"
20 #include "../h/stat.h"
21 #ifdef  AFS_OSF_ENV
22 #include <net/net_globals.h>
23 #endif  /* AFS_OSF_ENV */
24 #ifdef AFS_LINUX20_ENV
25 #include "../h/socket.h"
26 #endif
27 #include "../netinet/in.h"
28 #include "../afs/afs_args.h"
29 #include "../afs/afs_osi.h"
30 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
31 #include "../h/systm.h"
32 #endif
33 #ifdef RXDEBUG
34 #undef RXDEBUG      /* turn off debugging */
35 #endif /* RXDEBUG */
36 #if defined(AFS_SGI_ENV)
37 #include "../sys/debug.h"
38 #endif
39 #include "../afsint/afsint.h"
40 #ifdef  AFS_ALPHA_ENV
41 #undef kmem_alloc
42 #undef kmem_free
43 #undef mem_alloc
44 #undef mem_free
45 #undef register
46 #endif  /* AFS_ALPHA_ENV */
47 #else /* !UKERNEL */
48 #include "../afs/sysincludes.h"
49 #include "../afs/afsincludes.h"
50 #endif /* !UKERNEL */
51 #include "../afs/lock.h"
52 #include "../rx/rx_kmutex.h"
53 #include "../rx/rx_kernel.h"
54 #include "../rx/rx_clock.h"
55 #include "../rx/rx_queue.h"
56 #include "../rx/rx.h"
57 #include "../rx/rx_globals.h"
58 #include "../rx/rx_trace.h"
59 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
60 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
61 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
62 #include "../afsint/afsint.h"
63 extern afs_int32 afs_termState;
64 #ifdef AFS_AIX41_ENV
65 #include "sys/lockl.h"
66 #include "sys/lock_def.h"
67 #endif /* AFS_AIX41_ENV */
68 # include "../afsint/rxgen_consts.h"
69 #else /* KERNEL */
70 # include <afs/param.h>
71 # include <afsconfig.h>
72 # include <sys/types.h>
73 # include <errno.h>
74 #ifdef AFS_NT40_ENV
75 # include <stdlib.h>
76 # include <fcntl.h>
77 # include <afsutil.h>
78 #else
79 # include <sys/socket.h>
80 # include <sys/file.h>
81 # include <netdb.h>
82 # include <sys/stat.h>
83 # include <netinet/in.h>
84 # include <sys/time.h>
85 #endif
86 #ifdef HAVE_STRING_H
87 #include <string.h>
88 #endif
89 #ifdef HAVE_STRINGS_H
90 #include <strings.h>
91 #endif
92 # include "rx.h"
93 # include "rx_user.h"
94 # include "rx_clock.h"
95 # include "rx_queue.h"
96 # include "rx_globals.h"
97 # include "rx_trace.h"
98 # include "rx_internal.h"
99 # include <afs/rxgen_consts.h>
100 #endif /* KERNEL */
101
102 int (*registerProgram)() = 0;
103 int (*swapNameProgram)() = 0;
104
105 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
106 struct rx_tq_debug {
107     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
108     afs_int32 rxi_start_in_error;
109 } rx_tq_debug;
110 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
111
112 /*
113  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
114  * currently allocated within rx.  This number is used to allocate the
115  * memory required to return the statistics when queried.
116  */
117
118 static unsigned int rxi_rpc_peer_stat_cnt;
119
120 /*
121  * rxi_rpc_process_stat_cnt counts the total number of local process stat
122  * structures currently allocated within rx.  The number is used to allocate
123  * the memory required to return the statistics when queried.
124  */
125
126 static unsigned int rxi_rpc_process_stat_cnt;
127
128 #if !defined(offsetof)
129 #include <stddef.h>     /* for definition of offsetof() */
130 #endif
131
132 #ifdef AFS_PTHREAD_ENV
133 #include <assert.h>
134
135 /*
136  * Use procedural initialization of mutexes/condition variables
137  * to ease NT porting
138  */
139
140 extern pthread_mutex_t rxkad_stats_mutex;
141 extern pthread_mutex_t des_init_mutex;
142 extern pthread_mutex_t des_random_mutex;
143 extern pthread_mutex_t rx_clock_mutex;
144 extern pthread_mutex_t rxi_connCacheMutex;
145 extern pthread_mutex_t rx_event_mutex;
146 extern pthread_mutex_t osi_malloc_mutex;
147 extern pthread_mutex_t event_handler_mutex;
148 extern pthread_mutex_t listener_mutex;
149 extern pthread_mutex_t rx_if_init_mutex;
150 extern pthread_mutex_t rx_if_mutex;
151 extern pthread_mutex_t rxkad_client_uid_mutex;
152 extern pthread_mutex_t rxkad_random_mutex;
153
154 extern pthread_cond_t rx_event_handler_cond;
155 extern pthread_cond_t rx_listener_cond;
156
157 static pthread_mutex_t epoch_mutex;
158 static pthread_mutex_t rx_init_mutex;
159 static pthread_mutex_t rx_debug_mutex;
160
161 static void rxi_InitPthread(void) {
162     assert(pthread_mutex_init(&rx_clock_mutex,
163                               (const pthread_mutexattr_t*)0)==0);
164     assert(pthread_mutex_init(&rxi_connCacheMutex,
165                               (const pthread_mutexattr_t*)0)==0);
166     assert(pthread_mutex_init(&rx_init_mutex,
167                               (const pthread_mutexattr_t*)0)==0);
168     assert(pthread_mutex_init(&epoch_mutex,
169                               (const pthread_mutexattr_t*)0)==0);
170     assert(pthread_mutex_init(&rx_event_mutex,
171                               (const pthread_mutexattr_t*)0)==0);
172     assert(pthread_mutex_init(&des_init_mutex,
173                               (const pthread_mutexattr_t*)0)==0);
174     assert(pthread_mutex_init(&des_random_mutex,
175                               (const pthread_mutexattr_t*)0)==0);
176     assert(pthread_mutex_init(&osi_malloc_mutex,
177                               (const pthread_mutexattr_t*)0)==0);
178     assert(pthread_mutex_init(&event_handler_mutex,
179                               (const pthread_mutexattr_t*)0)==0);
180     assert(pthread_mutex_init(&listener_mutex,
181                               (const pthread_mutexattr_t*)0)==0);
182     assert(pthread_mutex_init(&rx_if_init_mutex,
183                               (const pthread_mutexattr_t*)0)==0);
184     assert(pthread_mutex_init(&rx_if_mutex,
185                               (const pthread_mutexattr_t*)0)==0);
186     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
187                               (const pthread_mutexattr_t*)0)==0);
188     assert(pthread_mutex_init(&rxkad_random_mutex,
189                               (const pthread_mutexattr_t*)0)==0);
190     assert(pthread_mutex_init(&rxkad_stats_mutex,
191                               (const pthread_mutexattr_t*)0)==0);
192     assert(pthread_mutex_init(&rx_debug_mutex,
193                               (const pthread_mutexattr_t*)0)==0);
194
195     assert(pthread_cond_init(&rx_event_handler_cond,
196                               (const pthread_condattr_t*)0)==0);
197     assert(pthread_cond_init(&rx_listener_cond,
198                               (const pthread_condattr_t*)0)==0);
199     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
200 }
201
202 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
203 #define INIT_PTHREAD_LOCKS \
204 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
205 /*
206  * The rx_stats_mutex mutex protects the following global variables:
207  * rxi_dataQuota
208  * rxi_minDeficit
209  * rxi_availProcs
210  * rxi_totalMin
211  * rxi_lowConnRefCount
212  * rxi_lowPeerRefCount
213  * rxi_nCalls
214  * rxi_Alloccnt
215  * rxi_Allocsize
216  * rx_nFreePackets
217  * rx_tq_debug
218  * rx_stats
219  */
220 #else
221 #define INIT_PTHREAD_LOCKS
222 #endif
223
224
225 /* Variables for handling the minProcs implementation.  availProcs gives the
226  * number of threads available in the pool at this moment (not counting dudes
227  * executing right now).  totalMin gives the total number of procs required
228  * for handling all minProcs requests.  minDeficit is a dynamic variable
229  * tracking the # of procs required to satisfy all of the remaining minProcs
230  * demands.
231  * For fine grain locking to work, the quota check and the reservation of
232  * a server thread has to come while rxi_availProcs and rxi_minDeficit
233  * are locked. To this end, the code has been modified under #ifdef
234  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
235  * same time. A new function, ReturnToServerPool() returns the allocation.
236  * 
237  * A call can be on several queue's (but only one at a time). When
238  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
239  * that no one else is touching the queue. To this end, we store the address
240  * of the queue lock in the call structure (under the call lock) when we
241  * put the call on a queue, and we clear the call_queue_lock when the
242  * call is removed from a queue (once the call lock has been obtained).
243  * This allows rxi_ResetCall to safely synchronize with others wishing
244  * to manipulate the queue.
245  */
246
247 #ifdef RX_ENABLE_LOCKS
248 static int rxi_ServerThreadSelectingCall;
249 static afs_kmutex_t rx_rpc_stats;
250 void rxi_StartUnlocked();
251 #endif
252
253 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
254 ** pretty good that the next packet coming in is from the same connection 
255 ** as the last packet, since we're send multiple packets in a transmit window.
256 */
257 struct rx_connection *rxLastConn = 0; 
258
259 #ifdef RX_ENABLE_LOCKS
260 /* The locking hierarchy for rx fine grain locking is composed of five
261  * tiers:
262  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
263  * call->lock - locks call data fields.
264  * Most any other lock - these are all independent of each other.....
265  *      rx_freePktQ_lock
266  *      rx_freeCallQueue_lock
267  *      freeSQEList_lock
268  *      rx_connHashTable_lock
269  *      rx_serverPool_lock
270  *      rxi_keyCreate_lock
271  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
272
273  * lowest level:
274  *      peer_lock - locks peer data fields.
275  *      conn_data_lock - that more than one thread is not updating a conn data
276  *              field at the same time.
277  * Do we need a lock to protect the peer field in the conn structure?
278  *      conn->peer was previously a constant for all intents and so has no
279  *      lock protecting this field. The multihomed client delta introduced
280  *      a RX code change : change the peer field in the connection structure
281  *      to that remote inetrface from which the last packet for this
282  *      connection was sent out. This may become an issue if further changes
283  *      are made.
284  */
285 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
286 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
287 #ifdef RX_LOCKS_DB
288 /* rxdb_fileID is used to identify the lock location, along with line#. */
289 static int rxdb_fileID = RXDB_FILE_RX;
290 #endif /* RX_LOCKS_DB */
291 static void rxi_SetAcksInTransmitQueue();
292 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
293 #else /* RX_ENABLE_LOCKS */
294 #define SET_CALL_QUEUE_LOCK(C, L)
295 #define CLEAR_CALL_QUEUE_LOCK(C)
296 #endif /* RX_ENABLE_LOCKS */
297 static void rxi_DestroyConnectionNoLock();
298 struct rx_serverQueueEntry *rx_waitForPacket = 0;
299
300 /* ------------Exported Interfaces------------- */
301
302 /* This function allows rxkad to set the epoch to a suitably random number
303  * which rx_NewConnection will use in the future.  The principle purpose is to
304  * get rxnull connections to use the same epoch as the rxkad connections do, at
305  * least once the first rxkad connection is established.  This is important now
306  * that the host/port addresses aren't used in FindConnection: the uniqueness
307  * of epoch/cid matters and the start time won't do. */
308
309 #ifdef AFS_PTHREAD_ENV
310 /*
311  * This mutex protects the following global variables:
312  * rx_epoch
313  */
314
315 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
316 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
317 #else
318 #define LOCK_EPOCH
319 #define UNLOCK_EPOCH
320 #endif /* AFS_PTHREAD_ENV */
321
322 void rx_SetEpoch (epoch)
323   afs_uint32 epoch;
324 {
325     LOCK_EPOCH
326     rx_epoch = epoch;
327     UNLOCK_EPOCH
328 }
329
330 /* Initialize rx.  A port number may be mentioned, in which case this
331  * becomes the default port number for any service installed later.
332  * If 0 is provided for the port number, a random port will be chosen
333  * by the kernel.  Whether this will ever overlap anything in
334  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
335  * error. */
336 static int rxinit_status = 1;
337 #ifdef AFS_PTHREAD_ENV
338 /*
339  * This mutex protects the following global variables:
340  * rxinit_status
341  */
342
343 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
344 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
345 #else
346 #define LOCK_RX_INIT
347 #define UNLOCK_RX_INIT
348 #endif
349
350 int rx_Init(u_int port)
351 {
352 #ifdef KERNEL
353     osi_timeval_t tv;
354 #else /* KERNEL */
355     struct timeval tv;
356 #endif /* KERNEL */
357     char *htable, *ptable;
358     int tmp_status;
359
360 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
361     __djgpp_set_quiet_socket(1);
362 #endif
363
364     SPLVAR;
365
366     INIT_PTHREAD_LOCKS
367     LOCK_RX_INIT
368     if (rxinit_status == 0) {
369         tmp_status = rxinit_status;
370         UNLOCK_RX_INIT
371         return tmp_status; /* Already started; return previous error code. */
372     }
373
374 #ifdef AFS_NT40_ENV
375     if (afs_winsockInit()<0)
376         return -1;
377 #endif
378
379 #ifndef KERNEL
380     /*
381      * Initialize anything necessary to provide a non-premptive threading
382      * environment.
383      */
384     rxi_InitializeThreadSupport();
385 #endif
386
387     /* Allocate and initialize a socket for client and perhaps server
388      * connections. */
389
390     rx_socket = rxi_GetUDPSocket((u_short)port); 
391     if (rx_socket == OSI_NULLSOCKET) {
392         UNLOCK_RX_INIT
393         return RX_ADDRINUSE;
394     }
395     
396
397 #ifdef  RX_ENABLE_LOCKS
398 #ifdef RX_LOCKS_DB
399     rxdb_init();
400 #endif /* RX_LOCKS_DB */
401     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
402     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
403     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
404     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
405     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
406                MUTEX_DEFAULT,0);
407     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
408     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
409     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
410     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
411 #ifndef KERNEL
412     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
413 #endif /* !KERNEL */
414     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
415 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
416     if ( !uniprocessor )
417       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
418 #endif /* KERNEL && AFS_HPUX110_ENV */
419 #else /* RX_ENABLE_LOCKS */
420 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
421     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
422 #endif /* AFS_GLOBAL_SUNLOCK */
423 #endif /* RX_ENABLE_LOCKS */
424
425     rxi_nCalls = 0;
426     rx_connDeadTime = 12;
427     rx_tranquil     = 0;        /* reset flag */
428     bzero((char *)&rx_stats, sizeof(struct rx_stats));
429     htable = (char *)
430         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
431     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
432     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
433     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
434     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
435     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
436
437     /* Malloc up a bunch of packets & buffers */
438     rx_nFreePackets = 0;
439     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
440     queue_Init(&rx_freePacketQueue);
441     rxi_NeedMorePackets = FALSE;
442     rxi_MorePackets(rx_nPackets);
443     rx_CheckPackets();
444
445     NETPRI;
446     AFS_RXGLOCK();
447
448     clock_Init();
449
450 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
451     tv.tv_sec = clock_now.sec;
452     tv.tv_usec = clock_now.usec;
453     srand((unsigned int) tv.tv_usec);
454 #else
455     osi_GetTime(&tv);
456 #endif
457     if (port) {
458         rx_port = port;
459     } else {
460 #if defined(KERNEL) && !defined(UKERNEL)
461         /* Really, this should never happen in a real kernel */
462         rx_port = 0;
463 #else
464         struct sockaddr_in addr;
465         int addrlen = sizeof(addr);
466         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
467             rx_Finalize();
468             return -1;
469         }
470         rx_port = addr.sin_port;
471 #endif
472     }
473     rx_stats.minRtt.sec = 9999999;
474 #ifdef  KERNEL
475     rx_SetEpoch (tv.tv_sec | 0x80000000);
476 #else
477     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
478                                          * will provide a randomer value. */
479 #endif
480     MUTEX_ENTER(&rx_stats_mutex);
481     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
482     MUTEX_EXIT(&rx_stats_mutex);
483     /* *Slightly* random start time for the cid.  This is just to help
484      * out with the hashing function at the peer */
485     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
486     rx_connHashTable = (struct rx_connection **) htable;
487     rx_peerHashTable = (struct rx_peer **) ptable;
488
489     rx_lastAckDelay.sec = 0;
490     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
491     rx_hardAckDelay.sec = 0;
492     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
493     rx_softAckDelay.sec = 0;
494     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
495
496     rxevent_Init(20, rxi_ReScheduleEvents);
497
498     /* Initialize various global queues */
499     queue_Init(&rx_idleServerQueue);
500     queue_Init(&rx_incomingCallQueue);
501     queue_Init(&rx_freeCallQueue);
502
503 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
504     /* Initialize our list of usable IP addresses. */
505     rx_GetIFInfo();
506 #endif
507
508     /* Start listener process (exact function is dependent on the
509      * implementation environment--kernel or user space) */
510     rxi_StartListener();
511
512     AFS_RXGUNLOCK();
513     USERPRI;
514     tmp_status = rxinit_status = 0;
515     UNLOCK_RX_INIT
516     return tmp_status;
517 }
518
519 /* called with unincremented nRequestsRunning to see if it is OK to start
520  * a new thread in this service.  Could be "no" for two reasons: over the
521  * max quota, or would prevent others from reaching their min quota.
522  */
523 #ifdef RX_ENABLE_LOCKS
524 /* This verion of QuotaOK reserves quota if it's ok while the
525  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
526  */
527 static int QuotaOK(aservice)
528 register struct rx_service *aservice;
529 {
530     /* check if over max quota */
531     if (aservice->nRequestsRunning >= aservice->maxProcs) {
532         return 0;
533     }
534
535     /* under min quota, we're OK */
536     /* otherwise, can use only if there are enough to allow everyone
537      * to go to their min quota after this guy starts.
538      */
539     MUTEX_ENTER(&rx_stats_mutex);
540     if ((aservice->nRequestsRunning < aservice->minProcs) ||
541          (rxi_availProcs > rxi_minDeficit)) {
542         aservice->nRequestsRunning++;
543         /* just started call in minProcs pool, need fewer to maintain
544          * guarantee */
545         if (aservice->nRequestsRunning <= aservice->minProcs)
546             rxi_minDeficit--;
547         rxi_availProcs--;
548         MUTEX_EXIT(&rx_stats_mutex);
549         return 1;
550     }
551     MUTEX_EXIT(&rx_stats_mutex);
552
553     return 0;
554 }
555 static void ReturnToServerPool(aservice)
556 register struct rx_service *aservice;
557 {
558     aservice->nRequestsRunning--;
559     MUTEX_ENTER(&rx_stats_mutex);
560     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
561     rxi_availProcs++;
562     MUTEX_EXIT(&rx_stats_mutex);
563 }
564
565 #else /* RX_ENABLE_LOCKS */
566 static int QuotaOK(aservice)
567 register struct rx_service *aservice; {
568     int rc=0;
569     /* under min quota, we're OK */
570     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
571
572     /* check if over max quota */
573     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
574
575     /* otherwise, can use only if there are enough to allow everyone
576      * to go to their min quota after this guy starts.
577      */
578     if (rxi_availProcs > rxi_minDeficit) rc = 1;
579     return rc;
580 }
581 #endif /* RX_ENABLE_LOCKS */
582
583 #ifndef KERNEL
584 /* Called by rx_StartServer to start up lwp's to service calls.
585    NExistingProcs gives the number of procs already existing, and which
586    therefore needn't be created. */
587 void rxi_StartServerProcs(nExistingProcs)
588     int nExistingProcs;
589 {
590     register struct rx_service *service;
591     register int i;
592     int maxdiff = 0;
593     int nProcs = 0;
594
595     /* For each service, reserve N processes, where N is the "minimum"
596        number of processes that MUST be able to execute a request in parallel,
597        at any time, for that process.  Also compute the maximum difference
598        between any service's maximum number of processes that can run
599        (i.e. the maximum number that ever will be run, and a guarantee
600        that this number will run if other services aren't running), and its
601        minimum number.  The result is the extra number of processes that
602        we need in order to provide the latter guarantee */
603     for (i=0; i<RX_MAX_SERVICES; i++) {
604         int diff;
605         service = rx_services[i];
606         if (service == (struct rx_service *) 0) break;
607         nProcs += service->minProcs;
608         diff = service->maxProcs - service->minProcs;
609         if (diff > maxdiff) maxdiff = diff;
610     }
611     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
612     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
613     for (i = 0; i<nProcs; i++) {
614         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
615     }
616 }
617 #endif /* KERNEL */
618
619 /* This routine must be called if any services are exported.  If the
620  * donateMe flag is set, the calling process is donated to the server
621  * process pool */
622 void rx_StartServer(donateMe)
623 {
624     register struct rx_service *service;
625     register int i, nProcs=0;
626     SPLVAR;
627     clock_NewTime();
628
629     NETPRI;
630     AFS_RXGLOCK();
631     /* Start server processes, if necessary (exact function is dependent
632      * on the implementation environment--kernel or user space).  DonateMe
633      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
634      * case, one less new proc will be created rx_StartServerProcs.
635      */
636     rxi_StartServerProcs(donateMe);
637
638     /* count up the # of threads in minProcs, and add set the min deficit to
639      * be that value, too.
640      */
641     for (i=0; i<RX_MAX_SERVICES; i++) {
642         service = rx_services[i];
643         if (service == (struct rx_service *) 0) break;
644         MUTEX_ENTER(&rx_stats_mutex);
645         rxi_totalMin += service->minProcs;
646         /* below works even if a thread is running, since minDeficit would
647          * still have been decremented and later re-incremented.
648          */
649         rxi_minDeficit += service->minProcs;
650         MUTEX_EXIT(&rx_stats_mutex);
651     }
652
653     /* Turn on reaping of idle server connections */
654     rxi_ReapConnections();
655
656     AFS_RXGUNLOCK();
657     USERPRI;
658
659     if (donateMe) {
660 #ifndef AFS_NT40_ENV
661 #ifndef KERNEL
662         char name[32];
663 #ifdef AFS_PTHREAD_ENV
664         pid_t pid;
665         pid = (pid_t) pthread_self();
666 #else /* AFS_PTHREAD_ENV */
667         PROCESS pid;
668         LWP_CurrentProcess(&pid);
669 #endif /* AFS_PTHREAD_ENV */
670
671         sprintf(name,"srv_%d", ++nProcs);
672         if (registerProgram)
673             (*registerProgram)(pid, name);
674 #endif /* KERNEL */
675 #endif /* AFS_NT40_ENV */
676         rx_ServerProc(); /* Never returns */
677     }
678     return;
679 }
680
681 /* Create a new client connection to the specified service, using the
682  * specified security object to implement the security model for this
683  * connection. */
684 struct rx_connection *
685 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
686     register afs_uint32 shost;      /* Server host */
687     u_short sport;                  /* Server port */
688     u_short sservice;               /* Server service id */
689     register struct rx_securityClass *securityObject;
690     int serviceSecurityIndex;
691 {
692     int hashindex;
693     afs_int32 cid;
694     register struct rx_connection *conn;
695
696     SPLVAR;
697
698     clock_NewTime();
699     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
700           shost, sport, sservice, securityObject, serviceSecurityIndex));
701
702     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
703      * the case of kmem_alloc? */
704     conn = rxi_AllocConnection();
705 #ifdef  RX_ENABLE_LOCKS
706     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
707     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
708     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
709 #endif
710     NETPRI;
711     AFS_RXGLOCK();
712     MUTEX_ENTER(&rx_connHashTable_lock);
713     cid = (rx_nextCid += RX_MAXCALLS);
714     conn->type = RX_CLIENT_CONNECTION;
715     conn->cid = cid;
716     conn->epoch = rx_epoch;
717     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
718     conn->serviceId = sservice;
719     conn->securityObject = securityObject;
720     /* This doesn't work in all compilers with void (they're buggy), so fake it
721      * with VOID */
722     conn->securityData = (VOID *) 0;
723     conn->securityIndex = serviceSecurityIndex;
724     rx_SetConnDeadTime(conn, rx_connDeadTime);
725     conn->ackRate = RX_FAST_ACK_RATE;
726     conn->nSpecific = 0;
727     conn->specific = NULL;
728     conn->challengeEvent = (struct rxevent *)0;
729     conn->delayedAbortEvent = (struct rxevent *)0;
730     conn->abortCount = 0;
731     conn->error = 0;
732
733     RXS_NewConnection(securityObject, conn);
734     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
735     
736     conn->refCount++; /* no lock required since only this thread knows... */
737     conn->next = rx_connHashTable[hashindex];
738     rx_connHashTable[hashindex] = conn;
739     MUTEX_ENTER(&rx_stats_mutex);
740     rx_stats.nClientConns++;
741     MUTEX_EXIT(&rx_stats_mutex);
742
743     MUTEX_EXIT(&rx_connHashTable_lock);
744     AFS_RXGUNLOCK();
745     USERPRI;
746     return conn;
747 }
748
749 void rx_SetConnDeadTime(conn, seconds)
750     register struct rx_connection *conn;
751     register int seconds;
752 {
753     /* The idea is to set the dead time to a value that allows several
754      * keepalives to be dropped without timing out the connection. */
755     conn->secondsUntilDead = MAX(seconds, 6);
756     conn->secondsUntilPing = conn->secondsUntilDead/6;
757 }
758
759 int rxi_lowPeerRefCount = 0;
760 int rxi_lowConnRefCount = 0;
761
762 /*
763  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
764  * NOTE: must not be called with rx_connHashTable_lock held.
765  */
766 void rxi_CleanupConnection(conn)
767     struct rx_connection *conn;
768 {
769     int i;
770
771     /* Notify the service exporter, if requested, that this connection
772      * is being destroyed */
773     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
774       (*conn->service->destroyConnProc)(conn);
775
776     /* Notify the security module that this connection is being destroyed */
777     RXS_DestroyConnection(conn->securityObject, conn);
778
779     /* If this is the last connection using the rx_peer struct, set its
780      * idle time to now. rxi_ReapConnections will reap it if it's still
781      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
782      */
783     MUTEX_ENTER(&rx_peerHashTable_lock);
784     if (--conn->peer->refCount <= 0) {
785         conn->peer->idleWhen = clock_Sec();
786         if (conn->peer->refCount < 0) {
787             conn->peer->refCount = 0; 
788             MUTEX_ENTER(&rx_stats_mutex);
789             rxi_lowPeerRefCount ++;
790             MUTEX_EXIT(&rx_stats_mutex);
791         }
792     }
793     MUTEX_EXIT(&rx_peerHashTable_lock);
794
795     MUTEX_ENTER(&rx_stats_mutex);
796     if (conn->type == RX_SERVER_CONNECTION)
797       rx_stats.nServerConns--;
798     else
799       rx_stats.nClientConns--;
800     MUTEX_EXIT(&rx_stats_mutex);
801
802 #ifndef KERNEL
803     if (conn->specific) {
804         for (i = 0 ; i < conn->nSpecific ; i++) {
805             if (conn->specific[i] && rxi_keyCreate_destructor[i])
806                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
807             conn->specific[i] = NULL;
808         }
809         free(conn->specific);
810     }
811     conn->specific = NULL;
812     conn->nSpecific = 0;
813 #endif /* !KERNEL */
814
815     MUTEX_DESTROY(&conn->conn_call_lock);
816     MUTEX_DESTROY(&conn->conn_data_lock);
817     CV_DESTROY(&conn->conn_call_cv);
818         
819     rxi_FreeConnection(conn);
820 }
821
822 /* Destroy the specified connection */
823 void rxi_DestroyConnection(conn)
824     register struct rx_connection *conn;
825 {
826     MUTEX_ENTER(&rx_connHashTable_lock);
827     rxi_DestroyConnectionNoLock(conn);
828     /* conn should be at the head of the cleanup list */
829     if (conn == rx_connCleanup_list) {
830         rx_connCleanup_list = rx_connCleanup_list->next;
831         MUTEX_EXIT(&rx_connHashTable_lock);
832         rxi_CleanupConnection(conn);
833     }
834 #ifdef RX_ENABLE_LOCKS
835     else {
836         MUTEX_EXIT(&rx_connHashTable_lock);
837     }
838 #endif /* RX_ENABLE_LOCKS */
839 }
840     
841 static void rxi_DestroyConnectionNoLock(conn)
842     register struct rx_connection *conn;
843 {
844     register struct rx_connection **conn_ptr;
845     register int havecalls = 0;
846     struct rx_packet *packet;
847     int i;
848     SPLVAR;
849
850     clock_NewTime();
851
852     NETPRI;
853     MUTEX_ENTER(&conn->conn_data_lock);
854     if (conn->refCount > 0)
855         conn->refCount--;
856     else {
857         MUTEX_ENTER(&rx_stats_mutex);
858         rxi_lowConnRefCount++;
859         MUTEX_EXIT(&rx_stats_mutex);
860     }
861
862     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
863         /* Busy; wait till the last guy before proceeding */
864         MUTEX_EXIT(&conn->conn_data_lock);
865         USERPRI;
866         return;
867     }
868
869     /* If the client previously called rx_NewCall, but it is still
870      * waiting, treat this as a running call, and wait to destroy the
871      * connection later when the call completes. */
872     if ((conn->type == RX_CLIENT_CONNECTION) &&
873         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
874         conn->flags |= RX_CONN_DESTROY_ME;
875         MUTEX_EXIT(&conn->conn_data_lock);
876         USERPRI;
877         return;
878     }
879     MUTEX_EXIT(&conn->conn_data_lock);
880
881     /* Check for extant references to this connection */
882     for (i = 0; i<RX_MAXCALLS; i++) {
883         register struct rx_call *call = conn->call[i];
884         if (call) {
885             havecalls = 1;
886             if (conn->type == RX_CLIENT_CONNECTION) {
887                 MUTEX_ENTER(&call->lock);
888                 if (call->delayedAckEvent) {
889                     /* Push the final acknowledgment out now--there
890                      * won't be a subsequent call to acknowledge the
891                      * last reply packets */
892                     rxevent_Cancel(call->delayedAckEvent, call,
893                                    RX_CALL_REFCOUNT_DELAY);
894                     rxi_AckAll((struct rxevent *)0, call, 0);
895                 }
896                 MUTEX_EXIT(&call->lock);
897             }
898         }
899     }
900 #ifdef RX_ENABLE_LOCKS
901     if (!havecalls) {
902         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
903             MUTEX_EXIT(&conn->conn_data_lock);
904         }
905         else {
906             /* Someone is accessing a packet right now. */
907             havecalls = 1;
908         }
909     }
910 #endif /* RX_ENABLE_LOCKS */
911
912     if (havecalls) {
913         /* Don't destroy the connection if there are any call
914          * structures still in use */
915         MUTEX_ENTER(&conn->conn_data_lock);
916         conn->flags |= RX_CONN_DESTROY_ME;
917         MUTEX_EXIT(&conn->conn_data_lock);
918         USERPRI;
919         return;
920     }
921
922     if (conn->delayedAbortEvent) {
923         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
924         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
925         if (packet) {
926             MUTEX_ENTER(&conn->conn_data_lock);
927             rxi_SendConnectionAbort(conn, packet, 0, 1);
928             MUTEX_EXIT(&conn->conn_data_lock);
929             rxi_FreePacket(packet);
930         }
931     }
932
933     /* Remove from connection hash table before proceeding */
934     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
935                                              conn->epoch, conn->type) ];
936     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
937         if (*conn_ptr == conn) {
938             *conn_ptr = conn->next;
939             break;
940         }
941     }
942     /* if the conn that we are destroying was the last connection, then we
943     * clear rxLastConn as well */
944     if ( rxLastConn == conn )
945         rxLastConn = 0;
946
947     /* Make sure the connection is completely reset before deleting it. */
948     /* get rid of pending events that could zap us later */
949     if (conn->challengeEvent) {
950         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
951     }
952  
953     /* Add the connection to the list of destroyed connections that
954      * need to be cleaned up. This is necessary to avoid deadlocks
955      * in the routines we call to inform others that this connection is
956      * being destroyed. */
957     conn->next = rx_connCleanup_list;
958     rx_connCleanup_list = conn;
959 }
960
961 /* Externally available version */
962 void rx_DestroyConnection(conn) 
963     register struct rx_connection *conn;
964 {
965     SPLVAR;
966
967     NETPRI;
968     AFS_RXGLOCK();
969     rxi_DestroyConnection (conn);
970     AFS_RXGUNLOCK();
971     USERPRI;
972 }
973
974 /* Start a new rx remote procedure call, on the specified connection.
975  * If wait is set to 1, wait for a free call channel; otherwise return
976  * 0.  Maxtime gives the maximum number of seconds this call may take,
977  * after rx_MakeCall returns.  After this time interval, a call to any
978  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
979  * For fine grain locking, we hold the conn_call_lock in order to 
980  * to ensure that we don't get signalle after we found a call in an active
981  * state and before we go to sleep.
982  */
983 struct rx_call *rx_NewCall(conn)
984     register struct rx_connection *conn;
985 {
986     register int i;
987     register struct rx_call *call;
988     struct clock queueTime;
989     SPLVAR;
990
991     clock_NewTime();
992     dpf (("rx_MakeCall(conn %x)\n", conn));
993
994     NETPRI;
995     clock_GetTime(&queueTime);
996     AFS_RXGLOCK();
997     MUTEX_ENTER(&conn->conn_call_lock);
998     for (;;) {
999         for (i=0; i<RX_MAXCALLS; i++) {
1000             call = conn->call[i];
1001             if (call) {
1002                 MUTEX_ENTER(&call->lock);
1003                 if (call->state == RX_STATE_DALLY) {
1004                     rxi_ResetCall(call, 0);
1005                     (*call->callNumber)++;
1006                     break;
1007                 }
1008                 MUTEX_EXIT(&call->lock);
1009             }
1010             else {
1011                 call = rxi_NewCall(conn, i);
1012                 MUTEX_ENTER(&call->lock);
1013                 break;
1014             }
1015         }
1016         if (i < RX_MAXCALLS) {
1017             break;
1018         }
1019         MUTEX_ENTER(&conn->conn_data_lock);
1020         conn->flags |= RX_CONN_MAKECALL_WAITING;
1021         MUTEX_EXIT(&conn->conn_data_lock);
1022 #ifdef  RX_ENABLE_LOCKS
1023         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1024 #else
1025         osi_rxSleep(conn);
1026 #endif
1027     }
1028
1029     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1030
1031     /* Client is initially in send mode */
1032     call->state = RX_STATE_ACTIVE;
1033     call->mode = RX_MODE_SENDING;
1034
1035     /* remember start time for call in case we have hard dead time limit */
1036     call->queueTime = queueTime;
1037     clock_GetTime(&call->startTime);
1038     hzero(call->bytesSent);
1039     hzero(call->bytesRcvd);
1040
1041     /* Turn on busy protocol. */
1042     rxi_KeepAliveOn(call);
1043
1044     MUTEX_EXIT(&call->lock);
1045     MUTEX_EXIT(&conn->conn_call_lock);
1046     AFS_RXGUNLOCK();
1047     USERPRI;
1048
1049 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1050     /* Now, if TQ wasn't cleared earlier, do it now. */
1051     AFS_RXGLOCK();
1052     MUTEX_ENTER(&call->lock);
1053     while (call->flags & RX_CALL_TQ_BUSY) {
1054         call->flags |= RX_CALL_TQ_WAIT;
1055 #ifdef RX_ENABLE_LOCKS
1056         CV_WAIT(&call->cv_tq, &call->lock);
1057 #else /* RX_ENABLE_LOCKS */
1058         osi_rxSleep(&call->tq);
1059 #endif /* RX_ENABLE_LOCKS */
1060     }
1061     if (call->flags & RX_CALL_TQ_CLEARME) {
1062         rxi_ClearTransmitQueue(call, 0);
1063         queue_Init(&call->tq);
1064     }
1065     MUTEX_EXIT(&call->lock);
1066     AFS_RXGUNLOCK();
1067 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1068
1069     return call;
1070 }
1071
1072 int
1073 rxi_HasActiveCalls(aconn)
1074 register struct rx_connection *aconn; {
1075     register int i;
1076     register struct rx_call *tcall;
1077     SPLVAR;
1078
1079     NETPRI;
1080     for(i=0; i<RX_MAXCALLS; i++) {
1081       if ((tcall = aconn->call[i])) {
1082         if ((tcall->state == RX_STATE_ACTIVE) 
1083             || (tcall->state == RX_STATE_PRECALL)) {
1084           USERPRI;
1085           return 1;
1086         }
1087       }
1088     }
1089     USERPRI;
1090     return 0;
1091 }
1092
1093 int
1094 rxi_GetCallNumberVector(aconn, aint32s)
1095 register struct rx_connection *aconn;
1096 register afs_int32 *aint32s; {
1097     register int i;
1098     register struct rx_call *tcall;
1099     SPLVAR;
1100
1101     NETPRI;
1102     for(i=0; i<RX_MAXCALLS; i++) {
1103         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1104             aint32s[i] = aconn->callNumber[i]+1;
1105         else
1106             aint32s[i] = aconn->callNumber[i];
1107     }
1108     USERPRI;
1109     return 0;
1110 }
1111
1112 int
1113 rxi_SetCallNumberVector(aconn, aint32s)
1114 register struct rx_connection *aconn;
1115 register afs_int32 *aint32s; {
1116     register int i;
1117     register struct rx_call *tcall;
1118     SPLVAR;
1119
1120     NETPRI;
1121     for(i=0; i<RX_MAXCALLS; i++) {
1122         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1123             aconn->callNumber[i] = aint32s[i] - 1;
1124         else
1125             aconn->callNumber[i] = aint32s[i];
1126     }
1127     USERPRI;
1128     return 0;
1129 }
1130
1131 /* Advertise a new service.  A service is named locally by a UDP port
1132  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1133  * on a failure. */
1134 struct rx_service *
1135 rx_NewService(port, serviceId, serviceName, securityObjects,
1136               nSecurityObjects, serviceProc)
1137     u_short port;
1138     u_short serviceId;
1139     char *serviceName;  /* Name for identification purposes (e.g. the
1140                          * service name might be used for probing for
1141                          * statistics) */
1142     struct rx_securityClass **securityObjects;
1143     int nSecurityObjects;
1144     afs_int32 (*serviceProc)();
1145 {    
1146     osi_socket socket = OSI_NULLSOCKET;
1147     register struct rx_service *tservice;    
1148     register int i;
1149     SPLVAR;
1150
1151     clock_NewTime();
1152
1153     if (serviceId == 0) {
1154         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1155          serviceName);
1156         return 0;
1157     }
1158     if (port == 0) {
1159         if (rx_port == 0) {
1160             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1161             return 0;
1162         }
1163         port = rx_port;
1164         socket = rx_socket;
1165     }
1166
1167     tservice = rxi_AllocService();
1168     NETPRI;
1169     AFS_RXGLOCK();
1170     for (i = 0; i<RX_MAX_SERVICES; i++) {
1171         register struct rx_service *service = rx_services[i];
1172         if (service) {
1173             if (port == service->servicePort) {
1174                 if (service->serviceId == serviceId) {
1175                     /* The identical service has already been
1176                      * installed; if the caller was intending to
1177                      * change the security classes used by this
1178                      * service, he/she loses. */
1179                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1180                     AFS_RXGUNLOCK();
1181                     USERPRI;
1182                     rxi_FreeService(tservice);
1183                     return service;
1184                 }
1185                 /* Different service, same port: re-use the socket
1186                  * which is bound to the same port */
1187                 socket = service->socket;
1188             }
1189         } else {
1190             if (socket == OSI_NULLSOCKET) {
1191                 /* If we don't already have a socket (from another
1192                  * service on same port) get a new one */
1193                 socket = rxi_GetUDPSocket(port);
1194                 if (socket == OSI_NULLSOCKET) {
1195                     AFS_RXGUNLOCK();
1196                     USERPRI;
1197                     rxi_FreeService(tservice);
1198                     return 0;
1199                 }
1200             }
1201             service = tservice;
1202             service->socket = socket;
1203             service->servicePort = port;
1204             service->serviceId = serviceId;
1205             service->serviceName = serviceName;
1206             service->nSecurityObjects = nSecurityObjects;
1207             service->securityObjects = securityObjects;
1208             service->minProcs = 0;
1209             service->maxProcs = 1;
1210             service->idleDeadTime = 60;
1211             service->connDeadTime = rx_connDeadTime;
1212             service->executeRequestProc = serviceProc;
1213             rx_services[i] = service;   /* not visible until now */
1214             AFS_RXGUNLOCK();
1215             USERPRI;
1216             return service;
1217         }
1218     }
1219     AFS_RXGUNLOCK();
1220     USERPRI;
1221     rxi_FreeService(tservice);
1222     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1223     return 0;
1224 }
1225
1226 /* Generic request processing loop. This routine should be called
1227  * by the implementation dependent rx_ServerProc. If socketp is
1228  * non-null, it will be set to the file descriptor that this thread
1229  * is now listening on. If socketp is null, this routine will never
1230  * returns. */
1231 void rxi_ServerProc(threadID, newcall, socketp)
1232 int threadID;
1233 struct rx_call *newcall;
1234 osi_socket *socketp;
1235 {
1236     register struct rx_call *call;
1237     register afs_int32 code;
1238     register struct rx_service *tservice = NULL;
1239
1240     for (;;) {
1241         if (newcall) {
1242             call = newcall;
1243             newcall = NULL;
1244         } else {
1245             call = rx_GetCall(threadID, tservice, socketp);
1246             if (socketp && *socketp != OSI_NULLSOCKET) {
1247                 /* We are now a listener thread */
1248                 return;
1249             }
1250         }
1251
1252         /* if server is restarting( typically smooth shutdown) then do not
1253          * allow any new calls.
1254          */
1255
1256         if ( rx_tranquil && (call != NULL) ) {
1257             SPLVAR;
1258
1259             NETPRI;
1260             AFS_RXGLOCK();
1261             MUTEX_ENTER(&call->lock);
1262
1263             rxi_CallError(call, RX_RESTARTING);
1264             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1265
1266             MUTEX_EXIT(&call->lock);
1267             AFS_RXGUNLOCK();
1268             USERPRI;
1269         }
1270
1271 #ifdef  KERNEL
1272         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1273 #ifdef RX_ENABLE_LOCKS
1274             AFS_GLOCK();
1275 #endif /* RX_ENABLE_LOCKS */
1276             afs_termState = AFSOP_STOP_AFS;
1277             afs_osi_Wakeup(&afs_termState);
1278 #ifdef RX_ENABLE_LOCKS
1279             AFS_GUNLOCK();
1280 #endif /* RX_ENABLE_LOCKS */
1281             return;
1282         }
1283 #endif
1284
1285         tservice = call->conn->service;
1286
1287         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1288
1289         code = call->conn->service->executeRequestProc(call);
1290
1291         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1292
1293         rx_EndCall(call, code);
1294         MUTEX_ENTER(&rx_stats_mutex);
1295         rxi_nCalls++;
1296         MUTEX_EXIT(&rx_stats_mutex);
1297     }
1298 }
1299
1300
1301 void rx_WakeupServerProcs()
1302 {
1303     struct rx_serverQueueEntry *np, *tqp;
1304     SPLVAR;
1305
1306     NETPRI;
1307     AFS_RXGLOCK();
1308     MUTEX_ENTER(&rx_serverPool_lock);
1309
1310 #ifdef RX_ENABLE_LOCKS
1311     if (rx_waitForPacket)
1312         CV_BROADCAST(&rx_waitForPacket->cv);
1313 #else /* RX_ENABLE_LOCKS */
1314     if (rx_waitForPacket)
1315         osi_rxWakeup(rx_waitForPacket);
1316 #endif /* RX_ENABLE_LOCKS */
1317     MUTEX_ENTER(&freeSQEList_lock);
1318     for (np = rx_FreeSQEList; np; np = tqp) {
1319       tqp = *(struct rx_serverQueueEntry **)np;
1320 #ifdef RX_ENABLE_LOCKS
1321       CV_BROADCAST(&np->cv);
1322 #else /* RX_ENABLE_LOCKS */
1323       osi_rxWakeup(np);
1324 #endif /* RX_ENABLE_LOCKS */
1325     }
1326     MUTEX_EXIT(&freeSQEList_lock);
1327     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1328 #ifdef RX_ENABLE_LOCKS
1329       CV_BROADCAST(&np->cv);
1330 #else /* RX_ENABLE_LOCKS */
1331       osi_rxWakeup(np);
1332 #endif /* RX_ENABLE_LOCKS */
1333     }
1334     MUTEX_EXIT(&rx_serverPool_lock);
1335     AFS_RXGUNLOCK();
1336     USERPRI;
1337 }
1338
1339 /* meltdown:
1340  * One thing that seems to happen is that all the server threads get
1341  * tied up on some empty or slow call, and then a whole bunch of calls
1342  * arrive at once, using up the packet pool, so now there are more 
1343  * empty calls.  The most critical resources here are server threads
1344  * and the free packet pool.  The "doreclaim" code seems to help in
1345  * general.  I think that eventually we arrive in this state: there
1346  * are lots of pending calls which do have all their packets present,
1347  * so they won't be reclaimed, are multi-packet calls, so they won't
1348  * be scheduled until later, and thus are tying up most of the free 
1349  * packet pool for a very long time.
1350  * future options:
1351  * 1.  schedule multi-packet calls if all the packets are present.  
1352  * Probably CPU-bound operation, useful to return packets to pool. 
1353  * Do what if there is a full window, but the last packet isn't here?
1354  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1355  * it sleeps and waits for that type of call.
1356  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1357  * the current dataquota business is badly broken.  The quota isn't adjusted
1358  * to reflect how many packets are presently queued for a running call.
1359  * So, when we schedule a queued call with a full window of packets queued
1360  * up for it, that *should* free up a window full of packets for other 2d-class
1361  * calls to be able to use from the packet pool.  But it doesn't.
1362  *
1363  * NB.  Most of the time, this code doesn't run -- since idle server threads
1364  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1365  * as a new call arrives.
1366  */
1367 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1368  * for an rx_Read. */
1369 #ifdef RX_ENABLE_LOCKS
1370 struct rx_call *
1371 rx_GetCall(tno, cur_service, socketp)
1372 int tno;
1373 struct rx_service *cur_service;
1374 osi_socket *socketp;
1375 {
1376     struct rx_serverQueueEntry *sq;
1377     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1378     struct rx_service *service = NULL;
1379     SPLVAR;
1380
1381     MUTEX_ENTER(&freeSQEList_lock);
1382
1383     if ((sq = rx_FreeSQEList)) {
1384         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1385         MUTEX_EXIT(&freeSQEList_lock);
1386     } else {    /* otherwise allocate a new one and return that */
1387         MUTEX_EXIT(&freeSQEList_lock);
1388         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1389         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1390         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1391     }
1392
1393     MUTEX_ENTER(&rx_serverPool_lock);
1394     if (cur_service != NULL) {
1395         ReturnToServerPool(cur_service);
1396     }
1397     while (1) {
1398         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1399             register struct rx_call *tcall, *ncall;
1400             choice2 = (struct rx_call *) 0;
1401             /* Scan for eligible incoming calls.  A call is not eligible
1402              * if the maximum number of calls for its service type are
1403              * already executing */
1404             /* One thread will process calls FCFS (to prevent starvation),
1405              * while the other threads may run ahead looking for calls which
1406              * have all their input data available immediately.  This helps 
1407              * keep threads from blocking, waiting for data from the client. */
1408             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1409               service = tcall->conn->service;
1410               if (!QuotaOK(service)) {
1411                 continue;
1412               }
1413               if (!tno || !tcall->queue_item_header.next  ) {
1414                 /* If we're thread 0, then  we'll just use 
1415                  * this call. If we haven't been able to find an optimal 
1416                  * choice, and we're at the end of the list, then use a 
1417                  * 2d choice if one has been identified.  Otherwise... */
1418                 call = (choice2 ? choice2 : tcall);
1419                 service = call->conn->service;
1420               } else if (!queue_IsEmpty(&tcall->rq)) {
1421                 struct rx_packet *rp;
1422                 rp = queue_First(&tcall->rq, rx_packet);
1423                 if (rp->header.seq == 1) {
1424                   if (!meltdown_1pkt ||             
1425                       (rp->header.flags & RX_LAST_PACKET)) {
1426                     call = tcall;
1427                   } else if (rxi_2dchoice && !choice2 &&
1428                              !(tcall->flags & RX_CALL_CLEARED) &&
1429                              (tcall->rprev > rxi_HardAckRate)) {
1430                     choice2 = tcall;
1431                   } else rxi_md2cnt++;
1432                 }
1433               }
1434               if (call)  {
1435                 break;
1436               } else {
1437                   ReturnToServerPool(service);
1438               }
1439             }
1440           }
1441
1442         if (call) {
1443             queue_Remove(call);
1444             rxi_ServerThreadSelectingCall = 1;
1445             MUTEX_EXIT(&rx_serverPool_lock);
1446             MUTEX_ENTER(&call->lock);
1447             MUTEX_ENTER(&rx_serverPool_lock);
1448
1449             if (queue_IsEmpty(&call->rq) ||
1450                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1451               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1452
1453             CLEAR_CALL_QUEUE_LOCK(call);
1454             if (call->error) {
1455                 MUTEX_EXIT(&call->lock);
1456                 ReturnToServerPool(service);
1457                 rxi_ServerThreadSelectingCall = 0;
1458                 CV_SIGNAL(&rx_serverPool_cv);
1459                 call = (struct rx_call*)0;
1460                 continue;
1461             }
1462             call->flags &= (~RX_CALL_WAIT_PROC);
1463             MUTEX_ENTER(&rx_stats_mutex);
1464             rx_nWaiting--;
1465             MUTEX_EXIT(&rx_stats_mutex);
1466             rxi_ServerThreadSelectingCall = 0;
1467             CV_SIGNAL(&rx_serverPool_cv);
1468             MUTEX_EXIT(&rx_serverPool_lock);
1469             break;
1470         }
1471         else {
1472             /* If there are no eligible incoming calls, add this process
1473              * to the idle server queue, to wait for one */
1474             sq->newcall = 0;
1475             sq->tno = tno;
1476             if (socketp) {
1477                 *socketp = OSI_NULLSOCKET;
1478             }
1479             sq->socketp = socketp;
1480             queue_Append(&rx_idleServerQueue, sq);
1481 #ifndef AFS_AIX41_ENV
1482             rx_waitForPacket = sq;
1483 #endif /* AFS_AIX41_ENV */
1484             do {
1485                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1486 #ifdef  KERNEL
1487                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1488                     MUTEX_EXIT(&rx_serverPool_lock);
1489                     return (struct rx_call *)0;
1490                 }
1491 #endif
1492             } while (!(call = sq->newcall) &&
1493                      !(socketp && *socketp != OSI_NULLSOCKET));
1494             MUTEX_EXIT(&rx_serverPool_lock);
1495             if (call) {
1496                 MUTEX_ENTER(&call->lock);
1497             }
1498             break;
1499         }
1500     }
1501
1502     MUTEX_ENTER(&freeSQEList_lock);
1503     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1504     rx_FreeSQEList = sq;
1505     MUTEX_EXIT(&freeSQEList_lock);
1506
1507     if (call) {
1508         clock_GetTime(&call->startTime);
1509         call->state = RX_STATE_ACTIVE;
1510         call->mode = RX_MODE_RECEIVING;
1511
1512         rxi_calltrace(RX_CALL_START, call);
1513         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1514              call->conn->service->servicePort, 
1515              call->conn->service->serviceId, call));
1516
1517         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1518         MUTEX_EXIT(&call->lock);
1519     } else {
1520         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1521     }
1522
1523     return call;
1524 }
1525 #else /* RX_ENABLE_LOCKS */
1526 struct rx_call *
1527 rx_GetCall(tno, cur_service, socketp)
1528   int tno;
1529   struct rx_service *cur_service;
1530   osi_socket *socketp;
1531 {
1532     struct rx_serverQueueEntry *sq;
1533     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1534     struct rx_service *service = NULL;
1535     SPLVAR;
1536
1537     NETPRI;
1538     AFS_RXGLOCK();
1539     MUTEX_ENTER(&freeSQEList_lock);
1540
1541     if ((sq = rx_FreeSQEList)) {
1542         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1543         MUTEX_EXIT(&freeSQEList_lock);
1544     } else {    /* otherwise allocate a new one and return that */
1545         MUTEX_EXIT(&freeSQEList_lock);
1546         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1547         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1548         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1549     }
1550     MUTEX_ENTER(&sq->lock);
1551
1552     if (cur_service != NULL) {
1553         cur_service->nRequestsRunning--;
1554         if (cur_service->nRequestsRunning < cur_service->minProcs)
1555             rxi_minDeficit++;
1556         rxi_availProcs++;
1557     }
1558     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1559         register struct rx_call *tcall, *ncall;
1560         /* Scan for eligible incoming calls.  A call is not eligible
1561          * if the maximum number of calls for its service type are
1562          * already executing */
1563         /* One thread will process calls FCFS (to prevent starvation),
1564          * while the other threads may run ahead looking for calls which
1565          * have all their input data available immediately.  This helps 
1566          * keep threads from blocking, waiting for data from the client. */
1567         choice2 = (struct rx_call *) 0;
1568         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1569           service = tcall->conn->service;
1570           if (QuotaOK(service)) {
1571              if (!tno || !tcall->queue_item_header.next  ) {
1572                  /* If we're thread 0, then  we'll just use 
1573                   * this call. If we haven't been able to find an optimal 
1574                   * choice, and we're at the end of the list, then use a 
1575                   * 2d choice if one has been identified.  Otherwise... */
1576                  call = (choice2 ? choice2 : tcall);
1577                  service = call->conn->service;
1578              } else if (!queue_IsEmpty(&tcall->rq)) {
1579                  struct rx_packet *rp;
1580                  rp = queue_First(&tcall->rq, rx_packet);
1581                  if (rp->header.seq == 1
1582                      && (!meltdown_1pkt ||
1583                          (rp->header.flags & RX_LAST_PACKET))) {
1584                      call = tcall;
1585                  } else if (rxi_2dchoice && !choice2 &&
1586                             !(tcall->flags & RX_CALL_CLEARED) &&
1587                             (tcall->rprev > rxi_HardAckRate)) {
1588                      choice2 = tcall;
1589                  } else rxi_md2cnt++;
1590              }
1591           }
1592           if (call) 
1593              break;
1594         }
1595       }
1596
1597     if (call) {
1598         queue_Remove(call);
1599         /* we can't schedule a call if there's no data!!! */
1600         /* send an ack if there's no data, if we're missing the
1601          * first packet, or we're missing something between first 
1602          * and last -- there's a "hole" in the incoming data. */
1603         if (queue_IsEmpty(&call->rq) ||
1604             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1605             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1606           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1607
1608         call->flags &= (~RX_CALL_WAIT_PROC);
1609         service->nRequestsRunning++;
1610         /* just started call in minProcs pool, need fewer to maintain
1611          * guarantee */
1612         if (service->nRequestsRunning <= service->minProcs)
1613             rxi_minDeficit--;
1614         rxi_availProcs--;
1615         rx_nWaiting--;
1616         /* MUTEX_EXIT(&call->lock); */
1617     }
1618     else {
1619         /* If there are no eligible incoming calls, add this process
1620          * to the idle server queue, to wait for one */
1621         sq->newcall = 0;
1622         if (socketp) {
1623             *socketp = OSI_NULLSOCKET;
1624         }
1625         sq->socketp = socketp;
1626         queue_Append(&rx_idleServerQueue, sq);
1627         do {
1628             osi_rxSleep(sq);
1629 #ifdef  KERNEL
1630                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1631                     AFS_RXGUNLOCK();
1632                     USERPRI;
1633                     return (struct rx_call *)0;
1634                 }
1635 #endif
1636         } while (!(call = sq->newcall) &&
1637                  !(socketp && *socketp != OSI_NULLSOCKET));
1638     }
1639     MUTEX_EXIT(&sq->lock);
1640
1641     MUTEX_ENTER(&freeSQEList_lock);
1642     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1643     rx_FreeSQEList = sq;
1644     MUTEX_EXIT(&freeSQEList_lock);
1645
1646     if (call) {
1647         clock_GetTime(&call->startTime);
1648         call->state = RX_STATE_ACTIVE;
1649         call->mode = RX_MODE_RECEIVING;
1650
1651         rxi_calltrace(RX_CALL_START, call);
1652         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1653          call->conn->service->servicePort, 
1654          call->conn->service->serviceId, call));
1655     } else {
1656         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1657     }
1658
1659     AFS_RXGUNLOCK();
1660     USERPRI;
1661
1662     return call;
1663 }
1664 #endif /* RX_ENABLE_LOCKS */
1665
1666
1667
1668 /* Establish a procedure to be called when a packet arrives for a
1669  * call.  This routine will be called at most once after each call,
1670  * and will also be called if there is an error condition on the or
1671  * the call is complete.  Used by multi rx to build a selection
1672  * function which determines which of several calls is likely to be a
1673  * good one to read from.  
1674  * NOTE: the way this is currently implemented it is probably only a
1675  * good idea to (1) use it immediately after a newcall (clients only)
1676  * and (2) only use it once.  Other uses currently void your warranty
1677  */
1678 void rx_SetArrivalProc(call, proc, handle, arg)
1679     register struct rx_call *call;
1680     register VOID (*proc)();
1681     register VOID *handle;
1682     register VOID *arg;
1683 {
1684     call->arrivalProc = proc;
1685     call->arrivalProcHandle = handle;
1686     call->arrivalProcArg = arg;
1687 }
1688
1689 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1690  * appropriate, and return the final error code from the conversation
1691  * to the caller */
1692
1693 afs_int32 rx_EndCall(call, rc)
1694     register struct rx_call *call;
1695     afs_int32 rc;
1696 {
1697     register struct rx_connection *conn = call->conn;
1698     register struct rx_service *service;
1699     register struct rx_packet *tp; /* Temporary packet pointer */
1700     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1701     afs_int32 error;
1702     SPLVAR;
1703
1704     dpf(("rx_EndCall(call %x)\n", call));
1705
1706     NETPRI;
1707     AFS_RXGLOCK();
1708     MUTEX_ENTER(&call->lock);
1709
1710     if (rc == 0 && call->error == 0) {
1711         call->abortCode = 0;
1712         call->abortCount = 0;
1713     }
1714
1715     call->arrivalProc = (VOID (*)()) 0;
1716     if (rc && call->error == 0) {
1717         rxi_CallError(call, rc);
1718         /* Send an abort message to the peer if this error code has
1719          * only just been set.  If it was set previously, assume the
1720          * peer has already been sent the error code or will request it 
1721          */
1722         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1723     }
1724     if (conn->type == RX_SERVER_CONNECTION) {
1725         /* Make sure reply or at least dummy reply is sent */
1726         if (call->mode == RX_MODE_RECEIVING) {
1727             rxi_WriteProc(call, 0, 0);
1728         }
1729         if (call->mode == RX_MODE_SENDING) {
1730             rxi_FlushWrite(call);
1731         }
1732         service = conn->service;
1733         rxi_calltrace(RX_CALL_END, call);
1734         /* Call goes to hold state until reply packets are acknowledged */
1735         if (call->tfirst + call->nSoftAcked < call->tnext) {
1736             call->state = RX_STATE_HOLD;
1737         } else {
1738             call->state = RX_STATE_DALLY;
1739             rxi_ClearTransmitQueue(call, 0);
1740             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1741             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1742         }
1743     }
1744     else { /* Client connection */
1745         char dummy;
1746         /* Make sure server receives input packets, in the case where
1747          * no reply arguments are expected */
1748         if ((call->mode == RX_MODE_SENDING)
1749          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1750             (void) rxi_ReadProc(call, &dummy, 1);
1751         }
1752         /* We need to release the call lock since it's lower than the
1753          * conn_call_lock and we don't want to hold the conn_call_lock
1754          * over the rx_ReadProc call. The conn_call_lock needs to be held
1755          * here for the case where rx_NewCall is perusing the calls on
1756          * the connection structure. We don't want to signal until
1757          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1758          * have checked this call, found it active and by the time it
1759          * goes to sleep, will have missed the signal.
1760          */
1761         MUTEX_EXIT(&call->lock);
1762         MUTEX_ENTER(&conn->conn_call_lock);
1763         MUTEX_ENTER(&call->lock);
1764         MUTEX_ENTER(&conn->conn_data_lock);
1765         conn->flags |= RX_CONN_BUSY;
1766         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1767             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1768             MUTEX_EXIT(&conn->conn_data_lock);
1769 #ifdef  RX_ENABLE_LOCKS
1770             CV_BROADCAST(&conn->conn_call_cv);
1771 #else
1772             osi_rxWakeup(conn);
1773 #endif
1774         }
1775 #ifdef RX_ENABLE_LOCKS
1776         else {
1777             MUTEX_EXIT(&conn->conn_data_lock);
1778         }
1779 #endif /* RX_ENABLE_LOCKS */
1780         call->state = RX_STATE_DALLY;
1781     }
1782     error = call->error;
1783
1784     /* currentPacket, nLeft, and NFree must be zeroed here, because
1785      * ResetCall cannot: ResetCall may be called at splnet(), in the
1786      * kernel version, and may interrupt the macros rx_Read or
1787      * rx_Write, which run at normal priority for efficiency. */
1788     if (call->currentPacket) {
1789         rxi_FreePacket(call->currentPacket);
1790         call->currentPacket = (struct rx_packet *) 0;
1791         call->nLeft = call->nFree = call->curlen = 0;
1792     }
1793     else
1794         call->nLeft = call->nFree = call->curlen = 0;
1795
1796     /* Free any packets from the last call to ReadvProc/WritevProc */
1797     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1798         queue_Remove(tp);
1799         rxi_FreePacket(tp);
1800     }
1801
1802     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1803     MUTEX_EXIT(&call->lock);
1804     if (conn->type == RX_CLIENT_CONNECTION) {
1805         MUTEX_EXIT(&conn->conn_call_lock);
1806         conn->flags &= ~RX_CONN_BUSY;
1807     }
1808     AFS_RXGUNLOCK();
1809     USERPRI;
1810     /*
1811      * Map errors to the local host's errno.h format.
1812      */
1813     error = ntoh_syserr_conv(error);
1814     return error;
1815 }
1816
1817 #if !defined(KERNEL)
1818
1819 /* Call this routine when shutting down a server or client (especially
1820  * clients).  This will allow Rx to gracefully garbage collect server
1821  * connections, and reduce the number of retries that a server might
1822  * make to a dead client.
1823  * This is not quite right, since some calls may still be ongoing and
1824  * we can't lock them to destroy them. */
1825 void rx_Finalize() {
1826     register struct rx_connection **conn_ptr, **conn_end;
1827
1828     INIT_PTHREAD_LOCKS
1829     LOCK_RX_INIT
1830     if (rxinit_status == 1) {
1831         UNLOCK_RX_INIT
1832         return; /* Already shutdown. */
1833     }
1834     rxi_DeleteCachedConnections();
1835     if (rx_connHashTable) {
1836         MUTEX_ENTER(&rx_connHashTable_lock);
1837         for (conn_ptr = &rx_connHashTable[0], 
1838              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1839              conn_ptr < conn_end; conn_ptr++) {
1840             struct rx_connection *conn, *next;
1841             for (conn = *conn_ptr; conn; conn = next) {
1842                 next = conn->next;
1843                 if (conn->type == RX_CLIENT_CONNECTION) {
1844                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1845                     conn->refCount++;
1846                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1847 #ifdef RX_ENABLE_LOCKS
1848                     rxi_DestroyConnectionNoLock(conn);
1849 #else /* RX_ENABLE_LOCKS */
1850                     rxi_DestroyConnection(conn);
1851 #endif /* RX_ENABLE_LOCKS */
1852                 }
1853             }
1854         }
1855 #ifdef RX_ENABLE_LOCKS
1856         while (rx_connCleanup_list) {
1857             struct rx_connection *conn;
1858             conn = rx_connCleanup_list;
1859             rx_connCleanup_list = rx_connCleanup_list->next;
1860             MUTEX_EXIT(&rx_connHashTable_lock);
1861             rxi_CleanupConnection(conn);
1862             MUTEX_ENTER(&rx_connHashTable_lock);
1863         }
1864         MUTEX_EXIT(&rx_connHashTable_lock);
1865 #endif /* RX_ENABLE_LOCKS */
1866     }
1867     rxi_flushtrace();
1868
1869     rxinit_status = 1;
1870     UNLOCK_RX_INIT
1871 }
1872 #endif
1873
1874 /* if we wakeup packet waiter too often, can get in loop with two
1875     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1876 void
1877 rxi_PacketsUnWait() {
1878
1879     if (!rx_waitingForPackets) {
1880         return;
1881     }
1882 #ifdef KERNEL
1883     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1884         return;                                     /* still over quota */
1885     }
1886 #endif /* KERNEL */
1887     rx_waitingForPackets = 0;
1888 #ifdef  RX_ENABLE_LOCKS
1889     CV_BROADCAST(&rx_waitingForPackets_cv);
1890 #else
1891     osi_rxWakeup(&rx_waitingForPackets);
1892 #endif
1893     return;
1894 }
1895
1896
1897 /* ------------------Internal interfaces------------------------- */
1898
1899 /* Return this process's service structure for the
1900  * specified socket and service */
1901 struct rx_service *rxi_FindService(socket, serviceId)
1902     register osi_socket socket;
1903     register u_short serviceId;
1904 {
1905     register struct rx_service **sp;    
1906     for (sp = &rx_services[0]; *sp; sp++) {
1907         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1908           return *sp;
1909     }
1910     return 0;
1911 }
1912
1913 /* Allocate a call structure, for the indicated channel of the
1914  * supplied connection.  The mode and state of the call must be set by
1915  * the caller. */
1916 struct rx_call *rxi_NewCall(conn, channel)
1917     register struct rx_connection *conn;
1918     register int channel;
1919 {
1920     register struct rx_call *call;
1921 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1922     register struct rx_call *cp;        /* Call pointer temp */
1923     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1924 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1925
1926     /* Grab an existing call structure, or allocate a new one.
1927      * Existing call structures are assumed to have been left reset by
1928      * rxi_FreeCall */
1929     MUTEX_ENTER(&rx_freeCallQueue_lock);
1930
1931 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1932     /*
1933      * EXCEPT that the TQ might not yet be cleared out.
1934      * Skip over those with in-use TQs.
1935      */
1936     call = NULL;
1937     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1938         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1939             call = cp;
1940             break;
1941         }
1942     }
1943     if (call) {
1944 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1945     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1946         call = queue_First(&rx_freeCallQueue, rx_call);
1947 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1948         queue_Remove(call);
1949         MUTEX_ENTER(&rx_stats_mutex);
1950         rx_stats.nFreeCallStructs--;
1951         MUTEX_EXIT(&rx_stats_mutex);
1952         MUTEX_EXIT(&rx_freeCallQueue_lock);
1953         MUTEX_ENTER(&call->lock);
1954         CLEAR_CALL_QUEUE_LOCK(call);
1955 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1956         /* Now, if TQ wasn't cleared earlier, do it now. */
1957         if (call->flags & RX_CALL_TQ_CLEARME) {
1958             rxi_ClearTransmitQueue(call, 0);
1959             queue_Init(&call->tq);
1960         }
1961 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1962         /* Bind the call to its connection structure */
1963         call->conn = conn;
1964         rxi_ResetCall(call, 1);
1965     }
1966     else {
1967         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1968
1969         MUTEX_EXIT(&rx_freeCallQueue_lock);
1970         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1971         MUTEX_ENTER(&call->lock);
1972         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1973         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1974         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1975
1976         MUTEX_ENTER(&rx_stats_mutex);
1977         rx_stats.nCallStructs++;
1978         MUTEX_EXIT(&rx_stats_mutex);
1979         /* Initialize once-only items */
1980         queue_Init(&call->tq);
1981         queue_Init(&call->rq);
1982         queue_Init(&call->iovq);
1983         /* Bind the call to its connection structure (prereq for reset) */
1984         call->conn = conn;
1985         rxi_ResetCall(call, 1);
1986     }
1987     call->channel = channel;
1988     call->callNumber = &conn->callNumber[channel];
1989     /* Note that the next expected call number is retained (in
1990      * conn->callNumber[i]), even if we reallocate the call structure
1991      */
1992     conn->call[channel] = call;
1993     /* if the channel's never been used (== 0), we should start at 1, otherwise
1994         the call number is valid from the last time this channel was used */
1995     if (*call->callNumber == 0) *call->callNumber = 1;
1996
1997     MUTEX_EXIT(&call->lock);
1998     return call;
1999 }
2000
2001 /* A call has been inactive long enough that so we can throw away
2002  * state, including the call structure, which is placed on the call
2003  * free list.
2004  * Call is locked upon entry.
2005  */
2006 #ifdef RX_ENABLE_LOCKS
2007 void rxi_FreeCall(call, haveCTLock)
2008     int haveCTLock; /* Set if called from rxi_ReapConnections */
2009 #else /* RX_ENABLE_LOCKS */
2010 void rxi_FreeCall(call)
2011 #endif /* RX_ENABLE_LOCKS */
2012     register struct rx_call *call;
2013 {
2014     register int channel = call->channel;
2015     register struct rx_connection *conn = call->conn;
2016
2017
2018     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2019       (*call->callNumber)++;
2020     rxi_ResetCall(call, 0);
2021     call->conn->call[channel] = (struct rx_call *) 0;
2022
2023     MUTEX_ENTER(&rx_freeCallQueue_lock);
2024     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2025 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2026     /* A call may be free even though its transmit queue is still in use.
2027      * Since we search the call list from head to tail, put busy calls at
2028      * the head of the list, and idle calls at the tail.
2029      */
2030     if (call->flags & RX_CALL_TQ_BUSY)
2031         queue_Prepend(&rx_freeCallQueue, call);
2032     else
2033         queue_Append(&rx_freeCallQueue, call);
2034 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2035     queue_Append(&rx_freeCallQueue, call);
2036 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2037     MUTEX_ENTER(&rx_stats_mutex);
2038     rx_stats.nFreeCallStructs++;
2039     MUTEX_EXIT(&rx_stats_mutex);
2040
2041     MUTEX_EXIT(&rx_freeCallQueue_lock);
2042  
2043     /* Destroy the connection if it was previously slated for
2044      * destruction, i.e. the Rx client code previously called
2045      * rx_DestroyConnection (client connections), or
2046      * rxi_ReapConnections called the same routine (server
2047      * connections).  Only do this, however, if there are no
2048      * outstanding calls. Note that for fine grain locking, there appears
2049      * to be a deadlock in that rxi_FreeCall has a call locked and
2050      * DestroyConnectionNoLock locks each call in the conn. But note a
2051      * few lines up where we have removed this call from the conn.
2052      * If someone else destroys a connection, they either have no
2053      * call lock held or are going through this section of code.
2054      */
2055     if (conn->flags & RX_CONN_DESTROY_ME) {
2056         MUTEX_ENTER(&conn->conn_data_lock);
2057         conn->refCount++;
2058         MUTEX_EXIT(&conn->conn_data_lock);
2059 #ifdef RX_ENABLE_LOCKS
2060         if (haveCTLock)
2061             rxi_DestroyConnectionNoLock(conn);
2062         else
2063             rxi_DestroyConnection(conn);
2064 #else /* RX_ENABLE_LOCKS */
2065         rxi_DestroyConnection(conn);
2066 #endif /* RX_ENABLE_LOCKS */
2067     }
2068 }
2069
2070 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2071 char *rxi_Alloc(size)
2072 register size_t size;
2073 {
2074     register char *p;
2075
2076 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2077     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2078      * implementation.
2079      */
2080     int glockOwner = ISAFS_GLOCK();
2081     if (!glockOwner)
2082         AFS_GLOCK();
2083 #endif
2084     MUTEX_ENTER(&rx_stats_mutex);
2085     rxi_Alloccnt++; rxi_Allocsize += size;
2086     MUTEX_EXIT(&rx_stats_mutex);
2087 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2088     if (size > AFS_SMALLOCSIZ) {
2089         p = (char *) osi_AllocMediumSpace(size);
2090     } else
2091         p = (char *) osi_AllocSmall(size, 1);
2092 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2093     if (!glockOwner)
2094         AFS_GUNLOCK();
2095 #endif
2096 #else
2097     p = (char *) osi_Alloc(size);
2098 #endif
2099     if (!p) osi_Panic("rxi_Alloc error");
2100     bzero(p, size);
2101     return p;
2102 }
2103
2104 void rxi_Free(addr, size)
2105 void *addr;
2106 register size_t size;
2107 {
2108 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2109     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2110      * implementation.
2111      */
2112     int glockOwner = ISAFS_GLOCK();
2113     if (!glockOwner)
2114         AFS_GLOCK();
2115 #endif
2116     MUTEX_ENTER(&rx_stats_mutex);
2117     rxi_Alloccnt--; rxi_Allocsize -= size;
2118     MUTEX_EXIT(&rx_stats_mutex);
2119 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2120     if (size > AFS_SMALLOCSIZ)
2121         osi_FreeMediumSpace(addr);
2122     else
2123         osi_FreeSmall(addr);
2124 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2125     if (!glockOwner)
2126         AFS_GUNLOCK();
2127 #endif
2128 #else
2129     osi_Free(addr, size);
2130 #endif    
2131 }
2132
2133 /* Find the peer process represented by the supplied (host,port)
2134  * combination.  If there is no appropriate active peer structure, a
2135  * new one will be allocated and initialized 
2136  * The origPeer, if set, is a pointer to a peer structure on which the
2137  * refcount will be be decremented. This is used to replace the peer
2138  * structure hanging off a connection structure */
2139 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2140     register afs_uint32 host;
2141     register u_short port;
2142     struct rx_peer *origPeer;
2143     int create;
2144 {
2145     register struct rx_peer *pp;
2146     int hashIndex;
2147     hashIndex = PEER_HASH(host, port);
2148     MUTEX_ENTER(&rx_peerHashTable_lock);
2149     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2150         if ((pp->host == host) && (pp->port == port)) break;
2151     }
2152     if (!pp) {
2153         if (create) {
2154             pp = rxi_AllocPeer(); /* This bzero's *pp */
2155             pp->host = host;      /* set here or in InitPeerParams is zero */
2156             pp->port = port;
2157             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2158             queue_Init(&pp->congestionQueue);
2159             queue_Init(&pp->rpcStats);
2160             pp->next = rx_peerHashTable[hashIndex];
2161             rx_peerHashTable[hashIndex] = pp;
2162             rxi_InitPeerParams(pp);
2163             MUTEX_ENTER(&rx_stats_mutex);
2164             rx_stats.nPeerStructs++;
2165             MUTEX_EXIT(&rx_stats_mutex);
2166         }
2167     }
2168     if (pp && create) {
2169         pp->refCount++;
2170     }
2171     if ( origPeer)
2172         origPeer->refCount--;
2173     MUTEX_EXIT(&rx_peerHashTable_lock);
2174     return pp;
2175 }
2176
2177
2178 /* Find the connection at (host, port) started at epoch, and with the
2179  * given connection id.  Creates the server connection if necessary.
2180  * The type specifies whether a client connection or a server
2181  * connection is desired.  In both cases, (host, port) specify the
2182  * peer's (host, pair) pair.  Client connections are not made
2183  * automatically by this routine.  The parameter socket gives the
2184  * socket descriptor on which the packet was received.  This is used,
2185  * in the case of server connections, to check that *new* connections
2186  * come via a valid (port, serviceId).  Finally, the securityIndex
2187  * parameter must match the existing index for the connection.  If a
2188  * server connection is created, it will be created using the supplied
2189  * index, if the index is valid for this service */
2190 struct rx_connection *
2191 rxi_FindConnection(socket, host, port, serviceId, cid, 
2192                    epoch, type, securityIndex)
2193     osi_socket socket;
2194     register afs_int32 host;
2195     register u_short port;
2196     u_short serviceId;
2197     afs_uint32 cid;
2198     afs_uint32 epoch;
2199     int type;
2200     u_int securityIndex;
2201 {
2202     int hashindex, flag;
2203     register struct rx_connection *conn;
2204     struct rx_peer *peer;
2205     hashindex = CONN_HASH(host, port, cid, epoch, type);
2206     MUTEX_ENTER(&rx_connHashTable_lock);
2207     rxLastConn ? (conn = rxLastConn, flag = 0) :
2208                  (conn = rx_connHashTable[hashindex], flag = 1);
2209     for (; conn; ) {
2210       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2211           && (epoch == conn->epoch)) {
2212         register struct rx_peer *pp = conn->peer;
2213         if (securityIndex != conn->securityIndex) {
2214             /* this isn't supposed to happen, but someone could forge a packet
2215                like this, and there seems to be some CM bug that makes this
2216                happen from time to time -- in which case, the fileserver
2217                asserts. */  
2218             MUTEX_EXIT(&rx_connHashTable_lock);
2219             return (struct rx_connection *) 0;
2220         }
2221         /* epoch's high order bits mean route for security reasons only on
2222          * the cid, not the host and port fields.
2223          */
2224         if (conn->epoch & 0x80000000) break;
2225         if (((type == RX_CLIENT_CONNECTION) 
2226              || (pp->host == host)) && (pp->port == port))
2227           break;
2228       }
2229       if ( !flag )
2230       {
2231         /* the connection rxLastConn that was used the last time is not the
2232         ** one we are looking for now. Hence, start searching in the hash */
2233         flag = 1;
2234         conn = rx_connHashTable[hashindex];
2235       }
2236       else
2237         conn = conn->next;
2238     }
2239     if (!conn) {
2240         struct rx_service *service;
2241         if (type == RX_CLIENT_CONNECTION) {
2242             MUTEX_EXIT(&rx_connHashTable_lock);
2243             return (struct rx_connection *) 0;
2244         }
2245         service = rxi_FindService(socket, serviceId);
2246         if (!service || (securityIndex >= service->nSecurityObjects) 
2247             || (service->securityObjects[securityIndex] == 0)) {
2248             MUTEX_EXIT(&rx_connHashTable_lock);
2249             return (struct rx_connection *) 0;
2250         }
2251         conn = rxi_AllocConnection(); /* This bzero's the connection */
2252         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2253                    MUTEX_DEFAULT,0);
2254         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2255                    MUTEX_DEFAULT,0);
2256         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2257         conn->next = rx_connHashTable[hashindex];
2258         rx_connHashTable[hashindex] = conn;
2259         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2260         conn->type = RX_SERVER_CONNECTION;
2261         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2262         conn->epoch = epoch;
2263         conn->cid = cid & RX_CIDMASK;
2264         /* conn->serial = conn->lastSerial = 0; */
2265         /* conn->timeout = 0; */
2266         conn->ackRate = RX_FAST_ACK_RATE;
2267         conn->service = service;
2268         conn->serviceId = serviceId;
2269         conn->securityIndex = securityIndex;
2270         conn->securityObject = service->securityObjects[securityIndex];
2271         conn->nSpecific = 0;
2272         conn->specific = NULL;
2273         rx_SetConnDeadTime(conn, service->connDeadTime);
2274         /* Notify security object of the new connection */
2275         RXS_NewConnection(conn->securityObject, conn);
2276         /* XXXX Connection timeout? */
2277         if (service->newConnProc) (*service->newConnProc)(conn);
2278         MUTEX_ENTER(&rx_stats_mutex);
2279         rx_stats.nServerConns++;
2280         MUTEX_EXIT(&rx_stats_mutex);
2281     }
2282     else
2283     {
2284     /* Ensure that the peer structure is set up in such a way that
2285     ** replies in this connection go back to that remote interface
2286     ** from which the last packet was sent out. In case, this packet's
2287     ** source IP address does not match the peer struct for this conn,
2288     ** then drop the refCount on conn->peer and get a new peer structure.
2289     ** We can check the host,port field in the peer structure without the
2290     ** rx_peerHashTable_lock because the peer structure has its refCount
2291     ** incremented and the only time the host,port in the peer struct gets
2292     ** updated is when the peer structure is created.
2293     */
2294         if (conn->peer->host == host )
2295                 peer = conn->peer; /* no change to the peer structure */
2296         else
2297                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2298     }
2299
2300     MUTEX_ENTER(&conn->conn_data_lock);
2301     conn->refCount++;
2302     conn->peer = peer;
2303     MUTEX_EXIT(&conn->conn_data_lock);
2304
2305     rxLastConn = conn;  /* store this connection as the last conn used */
2306     MUTEX_EXIT(&rx_connHashTable_lock);
2307     return conn;
2308 }
2309
2310 /* There are two packet tracing routines available for testing and monitoring
2311  * Rx.  One is called just after every packet is received and the other is
2312  * called just before every packet is sent.  Received packets, have had their
2313  * headers decoded, and packets to be sent have not yet had their headers
2314  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2315  * containing the network address.  Both can be modified.  The return value, if
2316  * non-zero, indicates that the packet should be dropped.  */
2317
2318 int (*rx_justReceived)() = 0;
2319 int (*rx_almostSent)() = 0;
2320
2321 /* A packet has been received off the interface.  Np is the packet, socket is
2322  * the socket number it was received from (useful in determining which service
2323  * this packet corresponds to), and (host, port) reflect the host,port of the
2324  * sender.  This call returns the packet to the caller if it is finished with
2325  * it, rather than de-allocating it, just as a small performance hack */
2326
2327 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2328     register struct rx_packet *np;
2329     osi_socket socket;
2330     afs_uint32 host;
2331     u_short port;
2332     int *tnop;
2333     struct rx_call **newcallp;
2334 {
2335     register struct rx_call *call;
2336     register struct rx_connection *conn;
2337     int channel;
2338     afs_uint32 currentCallNumber;
2339     int type;
2340     int skew;
2341 #ifdef RXDEBUG
2342     char *packetType;
2343 #endif
2344     struct rx_packet *tnp;
2345
2346 #ifdef RXDEBUG
2347 /* We don't print out the packet until now because (1) the time may not be
2348  * accurate enough until now in the lwp implementation (rx_Listener only gets
2349  * the time after the packet is read) and (2) from a protocol point of view,
2350  * this is the first time the packet has been seen */
2351     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2352         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2353     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2354          np->header.serial, packetType, host, port, np->header.serviceId,
2355          np->header.epoch, np->header.cid, np->header.callNumber, 
2356          np->header.seq, np->header.flags, np));
2357 #endif
2358
2359     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2360       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2361     }
2362
2363     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2364         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2365     }
2366 #ifdef RXDEBUG
2367     /* If an input tracer function is defined, call it with the packet and
2368      * network address.  Note this function may modify its arguments. */
2369     if (rx_justReceived) {
2370         struct sockaddr_in addr;
2371         int drop;
2372         addr.sin_family = AF_INET;
2373         addr.sin_port = port;
2374         addr.sin_addr.s_addr = host;
2375 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2376         addr.sin_len = sizeof(addr);
2377 #endif  /* AFS_OSF_ENV */
2378         drop = (*rx_justReceived) (np, &addr);
2379         /* drop packet if return value is non-zero */
2380         if (drop) return np;
2381         port = addr.sin_port;           /* in case fcn changed addr */
2382         host = addr.sin_addr.s_addr;
2383     }
2384 #endif
2385
2386     /* If packet was not sent by the client, then *we* must be the client */
2387     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2388         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2389
2390     /* Find the connection (or fabricate one, if we're the server & if
2391      * necessary) associated with this packet */
2392     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2393                               np->header.cid, np->header.epoch, type, 
2394                               np->header.securityIndex);
2395
2396     if (!conn) {
2397       /* If no connection found or fabricated, just ignore the packet.
2398        * (An argument could be made for sending an abort packet for
2399        * the conn) */
2400       return np;
2401     }   
2402
2403     MUTEX_ENTER(&conn->conn_data_lock);
2404     if (conn->maxSerial < np->header.serial)
2405       conn->maxSerial = np->header.serial;
2406     MUTEX_EXIT(&conn->conn_data_lock);
2407
2408     /* If the connection is in an error state, send an abort packet and ignore
2409      * the incoming packet */
2410     if (conn->error) {
2411         /* Don't respond to an abort packet--we don't want loops! */
2412         MUTEX_ENTER(&conn->conn_data_lock);
2413         if (np->header.type != RX_PACKET_TYPE_ABORT)
2414             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2415         conn->refCount--;
2416         MUTEX_EXIT(&conn->conn_data_lock);
2417         return np;
2418     }
2419
2420     /* Check for connection-only requests (i.e. not call specific). */
2421     if (np->header.callNumber == 0) {
2422         switch (np->header.type) {
2423             case RX_PACKET_TYPE_ABORT:
2424                 /* What if the supplied error is zero? */
2425                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2426                 MUTEX_ENTER(&conn->conn_data_lock);
2427                 conn->refCount--;
2428                 MUTEX_EXIT(&conn->conn_data_lock);
2429                 return np;
2430             case RX_PACKET_TYPE_CHALLENGE:
2431                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2432                 MUTEX_ENTER(&conn->conn_data_lock);
2433                 conn->refCount--;
2434                 MUTEX_EXIT(&conn->conn_data_lock);
2435                 return tnp;
2436             case RX_PACKET_TYPE_RESPONSE:
2437                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2438                 MUTEX_ENTER(&conn->conn_data_lock);
2439                 conn->refCount--;
2440                 MUTEX_EXIT(&conn->conn_data_lock);
2441                 return tnp;
2442             case RX_PACKET_TYPE_PARAMS:
2443             case RX_PACKET_TYPE_PARAMS+1:
2444             case RX_PACKET_TYPE_PARAMS+2:
2445                 /* ignore these packet types for now */
2446                 MUTEX_ENTER(&conn->conn_data_lock);
2447                 conn->refCount--;
2448                 MUTEX_EXIT(&conn->conn_data_lock);
2449                 return np;
2450
2451
2452             default:
2453                 /* Should not reach here, unless the peer is broken: send an
2454                  * abort packet */
2455                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2456                 MUTEX_ENTER(&conn->conn_data_lock);
2457                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2458                 conn->refCount--;
2459                 MUTEX_EXIT(&conn->conn_data_lock);
2460                 return tnp;
2461         }
2462     }
2463
2464     channel = np->header.cid & RX_CHANNELMASK;
2465     call = conn->call[channel];
2466 #ifdef  RX_ENABLE_LOCKS
2467     if (call)
2468         MUTEX_ENTER(&call->lock);
2469     /* Test to see if call struct is still attached to conn. */
2470     if (call != conn->call[channel]) {
2471         if (call)
2472             MUTEX_EXIT(&call->lock);
2473         if (type == RX_SERVER_CONNECTION) {
2474             call = conn->call[channel];
2475             /* If we started with no call attached and there is one now,
2476              * another thread is also running this routine and has gotten
2477              * the connection channel. We should drop this packet in the tests
2478              * below. If there was a call on this connection and it's now
2479              * gone, then we'll be making a new call below.
2480              * If there was previously a call and it's now different then
2481              * the old call was freed and another thread running this routine
2482              * has created a call on this channel. One of these two threads
2483              * has a packet for the old call and the code below handles those
2484              * cases.
2485              */
2486             if (call)
2487                 MUTEX_ENTER(&call->lock);
2488         }
2489         else {
2490             /* This packet can't be for this call. If the new call address is
2491              * 0 then no call is running on this channel. If there is a call
2492              * then, since this is a client connection we're getting data for
2493              * it must be for the previous call.
2494              */
2495             MUTEX_ENTER(&rx_stats_mutex);
2496             rx_stats.spuriousPacketsRead++;
2497             MUTEX_EXIT(&rx_stats_mutex);
2498             MUTEX_ENTER(&conn->conn_data_lock);
2499             conn->refCount--;
2500             MUTEX_EXIT(&conn->conn_data_lock);
2501             return np;
2502         }
2503     }
2504 #endif
2505     currentCallNumber = conn->callNumber[channel];
2506
2507     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2508         if (np->header.callNumber < currentCallNumber) {
2509             MUTEX_ENTER(&rx_stats_mutex);
2510             rx_stats.spuriousPacketsRead++;
2511             MUTEX_EXIT(&rx_stats_mutex);
2512 #ifdef  RX_ENABLE_LOCKS
2513             if (call)
2514                 MUTEX_EXIT(&call->lock);
2515 #endif
2516             MUTEX_ENTER(&conn->conn_data_lock);
2517             conn->refCount--;
2518             MUTEX_EXIT(&conn->conn_data_lock);
2519             return np;
2520         }
2521         if (!call) {
2522             call = rxi_NewCall(conn, channel);
2523             MUTEX_ENTER(&call->lock);
2524             *call->callNumber = np->header.callNumber;
2525             call->state = RX_STATE_PRECALL;
2526             clock_GetTime(&call->queueTime);
2527             hzero(call->bytesSent);
2528             hzero(call->bytesRcvd);
2529             rxi_KeepAliveOn(call);
2530         }
2531         else if (np->header.callNumber != currentCallNumber) {
2532             /* Wait until the transmit queue is idle before deciding
2533              * whether to reset the current call. Chances are that the
2534              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2535              * flag is cleared.
2536              */
2537 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2538             while ((call->state == RX_STATE_ACTIVE) &&
2539                    (call->flags & RX_CALL_TQ_BUSY)) {
2540                 call->flags |= RX_CALL_TQ_WAIT;
2541 #ifdef RX_ENABLE_LOCKS
2542                 CV_WAIT(&call->cv_tq, &call->lock);
2543 #else /* RX_ENABLE_LOCKS */
2544                 osi_rxSleep(&call->tq);
2545 #endif /* RX_ENABLE_LOCKS */
2546             }
2547 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2548             /* If the new call cannot be taken right now send a busy and set
2549              * the error condition in this call, so that it terminates as
2550              * quickly as possible */
2551             if (call->state == RX_STATE_ACTIVE) {
2552                 struct rx_packet *tp;
2553
2554                 rxi_CallError(call, RX_CALL_DEAD);
2555                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2556                 MUTEX_EXIT(&call->lock);
2557                 MUTEX_ENTER(&conn->conn_data_lock);
2558                 conn->refCount--;
2559                 MUTEX_EXIT(&conn->conn_data_lock);
2560                 return tp;
2561             }
2562             rxi_ResetCall(call, 0);
2563             *call->callNumber = np->header.callNumber;
2564             call->state = RX_STATE_PRECALL;
2565             clock_GetTime(&call->queueTime);
2566             hzero(call->bytesSent);
2567             hzero(call->bytesRcvd);
2568             /*
2569              * If the number of queued calls exceeds the overload
2570              * threshold then abort this call.
2571              */
2572             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2573                 struct rx_packet *tp;
2574
2575                 rxi_CallError(call, rx_BusyError);
2576                 tp = rxi_SendCallAbort(call, np, 1, 0);
2577                 MUTEX_EXIT(&call->lock);
2578                 MUTEX_ENTER(&conn->conn_data_lock);
2579                 conn->refCount--;
2580                 MUTEX_EXIT(&conn->conn_data_lock);
2581                 return tp;
2582             }
2583             rxi_KeepAliveOn(call);
2584         }
2585         else {
2586             /* Continuing call; do nothing here. */
2587         }
2588     } else { /* we're the client */
2589         /* Ignore all incoming acknowledgements for calls in DALLY state */
2590         if ( call && (call->state == RX_STATE_DALLY) 
2591          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2592             MUTEX_ENTER(&rx_stats_mutex);
2593             rx_stats.ignorePacketDally++;
2594             MUTEX_EXIT(&rx_stats_mutex);
2595 #ifdef  RX_ENABLE_LOCKS
2596             if (call) {
2597                 MUTEX_EXIT(&call->lock);
2598             }
2599 #endif
2600             MUTEX_ENTER(&conn->conn_data_lock);
2601             conn->refCount--;
2602             MUTEX_EXIT(&conn->conn_data_lock);
2603             return np;
2604         }
2605         
2606         /* Ignore anything that's not relevant to the current call.  If there
2607          * isn't a current call, then no packet is relevant. */
2608         if (!call || (np->header.callNumber != currentCallNumber)) {
2609             MUTEX_ENTER(&rx_stats_mutex);
2610             rx_stats.spuriousPacketsRead++;
2611             MUTEX_EXIT(&rx_stats_mutex);
2612 #ifdef  RX_ENABLE_LOCKS
2613             if (call) {
2614                 MUTEX_EXIT(&call->lock);
2615             }
2616 #endif
2617             MUTEX_ENTER(&conn->conn_data_lock);
2618             conn->refCount--;
2619             MUTEX_EXIT(&conn->conn_data_lock);
2620             return np;  
2621         }
2622         /* If the service security object index stamped in the packet does not
2623          * match the connection's security index, ignore the packet */
2624         if (np->header.securityIndex != conn->securityIndex) {
2625 #ifdef  RX_ENABLE_LOCKS
2626             MUTEX_EXIT(&call->lock);
2627 #endif
2628             MUTEX_ENTER(&conn->conn_data_lock);
2629             conn->refCount--;       
2630             MUTEX_EXIT(&conn->conn_data_lock);
2631             return np;
2632         }
2633
2634         /* If we're receiving the response, then all transmit packets are
2635          * implicitly acknowledged.  Get rid of them. */
2636         if (np->header.type == RX_PACKET_TYPE_DATA) {
2637 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2638             /* XXX Hack. Because we must release the global rx lock when
2639              * sending packets (osi_NetSend) we drop all acks while we're
2640              * traversing the tq in rxi_Start sending packets out because
2641              * packets may move to the freePacketQueue as result of being here!
2642              * So we drop these packets until we're safely out of the
2643              * traversing. Really ugly! 
2644              * For fine grain RX locking, we set the acked field in the
2645              * packets and let rxi_Start remove them from the transmit queue.
2646              */
2647             if (call->flags & RX_CALL_TQ_BUSY) {
2648 #ifdef  RX_ENABLE_LOCKS
2649                 rxi_SetAcksInTransmitQueue(call);
2650 #else
2651                 conn->refCount--;
2652                 return np;              /* xmitting; drop packet */
2653 #endif
2654             }
2655             else {
2656                 rxi_ClearTransmitQueue(call, 0);
2657             }
2658 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2659             rxi_ClearTransmitQueue(call, 0);
2660 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2661         } else {
2662           if (np->header.type == RX_PACKET_TYPE_ACK) {
2663         /* now check to see if this is an ack packet acknowledging that the
2664          * server actually *lost* some hard-acked data.  If this happens we
2665          * ignore this packet, as it may indicate that the server restarted in
2666          * the middle of a call.  It is also possible that this is an old ack
2667          * packet.  We don't abort the connection in this case, because this
2668          * *might* just be an old ack packet.  The right way to detect a server
2669          * restart in the midst of a call is to notice that the server epoch
2670          * changed, btw.  */
2671         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2672          * XXX unacknowledged.  I think that this is off-by-one, but
2673          * XXX I don't dare change it just yet, since it will
2674          * XXX interact badly with the server-restart detection 
2675          * XXX code in receiveackpacket.  */
2676             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2677                 MUTEX_ENTER(&rx_stats_mutex);
2678                 rx_stats.spuriousPacketsRead++;
2679                 MUTEX_EXIT(&rx_stats_mutex);
2680                 MUTEX_EXIT(&call->lock);
2681                 MUTEX_ENTER(&conn->conn_data_lock);
2682                 conn->refCount--;
2683                 MUTEX_EXIT(&conn->conn_data_lock);
2684                 return np;
2685             }
2686           }
2687         } /* else not a data packet */
2688     }
2689
2690     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2691     /* Set remote user defined status from packet */
2692     call->remoteStatus = np->header.userStatus;
2693
2694     /* Note the gap between the expected next packet and the actual
2695      * packet that arrived, when the new packet has a smaller serial number
2696      * than expected.  Rioses frequently reorder packets all by themselves,
2697      * so this will be quite important with very large window sizes.
2698      * Skew is checked against 0 here to avoid any dependence on the type of
2699      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2700      * true! 
2701      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2702      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2703      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2704      */
2705     MUTEX_ENTER(&conn->conn_data_lock);
2706     skew = conn->lastSerial - np->header.serial;
2707     conn->lastSerial = np->header.serial;
2708     MUTEX_EXIT(&conn->conn_data_lock);
2709     if (skew > 0) {
2710       register struct rx_peer *peer;
2711       peer = conn->peer;
2712       if (skew > peer->inPacketSkew) {
2713         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2714         peer->inPacketSkew = skew;
2715       }
2716     }
2717
2718     /* Now do packet type-specific processing */
2719     switch (np->header.type) {
2720         case RX_PACKET_TYPE_DATA:
2721             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2722                                        tnop, newcallp);
2723             break;
2724         case RX_PACKET_TYPE_ACK:
2725             /* Respond immediately to ack packets requesting acknowledgement
2726              * (ping packets) */
2727             if (np->header.flags & RX_REQUEST_ACK) {
2728                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2729                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2730             }
2731             np = rxi_ReceiveAckPacket(call, np, 1);
2732             break;
2733         case RX_PACKET_TYPE_ABORT:
2734             /* An abort packet: reset the connection, passing the error up to
2735              * the user */
2736             /* What if error is zero? */
2737             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2738             break;
2739         case RX_PACKET_TYPE_BUSY:
2740             /* XXXX */
2741             break;
2742         case RX_PACKET_TYPE_ACKALL:
2743             /* All packets acknowledged, so we can drop all packets previously
2744              * readied for sending */
2745 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2746             /* XXX Hack. We because we can't release the global rx lock when
2747              * sending packets (osi_NetSend) we drop all ack pkts while we're
2748              * traversing the tq in rxi_Start sending packets out because
2749              * packets may move to the freePacketQueue as result of being
2750              * here! So we drop these packets until we're safely out of the
2751              * traversing. Really ugly! 
2752              * For fine grain RX locking, we set the acked field in the packets
2753              * and let rxi_Start remove the packets from the transmit queue.
2754              */
2755             if (call->flags & RX_CALL_TQ_BUSY) {
2756 #ifdef  RX_ENABLE_LOCKS
2757                 rxi_SetAcksInTransmitQueue(call);
2758                 break;
2759 #else /* RX_ENABLE_LOCKS */
2760                 conn->refCount--;
2761                 return np;              /* xmitting; drop packet */
2762 #endif /* RX_ENABLE_LOCKS */
2763             }
2764 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2765             rxi_ClearTransmitQueue(call, 0);
2766             break;
2767         default:
2768             /* Should not reach here, unless the peer is broken: send an abort
2769              * packet */
2770             rxi_CallError(call, RX_PROTOCOL_ERROR);
2771             np = rxi_SendCallAbort(call, np, 1, 0);
2772             break;
2773     };
2774     /* Note when this last legitimate packet was received, for keep-alive
2775      * processing.  Note, we delay getting the time until now in the hope that
2776      * the packet will be delivered to the user before any get time is required
2777      * (if not, then the time won't actually be re-evaluated here). */
2778     call->lastReceiveTime = clock_Sec();
2779     MUTEX_EXIT(&call->lock);
2780     MUTEX_ENTER(&conn->conn_data_lock);
2781     conn->refCount--;
2782     MUTEX_EXIT(&conn->conn_data_lock);
2783     return np;
2784 }
2785
2786 /* return true if this is an "interesting" connection from the point of view
2787     of someone trying to debug the system */
2788 int rxi_IsConnInteresting(struct rx_connection *aconn)
2789 {
2790     register int i;
2791     register struct rx_call *tcall;
2792
2793     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2794         return 1;
2795     for(i=0;i<RX_MAXCALLS;i++) {
2796         tcall = aconn->call[i];
2797         if (tcall) {
2798             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2799                 return 1;
2800             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2801                 return 1;
2802         }
2803     }
2804     return 0;
2805 }
2806
2807 #ifdef KERNEL
2808 /* if this is one of the last few packets AND it wouldn't be used by the
2809    receiving call to immediately satisfy a read request, then drop it on
2810    the floor, since accepting it might prevent a lock-holding thread from
2811    making progress in its reading. If a call has been cleared while in
2812    the precall state then ignore all subsequent packets until the call
2813    is assigned to a thread. */
2814
2815 static TooLow(ap, acall)
2816   struct rx_call *acall;
2817   struct rx_packet *ap; {
2818     int rc=0;
2819     MUTEX_ENTER(&rx_stats_mutex);
2820     if (((ap->header.seq != 1) &&
2821          (acall->flags & RX_CALL_CLEARED) &&
2822          (acall->state == RX_STATE_PRECALL)) ||
2823         ((rx_nFreePackets < rxi_dataQuota+2) &&
2824          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2825            && (acall->flags & RX_CALL_READER_WAIT)))) {
2826         rc = 1;
2827     }
2828     MUTEX_EXIT(&rx_stats_mutex);
2829     return rc;
2830 }
2831 #endif /* KERNEL */
2832
2833 /* try to attach call, if authentication is complete */
2834 static void TryAttach(acall, socket, tnop, newcallp)
2835 register struct rx_call *acall;
2836 register osi_socket socket;
2837 register int *tnop;
2838 register struct rx_call **newcallp; {
2839     register struct rx_connection *conn;
2840     conn = acall->conn;
2841     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2842         /* Don't attach until we have any req'd. authentication. */
2843         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2844             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2845             /* Note:  this does not necessarily succeed; there
2846                may not any proc available */
2847         }
2848         else {
2849             rxi_ChallengeOn(acall->conn);
2850         }
2851     }
2852 }
2853
2854 /* A data packet has been received off the interface.  This packet is
2855  * appropriate to the call (the call is in the right state, etc.).  This
2856  * routine can return a packet to the caller, for re-use */
2857
2858 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2859                                         port, tnop, newcallp)
2860     register struct rx_call *call;
2861     register struct rx_packet *np;
2862     int istack;
2863     osi_socket socket;
2864     afs_uint32 host;
2865     u_short port;
2866     int *tnop;
2867     struct rx_call **newcallp;
2868 {
2869     int ackNeeded = 0;
2870     int newPackets = 0;
2871     int didHardAck = 0;
2872     int haveLast = 0;
2873     afs_uint32 seq, serial, flags;
2874     int isFirst;
2875     struct rx_packet *tnp;
2876     struct clock when;
2877     MUTEX_ENTER(&rx_stats_mutex);
2878     rx_stats.dataPacketsRead++;
2879     MUTEX_EXIT(&rx_stats_mutex);
2880
2881 #ifdef KERNEL
2882     /* If there are no packet buffers, drop this new packet, unless we can find
2883      * packet buffers from inactive calls */
2884     if (!call->error &&
2885         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2886         MUTEX_ENTER(&rx_freePktQ_lock);
2887         rxi_NeedMorePackets = TRUE;
2888         MUTEX_EXIT(&rx_freePktQ_lock);
2889         MUTEX_ENTER(&rx_stats_mutex);
2890         rx_stats.noPacketBuffersOnRead++;
2891         MUTEX_EXIT(&rx_stats_mutex);
2892         call->rprev = np->header.serial;
2893         rxi_calltrace(RX_TRACE_DROP, call);
2894         dpf (("packet %x dropped on receipt - quota problems", np));
2895         if (rxi_doreclaim)
2896             rxi_ClearReceiveQueue(call);
2897         clock_GetTime(&when);
2898         clock_Add(&when, &rx_softAckDelay);
2899         if (!call->delayedAckEvent ||
2900             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2901             rxevent_Cancel(call->delayedAckEvent, call,
2902                            RX_CALL_REFCOUNT_DELAY);
2903             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2904             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2905                                                  call, 0);
2906         }
2907         /* we've damaged this call already, might as well do it in. */
2908         return np;
2909     }
2910 #endif /* KERNEL */
2911
2912     /*
2913      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2914      * packet is one of several packets transmitted as a single
2915      * datagram. Do not send any soft or hard acks until all packets
2916      * in a jumbogram have been processed. Send negative acks right away.
2917      */
2918     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2919         /* tnp is non-null when there are more packets in the
2920          * current jumbo gram */
2921         if (tnp) {
2922             if (np)
2923                 rxi_FreePacket(np);
2924             np = tnp;
2925         }
2926
2927         seq = np->header.seq;
2928         serial = np->header.serial;
2929         flags = np->header.flags;
2930
2931         /* If the call is in an error state, send an abort message */
2932         if (call->error)
2933             return rxi_SendCallAbort(call, np, istack, 0);
2934
2935         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2936          * AFS 3.5 jumbogram. */
2937         if (flags & RX_JUMBO_PACKET) {
2938             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2939         } else {
2940             tnp = NULL;
2941         }
2942
2943         if (np->header.spare != 0) {
2944             MUTEX_ENTER(&call->conn->conn_data_lock);
2945             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2946             MUTEX_EXIT(&call->conn->conn_data_lock);
2947         }
2948
2949         /* The usual case is that this is the expected next packet */
2950         if (seq == call->rnext) {
2951
2952             /* Check to make sure it is not a duplicate of one already queued */
2953             if (queue_IsNotEmpty(&call->rq) 
2954                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2955                 MUTEX_ENTER(&rx_stats_mutex);
2956                 rx_stats.dupPacketsRead++;
2957                 MUTEX_EXIT(&rx_stats_mutex);
2958                 dpf (("packet %x dropped on receipt - duplicate", np));
2959                 rxevent_Cancel(call->delayedAckEvent, call,
2960                                RX_CALL_REFCOUNT_DELAY);
2961                 np = rxi_SendAck(call, np, seq, serial,
2962                                  flags, RX_ACK_DUPLICATE, istack);
2963                 ackNeeded = 0;
2964                 call->rprev = seq;
2965                 continue;
2966             }
2967
2968             /* It's the next packet. Stick it on the receive queue
2969              * for this call. Set newPackets to make sure we wake
2970              * the reader once all packets have been processed */
2971             queue_Prepend(&call->rq, np);
2972             call->nSoftAcks++;
2973             np = NULL; /* We can't use this anymore */
2974             newPackets = 1;
2975
2976             /* If an ack is requested then set a flag to make sure we
2977              * send an acknowledgement for this packet */
2978             if (flags & RX_REQUEST_ACK) {
2979                 ackNeeded = 1;
2980             }
2981
2982             /* Keep track of whether we have received the last packet */
2983             if (flags & RX_LAST_PACKET) {
2984                 call->flags |= RX_CALL_HAVE_LAST;
2985                 haveLast = 1;
2986             }
2987
2988             /* Check whether we have all of the packets for this call */
2989             if (call->flags & RX_CALL_HAVE_LAST) {
2990                 afs_uint32 tseq;                /* temporary sequence number */
2991                 struct rx_packet *tp;   /* Temporary packet pointer */
2992                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2993
2994                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2995                     if (tseq != tp->header.seq)
2996                         break;
2997                     if (tp->header.flags & RX_LAST_PACKET) {
2998                         call->flags |= RX_CALL_RECEIVE_DONE;
2999                         break;
3000                     }
3001                     tseq++;
3002                 }
3003             }
3004
3005             /* Provide asynchronous notification for those who want it
3006              * (e.g. multi rx) */
3007             if (call->arrivalProc) {
3008                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3009                                      call->arrivalProcArg);
3010                 call->arrivalProc = (VOID (*)()) 0;
3011             }
3012
3013             /* Update last packet received */
3014             call->rprev = seq;
3015
3016             /* If there is no server process serving this call, grab
3017              * one, if available. We only need to do this once. If a
3018              * server thread is available, this thread becomes a server
3019              * thread and the server thread becomes a listener thread. */
3020             if (isFirst) {
3021                 TryAttach(call, socket, tnop, newcallp);
3022             }
3023         }       
3024         /* This is not the expected next packet. */
3025         else {
3026             /* Determine whether this is a new or old packet, and if it's
3027              * a new one, whether it fits into the current receive window.
3028              * Also figure out whether the packet was delivered in sequence.
3029              * We use the prev variable to determine whether the new packet
3030              * is the successor of its immediate predecessor in the
3031              * receive queue, and the missing flag to determine whether
3032              * any of this packets predecessors are missing.  */
3033
3034             afs_uint32 prev;            /* "Previous packet" sequence number */
3035             struct rx_packet *tp;       /* Temporary packet pointer */
3036             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3037             int missing;                /* Are any predecessors missing? */
3038
3039             /* If the new packet's sequence number has been sent to the
3040              * application already, then this is a duplicate */
3041             if (seq < call->rnext) {
3042                 MUTEX_ENTER(&rx_stats_mutex);
3043                 rx_stats.dupPacketsRead++;
3044                 MUTEX_EXIT(&rx_stats_mutex);
3045                 rxevent_Cancel(call->delayedAckEvent, call,
3046                                RX_CALL_REFCOUNT_DELAY);
3047                 np = rxi_SendAck(call, np, seq, serial,
3048                                  flags, RX_ACK_DUPLICATE, istack);
3049                 ackNeeded = 0;
3050                 call->rprev = seq;
3051                 continue;
3052             }
3053
3054             /* If the sequence number is greater than what can be
3055              * accomodated by the current window, then send a negative
3056              * acknowledge and drop the packet */
3057             if ((call->rnext + call->rwind) <= seq) {
3058                 rxevent_Cancel(call->delayedAckEvent, call,
3059                                RX_CALL_REFCOUNT_DELAY);
3060                 np = rxi_SendAck(call, np, seq, serial,
3061                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3062                 ackNeeded = 0;
3063                 call->rprev = seq;
3064                 continue;
3065             }
3066
3067             /* Look for the packet in the queue of old received packets */
3068             for (prev = call->rnext - 1, missing = 0,
3069                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3070                 /*Check for duplicate packet */
3071                 if (seq == tp->header.seq) {
3072                     MUTEX_ENTER(&rx_stats_mutex);
3073                     rx_stats.dupPacketsRead++;
3074                     MUTEX_EXIT(&rx_stats_mutex);
3075                     rxevent_Cancel(call->delayedAckEvent, call,
3076                                    RX_CALL_REFCOUNT_DELAY);
3077                     np = rxi_SendAck(call, np, seq, serial, 
3078                                      flags, RX_ACK_DUPLICATE, istack);
3079                     ackNeeded = 0;
3080                     call->rprev = seq;
3081                     goto nextloop;
3082                 }
3083                 /* If we find a higher sequence packet, break out and
3084                  * insert the new packet here. */
3085                 if (seq < tp->header.seq) break;
3086                 /* Check for missing packet */
3087                 if (tp->header.seq != prev+1) {
3088                     missing = 1;
3089                 }
3090
3091                 prev = tp->header.seq;
3092             }
3093
3094             /* Keep track of whether we have received the last packet. */
3095             if (flags & RX_LAST_PACKET) {
3096                 call->flags |= RX_CALL_HAVE_LAST;
3097             }
3098
3099             /* It's within the window: add it to the the receive queue.
3100              * tp is left by the previous loop either pointing at the
3101              * packet before which to insert the new packet, or at the
3102              * queue head if the queue is empty or the packet should be
3103              * appended. */
3104             queue_InsertBefore(tp, np);
3105             call->nSoftAcks++;
3106             np = NULL;
3107
3108             /* Check whether we have all of the packets for this call */
3109             if ((call->flags & RX_CALL_HAVE_LAST)
3110              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3111                 afs_uint32 tseq;                /* temporary sequence number */
3112
3113                 for (tseq = call->rnext,
3114                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3115                     if (tseq != tp->header.seq)
3116                         break;
3117                     if (tp->header.flags & RX_LAST_PACKET) {
3118                         call->flags |= RX_CALL_RECEIVE_DONE;
3119                         break;
3120                     }
3121                     tseq++;
3122                 }
3123             }
3124
3125             /* We need to send an ack of the packet is out of sequence, 
3126              * or if an ack was requested by the peer. */
3127             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3128                 ackNeeded = 1;
3129             }
3130
3131             /* Acknowledge the last packet for each call */
3132             if (flags & RX_LAST_PACKET) {
3133                 haveLast = 1;
3134             }
3135
3136             call->rprev = seq;
3137         }
3138 nextloop:;
3139     }
3140
3141     if (newPackets) {
3142         /*
3143          * If the receiver is waiting for an iovec, fill the iovec
3144          * using the data from the receive queue */
3145         if (call->flags & RX_CALL_IOVEC_WAIT) {
3146             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3147             /* the call may have been aborted */
3148             if (call->error) {
3149                 return NULL;
3150             }
3151             if (didHardAck) {
3152                 ackNeeded = 0;
3153             }
3154         }
3155
3156         /* Wakeup the reader if any */
3157         if ((call->flags & RX_CALL_READER_WAIT) &&
3158             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3159              (call->iovNext >= call->iovMax) ||
3160              (call->flags & RX_CALL_RECEIVE_DONE))) {
3161             call->flags &= ~RX_CALL_READER_WAIT;
3162 #ifdef  RX_ENABLE_LOCKS
3163             CV_BROADCAST(&call->cv_rq);
3164 #else
3165             osi_rxWakeup(&call->rq);
3166 #endif
3167         }
3168     }
3169
3170     /*
3171      * Send an ack when requested by the peer, or once every
3172      * rxi_SoftAckRate packets until the last packet has been
3173      * received. Always send a soft ack for the last packet in
3174      * the server's reply. */
3175     if (ackNeeded) {
3176         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3177         np = rxi_SendAck(call, np, seq, serial, flags,
3178                          RX_ACK_REQUESTED, istack);
3179     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3180         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3181         np = rxi_SendAck(call, np, seq, serial, flags,
3182                          RX_ACK_IDLE, istack);
3183     } else if (call->nSoftAcks) {
3184         clock_GetTime(&when);
3185         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3186             clock_Add(&when, &rx_lastAckDelay);
3187         } else {
3188             clock_Add(&when, &rx_softAckDelay);
3189         }
3190         if (!call->delayedAckEvent ||
3191             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3192             rxevent_Cancel(call->delayedAckEvent, call,
3193                            RX_CALL_REFCOUNT_DELAY);
3194             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3195             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3196                                                  call, 0);
3197         }
3198     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3199         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3200     }
3201
3202     return np;
3203 }
3204
3205 #ifdef  ADAPT_WINDOW
3206 static void rxi_ComputeRate();
3207 #endif
3208
3209 /* The real smarts of the whole thing.  */
3210 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3211     register struct rx_call *call;
3212     struct rx_packet *np;
3213     int istack;
3214 {
3215     struct rx_ackPacket *ap;
3216     int nAcks;
3217     register struct rx_packet *tp;
3218     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3219     register struct rx_connection *conn = call->conn;
3220     struct rx_peer *peer = conn->peer;
3221     afs_uint32 first;
3222     afs_uint32 serial;
3223     /* because there are CM's that are bogus, sending weird values for this. */
3224     afs_uint32 skew = 0;
3225     int nbytes;
3226     int missing;
3227     int acked;
3228     int nNacked = 0;
3229     int newAckCount = 0;
3230     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3231     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3232
3233     MUTEX_ENTER(&rx_stats_mutex);
3234     rx_stats.ackPacketsRead++;
3235     MUTEX_EXIT(&rx_stats_mutex);
3236     ap = (struct rx_ackPacket *) rx_DataOf(np);
3237     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3238     if (nbytes < 0)
3239       return np;       /* truncated ack packet */
3240
3241     /* depends on ack packet struct */
3242     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3243     first = ntohl(ap->firstPacket);
3244     serial = ntohl(ap->serial);
3245     /* temporarily disabled -- needs to degrade over time 
3246        skew = ntohs(ap->maxSkew); */
3247
3248     /* Ignore ack packets received out of order */
3249     if (first < call->tfirst) {
3250         return np;
3251     }
3252
3253     if (np->header.flags & RX_SLOW_START_OK) {
3254         call->flags |= RX_CALL_SLOW_START_OK;
3255     }
3256     
3257 #ifdef RXDEBUG
3258     if (rx_Log) {
3259       fprintf( rx_Log, 
3260               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3261               ap->reason, ntohl(ap->previousPacket), 
3262               (unsigned int) np->header.seq, (unsigned int) serial, 
3263               (unsigned int) skew, ntohl(ap->firstPacket));
3264         if (nAcks) {
3265             int offset;
3266             for (offset = 0; offset < nAcks; offset++) 
3267                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3268         }
3269         putc('\n', rx_Log);
3270     }
3271 #endif
3272
3273     /* if a server connection has been re-created, it doesn't remember what
3274         serial # it was up to.  An ack will tell us, since the serial field
3275         contains the largest serial received by the other side */
3276     MUTEX_ENTER(&conn->conn_data_lock);
3277     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3278         conn->serial = serial+1;
3279     }
3280     MUTEX_EXIT(&conn->conn_data_lock);
3281
3282     /* Update the outgoing packet skew value to the latest value of
3283      * the peer's incoming packet skew value.  The ack packet, of
3284      * course, could arrive out of order, but that won't affect things
3285      * much */
3286     MUTEX_ENTER(&peer->peer_lock);
3287     peer->outPacketSkew = skew;
3288
3289     /* Check for packets that no longer need to be transmitted, and
3290      * discard them.  This only applies to packets positively
3291      * acknowledged as having been sent to the peer's upper level.
3292      * All other packets must be retained.  So only packets with
3293      * sequence numbers < ap->firstPacket are candidates. */
3294     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3295         if (tp->header.seq >= first) break;
3296         call->tfirst = tp->header.seq + 1;
3297         if (tp->header.serial == serial) {
3298           /* Use RTT if not delayed by client. */
3299           if (ap->reason != RX_ACK_DELAY)
3300               rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3301 #ifdef ADAPT_WINDOW
3302           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3303 #endif
3304         }
3305         else if (tp->firstSerial == serial) {
3306             /* Use RTT if not delayed by client. */
3307             if (ap->reason != RX_ACK_DELAY)
3308                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3309 #ifdef ADAPT_WINDOW
3310           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3311 #endif
3312         }
3313 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3314     /* XXX Hack. Because we have to release the global rx lock when sending
3315      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3316      * in rxi_Start sending packets out because packets may move to the
3317      * freePacketQueue as result of being here! So we drop these packets until
3318      * we're safely out of the traversing. Really ugly! 
3319      * To make it even uglier, if we're using fine grain locking, we can
3320      * set the ack bits in the packets and have rxi_Start remove the packets
3321      * when it's done transmitting.
3322      */
3323         if (!tp->acked) {
3324             newAckCount++;
3325         }
3326         if (call->flags & RX_CALL_TQ_BUSY) {
3327 #ifdef RX_ENABLE_LOCKS
3328             tp->acked = 1;
3329             call->flags |= RX_CALL_TQ_SOME_ACKED;
3330 #else /* RX_ENABLE_LOCKS */
3331             break;
3332 #endif /* RX_ENABLE_LOCKS */
3333         } else
3334 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3335         {
3336         queue_Remove(tp);
3337         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3338         }
3339     }
3340
3341 #ifdef ADAPT_WINDOW
3342     /* Give rate detector a chance to respond to ping requests */
3343     if (ap->reason == RX_ACK_PING_RESPONSE) {
3344         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3345     }
3346 #endif
3347
3348     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3349    
3350    /* Now go through explicit acks/nacks and record the results in
3351     * the waiting packets.  These are packets that can't be released
3352     * yet, even with a positive acknowledge.  This positive
3353     * acknowledge only means the packet has been received by the
3354     * peer, not that it will be retained long enough to be sent to
3355     * the peer's upper level.  In addition, reset the transmit timers
3356     * of any missing packets (those packets that must be missing
3357     * because this packet was out of sequence) */
3358
3359     call->nSoftAcked = 0;
3360     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3361         /* Update round trip time if the ack was stimulated on receipt
3362          * of this packet */
3363 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3364 #ifdef RX_ENABLE_LOCKS
3365         if (tp->header.seq >= first) {
3366 #endif /* RX_ENABLE_LOCKS */
3367 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3368         if (tp->header.serial == serial) {
3369             /* Use RTT if not delayed by client. */
3370             if (ap->reason != RX_ACK_DELAY)
3371                 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3372 #ifdef ADAPT_WINDOW
3373           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3374 #endif
3375         }
3376         else if ((tp->firstSerial == serial)) {
3377             /* Use RTT if not delayed by client. */
3378             if (ap->reason != RX_ACK_DELAY)
3379                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3380 #ifdef ADAPT_WINDOW
3381           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3382 #endif
3383         }
3384 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3385 #ifdef RX_ENABLE_LOCKS
3386         }
3387 #endif /* RX_ENABLE_LOCKS */
3388 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3389
3390         /* Set the acknowledge flag per packet based on the
3391          * information in the ack packet. An acknowlegded packet can
3392          * be downgraded when the server has discarded a packet it
3393          * soacked previously, or when an ack packet is received
3394          * out of sequence. */
3395         if (tp->header.seq < first) {
3396             /* Implicit ack information */
3397             if (!tp->acked) {
3398                 newAckCount++;
3399             }
3400             tp->acked = 1;
3401         }
3402         else if (tp->header.seq < first + nAcks) {
3403             /* Explicit ack information:  set it in the packet appropriately */
3404             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3405                 if (!tp->acked) {
3406                     newAckCount++;
3407                     tp->acked = 1;
3408                 }
3409                 if (missing) {
3410                     nNacked++;
3411                 } else {
3412                     call->nSoftAcked++;
3413                 }
3414             } else {
3415                 tp->acked = 0;
3416                 missing = 1;
3417             }
3418         }
3419         else {
3420             tp->acked = 0;
3421             missing = 1;
3422         }
3423
3424         /* If packet isn't yet acked, and it has been transmitted at least 
3425          * once, reset retransmit time using latest timeout 
3426          * ie, this should readjust the retransmit timer for all outstanding 
3427          * packets...  So we don't just retransmit when we should know better*/
3428
3429         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3430           tp->retryTime = tp->timeSent;
3431           clock_Add(&tp->retryTime, &peer->timeout);
3432           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3433           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3434         }
3435     }
3436
3437     /* If the window has been extended by this acknowledge packet,
3438      * then wakeup a sender waiting in alloc for window space, or try
3439      * sending packets now, if he's been sitting on packets due to
3440      * lack of window space */
3441     if (call->tnext < (call->tfirst + call->twind))  {
3442 #ifdef  RX_ENABLE_LOCKS
3443         CV_SIGNAL(&call->cv_twind);
3444 #else
3445         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3446             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3447             osi_rxWakeup(&call->twind);
3448         }
3449 #endif
3450         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3451             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3452         }
3453     }
3454
3455     /* if the ack packet has a receivelen field hanging off it,
3456      * update our state */
3457     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3458       afs_uint32 tSize;
3459
3460       /* If the ack packet has a "recommended" size that is less than 
3461        * what I am using now, reduce my size to match */
3462       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3463                     sizeof(afs_int32), &tSize);
3464       tSize = (afs_uint32) ntohl(tSize);
3465       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3466
3467       /* Get the maximum packet size to send to this peer */
3468       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3469                     &tSize);
3470       tSize = (afs_uint32)ntohl(tSize);
3471       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3472       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3473
3474       /* sanity check - peer might have restarted with different params.
3475        * If peer says "send less", dammit, send less...  Peer should never 
3476        * be unable to accept packets of the size that prior AFS versions would
3477        * send without asking.  */
3478       if (peer->maxMTU != tSize) {
3479           peer->maxMTU = tSize;
3480           peer->MTU = MIN(tSize, peer->MTU);
3481           call->MTU = MIN(call->MTU, tSize);
3482           peer->congestSeq++;
3483       }
3484
3485       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3486           /* AFS 3.4a */
3487           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3488                         sizeof(afs_int32), &tSize);
3489           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3490           if (tSize < call->twind) {       /* smaller than our send */
3491               call->twind = tSize;         /* window, we must send less... */
3492               call->ssthresh = MIN(call->twind, call->ssthresh);
3493           }
3494
3495           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3496            * network MTU confused with the loopback MTU. Calculate the
3497            * maximum MTU here for use in the slow start code below.
3498            */
3499           maxMTU = peer->maxMTU;
3500           /* Did peer restart with older RX version? */
3501           if (peer->maxDgramPackets > 1) {
3502               peer->maxDgramPackets = 1;
3503           }
3504       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3505           /* AFS 3.5 */
3506           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3507                         sizeof(afs_int32), &tSize);
3508           tSize = (afs_uint32) ntohl(tSize);
3509           /*
3510            * As of AFS 3.5 we set the send window to match the receive window. 
3511            */
3512           if (tSize < call->twind) {
3513               call->twind = tSize;
3514               call->ssthresh = MIN(call->twind, call->ssthresh);
3515           } else if (tSize > call->twind) {
3516               call->twind = tSize;
3517           }
3518
3519           /*
3520            * As of AFS 3.5, a jumbogram is more than one fixed size
3521            * packet transmitted in a single UDP datagram. If the remote
3522            * MTU is smaller than our local MTU then never send a datagram
3523            * larger than the natural MTU.
3524            */
3525           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3526                         sizeof(afs_int32), &tSize);
3527           maxDgramPackets = (afs_uint32) ntohl(tSize);
3528           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3529           maxDgramPackets = MIN(maxDgramPackets,
3530                                 (int)(peer->ifDgramPackets));
3531           maxDgramPackets = MIN(maxDgramPackets, tSize);
3532           if (maxDgramPackets > 1) {
3533             peer->maxDgramPackets = maxDgramPackets;
3534             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3535           } else {
3536             peer->maxDgramPackets = 1;
3537             call->MTU = peer->natMTU;
3538           }
3539        } else if (peer->maxDgramPackets > 1) {
3540           /* Restarted with lower version of RX */
3541           peer->maxDgramPackets = 1;
3542        }
3543     } else if (peer->maxDgramPackets > 1 ||
3544                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3545         /* Restarted with lower version of RX */
3546         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3547         peer->natMTU = OLD_MAX_PACKET_SIZE;
3548         peer->MTU = OLD_MAX_PACKET_SIZE;
3549         peer->maxDgramPackets = 1;
3550         peer->nDgramPackets = 1;
3551         peer->congestSeq++;
3552         call->MTU = OLD_MAX_PACKET_SIZE;
3553     }
3554
3555     if (nNacked) {
3556         /*
3557          * Calculate how many datagrams were successfully received after
3558          * the first missing packet and adjust the negative ack counter
3559          * accordingly.
3560          */
3561         call->nAcks = 0;
3562         call->nNacks++;
3563         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3564         if (call->nNacks < nNacked) {
3565             call->nNacks = nNacked;
3566         }
3567     } else {
3568         if (newAckCount) {
3569             call->nAcks++;
3570         }
3571         call->nNacks = 0;
3572     }
3573
3574     if (call->flags & RX_CALL_FAST_RECOVER) {
3575         if (nNacked) {
3576             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3577         } else {
3578             call->flags &= ~RX_CALL_FAST_RECOVER;
3579             call->cwind = call->nextCwind;
3580             call->nextCwind = 0;
3581             call->nAcks = 0;
3582         }
3583         call->nCwindAcks = 0;
3584     }
3585     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3586         /* Three negative acks in a row trigger congestion recovery */
3587 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3588         MUTEX_EXIT(&peer->peer_lock);
3589         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3590                 /* someone else is waiting to start recovery */
3591                 return np;
3592         }
3593         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3594         while (call->flags & RX_CALL_TQ_BUSY) {
3595             call->flags |= RX_CALL_TQ_WAIT;
3596 #ifdef RX_ENABLE_LOCKS
3597             CV_WAIT(&call->cv_tq, &call->lock);
3598 #else /* RX_ENABLE_LOCKS */
3599             osi_rxSleep(&call->tq);
3600 #endif /* RX_ENABLE_LOCKS */
3601         }
3602         MUTEX_ENTER(&peer->peer_lock);
3603 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3604         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3605         call->flags |= RX_CALL_FAST_RECOVER;
3606         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3607         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3608                           rx_maxSendWindow);
3609         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3610         call->nextCwind = call->ssthresh;
3611         call->nAcks = 0;
3612         call->nNacks = 0;
3613         peer->MTU = call->MTU;
3614         peer->cwind = call->nextCwind;
3615         peer->nDgramPackets = call->nDgramPackets;
3616         peer->congestSeq++;
3617         call->congestSeq = peer->congestSeq;
3618         /* Reset the resend times on the packets that were nacked
3619          * so we will retransmit as soon as the window permits*/
3620         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3621             if (acked) {
3622                 if (!tp->acked) {
3623                     clock_Zero(&tp->retryTime);
3624                 }
3625             } else if (tp->acked) {
3626                 acked = 1;
3627             }
3628         }
3629     } else {
3630         /* If cwind is smaller than ssthresh, then increase
3631          * the window one packet for each ack we receive (exponential
3632          * growth).
3633          * If cwind is greater than or equal to ssthresh then increase
3634          * the congestion window by one packet for each cwind acks we
3635          * receive (linear growth).  */
3636         if (call->cwind < call->ssthresh) {
3637             call->cwind = MIN((int)call->ssthresh,
3638                               (int)(call->cwind + newAckCount));
3639             call->nCwindAcks = 0;
3640         } else {
3641             call->nCwindAcks += newAckCount;
3642             if (call->nCwindAcks >= call->cwind) {
3643                 call->nCwindAcks = 0;
3644                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3645             }
3646         }
3647         /*
3648          * If we have received several acknowledgements in a row then
3649          * it is time to increase the size of our datagrams
3650          */
3651         if ((int)call->nAcks > rx_nDgramThreshold) {
3652             if (peer->maxDgramPackets > 1) {
3653                 if (call->nDgramPackets < peer->maxDgramPackets) {
3654                     call->nDgramPackets++;
3655                 }
3656                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3657             } else if (call->MTU < peer->maxMTU) {
3658                 call->MTU += peer->natMTU;
3659                 call->MTU = MIN(call->MTU, peer->maxMTU);
3660             }
3661             call->nAcks = 0;
3662         }
3663     }
3664
3665     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3666
3667     /* Servers need to hold the call until all response packets have
3668      * been acknowledged. Soft acks are good enough since clients
3669      * are not allowed to clear their receive queues. */
3670     if (call->state == RX_STATE_HOLD &&
3671         call->tfirst + call->nSoftAcked >= call->tnext) {
3672         call->state = RX_STATE_DALLY;
3673         rxi_ClearTransmitQueue(call, 0);
3674     } else if (!queue_IsEmpty(&call->tq)) {
3675         rxi_Start(0, call, istack);
3676     }
3677     return np;
3678 }
3679
3680 /* Received a response to a challenge packet */
3681 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)