rx-race-condition-cleanup-by-adding-busy-status-20010605
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #ifdef  KERNEL
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
16 #ifndef UKERNEL
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
20 #ifdef  AFS_OSF_ENV
21 #include <net/net_globals.h>
22 #endif  /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
25 #endif
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
31 #endif
32 #ifdef RXDEBUG
33 #undef RXDEBUG      /* turn off debugging */
34 #endif /* RXDEBUG */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
37 #endif
38 #include "../afsint/afsint.h"
39 #ifdef  AFS_ALPHA_ENV
40 #undef kmem_alloc
41 #undef kmem_free
42 #undef mem_alloc
43 #undef mem_free
44 #undef register
45 #endif  /* AFS_ALPHA_ENV */
46 #else /* !UKERNEL */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
49 #endif /* !UKERNEL */
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
55 #include "../rx/rx.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
60 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
63 #ifdef AFS_AIX41_ENV
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
68 #else /* KERNEL */
69 # include <afs/param.h>
70 # include <sys/types.h>
71 # include <errno.h>
72 #ifdef AFS_NT40_ENV
73 # include <stdlib.h>
74 # include <fcntl.h>
75 # include <afsutil.h>
76 #else
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
83 #endif
84 # include "rx.h"
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
92 #endif /* KERNEL */
93
94 #ifdef RXDEBUG
95 extern afs_uint32 LWP_ThreadId();
96 #endif /* RXDEBUG */
97
98 int (*registerProgram)() = 0;
99 int (*swapNameProgram)() = 0;
100
101 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
102 struct rx_tq_debug {
103     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
104     afs_int32 rxi_start_in_error;
105 } rx_tq_debug;
106 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
107
108 /*
109  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
110  * currently allocated within rx.  This number is used to allocate the
111  * memory required to return the statistics when queried.
112  */
113
114 static unsigned int rxi_rpc_peer_stat_cnt;
115
116 /*
117  * rxi_rpc_process_stat_cnt counts the total number of local process stat
118  * structures currently allocated within rx.  The number is used to allocate
119  * the memory required to return the statistics when queried.
120  */
121
122 static unsigned int rxi_rpc_process_stat_cnt;
123
124 #if !defined(offsetof)
125 #include <stddef.h>     /* for definition of offsetof() */
126 #endif
127
128 #ifdef AFS_PTHREAD_ENV
129 #include <assert.h>
130
131 /*
132  * Use procedural initialization of mutexes/condition variables
133  * to ease NT porting
134  */
135
136 extern pthread_mutex_t rxkad_stats_mutex;
137 extern pthread_mutex_t des_init_mutex;
138 extern pthread_mutex_t des_random_mutex;
139 extern pthread_mutex_t rx_clock_mutex;
140 extern pthread_mutex_t rxi_connCacheMutex;
141 extern pthread_mutex_t rx_event_mutex;
142 extern pthread_mutex_t osi_malloc_mutex;
143 extern pthread_mutex_t event_handler_mutex;
144 extern pthread_mutex_t listener_mutex;
145 extern pthread_mutex_t rx_if_init_mutex;
146 extern pthread_mutex_t rx_if_mutex;
147 extern pthread_mutex_t rxkad_client_uid_mutex;
148 extern pthread_mutex_t rxkad_random_mutex;
149
150 extern pthread_cond_t rx_event_handler_cond;
151 extern pthread_cond_t rx_listener_cond;
152
153 static pthread_mutex_t epoch_mutex;
154 static pthread_mutex_t rx_init_mutex;
155 static pthread_mutex_t rx_debug_mutex;
156
157 static void rxi_InitPthread(void) {
158     assert(pthread_mutex_init(&rx_clock_mutex,
159                               (const pthread_mutexattr_t*)0)==0);
160     assert(pthread_mutex_init(&rxi_connCacheMutex,
161                               (const pthread_mutexattr_t*)0)==0);
162     assert(pthread_mutex_init(&rx_init_mutex,
163                               (const pthread_mutexattr_t*)0)==0);
164     assert(pthread_mutex_init(&epoch_mutex,
165                               (const pthread_mutexattr_t*)0)==0);
166     assert(pthread_mutex_init(&rx_event_mutex,
167                               (const pthread_mutexattr_t*)0)==0);
168     assert(pthread_mutex_init(&des_init_mutex,
169                               (const pthread_mutexattr_t*)0)==0);
170     assert(pthread_mutex_init(&des_random_mutex,
171                               (const pthread_mutexattr_t*)0)==0);
172     assert(pthread_mutex_init(&osi_malloc_mutex,
173                               (const pthread_mutexattr_t*)0)==0);
174     assert(pthread_mutex_init(&event_handler_mutex,
175                               (const pthread_mutexattr_t*)0)==0);
176     assert(pthread_mutex_init(&listener_mutex,
177                               (const pthread_mutexattr_t*)0)==0);
178     assert(pthread_mutex_init(&rx_if_init_mutex,
179                               (const pthread_mutexattr_t*)0)==0);
180     assert(pthread_mutex_init(&rx_if_mutex,
181                               (const pthread_mutexattr_t*)0)==0);
182     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
183                               (const pthread_mutexattr_t*)0)==0);
184     assert(pthread_mutex_init(&rxkad_random_mutex,
185                               (const pthread_mutexattr_t*)0)==0);
186     assert(pthread_mutex_init(&rxkad_stats_mutex,
187                               (const pthread_mutexattr_t*)0)==0);
188     assert(pthread_mutex_init(&rx_debug_mutex,
189                               (const pthread_mutexattr_t*)0)==0);
190
191     assert(pthread_cond_init(&rx_event_handler_cond,
192                               (const pthread_condattr_t*)0)==0);
193     assert(pthread_cond_init(&rx_listener_cond,
194                               (const pthread_condattr_t*)0)==0);
195     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
196 }
197
198 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
199 #define INIT_PTHREAD_LOCKS \
200 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
201 /*
202  * The rx_stats_mutex mutex protects the following global variables:
203  * rxi_dataQuota
204  * rxi_minDeficit
205  * rxi_availProcs
206  * rxi_totalMin
207  * rxi_lowConnRefCount
208  * rxi_lowPeerRefCount
209  * rxi_nCalls
210  * rxi_Alloccnt
211  * rxi_Allocsize
212  * rx_nFreePackets
213  * rx_tq_debug
214  * rx_stats
215  */
216 #else
217 #define INIT_PTHREAD_LOCKS
218 #endif
219
220 extern void rxi_DeleteCachedConnections(void);
221
222
223 /* Variables for handling the minProcs implementation.  availProcs gives the
224  * number of threads available in the pool at this moment (not counting dudes
225  * executing right now).  totalMin gives the total number of procs required
226  * for handling all minProcs requests.  minDeficit is a dynamic variable
227  * tracking the # of procs required to satisfy all of the remaining minProcs
228  * demands.
229  * For fine grain locking to work, the quota check and the reservation of
230  * a server thread has to come while rxi_availProcs and rxi_minDeficit
231  * are locked. To this end, the code has been modified under #ifdef
232  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
233  * same time. A new function, ReturnToServerPool() returns the allocation.
234  * 
235  * A call can be on several queue's (but only one at a time). When
236  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
237  * that no one else is touching the queue. To this end, we store the address
238  * of the queue lock in the call structure (under the call lock) when we
239  * put the call on a queue, and we clear the call_queue_lock when the
240  * call is removed from a queue (once the call lock has been obtained).
241  * This allows rxi_ResetCall to safely synchronize with others wishing
242  * to manipulate the queue.
243  */
244
245 extern void rxi_Delay(int);
246
247 static int rxi_ServerThreadSelectingCall;
248
249 #ifdef RX_ENABLE_LOCKS
250 static afs_kmutex_t rx_rpc_stats;
251 void rxi_StartUnlocked();
252 #endif
253
254 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
255 ** pretty good that the next packet coming in is from the same connection 
256 ** as the last packet, since we're send multiple packets in a transmit window.
257 */
258 struct rx_connection *rxLastConn = 0; 
259
260 #ifdef RX_ENABLE_LOCKS
261 /* The locking hierarchy for rx fine grain locking is composed of five
262  * tiers:
263  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
264  * call->lock - locks call data fields.
265  * Most any other lock - these are all independent of each other.....
266  *      rx_freePktQ_lock
267  *      rx_freeCallQueue_lock
268  *      freeSQEList_lock
269  *      rx_connHashTable_lock
270  *      rx_serverPool_lock
271  *      rxi_keyCreate_lock
272  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
273
274  * lowest level:
275  *      peer_lock - locks peer data fields.
276  *      conn_data_lock - that more than one thread is not updating a conn data
277  *              field at the same time.
278  * Do we need a lock to protect the peer field in the conn structure?
279  *      conn->peer was previously a constant for all intents and so has no
280  *      lock protecting this field. The multihomed client delta introduced
281  *      a RX code change : change the peer field in the connection structure
282  *      to that remote inetrface from which the last packet for this
283  *      connection was sent out. This may become an issue if further changes
284  *      are made.
285  */
286 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
287 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
288 #ifdef RX_LOCKS_DB
289 /* rxdb_fileID is used to identify the lock location, along with line#. */
290 static int rxdb_fileID = RXDB_FILE_RX;
291 #endif /* RX_LOCKS_DB */
292 static void rxi_SetAcksInTransmitQueue();
293 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
294 #else /* RX_ENABLE_LOCKS */
295 #define SET_CALL_QUEUE_LOCK(C, L)
296 #define CLEAR_CALL_QUEUE_LOCK(C)
297 #endif /* RX_ENABLE_LOCKS */
298 static void rxi_DestroyConnectionNoLock();
299 void rxi_DestroyConnection();
300 void rxi_CleanupConnection();
301 struct rx_serverQueueEntry *rx_waitForPacket = 0;
302
303 /* ------------Exported Interfaces------------- */
304
305 /* This function allows rxkad to set the epoch to a suitably random number
306  * which rx_NewConnection will use in the future.  The principle purpose is to
307  * get rxnull connections to use the same epoch as the rxkad connections do, at
308  * least once the first rxkad connection is established.  This is important now
309  * that the host/port addresses aren't used in FindConnection: the uniqueness
310  * of epoch/cid matters and the start time won't do. */
311
312 #ifdef AFS_PTHREAD_ENV
313 /*
314  * This mutex protects the following global variables:
315  * rx_epoch
316  */
317
318 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
319 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
320 #else
321 #define LOCK_EPOCH
322 #define UNLOCK_EPOCH
323 #endif /* AFS_PTHREAD_ENV */
324
325 void rx_SetEpoch (epoch)
326   afs_uint32 epoch;
327 {
328     LOCK_EPOCH
329     rx_epoch = epoch;
330     UNLOCK_EPOCH
331 }
332
333 /* Initialize rx.  A port number may be mentioned, in which case this
334  * becomes the default port number for any service installed later.
335  * If 0 is provided for the port number, a random port will be chosen
336  * by the kernel.  Whether this will ever overlap anything in
337  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
338  * error. */
339 static int rxinit_status = 1;
340 #ifdef AFS_PTHREAD_ENV
341 /*
342  * This mutex protects the following global variables:
343  * rxinit_status
344  */
345
346 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
347 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
348 #else
349 #define LOCK_RX_INIT
350 #define UNLOCK_RX_INIT
351 #endif
352
353 int rx_Init(u_int port)
354 {
355 #ifdef KERNEL
356     osi_timeval_t tv;
357 #else /* KERNEL */
358     struct timeval tv;
359 #endif /* KERNEL */
360     char *htable, *ptable;
361     int tmp_status;
362
363 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
364     __djgpp_set_quiet_socket(1);
365 #endif
366
367     SPLVAR;
368
369     INIT_PTHREAD_LOCKS
370     LOCK_RX_INIT
371     if (rxinit_status == 0) {
372         tmp_status = rxinit_status;
373         UNLOCK_RX_INIT
374         return tmp_status; /* Already started; return previous error code. */
375     }
376
377 #ifdef AFS_NT40_ENV
378     if (afs_winsockInit()<0)
379         return -1;
380 #endif
381
382 #ifndef KERNEL
383     /*
384      * Initialize anything necessary to provide a non-premptive threading
385      * environment.
386      */
387     rxi_InitializeThreadSupport();
388 #endif
389
390     /* Allocate and initialize a socket for client and perhaps server
391      * connections. */
392
393     rx_socket = rxi_GetUDPSocket((u_short)port); 
394     if (rx_socket == OSI_NULLSOCKET) {
395         UNLOCK_RX_INIT
396         return RX_ADDRINUSE;
397     }
398     
399
400 #ifdef  RX_ENABLE_LOCKS
401 #ifdef RX_LOCKS_DB
402     rxdb_init();
403 #endif /* RX_LOCKS_DB */
404     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
405     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
406     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
407     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
408     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
409                MUTEX_DEFAULT,0);
410     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
411     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
412     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
413     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
414 #ifndef KERNEL
415     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
416 #endif /* !KERNEL */
417     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
418 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
419     if ( !uniprocessor )
420       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
421 #endif /* KERNEL && AFS_HPUX110_ENV */
422 #else /* RX_ENABLE_LOCKS */
423 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
424     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
425 #endif /* AFS_GLOBAL_SUNLOCK */
426 #endif /* RX_ENABLE_LOCKS */
427
428     rxi_nCalls = 0;
429     rx_connDeadTime = 12;
430     rx_tranquil     = 0;        /* reset flag */
431     bzero((char *)&rx_stats, sizeof(struct rx_stats));
432     htable = (char *)
433         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
434     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
435     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
436     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
437     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
438     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
439
440     /* Malloc up a bunch of packets & buffers */
441     rx_nFreePackets = 0;
442     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
443     queue_Init(&rx_freePacketQueue);
444     rxi_NeedMorePackets = FALSE;
445     rxi_MorePackets(rx_nPackets);
446     rx_CheckPackets();
447
448     NETPRI;
449     AFS_RXGLOCK();
450
451     clock_Init();
452
453 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
454     tv.tv_sec = clock_now.sec;
455     tv.tv_usec = clock_now.usec;
456     srand((unsigned int) tv.tv_usec);
457 #else
458     osi_GetTime(&tv);
459 #endif
460     if (port) {
461         rx_port = port;
462     } else {
463 #if defined(KERNEL) && !defined(UKERNEL)
464         /* Really, this should never happen in a real kernel */
465         rx_port = 0;
466 #else
467         struct sockaddr_in addr;
468         int addrlen = sizeof(addr);
469         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
470             rx_Finalize();
471             return -1;
472         }
473         rx_port = addr.sin_port;
474 #endif
475     }
476     rx_stats.minRtt.sec = 9999999;
477 #ifdef  KERNEL
478     rx_SetEpoch (tv.tv_sec | 0x80000000);
479 #else
480     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
481                                          * will provide a randomer value. */
482 #endif
483     MUTEX_ENTER(&rx_stats_mutex);
484     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
485     MUTEX_EXIT(&rx_stats_mutex);
486     /* *Slightly* random start time for the cid.  This is just to help
487      * out with the hashing function at the peer */
488     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
489     rx_connHashTable = (struct rx_connection **) htable;
490     rx_peerHashTable = (struct rx_peer **) ptable;
491
492     rx_lastAckDelay.sec = 0;
493     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
494     rx_hardAckDelay.sec = 0;
495     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
496     rx_softAckDelay.sec = 0;
497     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
498
499     rxevent_Init(20, rxi_ReScheduleEvents);
500
501     /* Initialize various global queues */
502     queue_Init(&rx_idleServerQueue);
503     queue_Init(&rx_incomingCallQueue);
504     queue_Init(&rx_freeCallQueue);
505
506 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
507     /* Initialize our list of usable IP addresses. */
508     rx_GetIFInfo();
509 #endif
510
511     /* Start listener process (exact function is dependent on the
512      * implementation environment--kernel or user space) */
513     rxi_StartListener();
514
515     AFS_RXGUNLOCK();
516     USERPRI;
517     tmp_status = rxinit_status = 0;
518     UNLOCK_RX_INIT
519     return tmp_status;
520 }
521
522 /* called with unincremented nRequestsRunning to see if it is OK to start
523  * a new thread in this service.  Could be "no" for two reasons: over the
524  * max quota, or would prevent others from reaching their min quota.
525  */
526 #ifdef RX_ENABLE_LOCKS
527 /* This verion of QuotaOK reserves quota if it's ok while the
528  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
529  */
530 static int QuotaOK(aservice)
531 register struct rx_service *aservice;
532 {
533     /* check if over max quota */
534     if (aservice->nRequestsRunning >= aservice->maxProcs) {
535         return 0;
536     }
537
538     /* under min quota, we're OK */
539     /* otherwise, can use only if there are enough to allow everyone
540      * to go to their min quota after this guy starts.
541      */
542     MUTEX_ENTER(&rx_stats_mutex);
543     if ((aservice->nRequestsRunning < aservice->minProcs) ||
544          (rxi_availProcs > rxi_minDeficit)) {
545         aservice->nRequestsRunning++;
546         /* just started call in minProcs pool, need fewer to maintain
547          * guarantee */
548         if (aservice->nRequestsRunning <= aservice->minProcs)
549             rxi_minDeficit--;
550         rxi_availProcs--;
551         MUTEX_EXIT(&rx_stats_mutex);
552         return 1;
553     }
554     MUTEX_EXIT(&rx_stats_mutex);
555
556     return 0;
557 }
558 static void ReturnToServerPool(aservice)
559 register struct rx_service *aservice;
560 {
561     aservice->nRequestsRunning--;
562     MUTEX_ENTER(&rx_stats_mutex);
563     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
564     rxi_availProcs++;
565     MUTEX_EXIT(&rx_stats_mutex);
566 }
567
568 #else /* RX_ENABLE_LOCKS */
569 static QuotaOK(aservice)
570 register struct rx_service *aservice; {
571     int rc=0;
572     /* under min quota, we're OK */
573     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
574
575     /* check if over max quota */
576     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
577
578     /* otherwise, can use only if there are enough to allow everyone
579      * to go to their min quota after this guy starts.
580      */
581     if (rxi_availProcs > rxi_minDeficit) rc = 1;
582     return rc;
583 }
584 #endif /* RX_ENABLE_LOCKS */
585
586 #ifndef KERNEL
587 /* Called by rx_StartServer to start up lwp's to service calls.
588    NExistingProcs gives the number of procs already existing, and which
589    therefore needn't be created. */
590 void rxi_StartServerProcs(nExistingProcs)
591     int nExistingProcs;
592 {
593     register struct rx_service *service;
594     register int i;
595     int maxdiff = 0;
596     int nProcs = 0;
597
598     /* For each service, reserve N processes, where N is the "minimum"
599        number of processes that MUST be able to execute a request in parallel,
600        at any time, for that process.  Also compute the maximum difference
601        between any service's maximum number of processes that can run
602        (i.e. the maximum number that ever will be run, and a guarantee
603        that this number will run if other services aren't running), and its
604        minimum number.  The result is the extra number of processes that
605        we need in order to provide the latter guarantee */
606     for (i=0; i<RX_MAX_SERVICES; i++) {
607         int diff;
608         service = rx_services[i];
609         if (service == (struct rx_service *) 0) break;
610         nProcs += service->minProcs;
611         diff = service->maxProcs - service->minProcs;
612         if (diff > maxdiff) maxdiff = diff;
613     }
614     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
615     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
616     for (i = 0; i<nProcs; i++) {
617         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
618     }
619 }
620 #endif /* KERNEL */
621
622 /* This routine must be called if any services are exported.  If the
623  * donateMe flag is set, the calling process is donated to the server
624  * process pool */
625 void rx_StartServer(donateMe)
626 {
627     register struct rx_service *service;
628     register int i, nProcs;
629     SPLVAR;
630     clock_NewTime();
631
632     NETPRI;
633     AFS_RXGLOCK();
634     /* Start server processes, if necessary (exact function is dependent
635      * on the implementation environment--kernel or user space).  DonateMe
636      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
637      * case, one less new proc will be created rx_StartServerProcs.
638      */
639     rxi_StartServerProcs(donateMe);
640
641     /* count up the # of threads in minProcs, and add set the min deficit to
642      * be that value, too.
643      */
644     for (i=0; i<RX_MAX_SERVICES; i++) {
645         service = rx_services[i];
646         if (service == (struct rx_service *) 0) break;
647         MUTEX_ENTER(&rx_stats_mutex);
648         rxi_totalMin += service->minProcs;
649         /* below works even if a thread is running, since minDeficit would
650          * still have been decremented and later re-incremented.
651          */
652         rxi_minDeficit += service->minProcs;
653         MUTEX_EXIT(&rx_stats_mutex);
654     }
655
656     /* Turn on reaping of idle server connections */
657     rxi_ReapConnections();
658
659     AFS_RXGUNLOCK();
660     USERPRI;
661
662     if (donateMe) {
663 #ifndef AFS_NT40_ENV
664 #ifndef KERNEL
665         int code;
666         char name[32];
667 #ifdef AFS_PTHREAD_ENV
668         pid_t pid;
669         pid = (pid_t) pthread_self();
670 #else /* AFS_PTHREAD_ENV */
671         PROCESS pid;
672         code = LWP_CurrentProcess(&pid);
673 #endif /* AFS_PTHREAD_ENV */
674
675         sprintf(name,"srv_%d", ++nProcs);
676         if (registerProgram)
677             (*registerProgram)(pid, name);
678 #endif /* KERNEL */
679 #endif /* AFS_NT40_ENV */
680         rx_ServerProc(); /* Never returns */
681     }
682     return;
683 }
684
685 /* Create a new client connection to the specified service, using the
686  * specified security object to implement the security model for this
687  * connection. */
688 struct rx_connection *
689 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
690     register afs_uint32 shost;      /* Server host */
691     u_short sport;                  /* Server port */
692     u_short sservice;               /* Server service id */
693     register struct rx_securityClass *securityObject;
694     int serviceSecurityIndex;
695 {
696     int hashindex;
697     afs_int32 cid;
698     register struct rx_connection *conn;
699
700     SPLVAR;
701
702     clock_NewTime();
703     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
704           shost, sport, sservice, securityObject, serviceSecurityIndex));
705
706     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
707      * the case of kmem_alloc? */
708     conn = rxi_AllocConnection();
709 #ifdef  RX_ENABLE_LOCKS
710     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
711     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
712     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
713 #endif
714     NETPRI;
715     AFS_RXGLOCK();
716     MUTEX_ENTER(&rx_connHashTable_lock);
717     cid = (rx_nextCid += RX_MAXCALLS);
718     conn->type = RX_CLIENT_CONNECTION;
719     conn->cid = cid;
720     conn->epoch = rx_epoch;
721     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
722     conn->serviceId = sservice;
723     conn->securityObject = securityObject;
724     /* This doesn't work in all compilers with void (they're buggy), so fake it
725      * with VOID */
726     conn->securityData = (VOID *) 0;
727     conn->securityIndex = serviceSecurityIndex;
728     rx_SetConnDeadTime(conn, rx_connDeadTime);
729     conn->ackRate = RX_FAST_ACK_RATE;
730     conn->nSpecific = 0;
731     conn->specific = NULL;
732     conn->challengeEvent = (struct rxevent *)0;
733     conn->delayedAbortEvent = (struct rxevent *)0;
734     conn->abortCount = 0;
735     conn->error = 0;
736
737     RXS_NewConnection(securityObject, conn);
738     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
739     
740     conn->refCount++; /* no lock required since only this thread knows... */
741     conn->next = rx_connHashTable[hashindex];
742     rx_connHashTable[hashindex] = conn;
743     MUTEX_ENTER(&rx_stats_mutex);
744     rx_stats.nClientConns++;
745     MUTEX_EXIT(&rx_stats_mutex);
746
747     MUTEX_EXIT(&rx_connHashTable_lock);
748     AFS_RXGUNLOCK();
749     USERPRI;
750     return conn;
751 }
752
753 void rx_SetConnDeadTime(conn, seconds)
754     register struct rx_connection *conn;
755     register int seconds;
756 {
757     /* The idea is to set the dead time to a value that allows several
758      * keepalives to be dropped without timing out the connection. */
759     conn->secondsUntilDead = MAX(seconds, 6);
760     conn->secondsUntilPing = conn->secondsUntilDead/6;
761 }
762
763 int rxi_lowPeerRefCount = 0;
764 int rxi_lowConnRefCount = 0;
765
766 /*
767  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
768  * NOTE: must not be called with rx_connHashTable_lock held.
769  */
770 void rxi_CleanupConnection(conn)
771     struct rx_connection *conn;
772 {
773     int i;
774
775     /* Notify the service exporter, if requested, that this connection
776      * is being destroyed */
777     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
778       (*conn->service->destroyConnProc)(conn);
779
780     /* Notify the security module that this connection is being destroyed */
781     RXS_DestroyConnection(conn->securityObject, conn);
782
783     /* If this is the last connection using the rx_peer struct, set its
784      * idle time to now. rxi_ReapConnections will reap it if it's still
785      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
786      */
787     MUTEX_ENTER(&rx_peerHashTable_lock);
788     if (--conn->peer->refCount <= 0) {
789         conn->peer->idleWhen = clock_Sec();
790         if (conn->peer->refCount < 0) {
791             conn->peer->refCount = 0; 
792             MUTEX_ENTER(&rx_stats_mutex);
793             rxi_lowPeerRefCount ++;
794             MUTEX_EXIT(&rx_stats_mutex);
795         }
796     }
797     MUTEX_EXIT(&rx_peerHashTable_lock);
798
799     MUTEX_ENTER(&rx_stats_mutex);
800     if (conn->type == RX_SERVER_CONNECTION)
801       rx_stats.nServerConns--;
802     else
803       rx_stats.nClientConns--;
804     MUTEX_EXIT(&rx_stats_mutex);
805
806 #ifndef KERNEL
807     if (conn->specific) {
808         for (i = 0 ; i < conn->nSpecific ; i++) {
809             if (conn->specific[i] && rxi_keyCreate_destructor[i])
810                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
811             conn->specific[i] = NULL;
812         }
813         free(conn->specific);
814     }
815     conn->specific = NULL;
816     conn->nSpecific = 0;
817 #endif /* !KERNEL */
818
819     MUTEX_DESTROY(&conn->conn_call_lock);
820     MUTEX_DESTROY(&conn->conn_data_lock);
821     CV_DESTROY(&conn->conn_call_cv);
822         
823     rxi_FreeConnection(conn);
824 }
825
826 /* Destroy the specified connection */
827 void rxi_DestroyConnection(conn)
828     register struct rx_connection *conn;
829 {
830     MUTEX_ENTER(&rx_connHashTable_lock);
831     rxi_DestroyConnectionNoLock(conn);
832     /* conn should be at the head of the cleanup list */
833     if (conn == rx_connCleanup_list) {
834         rx_connCleanup_list = rx_connCleanup_list->next;
835         MUTEX_EXIT(&rx_connHashTable_lock);
836         rxi_CleanupConnection(conn);
837     }
838 #ifdef RX_ENABLE_LOCKS
839     else {
840         MUTEX_EXIT(&rx_connHashTable_lock);
841     }
842 #endif /* RX_ENABLE_LOCKS */
843 }
844     
845 static void rxi_DestroyConnectionNoLock(conn)
846     register struct rx_connection *conn;
847 {
848     register struct rx_connection **conn_ptr;
849     register int havecalls = 0;
850     struct rx_packet *packet;
851     int i;
852     SPLVAR;
853
854     clock_NewTime();
855
856     NETPRI;
857     MUTEX_ENTER(&conn->conn_data_lock);
858     if (conn->refCount > 0)
859         conn->refCount--;
860     else {
861         MUTEX_ENTER(&rx_stats_mutex);
862         rxi_lowConnRefCount++;
863         MUTEX_EXIT(&rx_stats_mutex);
864     }
865
866     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
867         /* Busy; wait till the last guy before proceeding */
868         MUTEX_EXIT(&conn->conn_data_lock);
869         USERPRI;
870         return;
871     }
872
873     /* If the client previously called rx_NewCall, but it is still
874      * waiting, treat this as a running call, and wait to destroy the
875      * connection later when the call completes. */
876     if ((conn->type == RX_CLIENT_CONNECTION) &&
877         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
878         conn->flags |= RX_CONN_DESTROY_ME;
879         MUTEX_EXIT(&conn->conn_data_lock);
880         USERPRI;
881         return;
882     }
883     MUTEX_EXIT(&conn->conn_data_lock);
884
885     /* Check for extant references to this connection */
886     for (i = 0; i<RX_MAXCALLS; i++) {
887         register struct rx_call *call = conn->call[i];
888         if (call) {
889             havecalls = 1;
890             if (conn->type == RX_CLIENT_CONNECTION) {
891                 MUTEX_ENTER(&call->lock);
892                 if (call->delayedAckEvent) {
893                     /* Push the final acknowledgment out now--there
894                      * won't be a subsequent call to acknowledge the
895                      * last reply packets */
896                     rxevent_Cancel(call->delayedAckEvent, call,
897                                    RX_CALL_REFCOUNT_DELAY);
898                     rxi_AckAll((struct rxevent *)0, call, 0);
899                 }
900                 MUTEX_EXIT(&call->lock);
901             }
902         }
903     }
904 #ifdef RX_ENABLE_LOCKS
905     if (!havecalls) {
906         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
907             MUTEX_EXIT(&conn->conn_data_lock);
908         }
909         else {
910             /* Someone is accessing a packet right now. */
911             havecalls = 1;
912         }
913     }
914 #endif /* RX_ENABLE_LOCKS */
915
916     if (havecalls) {
917         /* Don't destroy the connection if there are any call
918          * structures still in use */
919         MUTEX_ENTER(&conn->conn_data_lock);
920         conn->flags |= RX_CONN_DESTROY_ME;
921         MUTEX_EXIT(&conn->conn_data_lock);
922         USERPRI;
923         return;
924     }
925
926     if (conn->delayedAbortEvent) {
927         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
928         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
929         if (packet) {
930             MUTEX_ENTER(&conn->conn_data_lock);
931             rxi_SendConnectionAbort(conn, packet, 0, 1);
932             MUTEX_EXIT(&conn->conn_data_lock);
933             rxi_FreePacket(packet);
934         }
935     }
936
937     /* Remove from connection hash table before proceeding */
938     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
939                                              conn->epoch, conn->type) ];
940     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
941         if (*conn_ptr == conn) {
942             *conn_ptr = conn->next;
943             break;
944         }
945     }
946     /* if the conn that we are destroying was the last connection, then we
947     * clear rxLastConn as well */
948     if ( rxLastConn == conn )
949         rxLastConn = 0;
950
951     /* Make sure the connection is completely reset before deleting it. */
952     /* get rid of pending events that could zap us later */
953     if (conn->challengeEvent) {
954         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
955     }
956  
957     /* Add the connection to the list of destroyed connections that
958      * need to be cleaned up. This is necessary to avoid deadlocks
959      * in the routines we call to inform others that this connection is
960      * being destroyed. */
961     conn->next = rx_connCleanup_list;
962     rx_connCleanup_list = conn;
963 }
964
965 /* Externally available version */
966 void rx_DestroyConnection(conn) 
967     register struct rx_connection *conn;
968 {
969     SPLVAR;
970
971     NETPRI;
972     AFS_RXGLOCK();
973     rxi_DestroyConnection (conn);
974     AFS_RXGUNLOCK();
975     USERPRI;
976 }
977
978 /* Start a new rx remote procedure call, on the specified connection.
979  * If wait is set to 1, wait for a free call channel; otherwise return
980  * 0.  Maxtime gives the maximum number of seconds this call may take,
981  * after rx_MakeCall returns.  After this time interval, a call to any
982  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
983  * For fine grain locking, we hold the conn_call_lock in order to 
984  * to ensure that we don't get signalle after we found a call in an active
985  * state and before we go to sleep.
986  */
987 struct rx_call *rx_NewCall(conn)
988     register struct rx_connection *conn;
989 {
990     register int i;
991     register struct rx_call *call;
992     struct clock queueTime;
993     SPLVAR;
994
995     clock_NewTime();
996     dpf (("rx_MakeCall(conn %x)\n", conn));
997
998     NETPRI;
999     clock_GetTime(&queueTime);
1000     AFS_RXGLOCK();
1001     MUTEX_ENTER(&conn->conn_call_lock);
1002     for (;;) {
1003         for (i=0; i<RX_MAXCALLS; i++) {
1004             call = conn->call[i];
1005             if (call) {
1006                 MUTEX_ENTER(&call->lock);
1007                 if (call->state == RX_STATE_DALLY) {
1008                     rxi_ResetCall(call, 0);
1009                     (*call->callNumber)++;
1010                     break;
1011                 }
1012                 MUTEX_EXIT(&call->lock);
1013             }
1014             else {
1015                 call = rxi_NewCall(conn, i);
1016                 MUTEX_ENTER(&call->lock);
1017                 break;
1018             }
1019         }
1020         if (i < RX_MAXCALLS) {
1021             break;
1022         }
1023         MUTEX_ENTER(&conn->conn_data_lock);
1024         conn->flags |= RX_CONN_MAKECALL_WAITING;
1025         MUTEX_EXIT(&conn->conn_data_lock);
1026 #ifdef  RX_ENABLE_LOCKS
1027         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1028 #else
1029         osi_rxSleep(conn);
1030 #endif
1031     }
1032
1033     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1034
1035     /* Client is initially in send mode */
1036     call->state = RX_STATE_ACTIVE;
1037     call->mode = RX_MODE_SENDING;
1038
1039     /* remember start time for call in case we have hard dead time limit */
1040     call->queueTime = queueTime;
1041     clock_GetTime(&call->startTime);
1042     hzero(call->bytesSent);
1043     hzero(call->bytesRcvd);
1044
1045     /* Turn on busy protocol. */
1046     rxi_KeepAliveOn(call);
1047
1048     MUTEX_EXIT(&call->lock);
1049     MUTEX_EXIT(&conn->conn_call_lock);
1050     AFS_RXGUNLOCK();
1051     USERPRI;
1052
1053 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1054     /* Now, if TQ wasn't cleared earlier, do it now. */
1055     AFS_RXGLOCK();
1056     MUTEX_ENTER(&call->lock);
1057     while (call->flags & RX_CALL_TQ_BUSY) {
1058         call->flags |= RX_CALL_TQ_WAIT;
1059 #ifdef RX_ENABLE_LOCKS
1060         CV_WAIT(&call->cv_tq, &call->lock);
1061 #else /* RX_ENABLE_LOCKS */
1062         osi_rxSleep(&call->tq);
1063 #endif /* RX_ENABLE_LOCKS */
1064     }
1065     if (call->flags & RX_CALL_TQ_CLEARME) {
1066         rxi_ClearTransmitQueue(call, 0);
1067         queue_Init(&call->tq);
1068     }
1069     MUTEX_EXIT(&call->lock);
1070     AFS_RXGUNLOCK();
1071 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1072
1073     return call;
1074 }
1075
1076 rxi_HasActiveCalls(aconn)
1077 register struct rx_connection *aconn; {
1078     register int i;
1079     register struct rx_call *tcall;
1080     SPLVAR;
1081
1082     NETPRI;
1083     for(i=0; i<RX_MAXCALLS; i++) {
1084       if (tcall = aconn->call[i]) {
1085         if ((tcall->state == RX_STATE_ACTIVE) 
1086             || (tcall->state == RX_STATE_PRECALL)) {
1087           USERPRI;
1088           return 1;
1089         }
1090       }
1091     }
1092     USERPRI;
1093     return 0;
1094 }
1095
1096 rxi_GetCallNumberVector(aconn, aint32s)
1097 register struct rx_connection *aconn;
1098 register afs_int32 *aint32s; {
1099     register int i;
1100     register struct rx_call *tcall;
1101     SPLVAR;
1102
1103     NETPRI;
1104     for(i=0; i<RX_MAXCALLS; i++) {
1105         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1106             aint32s[i] = aconn->callNumber[i]+1;
1107         else
1108             aint32s[i] = aconn->callNumber[i];
1109     }
1110     USERPRI;
1111     return 0;
1112 }
1113
1114 rxi_SetCallNumberVector(aconn, aint32s)
1115 register struct rx_connection *aconn;
1116 register afs_int32 *aint32s; {
1117     register int i;
1118     register struct rx_call *tcall;
1119     SPLVAR;
1120
1121     NETPRI;
1122     for(i=0; i<RX_MAXCALLS; i++) {
1123         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1124             aconn->callNumber[i] = aint32s[i] - 1;
1125         else
1126             aconn->callNumber[i] = aint32s[i];
1127     }
1128     USERPRI;
1129     return 0;
1130 }
1131
1132 /* Advertise a new service.  A service is named locally by a UDP port
1133  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1134  * on a failure. */
1135 struct rx_service *
1136 rx_NewService(port, serviceId, serviceName, securityObjects,
1137               nSecurityObjects, serviceProc)
1138     u_short port;
1139     u_short serviceId;
1140     char *serviceName;  /* Name for identification purposes (e.g. the
1141                          * service name might be used for probing for
1142                          * statistics) */
1143     struct rx_securityClass **securityObjects;
1144     int nSecurityObjects;
1145     afs_int32 (*serviceProc)();
1146 {    
1147     osi_socket socket = OSI_NULLSOCKET;
1148     register struct rx_service *tservice;    
1149     register int i;
1150     SPLVAR;
1151
1152     clock_NewTime();
1153
1154     if (serviceId == 0) {
1155         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1156          serviceName);
1157         return 0;
1158     }
1159     if (port == 0) {
1160         if (rx_port == 0) {
1161             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1162             return 0;
1163         }
1164         port = rx_port;
1165         socket = rx_socket;
1166     }
1167
1168     tservice = rxi_AllocService();
1169     NETPRI;
1170     AFS_RXGLOCK();
1171     for (i = 0; i<RX_MAX_SERVICES; i++) {
1172         register struct rx_service *service = rx_services[i];
1173         if (service) {
1174             if (port == service->servicePort) {
1175                 if (service->serviceId == serviceId) {
1176                     /* The identical service has already been
1177                      * installed; if the caller was intending to
1178                      * change the security classes used by this
1179                      * service, he/she loses. */
1180                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1181                     AFS_RXGUNLOCK();
1182                     USERPRI;
1183                     rxi_FreeService(tservice);
1184                     return service;
1185                 }
1186                 /* Different service, same port: re-use the socket
1187                  * which is bound to the same port */
1188                 socket = service->socket;
1189             }
1190         } else {
1191             if (socket == OSI_NULLSOCKET) {
1192                 /* If we don't already have a socket (from another
1193                  * service on same port) get a new one */
1194                 socket = rxi_GetUDPSocket(port);
1195                 if (socket == OSI_NULLSOCKET) {
1196                     AFS_RXGUNLOCK();
1197                     USERPRI;
1198                     rxi_FreeService(tservice);
1199                     return 0;
1200                 }
1201             }
1202             service = tservice;
1203             service->socket = socket;
1204             service->servicePort = port;
1205             service->serviceId = serviceId;
1206             service->serviceName = serviceName;
1207             service->nSecurityObjects = nSecurityObjects;
1208             service->securityObjects = securityObjects;
1209             service->minProcs = 0;
1210             service->maxProcs = 1;
1211             service->idleDeadTime = 60;
1212             service->connDeadTime = rx_connDeadTime;
1213             service->executeRequestProc = serviceProc;
1214             rx_services[i] = service;   /* not visible until now */
1215             AFS_RXGUNLOCK();
1216             USERPRI;
1217             return service;
1218         }
1219     }
1220     AFS_RXGUNLOCK();
1221     USERPRI;
1222     rxi_FreeService(tservice);
1223     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1224     return 0;
1225 }
1226
1227 /* Generic request processing loop. This routine should be called
1228  * by the implementation dependent rx_ServerProc. If socketp is
1229  * non-null, it will be set to the file descriptor that this thread
1230  * is now listening on. If socketp is null, this routine will never
1231  * returns. */
1232 void rxi_ServerProc(threadID, newcall, socketp)
1233 int threadID;
1234 struct rx_call *newcall;
1235 osi_socket *socketp;
1236 {
1237     register struct rx_call *call;
1238     register afs_int32 code;
1239     register struct rx_service *tservice = NULL;
1240
1241     for (;;) {
1242         if (newcall) {
1243             call = newcall;
1244             newcall = NULL;
1245         } else {
1246             call = rx_GetCall(threadID, tservice, socketp);
1247             if (socketp && *socketp != OSI_NULLSOCKET) {
1248                 /* We are now a listener thread */
1249                 return;
1250             }
1251         }
1252
1253         /* if server is restarting( typically smooth shutdown) then do not
1254          * allow any new calls.
1255          */
1256
1257         if ( rx_tranquil && (call != NULL) ) {
1258             SPLVAR;
1259
1260             NETPRI;
1261             AFS_RXGLOCK();
1262             MUTEX_ENTER(&call->lock);
1263
1264             rxi_CallError(call, RX_RESTARTING);
1265             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1266
1267             MUTEX_EXIT(&call->lock);
1268             AFS_RXGUNLOCK();
1269             USERPRI;
1270         }
1271
1272 #ifdef  KERNEL
1273         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1274 #ifdef RX_ENABLE_LOCKS
1275             AFS_GLOCK();
1276 #endif /* RX_ENABLE_LOCKS */
1277             afs_termState = AFSOP_STOP_AFS;
1278             afs_osi_Wakeup(&afs_termState);
1279 #ifdef RX_ENABLE_LOCKS
1280             AFS_GUNLOCK();
1281 #endif /* RX_ENABLE_LOCKS */
1282             return;
1283         }
1284 #endif
1285
1286         tservice = call->conn->service;
1287
1288         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1289
1290         code = call->conn->service->executeRequestProc(call);
1291
1292         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1293
1294         rx_EndCall(call, code);
1295         MUTEX_ENTER(&rx_stats_mutex);
1296         rxi_nCalls++;
1297         MUTEX_EXIT(&rx_stats_mutex);
1298     }
1299 }
1300
1301
1302 void rx_WakeupServerProcs()
1303 {
1304     struct rx_serverQueueEntry *np, *tqp;
1305     SPLVAR;
1306
1307     NETPRI;
1308     AFS_RXGLOCK();
1309     MUTEX_ENTER(&rx_serverPool_lock);
1310
1311 #ifdef RX_ENABLE_LOCKS
1312     if (rx_waitForPacket)
1313         CV_BROADCAST(&rx_waitForPacket->cv);
1314 #else /* RX_ENABLE_LOCKS */
1315     if (rx_waitForPacket)
1316         osi_rxWakeup(rx_waitForPacket);
1317 #endif /* RX_ENABLE_LOCKS */
1318     MUTEX_ENTER(&freeSQEList_lock);
1319     for (np = rx_FreeSQEList; np; np = tqp) {
1320       tqp = *(struct rx_serverQueueEntry **)np;
1321 #ifdef RX_ENABLE_LOCKS
1322       CV_BROADCAST(&np->cv);
1323 #else /* RX_ENABLE_LOCKS */
1324       osi_rxWakeup(np);
1325 #endif /* RX_ENABLE_LOCKS */
1326     }
1327     MUTEX_EXIT(&freeSQEList_lock);
1328     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1329 #ifdef RX_ENABLE_LOCKS
1330       CV_BROADCAST(&np->cv);
1331 #else /* RX_ENABLE_LOCKS */
1332       osi_rxWakeup(np);
1333 #endif /* RX_ENABLE_LOCKS */
1334     }
1335     MUTEX_EXIT(&rx_serverPool_lock);
1336     AFS_RXGUNLOCK();
1337     USERPRI;
1338 }
1339
1340 /* meltdown:
1341  * One thing that seems to happen is that all the server threads get
1342  * tied up on some empty or slow call, and then a whole bunch of calls
1343  * arrive at once, using up the packet pool, so now there are more 
1344  * empty calls.  The most critical resources here are server threads
1345  * and the free packet pool.  The "doreclaim" code seems to help in
1346  * general.  I think that eventually we arrive in this state: there
1347  * are lots of pending calls which do have all their packets present,
1348  * so they won't be reclaimed, are multi-packet calls, so they won't
1349  * be scheduled until later, and thus are tying up most of the free 
1350  * packet pool for a very long time.
1351  * future options:
1352  * 1.  schedule multi-packet calls if all the packets are present.  
1353  * Probably CPU-bound operation, useful to return packets to pool. 
1354  * Do what if there is a full window, but the last packet isn't here?
1355  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1356  * it sleeps and waits for that type of call.
1357  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1358  * the current dataquota business is badly broken.  The quota isn't adjusted
1359  * to reflect how many packets are presently queued for a running call.
1360  * So, when we schedule a queued call with a full window of packets queued
1361  * up for it, that *should* free up a window full of packets for other 2d-class
1362  * calls to be able to use from the packet pool.  But it doesn't.
1363  *
1364  * NB.  Most of the time, this code doesn't run -- since idle server threads
1365  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1366  * as a new call arrives.
1367  */
1368 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1369  * for an rx_Read. */
1370 #ifdef RX_ENABLE_LOCKS
1371 struct rx_call *
1372 rx_GetCall(tno, cur_service, socketp)
1373 int tno;
1374 struct rx_service *cur_service;
1375 osi_socket *socketp;
1376 {
1377     struct rx_serverQueueEntry *sq;
1378     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1379     struct rx_service *service;
1380     SPLVAR;
1381
1382     MUTEX_ENTER(&freeSQEList_lock);
1383
1384     if (sq = rx_FreeSQEList) {
1385         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1386         MUTEX_EXIT(&freeSQEList_lock);
1387     } else {    /* otherwise allocate a new one and return that */
1388         MUTEX_EXIT(&freeSQEList_lock);
1389         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1390         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1391         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1392     }
1393
1394     MUTEX_ENTER(&rx_serverPool_lock);
1395     if (cur_service != NULL) {
1396         ReturnToServerPool(cur_service);
1397     }
1398     while (1) {
1399         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1400             register struct rx_call *tcall, *ncall;
1401             choice2 = (struct rx_call *) 0;
1402             /* Scan for eligible incoming calls.  A call is not eligible
1403              * if the maximum number of calls for its service type are
1404              * already executing */
1405             /* One thread will process calls FCFS (to prevent starvation),
1406              * while the other threads may run ahead looking for calls which
1407              * have all their input data available immediately.  This helps 
1408              * keep threads from blocking, waiting for data from the client. */
1409             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1410               service = tcall->conn->service;
1411               if (!QuotaOK(service)) {
1412                 continue;
1413               }
1414               if (!tno || !tcall->queue_item_header.next  ) {
1415                 /* If we're thread 0, then  we'll just use 
1416                  * this call. If we haven't been able to find an optimal 
1417                  * choice, and we're at the end of the list, then use a 
1418                  * 2d choice if one has been identified.  Otherwise... */
1419                 call = (choice2 ? choice2 : tcall);
1420                 service = call->conn->service;
1421               } else if (!queue_IsEmpty(&tcall->rq)) {
1422                 struct rx_packet *rp;
1423                 rp = queue_First(&tcall->rq, rx_packet);
1424                 if (rp->header.seq == 1) {
1425                   if (!meltdown_1pkt ||             
1426                       (rp->header.flags & RX_LAST_PACKET)) {
1427                     call = tcall;
1428                   } else if (rxi_2dchoice && !choice2 &&
1429                              !(tcall->flags & RX_CALL_CLEARED) &&
1430                              (tcall->rprev > rxi_HardAckRate)) {
1431                     choice2 = tcall;
1432                   } else rxi_md2cnt++;
1433                 }
1434               }
1435               if (call)  {
1436                 break;
1437               } else {
1438                   ReturnToServerPool(service);
1439               }
1440             }
1441           }
1442
1443         if (call) {
1444             queue_Remove(call);
1445             rxi_ServerThreadSelectingCall = 1;
1446             MUTEX_EXIT(&rx_serverPool_lock);
1447             MUTEX_ENTER(&call->lock);
1448             MUTEX_ENTER(&rx_serverPool_lock);
1449
1450             if (queue_IsEmpty(&call->rq) ||
1451                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1452               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1453
1454             CLEAR_CALL_QUEUE_LOCK(call);
1455             if (call->error) {
1456                 MUTEX_EXIT(&call->lock);
1457                 ReturnToServerPool(service);
1458                 rxi_ServerThreadSelectingCall = 0;
1459                 CV_SIGNAL(&rx_serverPool_cv);
1460                 call = (struct rx_call*)0;
1461                 continue;
1462             }
1463             call->flags &= (~RX_CALL_WAIT_PROC);
1464             MUTEX_ENTER(&rx_stats_mutex);
1465             rx_nWaiting--;
1466             MUTEX_EXIT(&rx_stats_mutex);
1467             rxi_ServerThreadSelectingCall = 0;
1468             CV_SIGNAL(&rx_serverPool_cv);
1469             MUTEX_EXIT(&rx_serverPool_lock);
1470             break;
1471         }
1472         else {
1473             /* If there are no eligible incoming calls, add this process
1474              * to the idle server queue, to wait for one */
1475             sq->newcall = 0;
1476             sq->tno = tno;
1477             if (socketp) {
1478                 *socketp = OSI_NULLSOCKET;
1479             }
1480             sq->socketp = socketp;
1481             queue_Append(&rx_idleServerQueue, sq);
1482 #ifndef AFS_AIX41_ENV
1483             rx_waitForPacket = sq;
1484 #endif /* AFS_AIX41_ENV */
1485             do {
1486                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1487 #ifdef  KERNEL
1488                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1489                     MUTEX_EXIT(&rx_serverPool_lock);
1490                     return (struct rx_call *)0;
1491                 }
1492 #endif
1493             } while (!(call = sq->newcall) &&
1494                      !(socketp && *socketp != OSI_NULLSOCKET));
1495             MUTEX_EXIT(&rx_serverPool_lock);
1496             if (call) {
1497                 MUTEX_ENTER(&call->lock);
1498             }
1499             break;
1500         }
1501     }
1502
1503     MUTEX_ENTER(&freeSQEList_lock);
1504     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1505     rx_FreeSQEList = sq;
1506     MUTEX_EXIT(&freeSQEList_lock);
1507
1508     if (call) {
1509         clock_GetTime(&call->startTime);
1510         call->state = RX_STATE_ACTIVE;
1511         call->mode = RX_MODE_RECEIVING;
1512
1513         rxi_calltrace(RX_CALL_START, call);
1514         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1515              call->conn->service->servicePort, 
1516              call->conn->service->serviceId, call));
1517
1518         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1519         MUTEX_EXIT(&call->lock);
1520     } else {
1521         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1522     }
1523
1524     return call;
1525 }
1526 #else /* RX_ENABLE_LOCKS */
1527 struct rx_call *
1528 rx_GetCall(tno, cur_service, socketp)
1529   int tno;
1530   struct rx_service *cur_service;
1531   osi_socket *socketp;
1532 {
1533     struct rx_serverQueueEntry *sq;
1534     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1535     struct rx_service *service;
1536     SPLVAR;
1537
1538     NETPRI;
1539     AFS_RXGLOCK();
1540     MUTEX_ENTER(&freeSQEList_lock);
1541
1542     if (sq = rx_FreeSQEList) {
1543         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1544         MUTEX_EXIT(&freeSQEList_lock);
1545     } else {    /* otherwise allocate a new one and return that */
1546         MUTEX_EXIT(&freeSQEList_lock);
1547         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1548         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1549         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1550     }
1551     MUTEX_ENTER(&sq->lock);
1552
1553     if (cur_service != NULL) {
1554         cur_service->nRequestsRunning--;
1555         if (cur_service->nRequestsRunning < cur_service->minProcs)
1556             rxi_minDeficit++;
1557         rxi_availProcs++;
1558     }
1559     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1560         register struct rx_call *tcall, *ncall;
1561         /* Scan for eligible incoming calls.  A call is not eligible
1562          * if the maximum number of calls for its service type are
1563          * already executing */
1564         /* One thread will process calls FCFS (to prevent starvation),
1565          * while the other threads may run ahead looking for calls which
1566          * have all their input data available immediately.  This helps 
1567          * keep threads from blocking, waiting for data from the client. */
1568         choice2 = (struct rx_call *) 0;
1569         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1570           service = tcall->conn->service;
1571           if (QuotaOK(service)) {
1572              if (!tno || !tcall->queue_item_header.next  ) {
1573                  /* If we're thread 0, then  we'll just use 
1574                   * this call. If we haven't been able to find an optimal 
1575                   * choice, and we're at the end of the list, then use a 
1576                   * 2d choice if one has been identified.  Otherwise... */
1577                  call = (choice2 ? choice2 : tcall);
1578                  service = call->conn->service;
1579              } else if (!queue_IsEmpty(&tcall->rq)) {
1580                  struct rx_packet *rp;
1581                  rp = queue_First(&tcall->rq, rx_packet);
1582                  if (rp->header.seq == 1
1583                      && (!meltdown_1pkt ||
1584                          (rp->header.flags & RX_LAST_PACKET))) {
1585                      call = tcall;
1586                  } else if (rxi_2dchoice && !choice2 &&
1587                             !(tcall->flags & RX_CALL_CLEARED) &&
1588                             (tcall->rprev > rxi_HardAckRate)) {
1589                      choice2 = tcall;
1590                  } else rxi_md2cnt++;
1591              }
1592           }
1593           if (call) 
1594              break;
1595         }
1596       }
1597
1598     if (call) {
1599         queue_Remove(call);
1600         /* we can't schedule a call if there's no data!!! */
1601         /* send an ack if there's no data, if we're missing the
1602          * first packet, or we're missing something between first 
1603          * and last -- there's a "hole" in the incoming data. */
1604         if (queue_IsEmpty(&call->rq) ||
1605             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1606             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1607           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1608
1609         call->flags &= (~RX_CALL_WAIT_PROC);
1610         service->nRequestsRunning++;
1611         /* just started call in minProcs pool, need fewer to maintain
1612          * guarantee */
1613         if (service->nRequestsRunning <= service->minProcs)
1614             rxi_minDeficit--;
1615         rxi_availProcs--;
1616         rx_nWaiting--;
1617         /* MUTEX_EXIT(&call->lock); */
1618     }
1619     else {
1620         /* If there are no eligible incoming calls, add this process
1621          * to the idle server queue, to wait for one */
1622         sq->newcall = 0;
1623         if (socketp) {
1624             *socketp = OSI_NULLSOCKET;
1625         }
1626         sq->socketp = socketp;
1627         queue_Append(&rx_idleServerQueue, sq);
1628         do {
1629             osi_rxSleep(sq);
1630 #ifdef  KERNEL
1631                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1632                     AFS_RXGUNLOCK();
1633                     USERPRI;
1634                     return (struct rx_call *)0;
1635                 }
1636 #endif
1637         } while (!(call = sq->newcall) &&
1638                  !(socketp && *socketp != OSI_NULLSOCKET));
1639     }
1640     MUTEX_EXIT(&sq->lock);
1641
1642     MUTEX_ENTER(&freeSQEList_lock);
1643     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1644     rx_FreeSQEList = sq;
1645     MUTEX_EXIT(&freeSQEList_lock);
1646
1647     if (call) {
1648         clock_GetTime(&call->startTime);
1649         call->state = RX_STATE_ACTIVE;
1650         call->mode = RX_MODE_RECEIVING;
1651
1652         rxi_calltrace(RX_CALL_START, call);
1653         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1654          call->conn->service->servicePort, 
1655          call->conn->service->serviceId, call));
1656     } else {
1657         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1658     }
1659
1660     AFS_RXGUNLOCK();
1661     USERPRI;
1662
1663     return call;
1664 }
1665 #endif /* RX_ENABLE_LOCKS */
1666
1667
1668
1669 /* Establish a procedure to be called when a packet arrives for a
1670  * call.  This routine will be called at most once after each call,
1671  * and will also be called if there is an error condition on the or
1672  * the call is complete.  Used by multi rx to build a selection
1673  * function which determines which of several calls is likely to be a
1674  * good one to read from.  
1675  * NOTE: the way this is currently implemented it is probably only a
1676  * good idea to (1) use it immediately after a newcall (clients only)
1677  * and (2) only use it once.  Other uses currently void your warranty
1678  */
1679 void rx_SetArrivalProc(call, proc, handle, arg)
1680     register struct rx_call *call;
1681     register VOID (*proc)();
1682     register VOID *handle;
1683     register VOID *arg;
1684 {
1685     call->arrivalProc = proc;
1686     call->arrivalProcHandle = handle;
1687     call->arrivalProcArg = arg;
1688 }
1689
1690 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1691  * appropriate, and return the final error code from the conversation
1692  * to the caller */
1693
1694 afs_int32 rx_EndCall(call, rc)
1695     register struct rx_call *call;
1696     afs_int32 rc;
1697 {
1698     register struct rx_connection *conn = call->conn;
1699     register struct rx_service *service;
1700     register struct rx_packet *tp; /* Temporary packet pointer */
1701     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1702     afs_int32 error;
1703     SPLVAR;
1704
1705     dpf(("rx_EndCall(call %x)\n", call));
1706
1707     NETPRI;
1708     AFS_RXGLOCK();
1709     MUTEX_ENTER(&call->lock);
1710
1711     if (rc == 0 && call->error == 0) {
1712         call->abortCode = 0;
1713         call->abortCount = 0;
1714     }
1715
1716     call->arrivalProc = (VOID (*)()) 0;
1717     if (rc && call->error == 0) {
1718         rxi_CallError(call, rc);
1719         /* Send an abort message to the peer if this error code has
1720          * only just been set.  If it was set previously, assume the
1721          * peer has already been sent the error code or will request it 
1722          */
1723         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1724     }
1725     if (conn->type == RX_SERVER_CONNECTION) {
1726         /* Make sure reply or at least dummy reply is sent */
1727         if (call->mode == RX_MODE_RECEIVING) {
1728             rxi_WriteProc(call, 0, 0);
1729         }
1730         if (call->mode == RX_MODE_SENDING) {
1731             rxi_FlushWrite(call);
1732         }
1733         service = conn->service;
1734         rxi_calltrace(RX_CALL_END, call);
1735         /* Call goes to hold state until reply packets are acknowledged */
1736         if (call->tfirst + call->nSoftAcked < call->tnext) {
1737             call->state = RX_STATE_HOLD;
1738         } else {
1739             call->state = RX_STATE_DALLY;
1740             rxi_ClearTransmitQueue(call, 0);
1741             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1742             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1743         }
1744     }
1745     else { /* Client connection */
1746         char dummy;
1747         /* Make sure server receives input packets, in the case where
1748          * no reply arguments are expected */
1749         if ((call->mode == RX_MODE_SENDING)
1750          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1751             (void) rxi_ReadProc(call, &dummy, 1);
1752         }
1753         /* We need to release the call lock since it's lower than the
1754          * conn_call_lock and we don't want to hold the conn_call_lock
1755          * over the rx_ReadProc call. The conn_call_lock needs to be held
1756          * here for the case where rx_NewCall is perusing the calls on
1757          * the connection structure. We don't want to signal until
1758          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1759          * have checked this call, found it active and by the time it
1760          * goes to sleep, will have missed the signal.
1761          */
1762         MUTEX_EXIT(&call->lock);
1763         MUTEX_ENTER(&conn->conn_call_lock);
1764         MUTEX_ENTER(&call->lock);
1765         MUTEX_ENTER(&conn->conn_data_lock);
1766         conn->flags |= RX_CONN_BUSY;
1767         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1768             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1769             MUTEX_EXIT(&conn->conn_data_lock);
1770 #ifdef  RX_ENABLE_LOCKS
1771             CV_BROADCAST(&conn->conn_call_cv);
1772 #else
1773             osi_rxWakeup(conn);
1774 #endif
1775         }
1776 #ifdef RX_ENABLE_LOCKS
1777         else {
1778             MUTEX_EXIT(&conn->conn_data_lock);
1779         }
1780 #endif /* RX_ENABLE_LOCKS */
1781         call->state = RX_STATE_DALLY;
1782     }
1783     error = call->error;
1784
1785     /* currentPacket, nLeft, and NFree must be zeroed here, because
1786      * ResetCall cannot: ResetCall may be called at splnet(), in the
1787      * kernel version, and may interrupt the macros rx_Read or
1788      * rx_Write, which run at normal priority for efficiency. */
1789     if (call->currentPacket) {
1790         rxi_FreePacket(call->currentPacket);
1791         call->currentPacket = (struct rx_packet *) 0;
1792         call->nLeft = call->nFree = call->curlen = 0;
1793     }
1794     else
1795         call->nLeft = call->nFree = call->curlen = 0;
1796
1797     /* Free any packets from the last call to ReadvProc/WritevProc */
1798     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1799         queue_Remove(tp);
1800         rxi_FreePacket(tp);
1801     }
1802
1803     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1804     MUTEX_EXIT(&call->lock);
1805     if (conn->type == RX_CLIENT_CONNECTION) {
1806         MUTEX_EXIT(&conn->conn_call_lock);
1807         conn->flags &= ~RX_CONN_BUSY;
1808     }
1809     AFS_RXGUNLOCK();
1810     USERPRI;
1811     /*
1812      * Map errors to the local host's errno.h format.
1813      */
1814     error = ntoh_syserr_conv(error);
1815     return error;
1816 }
1817
1818 #if !defined(KERNEL)
1819
1820 /* Call this routine when shutting down a server or client (especially
1821  * clients).  This will allow Rx to gracefully garbage collect server
1822  * connections, and reduce the number of retries that a server might
1823  * make to a dead client.
1824  * This is not quite right, since some calls may still be ongoing and
1825  * we can't lock them to destroy them. */
1826 void rx_Finalize() {
1827     register struct rx_connection **conn_ptr, **conn_end;
1828
1829     INIT_PTHREAD_LOCKS
1830     LOCK_RX_INIT
1831     if (rxinit_status == 1) {
1832         UNLOCK_RX_INIT
1833         return; /* Already shutdown. */
1834     }
1835     rxi_DeleteCachedConnections();
1836     if (rx_connHashTable) {
1837         MUTEX_ENTER(&rx_connHashTable_lock);
1838         for (conn_ptr = &rx_connHashTable[0], 
1839              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1840              conn_ptr < conn_end; conn_ptr++) {
1841             struct rx_connection *conn, *next;
1842             for (conn = *conn_ptr; conn; conn = next) {
1843                 next = conn->next;
1844                 if (conn->type == RX_CLIENT_CONNECTION) {
1845                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1846                     conn->refCount++;
1847                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1848 #ifdef RX_ENABLE_LOCKS
1849                     rxi_DestroyConnectionNoLock(conn);
1850 #else /* RX_ENABLE_LOCKS */
1851                     rxi_DestroyConnection(conn);
1852 #endif /* RX_ENABLE_LOCKS */
1853                 }
1854             }
1855         }
1856 #ifdef RX_ENABLE_LOCKS
1857         while (rx_connCleanup_list) {
1858             struct rx_connection *conn;
1859             conn = rx_connCleanup_list;
1860             rx_connCleanup_list = rx_connCleanup_list->next;
1861             MUTEX_EXIT(&rx_connHashTable_lock);
1862             rxi_CleanupConnection(conn);
1863             MUTEX_ENTER(&rx_connHashTable_lock);
1864         }
1865         MUTEX_EXIT(&rx_connHashTable_lock);
1866 #endif /* RX_ENABLE_LOCKS */
1867     }
1868     rxi_flushtrace();
1869
1870     rxinit_status = 1;
1871     UNLOCK_RX_INIT
1872 }
1873 #endif
1874
1875 /* if we wakeup packet waiter too often, can get in loop with two
1876     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1877 void
1878 rxi_PacketsUnWait() {
1879
1880     if (!rx_waitingForPackets) {
1881         return;
1882     }
1883 #ifdef KERNEL
1884     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1885         return;                                     /* still over quota */
1886     }
1887 #endif /* KERNEL */
1888     rx_waitingForPackets = 0;
1889 #ifdef  RX_ENABLE_LOCKS
1890     CV_BROADCAST(&rx_waitingForPackets_cv);
1891 #else
1892     osi_rxWakeup(&rx_waitingForPackets);
1893 #endif
1894     return;
1895 }
1896
1897
1898 /* ------------------Internal interfaces------------------------- */
1899
1900 /* Return this process's service structure for the
1901  * specified socket and service */
1902 struct rx_service *rxi_FindService(socket, serviceId)
1903     register osi_socket socket;
1904     register u_short serviceId;
1905 {
1906     register struct rx_service **sp;    
1907     for (sp = &rx_services[0]; *sp; sp++) {
1908         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1909           return *sp;
1910     }
1911     return 0;
1912 }
1913
1914 /* Allocate a call structure, for the indicated channel of the
1915  * supplied connection.  The mode and state of the call must be set by
1916  * the caller. */
1917 struct rx_call *rxi_NewCall(conn, channel)
1918     register struct rx_connection *conn;
1919     register int channel;
1920 {
1921     register struct rx_call *call;
1922 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1923     register struct rx_call *cp;        /* Call pointer temp */
1924     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1925 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1926
1927     /* Grab an existing call structure, or allocate a new one.
1928      * Existing call structures are assumed to have been left reset by
1929      * rxi_FreeCall */
1930     MUTEX_ENTER(&rx_freeCallQueue_lock);
1931
1932 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1933     /*
1934      * EXCEPT that the TQ might not yet be cleared out.
1935      * Skip over those with in-use TQs.
1936      */
1937     call = NULL;
1938     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1939         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1940             call = cp;
1941             break;
1942         }
1943     }
1944     if (call) {
1945 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1946     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1947         call = queue_First(&rx_freeCallQueue, rx_call);
1948 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1949         queue_Remove(call);
1950         MUTEX_ENTER(&rx_stats_mutex);
1951         rx_stats.nFreeCallStructs--;
1952         MUTEX_EXIT(&rx_stats_mutex);
1953         MUTEX_EXIT(&rx_freeCallQueue_lock);
1954         MUTEX_ENTER(&call->lock);
1955         CLEAR_CALL_QUEUE_LOCK(call);
1956 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1957         /* Now, if TQ wasn't cleared earlier, do it now. */
1958         if (call->flags & RX_CALL_TQ_CLEARME) {
1959             rxi_ClearTransmitQueue(call, 0);
1960             queue_Init(&call->tq);
1961         }
1962 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1963         /* Bind the call to its connection structure */
1964         call->conn = conn;
1965         rxi_ResetCall(call, 1);
1966     }
1967     else {
1968         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1969
1970         MUTEX_EXIT(&rx_freeCallQueue_lock);
1971         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1972         MUTEX_ENTER(&call->lock);
1973         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1974         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1975         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1976
1977         MUTEX_ENTER(&rx_stats_mutex);
1978         rx_stats.nCallStructs++;
1979         MUTEX_EXIT(&rx_stats_mutex);
1980         /* Initialize once-only items */
1981         queue_Init(&call->tq);
1982         queue_Init(&call->rq);
1983         queue_Init(&call->iovq);
1984         /* Bind the call to its connection structure (prereq for reset) */
1985         call->conn = conn;
1986         rxi_ResetCall(call, 1);
1987     }
1988     call->channel = channel;
1989     call->callNumber = &conn->callNumber[channel];
1990     /* Note that the next expected call number is retained (in
1991      * conn->callNumber[i]), even if we reallocate the call structure
1992      */
1993     conn->call[channel] = call;
1994     /* if the channel's never been used (== 0), we should start at 1, otherwise
1995         the call number is valid from the last time this channel was used */
1996     if (*call->callNumber == 0) *call->callNumber = 1;
1997
1998     MUTEX_EXIT(&call->lock);
1999     return call;
2000 }
2001
2002 /* A call has been inactive long enough that so we can throw away
2003  * state, including the call structure, which is placed on the call
2004  * free list.
2005  * Call is locked upon entry.
2006  */
2007 #ifdef RX_ENABLE_LOCKS
2008 void rxi_FreeCall(call, haveCTLock)
2009     int haveCTLock; /* Set if called from rxi_ReapConnections */
2010 #else /* RX_ENABLE_LOCKS */
2011 void rxi_FreeCall(call)
2012 #endif /* RX_ENABLE_LOCKS */
2013     register struct rx_call *call;
2014 {
2015     register int channel = call->channel;
2016     register struct rx_connection *conn = call->conn;
2017
2018
2019     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2020       (*call->callNumber)++;
2021     rxi_ResetCall(call, 0);
2022     call->conn->call[channel] = (struct rx_call *) 0;
2023
2024     MUTEX_ENTER(&rx_freeCallQueue_lock);
2025     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2026 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2027     /* A call may be free even though its transmit queue is still in use.
2028      * Since we search the call list from head to tail, put busy calls at
2029      * the head of the list, and idle calls at the tail.
2030      */
2031     if (call->flags & RX_CALL_TQ_BUSY)
2032         queue_Prepend(&rx_freeCallQueue, call);
2033     else
2034         queue_Append(&rx_freeCallQueue, call);
2035 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2036     queue_Append(&rx_freeCallQueue, call);
2037 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2038     MUTEX_ENTER(&rx_stats_mutex);
2039     rx_stats.nFreeCallStructs++;
2040     MUTEX_EXIT(&rx_stats_mutex);
2041
2042     MUTEX_EXIT(&rx_freeCallQueue_lock);
2043  
2044     /* Destroy the connection if it was previously slated for
2045      * destruction, i.e. the Rx client code previously called
2046      * rx_DestroyConnection (client connections), or
2047      * rxi_ReapConnections called the same routine (server
2048      * connections).  Only do this, however, if there are no
2049      * outstanding calls. Note that for fine grain locking, there appears
2050      * to be a deadlock in that rxi_FreeCall has a call locked and
2051      * DestroyConnectionNoLock locks each call in the conn. But note a
2052      * few lines up where we have removed this call from the conn.
2053      * If someone else destroys a connection, they either have no
2054      * call lock held or are going through this section of code.
2055      */
2056     if (conn->flags & RX_CONN_DESTROY_ME) {
2057         MUTEX_ENTER(&conn->conn_data_lock);
2058         conn->refCount++;
2059         MUTEX_EXIT(&conn->conn_data_lock);
2060 #ifdef RX_ENABLE_LOCKS
2061         if (haveCTLock)
2062             rxi_DestroyConnectionNoLock(conn);
2063         else
2064             rxi_DestroyConnection(conn);
2065 #else /* RX_ENABLE_LOCKS */
2066         rxi_DestroyConnection(conn);
2067 #endif /* RX_ENABLE_LOCKS */
2068     }
2069 }
2070
2071 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2072 char *rxi_Alloc(size)
2073 register size_t size;
2074 {
2075     register char *p;
2076
2077 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2078     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2079      * implementation.
2080      */
2081     int glockOwner = ISAFS_GLOCK();
2082     if (!glockOwner)
2083         AFS_GLOCK();
2084 #endif
2085     MUTEX_ENTER(&rx_stats_mutex);
2086     rxi_Alloccnt++; rxi_Allocsize += size;
2087     MUTEX_EXIT(&rx_stats_mutex);
2088 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2089     if (size > AFS_SMALLOCSIZ) {
2090         p = (char *) osi_AllocMediumSpace(size);
2091     } else
2092         p = (char *) osi_AllocSmall(size, 1);
2093 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2094     if (!glockOwner)
2095         AFS_GUNLOCK();
2096 #endif
2097 #else
2098     p = (char *) osi_Alloc(size);
2099 #endif
2100     if (!p) osi_Panic("rxi_Alloc error");
2101     bzero(p, size);
2102     return p;
2103 }
2104
2105 void rxi_Free(addr, size)
2106 void *addr;
2107 register size_t size;
2108 {
2109 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2110     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2111      * implementation.
2112      */
2113     int glockOwner = ISAFS_GLOCK();
2114     if (!glockOwner)
2115         AFS_GLOCK();
2116 #endif
2117     MUTEX_ENTER(&rx_stats_mutex);
2118     rxi_Alloccnt--; rxi_Allocsize -= size;
2119     MUTEX_EXIT(&rx_stats_mutex);
2120 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2121     if (size > AFS_SMALLOCSIZ)
2122         osi_FreeMediumSpace(addr);
2123     else
2124         osi_FreeSmall(addr);
2125 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2126     if (!glockOwner)
2127         AFS_GUNLOCK();
2128 #endif
2129 #else
2130     osi_Free(addr, size);
2131 #endif    
2132 }
2133
2134 /* Find the peer process represented by the supplied (host,port)
2135  * combination.  If there is no appropriate active peer structure, a
2136  * new one will be allocated and initialized 
2137  * The origPeer, if set, is a pointer to a peer structure on which the
2138  * refcount will be be decremented. This is used to replace the peer
2139  * structure hanging off a connection structure */
2140 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2141     register afs_uint32 host;
2142     register u_short port;
2143     struct rx_peer *origPeer;
2144     int create;
2145 {
2146     register struct rx_peer *pp;
2147     int hashIndex;
2148     hashIndex = PEER_HASH(host, port);
2149     MUTEX_ENTER(&rx_peerHashTable_lock);
2150     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2151         if ((pp->host == host) && (pp->port == port)) break;
2152     }
2153     if (!pp) {
2154         if (create) {
2155             pp = rxi_AllocPeer(); /* This bzero's *pp */
2156             pp->host = host;      /* set here or in InitPeerParams is zero */
2157             pp->port = port;
2158             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2159             queue_Init(&pp->congestionQueue);
2160             queue_Init(&pp->rpcStats);
2161             pp->next = rx_peerHashTable[hashIndex];
2162             rx_peerHashTable[hashIndex] = pp;
2163             rxi_InitPeerParams(pp);
2164             MUTEX_ENTER(&rx_stats_mutex);
2165             rx_stats.nPeerStructs++;
2166             MUTEX_EXIT(&rx_stats_mutex);
2167         }
2168     }
2169     if (pp && create) {
2170         pp->refCount++;
2171     }
2172     if ( origPeer)
2173         origPeer->refCount--;
2174     MUTEX_EXIT(&rx_peerHashTable_lock);
2175     return pp;
2176 }
2177
2178
2179 /* Find the connection at (host, port) started at epoch, and with the
2180  * given connection id.  Creates the server connection if necessary.
2181  * The type specifies whether a client connection or a server
2182  * connection is desired.  In both cases, (host, port) specify the
2183  * peer's (host, pair) pair.  Client connections are not made
2184  * automatically by this routine.  The parameter socket gives the
2185  * socket descriptor on which the packet was received.  This is used,
2186  * in the case of server connections, to check that *new* connections
2187  * come via a valid (port, serviceId).  Finally, the securityIndex
2188  * parameter must match the existing index for the connection.  If a
2189  * server connection is created, it will be created using the supplied
2190  * index, if the index is valid for this service */
2191 struct rx_connection *
2192 rxi_FindConnection(socket, host, port, serviceId, cid, 
2193                    epoch, type, securityIndex)
2194     osi_socket socket;
2195     register afs_int32 host;
2196     register u_short port;
2197     u_short serviceId;
2198     afs_uint32 cid;
2199     afs_uint32 epoch;
2200     int type;
2201     u_int securityIndex;
2202 {
2203     int hashindex, flag;
2204     register struct rx_connection *conn;
2205     struct rx_peer *peer;
2206     hashindex = CONN_HASH(host, port, cid, epoch, type);
2207     MUTEX_ENTER(&rx_connHashTable_lock);
2208     rxLastConn ? (conn = rxLastConn, flag = 0) :
2209                  (conn = rx_connHashTable[hashindex], flag = 1);
2210     for (; conn; ) {
2211       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2212           && (epoch == conn->epoch)) {
2213         register struct rx_peer *pp = conn->peer;
2214         if (securityIndex != conn->securityIndex) {
2215             /* this isn't supposed to happen, but someone could forge a packet
2216                like this, and there seems to be some CM bug that makes this
2217                happen from time to time -- in which case, the fileserver
2218                asserts. */  
2219             MUTEX_EXIT(&rx_connHashTable_lock);
2220             return (struct rx_connection *) 0;
2221         }
2222         /* epoch's high order bits mean route for security reasons only on
2223          * the cid, not the host and port fields.
2224          */
2225         if (conn->epoch & 0x80000000) break;
2226         if (((type == RX_CLIENT_CONNECTION) 
2227              || (pp->host == host)) && (pp->port == port))
2228           break;
2229       }
2230       if ( !flag )
2231       {
2232         /* the connection rxLastConn that was used the last time is not the
2233         ** one we are looking for now. Hence, start searching in the hash */
2234         flag = 1;
2235         conn = rx_connHashTable[hashindex];
2236       }
2237       else
2238         conn = conn->next;
2239     }
2240     if (!conn) {
2241         struct rx_service *service;
2242         if (type == RX_CLIENT_CONNECTION) {
2243             MUTEX_EXIT(&rx_connHashTable_lock);
2244             return (struct rx_connection *) 0;
2245         }
2246         service = rxi_FindService(socket, serviceId);
2247         if (!service || (securityIndex >= service->nSecurityObjects) 
2248             || (service->securityObjects[securityIndex] == 0)) {
2249             MUTEX_EXIT(&rx_connHashTable_lock);
2250             return (struct rx_connection *) 0;
2251         }
2252         conn = rxi_AllocConnection(); /* This bzero's the connection */
2253         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2254                    MUTEX_DEFAULT,0);
2255         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2256                    MUTEX_DEFAULT,0);
2257         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2258         conn->next = rx_connHashTable[hashindex];
2259         rx_connHashTable[hashindex] = conn;
2260         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2261         conn->type = RX_SERVER_CONNECTION;
2262         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2263         conn->epoch = epoch;
2264         conn->cid = cid & RX_CIDMASK;
2265         /* conn->serial = conn->lastSerial = 0; */
2266         /* conn->timeout = 0; */
2267         conn->ackRate = RX_FAST_ACK_RATE;
2268         conn->service = service;
2269         conn->serviceId = serviceId;
2270         conn->securityIndex = securityIndex;
2271         conn->securityObject = service->securityObjects[securityIndex];
2272         conn->nSpecific = 0;
2273         conn->specific = NULL;
2274         rx_SetConnDeadTime(conn, service->connDeadTime);
2275         /* Notify security object of the new connection */
2276         RXS_NewConnection(conn->securityObject, conn);
2277         /* XXXX Connection timeout? */
2278         if (service->newConnProc) (*service->newConnProc)(conn);
2279         MUTEX_ENTER(&rx_stats_mutex);
2280         rx_stats.nServerConns++;
2281         MUTEX_EXIT(&rx_stats_mutex);
2282     }
2283     else
2284     {
2285     /* Ensure that the peer structure is set up in such a way that
2286     ** replies in this connection go back to that remote interface
2287     ** from which the last packet was sent out. In case, this packet's
2288     ** source IP address does not match the peer struct for this conn,
2289     ** then drop the refCount on conn->peer and get a new peer structure.
2290     ** We can check the host,port field in the peer structure without the
2291     ** rx_peerHashTable_lock because the peer structure has its refCount
2292     ** incremented and the only time the host,port in the peer struct gets
2293     ** updated is when the peer structure is created.
2294     */
2295         if (conn->peer->host == host )
2296                 peer = conn->peer; /* no change to the peer structure */
2297         else
2298                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2299     }
2300
2301     MUTEX_ENTER(&conn->conn_data_lock);
2302     conn->refCount++;
2303     conn->peer = peer;
2304     MUTEX_EXIT(&conn->conn_data_lock);
2305
2306     rxLastConn = conn;  /* store this connection as the last conn used */
2307     MUTEX_EXIT(&rx_connHashTable_lock);
2308     return conn;
2309 }
2310
2311 /* There are two packet tracing routines available for testing and monitoring
2312  * Rx.  One is called just after every packet is received and the other is
2313  * called just before every packet is sent.  Received packets, have had their
2314  * headers decoded, and packets to be sent have not yet had their headers
2315  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2316  * containing the network address.  Both can be modified.  The return value, if
2317  * non-zero, indicates that the packet should be dropped.  */
2318
2319 int (*rx_justReceived)() = 0;
2320 int (*rx_almostSent)() = 0;
2321
2322 /* A packet has been received off the interface.  Np is the packet, socket is
2323  * the socket number it was received from (useful in determining which service
2324  * this packet corresponds to), and (host, port) reflect the host,port of the
2325  * sender.  This call returns the packet to the caller if it is finished with
2326  * it, rather than de-allocating it, just as a small performance hack */
2327
2328 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2329     register struct rx_packet *np;
2330     osi_socket socket;
2331     afs_uint32 host;
2332     u_short port;
2333     int *tnop;
2334     struct rx_call **newcallp;
2335 {
2336     register struct rx_call *call;
2337     register struct rx_connection *conn;
2338     int channel;
2339     afs_uint32 currentCallNumber;
2340     int type;
2341     int skew;
2342 #ifdef RXDEBUG
2343     char *packetType;
2344 #endif
2345     struct rx_packet *tnp;
2346
2347 #ifdef RXDEBUG
2348 /* We don't print out the packet until now because (1) the time may not be
2349  * accurate enough until now in the lwp implementation (rx_Listener only gets
2350  * the time after the packet is read) and (2) from a protocol point of view,
2351  * this is the first time the packet has been seen */
2352     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2353         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2354     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2355          np->header.serial, packetType, host, port, np->header.serviceId,
2356          np->header.epoch, np->header.cid, np->header.callNumber, 
2357          np->header.seq, np->header.flags, np));
2358 #endif
2359
2360     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2361       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2362     }
2363
2364     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2365         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2366     }
2367 #ifdef RXDEBUG
2368     /* If an input tracer function is defined, call it with the packet and
2369      * network address.  Note this function may modify its arguments. */
2370     if (rx_justReceived) {
2371         struct sockaddr_in addr;
2372         int drop;
2373         addr.sin_family = AF_INET;
2374         addr.sin_port = port;
2375         addr.sin_addr.s_addr = host;
2376 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2377         addr.sin_len = sizeof(addr);
2378 #endif  /* AFS_OSF_ENV */
2379         drop = (*rx_justReceived) (np, &addr);
2380         /* drop packet if return value is non-zero */
2381         if (drop) return np;
2382         port = addr.sin_port;           /* in case fcn changed addr */
2383         host = addr.sin_addr.s_addr;
2384     }
2385 #endif
2386
2387     /* If packet was not sent by the client, then *we* must be the client */
2388     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2389         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2390
2391     /* Find the connection (or fabricate one, if we're the server & if
2392      * necessary) associated with this packet */
2393     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2394                               np->header.cid, np->header.epoch, type, 
2395                               np->header.securityIndex);
2396
2397     if (!conn) {
2398       /* If no connection found or fabricated, just ignore the packet.
2399        * (An argument could be made for sending an abort packet for
2400        * the conn) */
2401       return np;
2402     }   
2403
2404     MUTEX_ENTER(&conn->conn_data_lock);
2405     if (conn->maxSerial < np->header.serial)
2406       conn->maxSerial = np->header.serial;
2407     MUTEX_EXIT(&conn->conn_data_lock);
2408
2409     /* If the connection is in an error state, send an abort packet and ignore
2410      * the incoming packet */
2411     if (conn->error) {
2412         /* Don't respond to an abort packet--we don't want loops! */
2413         MUTEX_ENTER(&conn->conn_data_lock);
2414         if (np->header.type != RX_PACKET_TYPE_ABORT)
2415             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2416         conn->refCount--;
2417         MUTEX_EXIT(&conn->conn_data_lock);
2418         return np;
2419     }
2420
2421     /* Check for connection-only requests (i.e. not call specific). */
2422     if (np->header.callNumber == 0) {
2423         switch (np->header.type) {
2424             case RX_PACKET_TYPE_ABORT:
2425                 /* What if the supplied error is zero? */
2426                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2427                 MUTEX_ENTER(&conn->conn_data_lock);
2428                 conn->refCount--;
2429                 MUTEX_EXIT(&conn->conn_data_lock);
2430                 return np;
2431             case RX_PACKET_TYPE_CHALLENGE:
2432                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2433                 MUTEX_ENTER(&conn->conn_data_lock);
2434                 conn->refCount--;
2435                 MUTEX_EXIT(&conn->conn_data_lock);
2436                 return tnp;
2437             case RX_PACKET_TYPE_RESPONSE:
2438                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2439                 MUTEX_ENTER(&conn->conn_data_lock);
2440                 conn->refCount--;
2441                 MUTEX_EXIT(&conn->conn_data_lock);
2442                 return tnp;
2443             case RX_PACKET_TYPE_PARAMS:
2444             case RX_PACKET_TYPE_PARAMS+1:
2445             case RX_PACKET_TYPE_PARAMS+2:
2446                 /* ignore these packet types for now */
2447                 MUTEX_ENTER(&conn->conn_data_lock);
2448                 conn->refCount--;
2449                 MUTEX_EXIT(&conn->conn_data_lock);
2450                 return np;
2451
2452
2453             default:
2454                 /* Should not reach here, unless the peer is broken: send an
2455                  * abort packet */
2456                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2457                 MUTEX_ENTER(&conn->conn_data_lock);
2458                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2459                 conn->refCount--;
2460                 MUTEX_EXIT(&conn->conn_data_lock);
2461                 return tnp;
2462         }
2463     }
2464
2465     channel = np->header.cid & RX_CHANNELMASK;
2466     call = conn->call[channel];
2467 #ifdef  RX_ENABLE_LOCKS
2468     if (call)
2469         MUTEX_ENTER(&call->lock);
2470     /* Test to see if call struct is still attached to conn. */
2471     if (call != conn->call[channel]) {
2472         if (call)
2473             MUTEX_EXIT(&call->lock);
2474         if (type == RX_SERVER_CONNECTION) {
2475             call = conn->call[channel];
2476             /* If we started with no call attached and there is one now,
2477              * another thread is also running this routine and has gotten
2478              * the connection channel. We should drop this packet in the tests
2479              * below. If there was a call on this connection and it's now
2480              * gone, then we'll be making a new call below.
2481              * If there was previously a call and it's now different then
2482              * the old call was freed and another thread running this routine
2483              * has created a call on this channel. One of these two threads
2484              * has a packet for the old call and the code below handles those
2485              * cases.
2486              */
2487             if (call)
2488                 MUTEX_ENTER(&call->lock);
2489         }
2490         else {
2491             /* This packet can't be for this call. If the new call address is
2492              * 0 then no call is running on this channel. If there is a call
2493              * then, since this is a client connection we're getting data for
2494              * it must be for the previous call.
2495              */
2496             MUTEX_ENTER(&rx_stats_mutex);
2497             rx_stats.spuriousPacketsRead++;
2498             MUTEX_EXIT(&rx_stats_mutex);
2499             MUTEX_ENTER(&conn->conn_data_lock);
2500             conn->refCount--;
2501             MUTEX_EXIT(&conn->conn_data_lock);
2502             return np;
2503         }
2504     }
2505 #endif
2506     currentCallNumber = conn->callNumber[channel];
2507
2508     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2509         if (np->header.callNumber < currentCallNumber) {
2510             MUTEX_ENTER(&rx_stats_mutex);
2511             rx_stats.spuriousPacketsRead++;
2512             MUTEX_EXIT(&rx_stats_mutex);
2513 #ifdef  RX_ENABLE_LOCKS
2514             if (call)
2515                 MUTEX_EXIT(&call->lock);
2516 #endif
2517             MUTEX_ENTER(&conn->conn_data_lock);
2518             conn->refCount--;
2519             MUTEX_EXIT(&conn->conn_data_lock);
2520             return np;
2521         }
2522         if (!call) {
2523             call = rxi_NewCall(conn, channel);
2524             MUTEX_ENTER(&call->lock);
2525             *call->callNumber = np->header.callNumber;
2526             call->state = RX_STATE_PRECALL;
2527             clock_GetTime(&call->queueTime);
2528             hzero(call->bytesSent);
2529             hzero(call->bytesRcvd);
2530             rxi_KeepAliveOn(call);
2531         }
2532         else if (np->header.callNumber != currentCallNumber) {
2533             /* Wait until the transmit queue is idle before deciding
2534              * whether to reset the current call. Chances are that the
2535              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2536              * flag is cleared.
2537              */
2538 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2539             while ((call->state == RX_STATE_ACTIVE) &&
2540                    (call->flags & RX_CALL_TQ_BUSY)) {
2541                 call->flags |= RX_CALL_TQ_WAIT;
2542 #ifdef RX_ENABLE_LOCKS
2543                 CV_WAIT(&call->cv_tq, &call->lock);
2544 #else /* RX_ENABLE_LOCKS */
2545                 osi_rxSleep(&call->tq);
2546 #endif /* RX_ENABLE_LOCKS */
2547             }
2548 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2549             /* If the new call cannot be taken right now send a busy and set
2550              * the error condition in this call, so that it terminates as
2551              * quickly as possible */
2552             if (call->state == RX_STATE_ACTIVE) {
2553                 struct rx_packet *tp;
2554
2555                 rxi_CallError(call, RX_CALL_DEAD);
2556                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2557                 MUTEX_EXIT(&call->lock);
2558                 MUTEX_ENTER(&conn->conn_data_lock);
2559                 conn->refCount--;
2560                 MUTEX_EXIT(&conn->conn_data_lock);
2561                 return tp;
2562             }
2563             rxi_ResetCall(call, 0);
2564             *call->callNumber = np->header.callNumber;
2565             call->state = RX_STATE_PRECALL;
2566             clock_GetTime(&call->queueTime);
2567             hzero(call->bytesSent);
2568             hzero(call->bytesRcvd);
2569             /*
2570              * If the number of queued calls exceeds the overload
2571              * threshold then abort this call.
2572              */
2573             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2574                 struct rx_packet *tp;
2575
2576                 rxi_CallError(call, rx_BusyError);
2577                 tp = rxi_SendCallAbort(call, np, 1, 0);
2578                 MUTEX_EXIT(&call->lock);
2579                 MUTEX_ENTER(&conn->conn_data_lock);
2580                 conn->refCount--;
2581                 MUTEX_EXIT(&conn->conn_data_lock);
2582                 return tp;
2583             }
2584             rxi_KeepAliveOn(call);
2585         }
2586         else {
2587             /* Continuing call; do nothing here. */
2588         }
2589     } else { /* we're the client */
2590         /* Ignore all incoming acknowledgements for calls in DALLY state */
2591         if ( call && (call->state == RX_STATE_DALLY) 
2592          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2593             MUTEX_ENTER(&rx_stats_mutex);
2594             rx_stats.ignorePacketDally++;
2595             MUTEX_EXIT(&rx_stats_mutex);
2596 #ifdef  RX_ENABLE_LOCKS
2597             if (call) {
2598                 MUTEX_EXIT(&call->lock);
2599             }
2600 #endif
2601             MUTEX_ENTER(&conn->conn_data_lock);
2602             conn->refCount--;
2603             MUTEX_EXIT(&conn->conn_data_lock);
2604             return np;
2605         }
2606         
2607         /* Ignore anything that's not relevant to the current call.  If there
2608          * isn't a current call, then no packet is relevant. */
2609         if (!call || (np->header.callNumber != currentCallNumber)) {
2610             MUTEX_ENTER(&rx_stats_mutex);
2611             rx_stats.spuriousPacketsRead++;
2612             MUTEX_EXIT(&rx_stats_mutex);
2613 #ifdef  RX_ENABLE_LOCKS
2614             if (call) {
2615                 MUTEX_EXIT(&call->lock);
2616             }
2617 #endif
2618             MUTEX_ENTER(&conn->conn_data_lock);
2619             conn->refCount--;
2620             MUTEX_EXIT(&conn->conn_data_lock);
2621             return np;  
2622         }
2623         /* If the service security object index stamped in the packet does not
2624          * match the connection's security index, ignore the packet */
2625         if (np->header.securityIndex != conn->securityIndex) {
2626 #ifdef  RX_ENABLE_LOCKS
2627             MUTEX_EXIT(&call->lock);
2628 #endif
2629             MUTEX_ENTER(&conn->conn_data_lock);
2630             conn->refCount--;       
2631             MUTEX_EXIT(&conn->conn_data_lock);
2632             return np;
2633         }
2634
2635         /* If we're receiving the response, then all transmit packets are
2636          * implicitly acknowledged.  Get rid of them. */
2637         if (np->header.type == RX_PACKET_TYPE_DATA) {
2638 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2639             /* XXX Hack. Because we must release the global rx lock when
2640              * sending packets (osi_NetSend) we drop all acks while we're
2641              * traversing the tq in rxi_Start sending packets out because
2642              * packets may move to the freePacketQueue as result of being here!
2643              * So we drop these packets until we're safely out of the
2644              * traversing. Really ugly! 
2645              * For fine grain RX locking, we set the acked field in the
2646              * packets and let rxi_Start remove them from the transmit queue.
2647              */
2648             if (call->flags & RX_CALL_TQ_BUSY) {
2649 #ifdef  RX_ENABLE_LOCKS
2650                 rxi_SetAcksInTransmitQueue(call);
2651 #else
2652                 conn->refCount--;
2653                 return np;              /* xmitting; drop packet */
2654 #endif
2655             }
2656             else {
2657                 rxi_ClearTransmitQueue(call, 0);
2658             }
2659 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2660             rxi_ClearTransmitQueue(call, 0);
2661 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2662         } else {
2663           if (np->header.type == RX_PACKET_TYPE_ACK) {
2664         /* now check to see if this is an ack packet acknowledging that the
2665          * server actually *lost* some hard-acked data.  If this happens we
2666          * ignore this packet, as it may indicate that the server restarted in
2667          * the middle of a call.  It is also possible that this is an old ack
2668          * packet.  We don't abort the connection in this case, because this
2669          * *might* just be an old ack packet.  The right way to detect a server
2670          * restart in the midst of a call is to notice that the server epoch
2671          * changed, btw.  */
2672         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2673          * XXX unacknowledged.  I think that this is off-by-one, but
2674          * XXX I don't dare change it just yet, since it will
2675          * XXX interact badly with the server-restart detection 
2676          * XXX code in receiveackpacket.  */
2677             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2678                 MUTEX_ENTER(&rx_stats_mutex);
2679                 rx_stats.spuriousPacketsRead++;
2680                 MUTEX_EXIT(&rx_stats_mutex);
2681                 MUTEX_EXIT(&call->lock);
2682                 MUTEX_ENTER(&conn->conn_data_lock);
2683                 conn->refCount--;
2684                 MUTEX_EXIT(&conn->conn_data_lock);
2685                 return np;
2686             }
2687           }
2688         } /* else not a data packet */
2689     }
2690
2691     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2692     /* Set remote user defined status from packet */
2693     call->remoteStatus = np->header.userStatus;
2694
2695     /* Note the gap between the expected next packet and the actual
2696      * packet that arrived, when the new packet has a smaller serial number
2697      * than expected.  Rioses frequently reorder packets all by themselves,
2698      * so this will be quite important with very large window sizes.
2699      * Skew is checked against 0 here to avoid any dependence on the type of
2700      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2701      * true! 
2702      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2703      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2704      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2705      */
2706     MUTEX_ENTER(&conn->conn_data_lock);
2707     skew = conn->lastSerial - np->header.serial;
2708     conn->lastSerial = np->header.serial;
2709     MUTEX_EXIT(&conn->conn_data_lock);
2710     if (skew > 0) {
2711       register struct rx_peer *peer;
2712       peer = conn->peer;
2713       if (skew > peer->inPacketSkew) {
2714         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2715         peer->inPacketSkew = skew;
2716       }
2717     }
2718
2719     /* Now do packet type-specific processing */
2720     switch (np->header.type) {
2721         case RX_PACKET_TYPE_DATA:
2722             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2723                                        tnop, newcallp);
2724             break;
2725         case RX_PACKET_TYPE_ACK:
2726             /* Respond immediately to ack packets requesting acknowledgement
2727              * (ping packets) */
2728             if (np->header.flags & RX_REQUEST_ACK) {
2729                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2730                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2731             }
2732             np = rxi_ReceiveAckPacket(call, np, 1);
2733             break;
2734         case RX_PACKET_TYPE_ABORT:
2735             /* An abort packet: reset the connection, passing the error up to
2736              * the user */
2737             /* What if error is zero? */
2738             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2739             break;
2740         case RX_PACKET_TYPE_BUSY:
2741             /* XXXX */
2742             break;
2743         case RX_PACKET_TYPE_ACKALL:
2744             /* All packets acknowledged, so we can drop all packets previously
2745              * readied for sending */
2746 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2747             /* XXX Hack. We because we can't release the global rx lock when
2748              * sending packets (osi_NetSend) we drop all ack pkts while we're
2749              * traversing the tq in rxi_Start sending packets out because
2750              * packets may move to the freePacketQueue as result of being
2751              * here! So we drop these packets until we're safely out of the
2752              * traversing. Really ugly! 
2753              * For fine grain RX locking, we set the acked field in the packets
2754              * and let rxi_Start remove the packets from the transmit queue.
2755              */
2756             if (call->flags & RX_CALL_TQ_BUSY) {
2757 #ifdef  RX_ENABLE_LOCKS
2758                 rxi_SetAcksInTransmitQueue(call);
2759                 break;
2760 #else /* RX_ENABLE_LOCKS */
2761                 conn->refCount--;
2762                 return np;              /* xmitting; drop packet */
2763 #endif /* RX_ENABLE_LOCKS */
2764             }
2765 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2766             rxi_ClearTransmitQueue(call, 0);
2767             break;
2768         default:
2769             /* Should not reach here, unless the peer is broken: send an abort
2770              * packet */
2771             rxi_CallError(call, RX_PROTOCOL_ERROR);
2772             np = rxi_SendCallAbort(call, np, 1, 0);
2773             break;
2774     };
2775     /* Note when this last legitimate packet was received, for keep-alive
2776      * processing.  Note, we delay getting the time until now in the hope that
2777      * the packet will be delivered to the user before any get time is required
2778      * (if not, then the time won't actually be re-evaluated here). */
2779     call->lastReceiveTime = clock_Sec();
2780     MUTEX_EXIT(&call->lock);
2781     MUTEX_ENTER(&conn->conn_data_lock);
2782     conn->refCount--;
2783     MUTEX_EXIT(&conn->conn_data_lock);
2784     return np;
2785 }
2786
2787 /* return true if this is an "interesting" connection from the point of view
2788     of someone trying to debug the system */
2789 int rxi_IsConnInteresting(struct rx_connection *aconn)
2790 {
2791     register int i;
2792     register struct rx_call *tcall;
2793
2794     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2795         return 1;
2796     for(i=0;i<RX_MAXCALLS;i++) {
2797         tcall = aconn->call[i];
2798         if (tcall) {
2799             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2800                 return 1;
2801             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2802                 return 1;
2803         }
2804     }
2805     return 0;
2806 }
2807
2808 #ifdef KERNEL
2809 /* if this is one of the last few packets AND it wouldn't be used by the
2810    receiving call to immediately satisfy a read request, then drop it on
2811    the floor, since accepting it might prevent a lock-holding thread from
2812    making progress in its reading. If a call has been cleared while in
2813    the precall state then ignore all subsequent packets until the call
2814    is assigned to a thread. */
2815
2816 static TooLow(ap, acall)
2817   struct rx_call *acall;
2818   struct rx_packet *ap; {
2819     int rc=0;
2820     MUTEX_ENTER(&rx_stats_mutex);
2821     if (((ap->header.seq != 1) &&
2822          (acall->flags & RX_CALL_CLEARED) &&
2823          (acall->state == RX_STATE_PRECALL)) ||
2824         ((rx_nFreePackets < rxi_dataQuota+2) &&
2825          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2826            && (acall->flags & RX_CALL_READER_WAIT)))) {
2827         rc = 1;
2828     }
2829     MUTEX_EXIT(&rx_stats_mutex);
2830     return rc;
2831 }
2832 #endif /* KERNEL */
2833
2834 /* try to attach call, if authentication is complete */
2835 static void TryAttach(acall, socket, tnop, newcallp)
2836 register struct rx_call *acall;
2837 register osi_socket socket;
2838 register int *tnop;
2839 register struct rx_call **newcallp; {
2840     register struct rx_connection *conn;
2841     conn = acall->conn;
2842     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2843         /* Don't attach until we have any req'd. authentication. */
2844         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2845             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2846             /* Note:  this does not necessarily succeed; there
2847                may not any proc available */
2848         }
2849         else {
2850             rxi_ChallengeOn(acall->conn);
2851         }
2852     }
2853 }
2854
2855 /* A data packet has been received off the interface.  This packet is
2856  * appropriate to the call (the call is in the right state, etc.).  This
2857  * routine can return a packet to the caller, for re-use */
2858
2859 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2860                                         port, tnop, newcallp)
2861     register struct rx_call *call;
2862     register struct rx_packet *np;
2863     int istack;
2864     osi_socket socket;
2865     afs_uint32 host;
2866     u_short port;
2867     int *tnop;
2868     struct rx_call **newcallp;
2869 {
2870     int ackNeeded = 0;
2871     int newPackets = 0;
2872     int didHardAck = 0;
2873     int haveLast = 0;
2874     afs_uint32 seq, serial, flags;
2875     int isFirst;
2876     struct rx_packet *tnp;
2877     struct clock when;
2878     MUTEX_ENTER(&rx_stats_mutex);
2879     rx_stats.dataPacketsRead++;
2880     MUTEX_EXIT(&rx_stats_mutex);
2881
2882 #ifdef KERNEL
2883     /* If there are no packet buffers, drop this new packet, unless we can find
2884      * packet buffers from inactive calls */
2885     if (!call->error &&
2886         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2887         MUTEX_ENTER(&rx_freePktQ_lock);
2888         rxi_NeedMorePackets = TRUE;
2889         MUTEX_EXIT(&rx_freePktQ_lock);
2890         MUTEX_ENTER(&rx_stats_mutex);
2891         rx_stats.noPacketBuffersOnRead++;
2892         MUTEX_EXIT(&rx_stats_mutex);
2893         call->rprev = np->header.serial;
2894         rxi_calltrace(RX_TRACE_DROP, call);
2895         dpf (("packet %x dropped on receipt - quota problems", np));
2896         if (rxi_doreclaim)
2897             rxi_ClearReceiveQueue(call);
2898         clock_GetTime(&when);
2899         clock_Add(&when, &rx_softAckDelay);
2900         if (!call->delayedAckEvent ||
2901             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2902             rxevent_Cancel(call->delayedAckEvent, call,
2903                            RX_CALL_REFCOUNT_DELAY);
2904             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2905             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2906                                                  call, 0);
2907         }
2908         /* we've damaged this call already, might as well do it in. */
2909         return np;
2910     }
2911 #endif /* KERNEL */
2912
2913     /*
2914      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2915      * packet is one of several packets transmitted as a single
2916      * datagram. Do not send any soft or hard acks until all packets
2917      * in a jumbogram have been processed. Send negative acks right away.
2918      */
2919     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2920         /* tnp is non-null when there are more packets in the
2921          * current jumbo gram */
2922         if (tnp) {
2923             if (np)
2924                 rxi_FreePacket(np);
2925             np = tnp;
2926         }
2927
2928         seq = np->header.seq;
2929         serial = np->header.serial;
2930         flags = np->header.flags;
2931
2932         /* If the call is in an error state, send an abort message */
2933         if (call->error)
2934             return rxi_SendCallAbort(call, np, istack, 0);
2935
2936         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2937          * AFS 3.5 jumbogram. */
2938         if (flags & RX_JUMBO_PACKET) {
2939             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2940         } else {
2941             tnp = NULL;
2942         }
2943
2944         if (np->header.spare != 0) {
2945             MUTEX_ENTER(&call->conn->conn_data_lock);
2946             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2947             MUTEX_EXIT(&call->conn->conn_data_lock);
2948         }
2949
2950         /* The usual case is that this is the expected next packet */
2951         if (seq == call->rnext) {
2952
2953             /* Check to make sure it is not a duplicate of one already queued */
2954             if (queue_IsNotEmpty(&call->rq) 
2955                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2956                 MUTEX_ENTER(&rx_stats_mutex);
2957                 rx_stats.dupPacketsRead++;
2958                 MUTEX_EXIT(&rx_stats_mutex);
2959                 dpf (("packet %x dropped on receipt - duplicate", np));
2960                 rxevent_Cancel(call->delayedAckEvent, call,
2961                                RX_CALL_REFCOUNT_DELAY);
2962                 np = rxi_SendAck(call, np, seq, serial,
2963                                  flags, RX_ACK_DUPLICATE, istack);
2964                 ackNeeded = 0;
2965                 call->rprev = seq;
2966                 continue;
2967             }
2968
2969             /* It's the next packet. Stick it on the receive queue
2970              * for this call. Set newPackets to make sure we wake
2971              * the reader once all packets have been processed */
2972             queue_Prepend(&call->rq, np);
2973             call->nSoftAcks++;
2974             np = NULL; /* We can't use this anymore */
2975             newPackets = 1;
2976
2977             /* If an ack is requested then set a flag to make sure we
2978              * send an acknowledgement for this packet */
2979             if (flags & RX_REQUEST_ACK) {
2980                 ackNeeded = 1;
2981             }
2982
2983             /* Keep track of whether we have received the last packet */
2984             if (flags & RX_LAST_PACKET) {
2985                 call->flags |= RX_CALL_HAVE_LAST;
2986                 haveLast = 1;
2987             }
2988
2989             /* Check whether we have all of the packets for this call */
2990             if (call->flags & RX_CALL_HAVE_LAST) {
2991                 afs_uint32 tseq;                /* temporary sequence number */
2992                 struct rx_packet *tp;   /* Temporary packet pointer */
2993                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2994
2995                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2996                     if (tseq != tp->header.seq)
2997                         break;
2998                     if (tp->header.flags & RX_LAST_PACKET) {
2999                         call->flags |= RX_CALL_RECEIVE_DONE;
3000                         break;
3001                     }
3002                     tseq++;
3003                 }
3004             }
3005
3006             /* Provide asynchronous notification for those who want it
3007              * (e.g. multi rx) */
3008             if (call->arrivalProc) {
3009                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3010                                      call->arrivalProcArg);
3011                 call->arrivalProc = (VOID (*)()) 0;
3012             }
3013
3014             /* Update last packet received */
3015             call->rprev = seq;
3016
3017             /* If there is no server process serving this call, grab
3018              * one, if available. We only need to do this once. If a
3019              * server thread is available, this thread becomes a server
3020              * thread and the server thread becomes a listener thread. */
3021             if (isFirst) {
3022                 TryAttach(call, socket, tnop, newcallp);
3023             }
3024         }       
3025         /* This is not the expected next packet. */
3026         else {
3027             /* Determine whether this is a new or old packet, and if it's
3028              * a new one, whether it fits into the current receive window.
3029              * Also figure out whether the packet was delivered in sequence.
3030              * We use the prev variable to determine whether the new packet
3031              * is the successor of its immediate predecessor in the
3032              * receive queue, and the missing flag to determine whether
3033              * any of this packets predecessors are missing.  */
3034
3035             afs_uint32 prev;            /* "Previous packet" sequence number */
3036             struct rx_packet *tp;       /* Temporary packet pointer */
3037             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3038             int missing;                /* Are any predecessors missing? */
3039
3040             /* If the new packet's sequence number has been sent to the
3041              * application already, then this is a duplicate */
3042             if (seq < call->rnext) {
3043                 MUTEX_ENTER(&rx_stats_mutex);
3044                 rx_stats.dupPacketsRead++;
3045                 MUTEX_EXIT(&rx_stats_mutex);
3046                 rxevent_Cancel(call->delayedAckEvent, call,
3047                                RX_CALL_REFCOUNT_DELAY);
3048                 np = rxi_SendAck(call, np, seq, serial,
3049                                  flags, RX_ACK_DUPLICATE, istack);
3050                 ackNeeded = 0;
3051                 call->rprev = seq;
3052                 continue;
3053             }
3054
3055             /* If the sequence number is greater than what can be
3056              * accomodated by the current window, then send a negative
3057              * acknowledge and drop the packet */
3058             if ((call->rnext + call->rwind) <= seq) {
3059                 rxevent_Cancel(call->delayedAckEvent, call,
3060                                RX_CALL_REFCOUNT_DELAY);
3061                 np = rxi_SendAck(call, np, seq, serial,
3062                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3063                 ackNeeded = 0;
3064                 call->rprev = seq;
3065                 continue;
3066             }
3067
3068             /* Look for the packet in the queue of old received packets */
3069             for (prev = call->rnext - 1, missing = 0,
3070                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3071                 /*Check for duplicate packet */
3072                 if (seq == tp->header.seq) {
3073                     MUTEX_ENTER(&rx_stats_mutex);
3074                     rx_stats.dupPacketsRead++;
3075                     MUTEX_EXIT(&rx_stats_mutex);
3076                     rxevent_Cancel(call->delayedAckEvent, call,
3077                                    RX_CALL_REFCOUNT_DELAY);
3078                     np = rxi_SendAck(call, np, seq, serial, 
3079                                      flags, RX_ACK_DUPLICATE, istack);
3080                     ackNeeded = 0;
3081                     call->rprev = seq;
3082                     goto nextloop;
3083                 }
3084                 /* If we find a higher sequence packet, break out and
3085                  * insert the new packet here. */
3086                 if (seq < tp->header.seq) break;
3087                 /* Check for missing packet */
3088                 if (tp->header.seq != prev+1) {
3089                     missing = 1;
3090                 }
3091
3092                 prev = tp->header.seq;
3093             }
3094
3095             /* Keep track of whether we have received the last packet. */
3096             if (flags & RX_LAST_PACKET) {
3097                 call->flags |= RX_CALL_HAVE_LAST;
3098             }
3099
3100             /* It's within the window: add it to the the receive queue.
3101              * tp is left by the previous loop either pointing at the
3102              * packet before which to insert the new packet, or at the
3103              * queue head if the queue is empty or the packet should be
3104              * appended. */
3105             queue_InsertBefore(tp, np);
3106             call->nSoftAcks++;
3107             np = NULL;
3108
3109             /* Check whether we have all of the packets for this call */
3110             if ((call->flags & RX_CALL_HAVE_LAST)
3111              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3112                 afs_uint32 tseq;                /* temporary sequence number */
3113
3114                 for (tseq = call->rnext,
3115                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3116                     if (tseq != tp->header.seq)
3117                         break;
3118                     if (tp->header.flags & RX_LAST_PACKET) {
3119                         call->flags |= RX_CALL_RECEIVE_DONE;
3120                         break;
3121                     }
3122                     tseq++;
3123                 }
3124             }
3125
3126             /* We need to send an ack of the packet is out of sequence, 
3127              * or if an ack was requested by the peer. */
3128             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3129                 ackNeeded = 1;
3130             }
3131
3132             /* Acknowledge the last packet for each call */
3133             if (flags & RX_LAST_PACKET) {
3134                 haveLast = 1;
3135             }
3136
3137             call->rprev = seq;
3138         }
3139 nextloop:;
3140     }
3141
3142     if (newPackets) {
3143         /*
3144          * If the receiver is waiting for an iovec, fill the iovec
3145          * using the data from the receive queue */
3146         if (call->flags & RX_CALL_IOVEC_WAIT) {
3147             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3148             /* the call may have been aborted */
3149             if (call->error) {
3150                 return NULL;
3151             }
3152             if (didHardAck) {
3153                 ackNeeded = 0;
3154             }
3155         }
3156
3157         /* Wakeup the reader if any */
3158         if ((call->flags & RX_CALL_READER_WAIT) &&
3159             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3160              (call->iovNext >= call->iovMax) ||
3161              (call->flags & RX_CALL_RECEIVE_DONE))) {
3162             call->flags &= ~RX_CALL_READER_WAIT;
3163 #ifdef  RX_ENABLE_LOCKS
3164             CV_BROADCAST(&call->cv_rq);
3165 #else
3166             osi_rxWakeup(&call->rq);
3167 #endif
3168         }
3169     }
3170
3171     /*
3172      * Send an ack when requested by the peer, or once every
3173      * rxi_SoftAckRate packets until the last packet has been
3174      * received. Always send a soft ack for the last packet in
3175      * the server's reply. */
3176     if (ackNeeded) {
3177         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3178         np = rxi_SendAck(call, np, seq, serial, flags,
3179                          RX_ACK_REQUESTED, istack);
3180     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3181         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3182         np = rxi_SendAck(call, np, seq, serial, flags,
3183                          RX_ACK_IDLE, istack);
3184     } else if (call->nSoftAcks) {
3185         clock_GetTime(&when);
3186         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3187             clock_Add(&when, &rx_lastAckDelay);
3188         } else {
3189             clock_Add(&when, &rx_softAckDelay);
3190         }
3191         if (!call->delayedAckEvent ||
3192             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3193             rxevent_Cancel(call->delayedAckEvent, call,
3194                            RX_CALL_REFCOUNT_DELAY);
3195             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3196             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3197                                                  call, 0);
3198         }
3199     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3200         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3201     }
3202
3203     return np;
3204 }
3205
3206 #ifdef  ADAPT_WINDOW
3207 static void rxi_ComputeRate();
3208 #endif
3209
3210 /* The real smarts of the whole thing.  */
3211 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3212     register struct rx_call *call;
3213     struct rx_packet *np;
3214     int istack;
3215 {
3216     struct rx_ackPacket *ap;
3217     int nAcks;
3218     register struct rx_packet *tp;
3219     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3220     register struct rx_connection *conn = call->conn;
3221     struct rx_peer *peer = conn->peer;
3222     afs_uint32 first;
3223     afs_uint32 serial;
3224     /* because there are CM's that are bogus, sending weird values for this. */
3225     afs_uint32 skew = 0;
3226     int needRxStart = 0;
3227     int nbytes;
3228     int missing;
3229     int acked;
3230     int nNacked = 0;
3231     int newAckCount = 0;
3232     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3233     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3234
3235     MUTEX_ENTER(&rx_stats_mutex);
3236     rx_stats.ackPacketsRead++;
3237     MUTEX_EXIT(&rx_stats_mutex);
3238     ap = (struct rx_ackPacket *) rx_DataOf(np);
3239     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3240     if (nbytes < 0)
3241       return np;       /* truncated ack packet */
3242
3243     /* depends on ack packet struct */
3244     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3245     first = ntohl(ap->firstPacket);
3246     serial = ntohl(ap->serial);
3247     /* temporarily disabled -- needs to degrade over time 
3248        skew = ntohs(ap->maxSkew); */
3249
3250     /* Ignore ack packets received out of order */
3251     if (first < call->tfirst) {
3252         return np;
3253     }
3254
3255     if (np->header.flags & RX_SLOW_START_OK) {
3256         call->flags |= RX_CALL_SLOW_START_OK;
3257     }
3258     
3259 #ifdef RXDEBUG
3260     if (rx_Log) {
3261       fprintf( rx_Log, 
3262               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3263               ap->reason, ntohl(ap->previousPacket), np->header.seq, serial, 
3264               skew, ntohl(ap->firstPacket));
3265         if (nAcks) {
3266             int offset;
3267             for (offset = 0; offset < nAcks; offset++) 
3268                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3269         }
3270         putc('\n', rx_Log);
3271     }
3272 #endif
3273
3274     /* if a server connection has been re-created, it doesn't remember what
3275         serial # it was up to.  An ack will tell us, since the serial field
3276         contains the largest serial received by the other side */
3277     MUTEX_ENTER(&conn->conn_data_lock);
3278     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3279         conn->serial = serial+1;
3280     }
3281     MUTEX_EXIT(&conn->conn_data_lock);
3282
3283     /* Update the outgoing packet skew value to the latest value of
3284      * the peer's incoming packet skew value.  The ack packet, of
3285      * course, could arrive out of order, but that won't affect things
3286      * much */
3287     MUTEX_ENTER(&peer->peer_lock);
3288     peer->outPacketSkew = skew;
3289
3290     /* Check for packets that no longer need to be transmitted, and
3291      * discard them.  This only applies to packets positively
3292      * acknowledged as having been sent to the peer's upper level.
3293      * All other packets must be retained.  So only packets with
3294      * sequence numbers < ap->firstPacket are candidates. */
3295     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3296         if (tp->header.seq >= first) break;
3297         call->tfirst = tp->header.seq + 1;
3298         if (tp->header.serial == serial) {
3299           /* Use RTT if not delayed by client. */
3300           if (ap->reason != RX_ACK_DELAY)
3301               rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3302 #ifdef ADAPT_WINDOW
3303           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3304 #endif
3305         }
3306         else if (tp->firstSerial == serial) {
3307             /* Use RTT if not delayed by client. */
3308             if (ap->reason != RX_ACK_DELAY)
3309                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3310 #ifdef ADAPT_WINDOW
3311           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3312 #endif
3313         }
3314 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3315     /* XXX Hack. Because we have to release the global rx lock when sending
3316      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3317      * in rxi_Start sending packets out because packets may move to the
3318      * freePacketQueue as result of being here! So we drop these packets until
3319      * we're safely out of the traversing. Really ugly! 
3320      * To make it even uglier, if we're using fine grain locking, we can
3321      * set the ack bits in the packets and have rxi_Start remove the packets
3322      * when it's done transmitting.
3323      */
3324         if (!tp->acked) {
3325             newAckCount++;
3326         }
3327         if (call->flags & RX_CALL_TQ_BUSY) {
3328 #ifdef RX_ENABLE_LOCKS
3329             tp->acked = 1;
3330             call->flags |= RX_CALL_TQ_SOME_ACKED;
3331 #else /* RX_ENABLE_LOCKS */
3332             break;
3333 #endif /* RX_ENABLE_LOCKS */
3334         } else
3335 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3336         {
3337         queue_Remove(tp);
3338         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3339         }
3340     }
3341
3342 #ifdef ADAPT_WINDOW
3343     /* Give rate detector a chance to respond to ping requests */
3344     if (ap->reason == RX_ACK_PING_RESPONSE) {
3345         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3346     }
3347 #endif
3348
3349     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3350    
3351    /* Now go through explicit acks/nacks and record the results in
3352     * the waiting packets.  These are packets that can't be released
3353     * yet, even with a positive acknowledge.  This positive
3354     * acknowledge only means the packet has been received by the
3355     * peer, not that it will be retained long enough to be sent to
3356     * the peer's upper level.  In addition, reset the transmit timers
3357     * of any missing packets (those packets that must be missing
3358     * because this packet was out of sequence) */
3359
3360     call->nSoftAcked = 0;
3361     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3362         /* Update round trip time if the ack was stimulated on receipt
3363          * of this packet */
3364 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3365 #ifdef RX_ENABLE_LOCKS
3366         if (tp->header.seq >= first) {
3367 #endif /* RX_ENABLE_LOCKS */
3368 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3369         if (tp->header.serial == serial) {
3370             /* Use RTT if not delayed by client. */
3371             if (ap->reason != RX_ACK_DELAY)
3372                 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3373 #ifdef ADAPT_WINDOW
3374           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3375 #endif
3376         }
3377         else if ((tp->firstSerial == serial)) {
3378             /* Use RTT if not delayed by client. */
3379             if (ap->reason != RX_ACK_DELAY)
3380                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3381 #ifdef ADAPT_WINDOW
3382           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3383 #endif
3384         }
3385 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3386 #ifdef RX_ENABLE_LOCKS
3387         }
3388 #endif /* RX_ENABLE_LOCKS */
3389 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3390
3391         /* Set the acknowledge flag per packet based on the
3392          * information in the ack packet. An acknowlegded packet can
3393          * be downgraded when the server has discarded a packet it
3394          * soacked previously, or when an ack packet is received
3395          * out of sequence. */
3396         if (tp->header.seq < first) {
3397             /* Implicit ack information */
3398             if (!tp->acked) {
3399                 newAckCount++;
3400             }
3401             tp->acked = 1;
3402         }
3403         else if (tp->header.seq < first + nAcks) {
3404             /* Explicit ack information:  set it in the packet appropriately */
3405             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3406                 if (!tp->acked) {
3407                     newAckCount++;
3408                     tp->acked = 1;
3409                 }
3410                 if (missing) {
3411                     nNacked++;
3412                 } else {
3413                     call->nSoftAcked++;
3414                 }
3415             } else {
3416                 tp->acked = 0;
3417                 missing = 1;
3418             }
3419         }
3420         else {
3421             tp->acked = 0;
3422             missing = 1;
3423         }
3424
3425         /* If packet isn't yet acked, and it has been transmitted at least 
3426          * once, reset retransmit time using latest timeout 
3427          * ie, this should readjust the retransmit timer for all outstanding 
3428          * packets...  So we don't just retransmit when we should know better*/
3429
3430         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3431           tp->retryTime = tp->timeSent;
3432           clock_Add(&tp->retryTime, &peer->timeout);
3433           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3434           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3435         }
3436     }
3437
3438     /* If the window has been extended by this acknowledge packet,
3439      * then wakeup a sender waiting in alloc for window space, or try
3440      * sending packets now, if he's been sitting on packets due to
3441      * lack of window space */
3442     if (call->tnext < (call->tfirst + call->twind))  {
3443 #ifdef  RX_ENABLE_LOCKS
3444         CV_SIGNAL(&call->cv_twind);
3445 #else
3446         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3447             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3448             osi_rxWakeup(&call->twind);
3449         }
3450 #endif
3451         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3452             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3453         }
3454     }
3455
3456     /* if the ack packet has a receivelen field hanging off it,
3457      * update our state */
3458     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3459       afs_uint32 tSize;
3460
3461       /* If the ack packet has a "recommended" size that is less than 
3462        * what I am using now, reduce my size to match */
3463       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3464                     sizeof(afs_int32), &tSize);
3465       tSize = (afs_uint32) ntohl(tSize);
3466       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3467
3468       /* Get the maximum packet size to send to this peer */
3469       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3470                     &tSize);
3471       tSize = (afs_uint32)ntohl(tSize);
3472       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3473       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3474
3475       /* sanity check - peer might have restarted with different params.
3476        * If peer says "send less", dammit, send less...  Peer should never 
3477        * be unable to accept packets of the size that prior AFS versions would
3478        * send without asking.  */
3479       if (peer->maxMTU != tSize) {
3480           peer->maxMTU = tSize;
3481           peer->MTU = MIN(tSize, peer->MTU);
3482           call->MTU = MIN(call->MTU, tSize);
3483           peer->congestSeq++;
3484       }
3485
3486       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3487           /* AFS 3.4a */
3488           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3489                         sizeof(afs_int32), &tSize);
3490           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3491           if (tSize < call->twind) {       /* smaller than our send */
3492               call->twind = tSize;         /* window, we must send less... */
3493               call->ssthresh = MIN(call->twind, call->ssthresh);
3494           }
3495
3496           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3497            * network MTU confused with the loopback MTU. Calculate the
3498            * maximum MTU here for use in the slow start code below.
3499            */
3500           maxMTU = peer->maxMTU;
3501           /* Did peer restart with older RX version? */
3502           if (peer->maxDgramPackets > 1) {
3503               peer->maxDgramPackets = 1;
3504           }
3505       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3506           /* AFS 3.5 */
3507           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3508                         sizeof(afs_int32), &tSize);
3509           tSize = (afs_uint32) ntohl(tSize);
3510           /*
3511            * As of AFS 3.5 we set the send window to match the receive window. 
3512            */
3513           if (tSize < call->twind) {
3514               call->twind = tSize;
3515               call->ssthresh = MIN(call->twind, call->ssthresh);
3516           } else if (tSize > call->twind) {
3517               call->twind = tSize;
3518           }
3519
3520           /*
3521            * As of AFS 3.5, a jumbogram is more than one fixed size
3522            * packet transmitted in a single UDP datagram. If the remote
3523            * MTU is smaller than our local MTU then never send a datagram
3524            * larger than the natural MTU.
3525            */
3526           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3527                         sizeof(afs_int32), &tSize);
3528           maxDgramPackets = (afs_uint32) ntohl(tSize);
3529           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3530           maxDgramPackets = MIN(maxDgramPackets,
3531                                 (int)(peer->ifDgramPackets));
3532           maxDgramPackets = MIN(maxDgramPackets, tSize);
3533           if (maxDgramPackets > 1) {
3534             peer->maxDgramPackets = maxDgramPackets;
3535             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3536           } else {
3537             peer->maxDgramPackets = 1;
3538             call->MTU = peer->natMTU;
3539           }
3540        } else if (peer->maxDgramPackets > 1) {
3541           /* Restarted with lower version of RX */
3542           peer->maxDgramPackets = 1;
3543        }
3544     } else if (peer->maxDgramPackets > 1 ||
3545                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3546         /* Restarted with lower version of RX */
3547         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3548         peer->natMTU = OLD_MAX_PACKET_SIZE;
3549         peer->MTU = OLD_MAX_PACKET_SIZE;
3550         peer->maxDgramPackets = 1;
3551         peer->nDgramPackets = 1;
3552         peer->congestSeq++;
3553         call->MTU = OLD_MAX_PACKET_SIZE;
3554     }
3555
3556     if (nNacked) {
3557         /*
3558          * Calculate how many datagrams were successfully received after
3559          * the first missing packet and adjust the negative ack counter
3560          * accordingly.
3561          */
3562         call->nAcks = 0;
3563         call->nNacks++;
3564         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3565         if (call->nNacks < nNacked) {
3566             call->nNacks = nNacked;
3567         }
3568     } else {
3569         if (newAckCount) {
3570             call->nAcks++;
3571         }
3572         call->nNacks = 0;
3573     }
3574
3575     if (call->flags & RX_CALL_FAST_RECOVER) {
3576         if (nNacked) {
3577             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3578         } else {
3579             call->flags &= ~RX_CALL_FAST_RECOVER;
3580             call->cwind = call->nextCwind;
3581             call->nextCwind = 0;
3582             call->nAcks = 0;
3583         }
3584         call->nCwindAcks = 0;
3585     }
3586     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3587         /* Three negative acks in a row trigger congestion recovery */
3588 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3589         MUTEX_EXIT(&peer->peer_lock);
3590         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3591                 /* someone else is waiting to start recovery */
3592                 return np;
3593         }
3594         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3595         while (call->flags & RX_CALL_TQ_BUSY) {
3596             call->flags |= RX_CALL_TQ_WAIT;
3597 #ifdef RX_ENABLE_LOCKS
3598             CV_WAIT(&call->cv_tq, &call->lock);
3599 #else /* RX_ENABLE_LOCKS */
3600             osi_rxSleep(&call->tq);
3601 #endif /* RX_ENABLE_LOCKS */
3602         }
3603         MUTEX_ENTER(&peer->peer_lock);
3604 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3605         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3606         call->flags |= RX_CALL_FAST_RECOVER;
3607         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3608         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3609                           rx_maxSendWindow);
3610         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3611         call->nextCwind = call->ssthresh;
3612         call->nAcks = 0;
3613         call->nNacks = 0;
3614         peer->MTU = call->MTU;
3615         peer->cwind = call->nextCwind;
3616         peer->nDgramPackets = call->nDgramPackets;
3617         peer->congestSeq++;
3618         call->congestSeq = peer->congestSeq;
3619         /* Reset the resend times on the packets that were nacked
3620          * so we will retransmit as soon as the window permits*/
3621         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3622             if (acked) {
3623                 if (!tp->acked) {
3624                     clock_Zero(&tp->retryTime);
3625                 }
3626             } else if (tp->acked) {
3627                 acked = 1;
3628             }
3629         }
3630     } else {
3631         /* If cwind is smaller than ssthresh, then increase
3632          * the window one packet for each ack we receive (exponential
3633          * growth).
3634          * If cwind is greater than or equal to ssthresh then increase
3635          * the congestion window by one packet for each cwind acks we
3636          * receive (linear growth).  */
3637         if (call->cwind < call->ssthresh) {
3638             call->cwind = MIN((int)call->ssthresh,
3639                               (int)(call->cwind + newAckCount));
3640             call->nCwindAcks = 0;
3641         } else {
3642             call->nCwindAcks += newAckCount;
3643             if (call->nCwindAcks >= call->cwind) {
3644                 call->nCwindAcks = 0;
3645                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3646             }
3647         }
3648         /*
3649          * If we have received several acknowledgements in a row then
3650          * it is time to increase the size of our datagrams
3651          */
3652         if ((int)call->nAcks > rx_nDgramThreshold) {
3653             if (peer->maxDgramPackets > 1) {
3654                 if (call->nDgramPackets < peer->maxDgramPackets) {
3655                     call->nDgramPackets++;
3656                 }
3657                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3658             } else if (call->MTU < peer->maxMTU) {
3659                 call->MTU += peer->natMTU;
3660                 call->MTU = MIN(call->MTU, peer->maxMTU);
3661             }
3662             call->nAcks = 0;
3663         }
3664     }
3665
3666     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3667
3668     /* Servers need to hold the call until all response packets have
3669      * been acknowledged. Soft acks are good enough since clients
3670      * are not allowed to clear their receive queues. */
3671     if (call->state == RX_STATE_HOLD &&
3672         call->tfirst + call->nSoftAcked >= call->tnext) {
3673         call->state = RX_STATE_DALLY;
3674         rxi_ClearTransmitQueue(call, 0);
3675     } else if (!queue_IsEmpty(&call->tq)) {
3676         rxi_Start(0, call, istack);
3677     }
3678     return np;
3679 }
3680
3681 /* Received a response to a challenge packet */