ace254d08559948bf16062573ba94e24abac93f4
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #ifdef  KERNEL
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
16 #ifndef UKERNEL
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
20 #ifdef  AFS_OSF_ENV
21 #include <net/net_globals.h>
22 #endif  /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
25 #endif
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
31 #endif
32 #ifdef RXDEBUG
33 #undef RXDEBUG      /* turn off debugging */
34 #endif /* RXDEBUG */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
37 #endif
38 #include "../afsint/afsint.h"
39 #ifdef  AFS_ALPHA_ENV
40 #undef kmem_alloc
41 #undef kmem_free
42 #undef mem_alloc
43 #undef mem_free
44 #undef register
45 #endif  /* AFS_ALPHA_ENV */
46 #else /* !UKERNEL */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
49 #endif /* !UKERNEL */
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
55 #include "../rx/rx.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
60 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
63 #ifdef AFS_AIX41_ENV
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
68 #else /* KERNEL */
69 # include <afs/param.h>
70 # include <sys/types.h>
71 # include <errno.h>
72 #ifdef AFS_NT40_ENV
73 # include <stdlib.h>
74 # include <fcntl.h>
75 # include <afsutil.h>
76 #else
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
83 #endif
84 # include "rx.h"
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
92 #endif /* KERNEL */
93
94 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
95 struct rx_tq_debug {
96     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
97     afs_int32 rxi_start_in_error;
98 } rx_tq_debug;
99 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
100
101 /*
102  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
103  * currently allocated within rx.  This number is used to allocate the
104  * memory required to return the statistics when queried.
105  */
106
107 static unsigned int rxi_rpc_peer_stat_cnt;
108
109 /*
110  * rxi_rpc_process_stat_cnt counts the total number of local process stat
111  * structures currently allocated within rx.  The number is used to allocate
112  * the memory required to return the statistics when queried.
113  */
114
115 static unsigned int rxi_rpc_process_stat_cnt;
116
117 #if !defined(offsetof)
118 #include <stddef.h>     /* for definition of offsetof() */
119 #endif
120
121 #ifdef AFS_PTHREAD_ENV
122 #include <assert.h>
123
124 /*
125  * Use procedural initialization of mutexes/condition variables
126  * to ease NT porting
127  */
128
129 extern pthread_mutex_t rxkad_stats_mutex;
130 extern pthread_mutex_t des_init_mutex;
131 extern pthread_mutex_t des_random_mutex;
132 extern pthread_mutex_t rx_clock_mutex;
133 extern pthread_mutex_t rxi_connCacheMutex;
134 extern pthread_mutex_t rx_event_mutex;
135 extern pthread_mutex_t osi_malloc_mutex;
136 extern pthread_mutex_t event_handler_mutex;
137 extern pthread_mutex_t listener_mutex;
138 extern pthread_mutex_t rx_if_init_mutex;
139 extern pthread_mutex_t rx_if_mutex;
140 extern pthread_mutex_t rxkad_client_uid_mutex;
141 extern pthread_mutex_t rxkad_random_mutex;
142
143 extern pthread_cond_t rx_event_handler_cond;
144 extern pthread_cond_t rx_listener_cond;
145
146 static pthread_mutex_t epoch_mutex;
147 static pthread_mutex_t rx_init_mutex;
148 static pthread_mutex_t rx_debug_mutex;
149
150 static void rxi_InitPthread(void) {
151     assert(pthread_mutex_init(&rx_clock_mutex,
152                               (const pthread_mutexattr_t*)0)==0);
153     assert(pthread_mutex_init(&rxi_connCacheMutex,
154                               (const pthread_mutexattr_t*)0)==0);
155     assert(pthread_mutex_init(&rx_init_mutex,
156                               (const pthread_mutexattr_t*)0)==0);
157     assert(pthread_mutex_init(&epoch_mutex,
158                               (const pthread_mutexattr_t*)0)==0);
159     assert(pthread_mutex_init(&rx_event_mutex,
160                               (const pthread_mutexattr_t*)0)==0);
161     assert(pthread_mutex_init(&des_init_mutex,
162                               (const pthread_mutexattr_t*)0)==0);
163     assert(pthread_mutex_init(&des_random_mutex,
164                               (const pthread_mutexattr_t*)0)==0);
165     assert(pthread_mutex_init(&osi_malloc_mutex,
166                               (const pthread_mutexattr_t*)0)==0);
167     assert(pthread_mutex_init(&event_handler_mutex,
168                               (const pthread_mutexattr_t*)0)==0);
169     assert(pthread_mutex_init(&listener_mutex,
170                               (const pthread_mutexattr_t*)0)==0);
171     assert(pthread_mutex_init(&rx_if_init_mutex,
172                               (const pthread_mutexattr_t*)0)==0);
173     assert(pthread_mutex_init(&rx_if_mutex,
174                               (const pthread_mutexattr_t*)0)==0);
175     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
176                               (const pthread_mutexattr_t*)0)==0);
177     assert(pthread_mutex_init(&rxkad_random_mutex,
178                               (const pthread_mutexattr_t*)0)==0);
179     assert(pthread_mutex_init(&rxkad_stats_mutex,
180                               (const pthread_mutexattr_t*)0)==0);
181     assert(pthread_mutex_init(&rx_debug_mutex,
182                               (const pthread_mutexattr_t*)0)==0);
183
184     assert(pthread_cond_init(&rx_event_handler_cond,
185                               (const pthread_condattr_t*)0)==0);
186     assert(pthread_cond_init(&rx_listener_cond,
187                               (const pthread_condattr_t*)0)==0);
188     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
189 }
190
191 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
192 #define INIT_PTHREAD_LOCKS \
193 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
194 /*
195  * The rx_stats_mutex mutex protects the following global variables:
196  * rxi_dataQuota
197  * rxi_minDeficit
198  * rxi_availProcs
199  * rxi_totalMin
200  * rxi_lowConnRefCount
201  * rxi_lowPeerRefCount
202  * rxi_nCalls
203  * rxi_Alloccnt
204  * rxi_Allocsize
205  * rx_nFreePackets
206  * rx_tq_debug
207  * rx_stats
208  */
209 #else
210 #define INIT_PTHREAD_LOCKS
211 #endif
212
213 extern void rxi_DeleteCachedConnections(void);
214
215
216 /* Variables for handling the minProcs implementation.  availProcs gives the
217  * number of threads available in the pool at this moment (not counting dudes
218  * executing right now).  totalMin gives the total number of procs required
219  * for handling all minProcs requests.  minDeficit is a dynamic variable
220  * tracking the # of procs required to satisfy all of the remaining minProcs
221  * demands.
222  * For fine grain locking to work, the quota check and the reservation of
223  * a server thread has to come while rxi_availProcs and rxi_minDeficit
224  * are locked. To this end, the code has been modified under #ifdef
225  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
226  * same time. A new function, ReturnToServerPool() returns the allocation.
227  * 
228  * A call can be on several queue's (but only one at a time). When
229  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
230  * that no one else is touching the queue. To this end, we store the address
231  * of the queue lock in the call structure (under the call lock) when we
232  * put the call on a queue, and we clear the call_queue_lock when the
233  * call is removed from a queue (once the call lock has been obtained).
234  * This allows rxi_ResetCall to safely synchronize with others wishing
235  * to manipulate the queue.
236  */
237
238 extern void rxi_Delay(int);
239
240 static int rxi_ServerThreadSelectingCall;
241
242 #ifdef RX_ENABLE_LOCKS
243 static afs_kmutex_t rx_rpc_stats;
244 void rxi_StartUnlocked();
245 #endif
246
247 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
248 ** pretty good that the next packet coming in is from the same connection 
249 ** as the last packet, since we're send multiple packets in a transmit window.
250 */
251 struct rx_connection *rxLastConn; 
252
253 #ifdef RX_ENABLE_LOCKS
254 /* The locking hierarchy for rx fine grain locking is composed of five
255  * tiers:
256  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
257  * call->lock - locks call data fields.
258  * Most any other lock - these are all independent of each other.....
259  *      rx_freePktQ_lock
260  *      rx_freeCallQueue_lock
261  *      freeSQEList_lock
262  *      rx_connHashTable_lock
263  *      rx_serverPool_lock
264  *      rxi_keyCreate_lock
265  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
266
267  * lowest level:
268  *      peer_lock - locks peer data fields.
269  *      conn_data_lock - that more than one thread is not updating a conn data
270  *              field at the same time.
271  * Do we need a lock to protect the peer field in the conn structure?
272  *      conn->peer was previously a constant for all intents and so has no
273  *      lock protecting this field. The multihomed client delta introduced
274  *      a RX code change : change the peer field in the connection structure
275  *      to that remote inetrface from which the last packet for this
276  *      connection was sent out. This may become an issue if further changes
277  *      are made.
278  */
279 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
280 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
281 #ifdef RX_LOCKS_DB
282 /* rxdb_fileID is used to identify the lock location, along with line#. */
283 static int rxdb_fileID = RXDB_FILE_RX;
284 #endif /* RX_LOCKS_DB */
285 static void rxi_SetAcksInTransmitQueue();
286 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
287 #else /* RX_ENABLE_LOCKS */
288 #define SET_CALL_QUEUE_LOCK(C, L)
289 #define CLEAR_CALL_QUEUE_LOCK(C)
290 #endif /* RX_ENABLE_LOCKS */
291 static void rxi_DestroyConnectionNoLock();
292 void rxi_DestroyConnection();
293 void rxi_CleanupConnection();
294 struct rx_serverQueueEntry *rx_waitForPacket = 0;
295
296 /* ------------Exported Interfaces------------- */
297
298 /* This function allows rxkad to set the epoch to a suitably random number
299  * which rx_NewConnection will use in the future.  The principle purpose is to
300  * get rxnull connections to use the same epoch as the rxkad connections do, at
301  * least once the first rxkad connection is established.  This is important now
302  * that the host/port addresses aren't used in FindConnection: the uniqueness
303  * of epoch/cid matters and the start time won't do. */
304
305 #ifdef AFS_PTHREAD_ENV
306 /*
307  * This mutex protects the following global variables:
308  * rx_epoch
309  */
310
311 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
312 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
313 #else
314 #define LOCK_EPOCH
315 #define UNLOCK_EPOCH
316 #endif /* AFS_PTHREAD_ENV */
317
318 void rx_SetEpoch (epoch)
319   afs_uint32 epoch;
320 {
321     LOCK_EPOCH
322     rx_epoch = epoch;
323     UNLOCK_EPOCH
324 }
325
326 /* Initialize rx.  A port number may be mentioned, in which case this
327  * becomes the default port number for any service installed later.
328  * If 0 is provided for the port number, a random port will be chosen
329  * by the kernel.  Whether this will ever overlap anything in
330  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
331  * error. */
332 static int rxinit_status = 1;
333 #ifdef AFS_PTHREAD_ENV
334 /*
335  * This mutex protects the following global variables:
336  * rxinit_status
337  */
338
339 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
340 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
341 #else
342 #define LOCK_RX_INIT
343 #define UNLOCK_RX_INIT
344 #endif
345
346 int rx_Init(u_int port)
347 {
348 #ifdef KERNEL
349     osi_timeval_t tv;
350 #else /* KERNEL */
351     struct timeval tv;
352 #endif /* KERNEL */
353     char *htable, *ptable;
354     int tmp_status;
355
356     SPLVAR;
357
358     INIT_PTHREAD_LOCKS
359     LOCK_RX_INIT
360     if (rxinit_status == 0) {
361         tmp_status = rxinit_status;
362         UNLOCK_RX_INIT
363         return tmp_status; /* Already started; return previous error code. */
364     }
365
366 #ifdef AFS_NT40_ENV
367     if (afs_winsockInit()<0)
368         return -1;
369 #endif
370
371 #ifndef KERNEL
372     /*
373      * Initialize anything necessary to provide a non-premptive threading
374      * environment.
375      */
376     rxi_InitializeThreadSupport();
377 #endif
378
379     /* Allocate and initialize a socket for client and perhaps server
380      * connections. */
381
382     rx_socket = rxi_GetUDPSocket((u_short)port); 
383     if (rx_socket == OSI_NULLSOCKET) {
384         UNLOCK_RX_INIT
385         return RX_ADDRINUSE;
386     }
387     
388
389 #ifdef  RX_ENABLE_LOCKS
390 #ifdef RX_LOCKS_DB
391     rxdb_init();
392 #endif /* RX_LOCKS_DB */
393     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
394     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
395     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
396     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
397     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
398                MUTEX_DEFAULT,0);
399     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
400     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
401     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
402     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
403 #ifndef KERNEL
404     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
405 #endif /* !KERNEL */
406     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
407 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
408     if ( !uniprocessor )
409       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
410 #endif /* KERNEL && AFS_HPUX110_ENV */
411 #else /* RX_ENABLE_LOCKS */
412 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
413     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
414 #endif /* AFS_GLOBAL_SUNLOCK */
415 #endif /* RX_ENABLE_LOCKS */
416
417     rxi_nCalls = 0;
418     rx_connDeadTime = 12;
419     rx_tranquil     = 0;        /* reset flag */
420     bzero((char *)&rx_stats, sizeof(struct rx_stats));
421     htable = (char *)
422         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
423     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
424     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
425     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
426     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
427     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
428
429     /* Malloc up a bunch of packets & buffers */
430     rx_nFreePackets = 0;
431     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
432     queue_Init(&rx_freePacketQueue);
433     rxi_NeedMorePackets = FALSE;
434     rxi_MorePackets(rx_nPackets);
435     rx_CheckPackets();
436
437     NETPRI;
438     AFS_RXGLOCK();
439
440     clock_Init();
441
442 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
443     tv.tv_sec = clock_now.sec;
444     tv.tv_usec = clock_now.usec;
445     srand((unsigned int) tv.tv_usec);
446 #else
447     osi_GetTime(&tv);
448 #endif
449     /* *Slightly* random start time for the cid.  This is just to help
450      * out with the hashing function at the peer */
451     rx_port = port;
452     rx_stats.minRtt.sec = 9999999;
453 #ifdef  KERNEL
454     rx_SetEpoch (tv.tv_sec | 0x80000000);
455 #else
456     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
457                                          * will provide a randomer value. */
458 #endif
459     MUTEX_ENTER(&rx_stats_mutex);
460     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
461     MUTEX_EXIT(&rx_stats_mutex);
462     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
463     rx_connHashTable = (struct rx_connection **) htable;
464     rx_peerHashTable = (struct rx_peer **) ptable;
465
466     rx_lastAckDelay.sec = 0;
467     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
468     rx_hardAckDelay.sec = 0;
469     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
470     rx_softAckDelay.sec = 0;
471     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
472
473     rxevent_Init(20, rxi_ReScheduleEvents);
474
475     /* Initialize various global queues */
476     queue_Init(&rx_idleServerQueue);
477     queue_Init(&rx_incomingCallQueue);
478     queue_Init(&rx_freeCallQueue);
479
480 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
481     /* Initialize our list of usable IP addresses. */
482     rx_GetIFInfo();
483 #endif
484
485     /* Start listener process (exact function is dependent on the
486      * implementation environment--kernel or user space) */
487     rxi_StartListener();
488
489     AFS_RXGUNLOCK();
490     USERPRI;
491     tmp_status = rxinit_status = 0;
492     UNLOCK_RX_INIT
493     return tmp_status;
494 }
495
496 /* called with unincremented nRequestsRunning to see if it is OK to start
497  * a new thread in this service.  Could be "no" for two reasons: over the
498  * max quota, or would prevent others from reaching their min quota.
499  */
500 #ifdef RX_ENABLE_LOCKS
501 /* This verion of QuotaOK reserves quota if it's ok while the
502  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
503  */
504 static int QuotaOK(aservice)
505 register struct rx_service *aservice;
506 {
507     /* check if over max quota */
508     if (aservice->nRequestsRunning >= aservice->maxProcs) {
509         return 0;
510     }
511
512     /* under min quota, we're OK */
513     /* otherwise, can use only if there are enough to allow everyone
514      * to go to their min quota after this guy starts.
515      */
516     MUTEX_ENTER(&rx_stats_mutex);
517     if ((aservice->nRequestsRunning < aservice->minProcs) ||
518          (rxi_availProcs > rxi_minDeficit)) {
519         aservice->nRequestsRunning++;
520         /* just started call in minProcs pool, need fewer to maintain
521          * guarantee */
522         if (aservice->nRequestsRunning <= aservice->minProcs)
523             rxi_minDeficit--;
524         rxi_availProcs--;
525         MUTEX_EXIT(&rx_stats_mutex);
526         return 1;
527     }
528     MUTEX_EXIT(&rx_stats_mutex);
529
530     return 0;
531 }
532 static void ReturnToServerPool(aservice)
533 register struct rx_service *aservice;
534 {
535     aservice->nRequestsRunning--;
536     MUTEX_ENTER(&rx_stats_mutex);
537     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
538     rxi_availProcs++;
539     MUTEX_EXIT(&rx_stats_mutex);
540 }
541
542 #else /* RX_ENABLE_LOCKS */
543 static QuotaOK(aservice)
544 register struct rx_service *aservice; {
545     int rc=0;
546     /* under min quota, we're OK */
547     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
548
549     /* check if over max quota */
550     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
551
552     /* otherwise, can use only if there are enough to allow everyone
553      * to go to their min quota after this guy starts.
554      */
555     if (rxi_availProcs > rxi_minDeficit) rc = 1;
556     return rc;
557 }
558 #endif /* RX_ENABLE_LOCKS */
559
560 #ifndef KERNEL
561 /* Called by rx_StartServer to start up lwp's to service calls.
562    NExistingProcs gives the number of procs already existing, and which
563    therefore needn't be created. */
564 void rxi_StartServerProcs(nExistingProcs)
565     int nExistingProcs;
566 {
567     register struct rx_service *service;
568     register int i;
569     int maxdiff = 0;
570     int nProcs = 0;
571
572     /* For each service, reserve N processes, where N is the "minimum"
573        number of processes that MUST be able to execute a request in parallel,
574        at any time, for that process.  Also compute the maximum difference
575        between any service's maximum number of processes that can run
576        (i.e. the maximum number that ever will be run, and a guarantee
577        that this number will run if other services aren't running), and its
578        minimum number.  The result is the extra number of processes that
579        we need in order to provide the latter guarantee */
580     for (i=0; i<RX_MAX_SERVICES; i++) {
581         int diff;
582         service = rx_services[i];
583         if (service == (struct rx_service *) 0) break;
584         nProcs += service->minProcs;
585         diff = service->maxProcs - service->minProcs;
586         if (diff > maxdiff) maxdiff = diff;
587     }
588     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
589     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
590     for (i = 0; i<nProcs; i++) {
591         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
592     }
593 }
594 #endif /* KERNEL */
595
596 /* This routine must be called if any services are exported.  If the
597  * donateMe flag is set, the calling process is donated to the server
598  * process pool */
599 void rx_StartServer(donateMe)
600 {
601     register struct rx_service *service;
602     register int i;
603     SPLVAR;
604     clock_NewTime();
605
606     NETPRI;
607     AFS_RXGLOCK();
608     /* Start server processes, if necessary (exact function is dependent
609      * on the implementation environment--kernel or user space).  DonateMe
610      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
611      * case, one less new proc will be created rx_StartServerProcs.
612      */
613     rxi_StartServerProcs(donateMe);
614
615     /* count up the # of threads in minProcs, and add set the min deficit to
616      * be that value, too.
617      */
618     for (i=0; i<RX_MAX_SERVICES; i++) {
619         service = rx_services[i];
620         if (service == (struct rx_service *) 0) break;
621         MUTEX_ENTER(&rx_stats_mutex);
622         rxi_totalMin += service->minProcs;
623         /* below works even if a thread is running, since minDeficit would
624          * still have been decremented and later re-incremented.
625          */
626         rxi_minDeficit += service->minProcs;
627         MUTEX_EXIT(&rx_stats_mutex);
628     }
629
630     /* Turn on reaping of idle server connections */
631     rxi_ReapConnections();
632
633     AFS_RXGUNLOCK();
634     USERPRI;
635
636     if (donateMe) rx_ServerProc(); /* Never returns */
637     return;
638 }
639
640 /* Create a new client connection to the specified service, using the
641  * specified security object to implement the security model for this
642  * connection. */
643 struct rx_connection *
644 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
645     register afs_uint32 shost;      /* Server host */
646     u_short sport;                  /* Server port */
647     u_short sservice;               /* Server service id */
648     register struct rx_securityClass *securityObject;
649     int serviceSecurityIndex;
650 {
651     int hashindex;
652     afs_int32 cid;
653     register struct rx_connection *conn;
654
655     SPLVAR;
656
657     clock_NewTime();
658     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
659           shost, sport, sservice, securityObject, serviceSecurityIndex));
660
661     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
662      * the case of kmem_alloc? */
663     conn = rxi_AllocConnection();
664 #ifdef  RX_ENABLE_LOCKS
665     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
666     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
667     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
668 #endif
669     NETPRI;
670     AFS_RXGLOCK();
671     MUTEX_ENTER(&rx_connHashTable_lock);
672     cid = (rx_nextCid += RX_MAXCALLS);
673     conn->type = RX_CLIENT_CONNECTION;
674     conn->cid = cid;
675     conn->epoch = rx_epoch;
676     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
677     conn->serviceId = sservice;
678     conn->securityObject = securityObject;
679     /* This doesn't work in all compilers with void (they're buggy), so fake it
680      * with VOID */
681     conn->securityData = (VOID *) 0;
682     conn->securityIndex = serviceSecurityIndex;
683     rx_SetConnDeadTime(conn, rx_connDeadTime);
684     conn->ackRate = RX_FAST_ACK_RATE;
685     conn->nSpecific = 0;
686     conn->specific = NULL;
687     conn->challengeEvent = (struct rxevent *)0;
688     conn->delayedAbortEvent = (struct rxevent *)0;
689     conn->abortCount = 0;
690     conn->error = 0;
691
692     RXS_NewConnection(securityObject, conn);
693     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
694     
695     conn->refCount++; /* no lock required since only this thread knows... */
696     conn->next = rx_connHashTable[hashindex];
697     rx_connHashTable[hashindex] = conn;
698     MUTEX_ENTER(&rx_stats_mutex);
699     rx_stats.nClientConns++;
700     MUTEX_EXIT(&rx_stats_mutex);
701
702     MUTEX_EXIT(&rx_connHashTable_lock);
703     AFS_RXGUNLOCK();
704     USERPRI;
705     return conn;
706 }
707
708 void rx_SetConnDeadTime(conn, seconds)
709     register struct rx_connection *conn;
710     register int seconds;
711 {
712     /* The idea is to set the dead time to a value that allows several
713      * keepalives to be dropped without timing out the connection. */
714     conn->secondsUntilDead = MAX(seconds, 6);
715     conn->secondsUntilPing = conn->secondsUntilDead/6;
716 }
717
718 int rxi_lowPeerRefCount = 0;
719 int rxi_lowConnRefCount = 0;
720
721 /*
722  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
723  * NOTE: must not be called with rx_connHashTable_lock held.
724  */
725 void rxi_CleanupConnection(conn)
726     struct rx_connection *conn;
727 {
728     int i;
729
730     /* Notify the service exporter, if requested, that this connection
731      * is being destroyed */
732     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
733       (*conn->service->destroyConnProc)(conn);
734
735     /* Notify the security module that this connection is being destroyed */
736     RXS_DestroyConnection(conn->securityObject, conn);
737
738     /* If this is the last connection using the rx_peer struct, set its
739      * idle time to now. rxi_ReapConnections will reap it if it's still
740      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
741      */
742     MUTEX_ENTER(&rx_peerHashTable_lock);
743     if (--conn->peer->refCount <= 0) {
744         conn->peer->idleWhen = clock_Sec();
745         if (conn->peer->refCount < 0) {
746             conn->peer->refCount = 0; 
747             MUTEX_ENTER(&rx_stats_mutex);
748             rxi_lowPeerRefCount ++;
749             MUTEX_EXIT(&rx_stats_mutex);
750         }
751     }
752     MUTEX_EXIT(&rx_peerHashTable_lock);
753
754     MUTEX_ENTER(&rx_stats_mutex);
755     if (conn->type == RX_SERVER_CONNECTION)
756       rx_stats.nServerConns--;
757     else
758       rx_stats.nClientConns--;
759     MUTEX_EXIT(&rx_stats_mutex);
760
761 #ifndef KERNEL
762     if (conn->specific) {
763         for (i = 0 ; i < conn->nSpecific ; i++) {
764             if (conn->specific[i] && rxi_keyCreate_destructor[i])
765                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
766             conn->specific[i] = NULL;
767         }
768         free(conn->specific);
769     }
770     conn->specific = NULL;
771     conn->nSpecific = 0;
772 #endif /* !KERNEL */
773
774     MUTEX_DESTROY(&conn->conn_call_lock);
775     MUTEX_DESTROY(&conn->conn_data_lock);
776     CV_DESTROY(&conn->conn_call_cv);
777         
778     rxi_FreeConnection(conn);
779 }
780
781 /* Destroy the specified connection */
782 void rxi_DestroyConnection(conn)
783     register struct rx_connection *conn;
784 {
785     MUTEX_ENTER(&rx_connHashTable_lock);
786     rxi_DestroyConnectionNoLock(conn);
787     /* conn should be at the head of the cleanup list */
788     if (conn == rx_connCleanup_list) {
789         rx_connCleanup_list = rx_connCleanup_list->next;
790         MUTEX_EXIT(&rx_connHashTable_lock);
791         rxi_CleanupConnection(conn);
792     }
793 #ifdef RX_ENABLE_LOCKS
794     else {
795         MUTEX_EXIT(&rx_connHashTable_lock);
796     }
797 #endif /* RX_ENABLE_LOCKS */
798 }
799     
800 static void rxi_DestroyConnectionNoLock(conn)
801     register struct rx_connection *conn;
802 {
803     register struct rx_connection **conn_ptr;
804     register int havecalls = 0;
805     struct rx_packet *packet;
806     int i;
807     SPLVAR;
808
809     clock_NewTime();
810
811     NETPRI;
812     MUTEX_ENTER(&conn->conn_data_lock);
813     if (conn->refCount > 0)
814         conn->refCount--;
815     else {
816         MUTEX_ENTER(&rx_stats_mutex);
817         rxi_lowConnRefCount++;
818         MUTEX_EXIT(&rx_stats_mutex);
819     }
820
821     if (conn->refCount > 0) {
822         /* Busy; wait till the last guy before proceeding */
823         MUTEX_EXIT(&conn->conn_data_lock);
824         USERPRI;
825         return;
826     }
827
828     /* If the client previously called rx_NewCall, but it is still
829      * waiting, treat this as a running call, and wait to destroy the
830      * connection later when the call completes. */
831     if ((conn->type == RX_CLIENT_CONNECTION) &&
832         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
833         conn->flags |= RX_CONN_DESTROY_ME;
834         MUTEX_EXIT(&conn->conn_data_lock);
835         USERPRI;
836         return;
837     }
838     MUTEX_EXIT(&conn->conn_data_lock);
839
840     /* Check for extant references to this connection */
841     for (i = 0; i<RX_MAXCALLS; i++) {
842         register struct rx_call *call = conn->call[i];
843         if (call) {
844             havecalls = 1;
845             if (conn->type == RX_CLIENT_CONNECTION) {
846                 MUTEX_ENTER(&call->lock);
847                 if (call->delayedAckEvent) {
848                     /* Push the final acknowledgment out now--there
849                      * won't be a subsequent call to acknowledge the
850                      * last reply packets */
851                     rxevent_Cancel(call->delayedAckEvent, call,
852                                    RX_CALL_REFCOUNT_DELAY);
853                     rxi_AckAll((struct rxevent *)0, call, 0);
854                 }
855                 MUTEX_EXIT(&call->lock);
856             }
857         }
858     }
859 #ifdef RX_ENABLE_LOCKS
860     if (!havecalls) {
861         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
862             MUTEX_EXIT(&conn->conn_data_lock);
863         }
864         else {
865             /* Someone is accessing a packet right now. */
866             havecalls = 1;
867         }
868     }
869 #endif /* RX_ENABLE_LOCKS */
870
871     if (havecalls) {
872         /* Don't destroy the connection if there are any call
873          * structures still in use */
874         MUTEX_ENTER(&conn->conn_data_lock);
875         conn->flags |= RX_CONN_DESTROY_ME;
876         MUTEX_EXIT(&conn->conn_data_lock);
877         USERPRI;
878         return;
879     }
880
881     if (conn->delayedAbortEvent) {
882         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
883         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
884         if (packet) {
885             MUTEX_ENTER(&conn->conn_data_lock);
886             rxi_SendConnectionAbort(conn, packet, 0, 1);
887             MUTEX_EXIT(&conn->conn_data_lock);
888             rxi_FreePacket(packet);
889         }
890     }
891
892     /* Remove from connection hash table before proceeding */
893     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
894                                              conn->epoch, conn->type) ];
895     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
896         if (*conn_ptr == conn) {
897             *conn_ptr = conn->next;
898             break;
899         }
900     }
901     /* if the conn that we are destroying was the last connection, then we
902     * clear rxLastConn as well */
903     if ( rxLastConn == conn )
904         rxLastConn = 0;
905
906     /* Make sure the connection is completely reset before deleting it. */
907     /* get rid of pending events that could zap us later */
908     if (conn->challengeEvent) {
909         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
910     }
911  
912     /* Add the connection to the list of destroyed connections that
913      * need to be cleaned up. This is necessary to avoid deadlocks
914      * in the routines we call to inform others that this connection is
915      * being destroyed. */
916     conn->next = rx_connCleanup_list;
917     rx_connCleanup_list = conn;
918 }
919
920 /* Externally available version */
921 void rx_DestroyConnection(conn) 
922     register struct rx_connection *conn;
923 {
924     SPLVAR;
925
926     NETPRI;
927     AFS_RXGLOCK();
928     rxi_DestroyConnection (conn);
929     AFS_RXGUNLOCK();
930     USERPRI;
931 }
932
933 /* Start a new rx remote procedure call, on the specified connection.
934  * If wait is set to 1, wait for a free call channel; otherwise return
935  * 0.  Maxtime gives the maximum number of seconds this call may take,
936  * after rx_MakeCall returns.  After this time interval, a call to any
937  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
938  * For fine grain locking, we hold the conn_call_lock in order to 
939  * to ensure that we don't get signalle after we found a call in an active
940  * state and before we go to sleep.
941  */
942 struct rx_call *rx_NewCall(conn)
943     register struct rx_connection *conn;
944 {
945     register int i;
946     register struct rx_call *call;
947     struct clock queueTime;
948     SPLVAR;
949
950     clock_NewTime();
951     dpf (("rx_MakeCall(conn %x)\n", conn));
952
953     NETPRI;
954     clock_GetTime(&queueTime);
955     AFS_RXGLOCK();
956     MUTEX_ENTER(&conn->conn_call_lock);
957     for (;;) {
958         for (i=0; i<RX_MAXCALLS; i++) {
959             call = conn->call[i];
960             if (call) {
961                 MUTEX_ENTER(&call->lock);
962                 if (call->state == RX_STATE_DALLY) {
963                     rxi_ResetCall(call, 0);
964                     (*call->callNumber)++;
965                     break;
966                 }
967                 MUTEX_EXIT(&call->lock);
968             }
969             else {
970                 call = rxi_NewCall(conn, i);
971                 MUTEX_ENTER(&call->lock);
972                 break;
973             }
974         }
975         if (i < RX_MAXCALLS) {
976             break;
977         }
978         MUTEX_ENTER(&conn->conn_data_lock);
979         conn->flags |= RX_CONN_MAKECALL_WAITING;
980         MUTEX_EXIT(&conn->conn_data_lock);
981 #ifdef  RX_ENABLE_LOCKS
982         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
983 #else
984         osi_rxSleep(conn);
985 #endif
986     }
987
988     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
989
990     /* Client is initially in send mode */
991     call->state = RX_STATE_ACTIVE;
992     call->mode = RX_MODE_SENDING;
993
994     /* remember start time for call in case we have hard dead time limit */
995     call->queueTime = queueTime;
996     clock_GetTime(&call->startTime);
997     hzero(call->bytesSent);
998     hzero(call->bytesRcvd);
999
1000     /* Turn on busy protocol. */
1001     rxi_KeepAliveOn(call);
1002
1003     MUTEX_EXIT(&call->lock);
1004     MUTEX_EXIT(&conn->conn_call_lock);
1005     AFS_RXGUNLOCK();
1006     USERPRI;
1007
1008 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1009     /* Now, if TQ wasn't cleared earlier, do it now. */
1010     AFS_RXGLOCK();
1011     MUTEX_ENTER(&call->lock);
1012     while (call->flags & RX_CALL_TQ_BUSY) {
1013         call->flags |= RX_CALL_TQ_WAIT;
1014 #ifdef RX_ENABLE_LOCKS
1015         CV_WAIT(&call->cv_tq, &call->lock);
1016 #else /* RX_ENABLE_LOCKS */
1017         osi_rxSleep(&call->tq);
1018 #endif /* RX_ENABLE_LOCKS */
1019     }
1020     if (call->flags & RX_CALL_TQ_CLEARME) {
1021         rxi_ClearTransmitQueue(call, 0);
1022         queue_Init(&call->tq);
1023     }
1024     MUTEX_EXIT(&call->lock);
1025     AFS_RXGUNLOCK();
1026 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1027
1028     return call;
1029 }
1030
1031 rxi_HasActiveCalls(aconn)
1032 register struct rx_connection *aconn; {
1033     register int i;
1034     register struct rx_call *tcall;
1035     SPLVAR;
1036
1037     NETPRI;
1038     for(i=0; i<RX_MAXCALLS; i++) {
1039       if (tcall = aconn->call[i]) {
1040         if ((tcall->state == RX_STATE_ACTIVE) 
1041             || (tcall->state == RX_STATE_PRECALL)) {
1042           USERPRI;
1043           return 1;
1044         }
1045       }
1046     }
1047     USERPRI;
1048     return 0;
1049 }
1050
1051 rxi_GetCallNumberVector(aconn, aint32s)
1052 register struct rx_connection *aconn;
1053 register afs_int32 *aint32s; {
1054     register int i;
1055     register struct rx_call *tcall;
1056     SPLVAR;
1057
1058     NETPRI;
1059     for(i=0; i<RX_MAXCALLS; i++) {
1060         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1061             aint32s[i] = aconn->callNumber[i]+1;
1062         else
1063             aint32s[i] = aconn->callNumber[i];
1064     }
1065     USERPRI;
1066     return 0;
1067 }
1068
1069 rxi_SetCallNumberVector(aconn, aint32s)
1070 register struct rx_connection *aconn;
1071 register afs_int32 *aint32s; {
1072     register int i;
1073     register struct rx_call *tcall;
1074     SPLVAR;
1075
1076     NETPRI;
1077     for(i=0; i<RX_MAXCALLS; i++) {
1078         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1079             aconn->callNumber[i] = aint32s[i] - 1;
1080         else
1081             aconn->callNumber[i] = aint32s[i];
1082     }
1083     USERPRI;
1084     return 0;
1085 }
1086
1087 /* Advertise a new service.  A service is named locally by a UDP port
1088  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1089  * on a failure. */
1090 struct rx_service *
1091 rx_NewService(port, serviceId, serviceName, securityObjects,
1092               nSecurityObjects, serviceProc)
1093     u_short port;
1094     u_short serviceId;
1095     char *serviceName;  /* Name for identification purposes (e.g. the
1096                          * service name might be used for probing for
1097                          * statistics) */
1098     struct rx_securityClass **securityObjects;
1099     int nSecurityObjects;
1100     afs_int32 (*serviceProc)();
1101 {    
1102     osi_socket socket = OSI_NULLSOCKET;
1103     register struct rx_service *tservice;    
1104     register int i;
1105     SPLVAR;
1106
1107     clock_NewTime();
1108
1109     if (serviceId == 0) {
1110         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1111          serviceName);
1112         return 0;
1113     }
1114     if (port == 0) {
1115         if (rx_port == 0) {
1116             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1117             return 0;
1118         }
1119         port = rx_port;
1120         socket = rx_socket;
1121     }
1122
1123     tservice = rxi_AllocService();
1124     NETPRI;
1125     AFS_RXGLOCK();
1126     for (i = 0; i<RX_MAX_SERVICES; i++) {
1127         register struct rx_service *service = rx_services[i];
1128         if (service) {
1129             if (port == service->servicePort) {
1130                 if (service->serviceId == serviceId) {
1131                     /* The identical service has already been
1132                      * installed; if the caller was intending to
1133                      * change the security classes used by this
1134                      * service, he/she loses. */
1135                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1136                     AFS_RXGUNLOCK();
1137                     USERPRI;
1138                     rxi_FreeService(tservice);
1139                     return service;
1140                 }
1141                 /* Different service, same port: re-use the socket
1142                  * which is bound to the same port */
1143                 socket = service->socket;
1144             }
1145         } else {
1146             if (socket == OSI_NULLSOCKET) {
1147                 /* If we don't already have a socket (from another
1148                  * service on same port) get a new one */
1149                 socket = rxi_GetUDPSocket(port);
1150                 if (socket == OSI_NULLSOCKET) {
1151                     AFS_RXGUNLOCK();
1152                     USERPRI;
1153                     rxi_FreeService(tservice);
1154                     return 0;
1155                 }
1156             }
1157             service = tservice;
1158             service->socket = socket;
1159             service->servicePort = port;
1160             service->serviceId = serviceId;
1161             service->serviceName = serviceName;
1162             service->nSecurityObjects = nSecurityObjects;
1163             service->securityObjects = securityObjects;
1164             service->minProcs = 0;
1165             service->maxProcs = 1;
1166             service->idleDeadTime = 60;
1167             service->connDeadTime = rx_connDeadTime;
1168             service->executeRequestProc = serviceProc;
1169             rx_services[i] = service;   /* not visible until now */
1170             AFS_RXGUNLOCK();
1171             USERPRI;
1172             return service;
1173         }
1174     }
1175     AFS_RXGUNLOCK();
1176     USERPRI;
1177     rxi_FreeService(tservice);
1178     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1179     return 0;
1180 }
1181
1182 /* Generic request processing loop. This routine should be called
1183  * by the implementation dependent rx_ServerProc. If socketp is
1184  * non-null, it will be set to the file descriptor that this thread
1185  * is now listening on. If socketp is null, this routine will never
1186  * returns. */
1187 void rxi_ServerProc(threadID, newcall, socketp)
1188 int threadID;
1189 struct rx_call *newcall;
1190 osi_socket *socketp;
1191 {
1192     register struct rx_call *call;
1193     register afs_int32 code;
1194     register struct rx_service *tservice = NULL;
1195
1196     for (;;) {
1197         if (newcall) {
1198             call = newcall;
1199             newcall = NULL;
1200         } else {
1201             call = rx_GetCall(threadID, tservice, socketp);
1202             if (socketp && *socketp != OSI_NULLSOCKET) {
1203                 /* We are now a listener thread */
1204                 return;
1205             }
1206         }
1207
1208         /* if server is restarting( typically smooth shutdown) then do not
1209          * allow any new calls.
1210          */
1211
1212         if ( rx_tranquil && (call != NULL) ) {
1213             SPLVAR;
1214
1215             NETPRI;
1216             AFS_RXGLOCK();
1217             MUTEX_ENTER(&call->lock);
1218
1219             rxi_CallError(call, RX_RESTARTING);
1220             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1221
1222             MUTEX_EXIT(&call->lock);
1223             AFS_RXGUNLOCK();
1224             USERPRI;
1225         }
1226
1227 #ifdef  KERNEL
1228         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1229 #ifdef RX_ENABLE_LOCKS
1230             AFS_GLOCK();
1231 #endif /* RX_ENABLE_LOCKS */
1232             afs_termState = AFSOP_STOP_AFS;
1233             afs_osi_Wakeup(&afs_termState);
1234 #ifdef RX_ENABLE_LOCKS
1235             AFS_GUNLOCK();
1236 #endif /* RX_ENABLE_LOCKS */
1237             return;
1238         }
1239 #endif
1240
1241         tservice = call->conn->service;
1242
1243         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1244
1245         code = call->conn->service->executeRequestProc(call);
1246
1247         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1248
1249         rx_EndCall(call, code);
1250         MUTEX_ENTER(&rx_stats_mutex);
1251         rxi_nCalls++;
1252         MUTEX_EXIT(&rx_stats_mutex);
1253     }
1254 }
1255
1256
1257 void rx_WakeupServerProcs()
1258 {
1259     struct rx_serverQueueEntry *np, *tqp;
1260     SPLVAR;
1261
1262     NETPRI;
1263     AFS_RXGLOCK();
1264     MUTEX_ENTER(&rx_serverPool_lock);
1265
1266 #ifdef RX_ENABLE_LOCKS
1267     if (rx_waitForPacket)
1268         CV_BROADCAST(&rx_waitForPacket->cv);
1269 #else /* RX_ENABLE_LOCKS */
1270     if (rx_waitForPacket)
1271         osi_rxWakeup(rx_waitForPacket);
1272 #endif /* RX_ENABLE_LOCKS */
1273     MUTEX_ENTER(&freeSQEList_lock);
1274     for (np = rx_FreeSQEList; np; np = tqp) {
1275       tqp = *(struct rx_serverQueueEntry **)np;
1276 #ifdef RX_ENABLE_LOCKS
1277       CV_BROADCAST(&np->cv);
1278 #else /* RX_ENABLE_LOCKS */
1279       osi_rxWakeup(np);
1280 #endif /* RX_ENABLE_LOCKS */
1281     }
1282     MUTEX_EXIT(&freeSQEList_lock);
1283     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1284 #ifdef RX_ENABLE_LOCKS
1285       CV_BROADCAST(&np->cv);
1286 #else /* RX_ENABLE_LOCKS */
1287       osi_rxWakeup(np);
1288 #endif /* RX_ENABLE_LOCKS */
1289     }
1290     MUTEX_EXIT(&rx_serverPool_lock);
1291     AFS_RXGUNLOCK();
1292     USERPRI;
1293 }
1294
1295 /* meltdown:
1296  * One thing that seems to happen is that all the server threads get
1297  * tied up on some empty or slow call, and then a whole bunch of calls
1298  * arrive at once, using up the packet pool, so now there are more 
1299  * empty calls.  The most critical resources here are server threads
1300  * and the free packet pool.  The "doreclaim" code seems to help in
1301  * general.  I think that eventually we arrive in this state: there
1302  * are lots of pending calls which do have all their packets present,
1303  * so they won't be reclaimed, are multi-packet calls, so they won't
1304  * be scheduled until later, and thus are tying up most of the free 
1305  * packet pool for a very long time.
1306  * future options:
1307  * 1.  schedule multi-packet calls if all the packets are present.  
1308  * Probably CPU-bound operation, useful to return packets to pool. 
1309  * Do what if there is a full window, but the last packet isn't here?
1310  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1311  * it sleeps and waits for that type of call.
1312  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1313  * the current dataquota business is badly broken.  The quota isn't adjusted
1314  * to reflect how many packets are presently queued for a running call.
1315  * So, when we schedule a queued call with a full window of packets queued
1316  * up for it, that *should* free up a window full of packets for other 2d-class
1317  * calls to be able to use from the packet pool.  But it doesn't.
1318  *
1319  * NB.  Most of the time, this code doesn't run -- since idle server threads
1320  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1321  * as a new call arrives.
1322  */
1323 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1324  * for an rx_Read. */
1325 #ifdef RX_ENABLE_LOCKS
1326 struct rx_call *
1327 rx_GetCall(tno, cur_service, socketp)
1328 int tno;
1329 struct rx_service *cur_service;
1330 osi_socket *socketp;
1331 {
1332     struct rx_serverQueueEntry *sq;
1333     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1334     struct rx_service *service;
1335     SPLVAR;
1336
1337     MUTEX_ENTER(&freeSQEList_lock);
1338
1339     if (sq = rx_FreeSQEList) {
1340         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1341         MUTEX_EXIT(&freeSQEList_lock);
1342     } else {    /* otherwise allocate a new one and return that */
1343         MUTEX_EXIT(&freeSQEList_lock);
1344         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1345         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1346         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1347     }
1348
1349     MUTEX_ENTER(&rx_serverPool_lock);
1350     if (cur_service != NULL) {
1351         ReturnToServerPool(cur_service);
1352     }
1353     while (1) {
1354         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1355             register struct rx_call *tcall, *ncall;
1356             choice2 = (struct rx_call *) 0;
1357             /* Scan for eligible incoming calls.  A call is not eligible
1358              * if the maximum number of calls for its service type are
1359              * already executing */
1360             /* One thread will process calls FCFS (to prevent starvation),
1361              * while the other threads may run ahead looking for calls which
1362              * have all their input data available immediately.  This helps 
1363              * keep threads from blocking, waiting for data from the client. */
1364             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1365               service = tcall->conn->service;
1366               if (!QuotaOK(service)) {
1367                 continue;
1368               }
1369               if (!tno || !tcall->queue_item_header.next  ) {
1370                 /* If we're thread 0, then  we'll just use 
1371                  * this call. If we haven't been able to find an optimal 
1372                  * choice, and we're at the end of the list, then use a 
1373                  * 2d choice if one has been identified.  Otherwise... */
1374                 call = (choice2 ? choice2 : tcall);
1375                 service = call->conn->service;
1376               } else if (!queue_IsEmpty(&tcall->rq)) {
1377                 struct rx_packet *rp;
1378                 rp = queue_First(&tcall->rq, rx_packet);
1379                 if (rp->header.seq == 1) {
1380                   if (!meltdown_1pkt ||             
1381                       (rp->header.flags & RX_LAST_PACKET)) {
1382                     call = tcall;
1383                   } else if (rxi_2dchoice && !choice2 &&
1384                              !(tcall->flags & RX_CALL_CLEARED) &&
1385                              (tcall->rprev > rxi_HardAckRate)) {
1386                     choice2 = tcall;
1387                   } else rxi_md2cnt++;
1388                 }
1389               }
1390               if (call)  {
1391                 break;
1392               } else {
1393                   ReturnToServerPool(service);
1394               }
1395             }
1396           }
1397
1398         if (call) {
1399             queue_Remove(call);
1400             rxi_ServerThreadSelectingCall = 1;
1401             MUTEX_EXIT(&rx_serverPool_lock);
1402             MUTEX_ENTER(&call->lock);
1403             MUTEX_ENTER(&rx_serverPool_lock);
1404
1405             if (queue_IsEmpty(&call->rq) ||
1406                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1407               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1408
1409             CLEAR_CALL_QUEUE_LOCK(call);
1410             if (call->error) {
1411                 MUTEX_EXIT(&call->lock);
1412                 ReturnToServerPool(service);
1413                 rxi_ServerThreadSelectingCall = 0;
1414                 CV_SIGNAL(&rx_serverPool_cv);
1415                 call = (struct rx_call*)0;
1416                 continue;
1417             }
1418             call->flags &= (~RX_CALL_WAIT_PROC);
1419             MUTEX_ENTER(&rx_stats_mutex);
1420             rx_nWaiting--;
1421             MUTEX_EXIT(&rx_stats_mutex);
1422             rxi_ServerThreadSelectingCall = 0;
1423             CV_SIGNAL(&rx_serverPool_cv);
1424             MUTEX_EXIT(&rx_serverPool_lock);
1425             break;
1426         }
1427         else {
1428             /* If there are no eligible incoming calls, add this process
1429              * to the idle server queue, to wait for one */
1430             sq->newcall = 0;
1431             sq->tno = tno;
1432             if (socketp) {
1433                 *socketp = OSI_NULLSOCKET;
1434             }
1435             sq->socketp = socketp;
1436             queue_Append(&rx_idleServerQueue, sq);
1437 #ifndef AFS_AIX41_ENV
1438             rx_waitForPacket = sq;
1439 #endif /* AFS_AIX41_ENV */
1440             do {
1441                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1442 #ifdef  KERNEL
1443                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1444                     MUTEX_EXIT(&rx_serverPool_lock);
1445                     return (struct rx_call *)0;
1446                 }
1447 #endif
1448             } while (!(call = sq->newcall) &&
1449                      !(socketp && *socketp != OSI_NULLSOCKET));
1450             MUTEX_EXIT(&rx_serverPool_lock);
1451             if (call) {
1452                 MUTEX_ENTER(&call->lock);
1453             }
1454             break;
1455         }
1456     }
1457
1458     MUTEX_ENTER(&freeSQEList_lock);
1459     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1460     rx_FreeSQEList = sq;
1461     MUTEX_EXIT(&freeSQEList_lock);
1462
1463     if (call) {
1464         clock_GetTime(&call->startTime);
1465         call->state = RX_STATE_ACTIVE;
1466         call->mode = RX_MODE_RECEIVING;
1467
1468         rxi_calltrace(RX_CALL_START, call);
1469         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1470              call->conn->service->servicePort, 
1471              call->conn->service->serviceId, call));
1472
1473         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1474         MUTEX_EXIT(&call->lock);
1475     } else {
1476         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1477     }
1478
1479     return call;
1480 }
1481 #else /* RX_ENABLE_LOCKS */
1482 struct rx_call *
1483 rx_GetCall(tno, cur_service, socketp)
1484   int tno;
1485   struct rx_service *cur_service;
1486   osi_socket *socketp;
1487 {
1488     struct rx_serverQueueEntry *sq;
1489     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1490     struct rx_service *service;
1491     SPLVAR;
1492
1493     NETPRI;
1494     AFS_RXGLOCK();
1495     MUTEX_ENTER(&freeSQEList_lock);
1496
1497     if (sq = rx_FreeSQEList) {
1498         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1499         MUTEX_EXIT(&freeSQEList_lock);
1500     } else {    /* otherwise allocate a new one and return that */
1501         MUTEX_EXIT(&freeSQEList_lock);
1502         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1503         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1504         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1505     }
1506     MUTEX_ENTER(&sq->lock);
1507
1508     if (cur_service != NULL) {
1509         cur_service->nRequestsRunning--;
1510         if (cur_service->nRequestsRunning < cur_service->minProcs)
1511             rxi_minDeficit++;
1512         rxi_availProcs++;
1513     }
1514     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1515         register struct rx_call *tcall, *ncall;
1516         /* Scan for eligible incoming calls.  A call is not eligible
1517          * if the maximum number of calls for its service type are
1518          * already executing */
1519         /* One thread will process calls FCFS (to prevent starvation),
1520          * while the other threads may run ahead looking for calls which
1521          * have all their input data available immediately.  This helps 
1522          * keep threads from blocking, waiting for data from the client. */
1523         choice2 = (struct rx_call *) 0;
1524         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1525           service = tcall->conn->service;
1526           if (QuotaOK(service)) {
1527              if (!tno || !tcall->queue_item_header.next  ) {
1528                  /* If we're thread 0, then  we'll just use 
1529                   * this call. If we haven't been able to find an optimal 
1530                   * choice, and we're at the end of the list, then use a 
1531                   * 2d choice if one has been identified.  Otherwise... */
1532                  call = (choice2 ? choice2 : tcall);
1533                  service = call->conn->service;
1534              } else if (!queue_IsEmpty(&tcall->rq)) {
1535                  struct rx_packet *rp;
1536                  rp = queue_First(&tcall->rq, rx_packet);
1537                  if (rp->header.seq == 1
1538                      && (!meltdown_1pkt ||
1539                          (rp->header.flags & RX_LAST_PACKET))) {
1540                      call = tcall;
1541                  } else if (rxi_2dchoice && !choice2 &&
1542                             !(tcall->flags & RX_CALL_CLEARED) &&
1543                             (tcall->rprev > rxi_HardAckRate)) {
1544                      choice2 = tcall;
1545                  } else rxi_md2cnt++;
1546              }
1547           }
1548           if (call) 
1549              break;
1550         }
1551       }
1552
1553     if (call) {
1554         queue_Remove(call);
1555         /* we can't schedule a call if there's no data!!! */
1556         /* send an ack if there's no data, if we're missing the
1557          * first packet, or we're missing something between first 
1558          * and last -- there's a "hole" in the incoming data. */
1559         if (queue_IsEmpty(&call->rq) ||
1560             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1561             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1562           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1563
1564         call->flags &= (~RX_CALL_WAIT_PROC);
1565         service->nRequestsRunning++;
1566         /* just started call in minProcs pool, need fewer to maintain
1567          * guarantee */
1568         if (service->nRequestsRunning <= service->minProcs)
1569             rxi_minDeficit--;
1570         rxi_availProcs--;
1571         rx_nWaiting--;
1572         /* MUTEX_EXIT(&call->lock); */
1573     }
1574     else {
1575         /* If there are no eligible incoming calls, add this process
1576          * to the idle server queue, to wait for one */
1577         sq->newcall = 0;
1578         if (socketp) {
1579             *socketp = OSI_NULLSOCKET;
1580         }
1581         sq->socketp = socketp;
1582         queue_Append(&rx_idleServerQueue, sq);
1583         do {
1584             osi_rxSleep(sq);
1585 #ifdef  KERNEL
1586                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1587                     AFS_RXGUNLOCK();
1588                     USERPRI;
1589                     return (struct rx_call *)0;
1590                 }
1591 #endif
1592         } while (!(call = sq->newcall) &&
1593                  !(socketp && *socketp != OSI_NULLSOCKET));
1594     }
1595     MUTEX_EXIT(&sq->lock);
1596
1597     MUTEX_ENTER(&freeSQEList_lock);
1598     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1599     rx_FreeSQEList = sq;
1600     MUTEX_EXIT(&freeSQEList_lock);
1601
1602     if (call) {
1603         clock_GetTime(&call->startTime);
1604         call->state = RX_STATE_ACTIVE;
1605         call->mode = RX_MODE_RECEIVING;
1606
1607         rxi_calltrace(RX_CALL_START, call);
1608         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1609          call->conn->service->servicePort, 
1610          call->conn->service->serviceId, call));
1611     } else {
1612         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1613     }
1614
1615     AFS_RXGUNLOCK();
1616     USERPRI;
1617
1618     return call;
1619 }
1620 #endif /* RX_ENABLE_LOCKS */
1621
1622
1623
1624 /* Establish a procedure to be called when a packet arrives for a
1625  * call.  This routine will be called at most once after each call,
1626  * and will also be called if there is an error condition on the or
1627  * the call is complete.  Used by multi rx to build a selection
1628  * function which determines which of several calls is likely to be a
1629  * good one to read from.  
1630  * NOTE: the way this is currently implemented it is probably only a
1631  * good idea to (1) use it immediately after a newcall (clients only)
1632  * and (2) only use it once.  Other uses currently void your warranty
1633  */
1634 void rx_SetArrivalProc(call, proc, handle, arg)
1635     register struct rx_call *call;
1636     register VOID (*proc)();
1637     register VOID *handle;
1638     register VOID *arg;
1639 {
1640     call->arrivalProc = proc;
1641     call->arrivalProcHandle = handle;
1642     call->arrivalProcArg = arg;
1643 }
1644
1645 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1646  * appropriate, and return the final error code from the conversation
1647  * to the caller */
1648
1649 afs_int32 rx_EndCall(call, rc)
1650     register struct rx_call *call;
1651     afs_int32 rc;
1652 {
1653     register struct rx_connection *conn = call->conn;
1654     register struct rx_service *service;
1655     register struct rx_packet *tp; /* Temporary packet pointer */
1656     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1657     afs_int32 error;
1658     SPLVAR;
1659
1660     dpf(("rx_EndCall(call %x)\n", call));
1661
1662     NETPRI;
1663     AFS_RXGLOCK();
1664     MUTEX_ENTER(&call->lock);
1665
1666     if (rc == 0 && call->error == 0) {
1667         call->abortCode = 0;
1668         call->abortCount = 0;
1669     }
1670
1671     call->arrivalProc = (VOID (*)()) 0;
1672     if (rc && call->error == 0) {
1673         rxi_CallError(call, rc);
1674         /* Send an abort message to the peer if this error code has
1675          * only just been set.  If it was set previously, assume the
1676          * peer has already been sent the error code or will request it 
1677          */
1678         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1679     }
1680     if (conn->type == RX_SERVER_CONNECTION) {
1681         /* Make sure reply or at least dummy reply is sent */
1682         if (call->mode == RX_MODE_RECEIVING) {
1683             rxi_WriteProc(call, 0, 0);
1684         }
1685         if (call->mode == RX_MODE_SENDING) {
1686             rxi_FlushWrite(call);
1687         }
1688         service = conn->service;
1689         rxi_calltrace(RX_CALL_END, call);
1690         /* Call goes to hold state until reply packets are acknowledged */
1691         if (call->tfirst + call->nSoftAcked < call->tnext) {
1692             call->state = RX_STATE_HOLD;
1693         } else {
1694             call->state = RX_STATE_DALLY;
1695             rxi_ClearTransmitQueue(call, 0);
1696             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1697             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1698         }
1699     }
1700     else { /* Client connection */
1701         char dummy;
1702         /* Make sure server receives input packets, in the case where
1703          * no reply arguments are expected */
1704         if ((call->mode == RX_MODE_SENDING)
1705          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1706             (void) rxi_ReadProc(call, &dummy, 1);
1707         }
1708         /* We need to release the call lock since it's lower than the
1709          * conn_call_lock and we don't want to hold the conn_call_lock
1710          * over the rx_ReadProc call. The conn_call_lock needs to be held
1711          * here for the case where rx_NewCall is perusing the calls on
1712          * the connection structure. We don't want to signal until
1713          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1714          * have checked this call, found it active and by the time it
1715          * goes to sleep, will have missed the signal.
1716          */
1717         MUTEX_EXIT(&call->lock);
1718         MUTEX_ENTER(&conn->conn_call_lock);
1719         MUTEX_ENTER(&call->lock);
1720         MUTEX_ENTER(&conn->conn_data_lock);
1721         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1722             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1723             MUTEX_EXIT(&conn->conn_data_lock);
1724 #ifdef  RX_ENABLE_LOCKS
1725             CV_BROADCAST(&conn->conn_call_cv);
1726 #else
1727             osi_rxWakeup(conn);
1728 #endif
1729         }
1730 #ifdef RX_ENABLE_LOCKS
1731         else {
1732             MUTEX_EXIT(&conn->conn_data_lock);
1733         }
1734 #endif /* RX_ENABLE_LOCKS */
1735         call->state = RX_STATE_DALLY;
1736     }
1737     error = call->error;
1738
1739     /* currentPacket, nLeft, and NFree must be zeroed here, because
1740      * ResetCall cannot: ResetCall may be called at splnet(), in the
1741      * kernel version, and may interrupt the macros rx_Read or
1742      * rx_Write, which run at normal priority for efficiency. */
1743     if (call->currentPacket) {
1744         rxi_FreePacket(call->currentPacket);
1745         call->currentPacket = (struct rx_packet *) 0;
1746         call->nLeft = call->nFree = call->curlen = 0;
1747     }
1748     else
1749         call->nLeft = call->nFree = call->curlen = 0;
1750
1751     /* Free any packets from the last call to ReadvProc/WritevProc */
1752     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1753         queue_Remove(tp);
1754         rxi_FreePacket(tp);
1755     }
1756
1757     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1758     MUTEX_EXIT(&call->lock);
1759     if (conn->type == RX_CLIENT_CONNECTION)
1760         MUTEX_EXIT(&conn->conn_call_lock);
1761     AFS_RXGUNLOCK();
1762     USERPRI;
1763     /*
1764      * Map errors to the local host's errno.h format.
1765      */
1766     error = ntoh_syserr_conv(error);
1767     return error;
1768 }
1769
1770 #if !defined(KERNEL)
1771
1772 /* Call this routine when shutting down a server or client (especially
1773  * clients).  This will allow Rx to gracefully garbage collect server
1774  * connections, and reduce the number of retries that a server might
1775  * make to a dead client.
1776  * This is not quite right, since some calls may still be ongoing and
1777  * we can't lock them to destroy them. */
1778 void rx_Finalize() {
1779     register struct rx_connection **conn_ptr, **conn_end;
1780
1781     INIT_PTHREAD_LOCKS
1782     LOCK_RX_INIT
1783     if (rxinit_status == 1) {
1784         UNLOCK_RX_INIT
1785         return; /* Already shutdown. */
1786     }
1787     rxi_DeleteCachedConnections();
1788     if (rx_connHashTable) {
1789         MUTEX_ENTER(&rx_connHashTable_lock);
1790         for (conn_ptr = &rx_connHashTable[0], 
1791              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1792              conn_ptr < conn_end; conn_ptr++) {
1793             struct rx_connection *conn, *next;
1794             for (conn = *conn_ptr; conn; conn = next) {
1795                 next = conn->next;
1796                 if (conn->type == RX_CLIENT_CONNECTION) {
1797                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1798                     conn->refCount++;
1799                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1800 #ifdef RX_ENABLE_LOCKS
1801                     rxi_DestroyConnectionNoLock(conn);
1802 #else /* RX_ENABLE_LOCKS */
1803                     rxi_DestroyConnection(conn);
1804 #endif /* RX_ENABLE_LOCKS */
1805                 }
1806             }
1807         }
1808 #ifdef RX_ENABLE_LOCKS
1809         while (rx_connCleanup_list) {
1810             struct rx_connection *conn;
1811             conn = rx_connCleanup_list;
1812             rx_connCleanup_list = rx_connCleanup_list->next;
1813             MUTEX_EXIT(&rx_connHashTable_lock);
1814             rxi_CleanupConnection(conn);
1815             MUTEX_ENTER(&rx_connHashTable_lock);
1816         }
1817         MUTEX_EXIT(&rx_connHashTable_lock);
1818 #endif /* RX_ENABLE_LOCKS */
1819     }
1820     rxi_flushtrace();
1821
1822     rxinit_status = 1;
1823     UNLOCK_RX_INIT
1824 }
1825 #endif
1826
1827 /* if we wakeup packet waiter too often, can get in loop with two
1828     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1829 void
1830 rxi_PacketsUnWait() {
1831
1832     if (!rx_waitingForPackets) {
1833         return;
1834     }
1835 #ifdef KERNEL
1836     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1837         return;                                     /* still over quota */
1838     }
1839 #endif /* KERNEL */
1840     rx_waitingForPackets = 0;
1841 #ifdef  RX_ENABLE_LOCKS
1842     CV_BROADCAST(&rx_waitingForPackets_cv);
1843 #else
1844     osi_rxWakeup(&rx_waitingForPackets);
1845 #endif
1846     return;
1847 }
1848
1849
1850 /* ------------------Internal interfaces------------------------- */
1851
1852 /* Return this process's service structure for the
1853  * specified socket and service */
1854 struct rx_service *rxi_FindService(socket, serviceId)
1855     register osi_socket socket;
1856     register u_short serviceId;
1857 {
1858     register struct rx_service **sp;    
1859     for (sp = &rx_services[0]; *sp; sp++) {
1860         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1861           return *sp;
1862     }
1863     return 0;
1864 }
1865
1866 /* Allocate a call structure, for the indicated channel of the
1867  * supplied connection.  The mode and state of the call must be set by
1868  * the caller. */
1869 struct rx_call *rxi_NewCall(conn, channel)
1870     register struct rx_connection *conn;
1871     register int channel;
1872 {
1873     register struct rx_call *call;
1874 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1875     register struct rx_call *cp;        /* Call pointer temp */
1876     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1877 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1878
1879     /* Grab an existing call structure, or allocate a new one.
1880      * Existing call structures are assumed to have been left reset by
1881      * rxi_FreeCall */
1882     MUTEX_ENTER(&rx_freeCallQueue_lock);
1883
1884 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1885     /*
1886      * EXCEPT that the TQ might not yet be cleared out.
1887      * Skip over those with in-use TQs.
1888      */
1889     call = NULL;
1890     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1891         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1892             call = cp;
1893             break;
1894         }
1895     }
1896     if (call) {
1897 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1898     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1899         call = queue_First(&rx_freeCallQueue, rx_call);
1900 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1901         queue_Remove(call);
1902         MUTEX_ENTER(&rx_stats_mutex);
1903         rx_stats.nFreeCallStructs--;
1904         MUTEX_EXIT(&rx_stats_mutex);
1905         MUTEX_EXIT(&rx_freeCallQueue_lock);
1906         MUTEX_ENTER(&call->lock);
1907         CLEAR_CALL_QUEUE_LOCK(call);
1908 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1909         /* Now, if TQ wasn't cleared earlier, do it now. */
1910         if (call->flags & RX_CALL_TQ_CLEARME) {
1911             rxi_ClearTransmitQueue(call, 0);
1912             queue_Init(&call->tq);
1913         }
1914 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1915         /* Bind the call to its connection structure */
1916         call->conn = conn;
1917         rxi_ResetCall(call, 1);
1918     }
1919     else {
1920         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1921
1922         MUTEX_EXIT(&rx_freeCallQueue_lock);
1923         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1924         MUTEX_ENTER(&call->lock);
1925         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1926         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1927         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1928
1929         MUTEX_ENTER(&rx_stats_mutex);
1930         rx_stats.nCallStructs++;
1931         MUTEX_EXIT(&rx_stats_mutex);
1932         /* Initialize once-only items */
1933         queue_Init(&call->tq);
1934         queue_Init(&call->rq);
1935         queue_Init(&call->iovq);
1936         /* Bind the call to its connection structure (prereq for reset) */
1937         call->conn = conn;
1938         rxi_ResetCall(call, 1);
1939     }
1940     call->channel = channel;
1941     call->callNumber = &conn->callNumber[channel];
1942     /* Note that the next expected call number is retained (in
1943      * conn->callNumber[i]), even if we reallocate the call structure
1944      */
1945     conn->call[channel] = call;
1946     /* if the channel's never been used (== 0), we should start at 1, otherwise
1947         the call number is valid from the last time this channel was used */
1948     if (*call->callNumber == 0) *call->callNumber = 1;
1949
1950     MUTEX_EXIT(&call->lock);
1951     return call;
1952 }
1953
1954 /* A call has been inactive long enough that so we can throw away
1955  * state, including the call structure, which is placed on the call
1956  * free list.
1957  * Call is locked upon entry.
1958  */
1959 #ifdef RX_ENABLE_LOCKS
1960 void rxi_FreeCall(call, haveCTLock)
1961     int haveCTLock; /* Set if called from rxi_ReapConnections */
1962 #else /* RX_ENABLE_LOCKS */
1963 void rxi_FreeCall(call)
1964 #endif /* RX_ENABLE_LOCKS */
1965     register struct rx_call *call;
1966 {
1967     register int channel = call->channel;
1968     register struct rx_connection *conn = call->conn;
1969
1970
1971     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
1972       (*call->callNumber)++;
1973     rxi_ResetCall(call, 0);
1974     call->conn->call[channel] = (struct rx_call *) 0;
1975
1976     MUTEX_ENTER(&rx_freeCallQueue_lock);
1977     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
1978 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1979     /* A call may be free even though its transmit queue is still in use.
1980      * Since we search the call list from head to tail, put busy calls at
1981      * the head of the list, and idle calls at the tail.
1982      */
1983     if (call->flags & RX_CALL_TQ_BUSY)
1984         queue_Prepend(&rx_freeCallQueue, call);
1985     else
1986         queue_Append(&rx_freeCallQueue, call);
1987 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1988     queue_Append(&rx_freeCallQueue, call);
1989 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1990     MUTEX_ENTER(&rx_stats_mutex);
1991     rx_stats.nFreeCallStructs++;
1992     MUTEX_EXIT(&rx_stats_mutex);
1993
1994     MUTEX_EXIT(&rx_freeCallQueue_lock);
1995  
1996     /* Destroy the connection if it was previously slated for
1997      * destruction, i.e. the Rx client code previously called
1998      * rx_DestroyConnection (client connections), or
1999      * rxi_ReapConnections called the same routine (server
2000      * connections).  Only do this, however, if there are no
2001      * outstanding calls. Note that for fine grain locking, there appears
2002      * to be a deadlock in that rxi_FreeCall has a call locked and
2003      * DestroyConnectionNoLock locks each call in the conn. But note a
2004      * few lines up where we have removed this call from the conn.
2005      * If someone else destroys a connection, they either have no
2006      * call lock held or are going through this section of code.
2007      */
2008     if (conn->flags & RX_CONN_DESTROY_ME) {
2009         MUTEX_ENTER(&conn->conn_data_lock);
2010         conn->refCount++;
2011         MUTEX_EXIT(&conn->conn_data_lock);
2012 #ifdef RX_ENABLE_LOCKS
2013         if (haveCTLock)
2014             rxi_DestroyConnectionNoLock(conn);
2015         else
2016             rxi_DestroyConnection(conn);
2017 #else /* RX_ENABLE_LOCKS */
2018         rxi_DestroyConnection(conn);
2019 #endif /* RX_ENABLE_LOCKS */
2020     }
2021 }
2022
2023 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2024 char *rxi_Alloc(size)
2025 register size_t size;
2026 {
2027     register char *p;
2028
2029 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2030     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2031      * implementation.
2032      */
2033     int glockOwner = ISAFS_GLOCK();
2034     if (!glockOwner)
2035         AFS_GLOCK();
2036 #endif
2037     MUTEX_ENTER(&rx_stats_mutex);
2038     rxi_Alloccnt++; rxi_Allocsize += size;
2039     MUTEX_EXIT(&rx_stats_mutex);
2040 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2041     if (size > AFS_SMALLOCSIZ) {
2042         p = (char *) osi_AllocMediumSpace(size);
2043     } else
2044         p = (char *) osi_AllocSmall(size, 1);
2045 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2046     if (!glockOwner)
2047         AFS_GUNLOCK();
2048 #endif
2049 #else
2050     p = (char *) osi_Alloc(size);
2051 #endif
2052     if (!p) osi_Panic("rxi_Alloc error");
2053     bzero(p, size);
2054     return p;
2055 }
2056
2057 void rxi_Free(addr, size)
2058 void *addr;
2059 register size_t size;
2060 {
2061 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2062     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2063      * implementation.
2064      */
2065     int glockOwner = ISAFS_GLOCK();
2066     if (!glockOwner)
2067         AFS_GLOCK();
2068 #endif
2069     MUTEX_ENTER(&rx_stats_mutex);
2070     rxi_Alloccnt--; rxi_Allocsize -= size;
2071     MUTEX_EXIT(&rx_stats_mutex);
2072 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2073     if (size > AFS_SMALLOCSIZ)
2074         osi_FreeMediumSpace(addr);
2075     else
2076         osi_FreeSmall(addr);
2077 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2078     if (!glockOwner)
2079         AFS_GUNLOCK();
2080 #endif
2081 #else
2082     osi_Free(addr, size);
2083 #endif    
2084 }
2085
2086 /* Find the peer process represented by the supplied (host,port)
2087  * combination.  If there is no appropriate active peer structure, a
2088  * new one will be allocated and initialized 
2089  * The origPeer, if set, is a pointer to a peer structure on which the
2090  * refcount will be be decremented. This is used to replace the peer
2091  * structure hanging off a connection structure */
2092 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2093     register afs_uint32 host;
2094     register u_short port;
2095     struct rx_peer *origPeer;
2096     int create;
2097 {
2098     register struct rx_peer *pp;
2099     int hashIndex;
2100     hashIndex = PEER_HASH(host, port);
2101     MUTEX_ENTER(&rx_peerHashTable_lock);
2102     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2103         if ((pp->host == host) && (pp->port == port)) break;
2104     }
2105     if (!pp) {
2106         if (create) {
2107             pp = rxi_AllocPeer(); /* This bzero's *pp */
2108             pp->host = host;      /* set here or in InitPeerParams is zero */
2109             pp->port = port;
2110             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2111             queue_Init(&pp->congestionQueue);
2112             queue_Init(&pp->rpcStats);
2113             pp->next = rx_peerHashTable[hashIndex];
2114             rx_peerHashTable[hashIndex] = pp;
2115             rxi_InitPeerParams(pp);
2116             MUTEX_ENTER(&rx_stats_mutex);
2117             rx_stats.nPeerStructs++;
2118             MUTEX_EXIT(&rx_stats_mutex);
2119         }
2120     }
2121     if (pp && create) {
2122         pp->refCount++;
2123     }
2124     if ( origPeer)
2125         origPeer->refCount--;
2126     MUTEX_EXIT(&rx_peerHashTable_lock);
2127     return pp;
2128 }
2129
2130
2131 /* Find the connection at (host, port) started at epoch, and with the
2132  * given connection id.  Creates the server connection if necessary.
2133  * The type specifies whether a client connection or a server
2134  * connection is desired.  In both cases, (host, port) specify the
2135  * peer's (host, pair) pair.  Client connections are not made
2136  * automatically by this routine.  The parameter socket gives the
2137  * socket descriptor on which the packet was received.  This is used,
2138  * in the case of server connections, to check that *new* connections
2139  * come via a valid (port, serviceId).  Finally, the securityIndex
2140  * parameter must match the existing index for the connection.  If a
2141  * server connection is created, it will be created using the supplied
2142  * index, if the index is valid for this service */
2143 struct rx_connection *
2144 rxi_FindConnection(socket, host, port, serviceId, cid, 
2145                    epoch, type, securityIndex)
2146     osi_socket socket;
2147     register afs_int32 host;
2148     register u_short port;
2149     u_short serviceId;
2150     afs_uint32 cid;
2151     afs_uint32 epoch;
2152     int type;
2153     u_int securityIndex;
2154 {
2155     int hashindex, flag;
2156     register struct rx_connection *conn;
2157     struct rx_peer *peer;
2158     hashindex = CONN_HASH(host, port, cid, epoch, type);
2159     MUTEX_ENTER(&rx_connHashTable_lock);
2160     rxLastConn ? (conn = rxLastConn, flag = 0) :
2161                  (conn = rx_connHashTable[hashindex], flag = 1);
2162     for (; conn; ) {
2163       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2164           && (epoch == conn->epoch)) {
2165         register struct rx_peer *pp = conn->peer;
2166         if (securityIndex != conn->securityIndex) {
2167             /* this isn't supposed to happen, but someone could forge a packet
2168                like this, and there seems to be some CM bug that makes this
2169                happen from time to time -- in which case, the fileserver
2170                asserts. */  
2171             MUTEX_EXIT(&rx_connHashTable_lock);
2172             return (struct rx_connection *) 0;
2173         }
2174         /* epoch's high order bits mean route for security reasons only on
2175          * the cid, not the host and port fields.
2176          */
2177         if (conn->epoch & 0x80000000) break;
2178         if (((type == RX_CLIENT_CONNECTION) 
2179              || (pp->host == host)) && (pp->port == port))
2180           break;
2181       }
2182       if ( !flag )
2183       {
2184         /* the connection rxLastConn that was used the last time is not the
2185         ** one we are looking for now. Hence, start searching in the hash */
2186         flag = 1;
2187         conn = rx_connHashTable[hashindex];
2188       }
2189       else
2190         conn = conn->next;
2191     }
2192     if (!conn) {
2193         struct rx_service *service;
2194         if (type == RX_CLIENT_CONNECTION) {
2195             MUTEX_EXIT(&rx_connHashTable_lock);
2196             return (struct rx_connection *) 0;
2197         }
2198         service = rxi_FindService(socket, serviceId);
2199         if (!service || (securityIndex >= service->nSecurityObjects) 
2200             || (service->securityObjects[securityIndex] == 0)) {
2201             MUTEX_EXIT(&rx_connHashTable_lock);
2202             return (struct rx_connection *) 0;
2203         }
2204         conn = rxi_AllocConnection(); /* This bzero's the connection */
2205         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2206                    MUTEX_DEFAULT,0);
2207         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2208                    MUTEX_DEFAULT,0);
2209         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2210         conn->next = rx_connHashTable[hashindex];
2211         rx_connHashTable[hashindex] = conn;
2212         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2213         conn->type = RX_SERVER_CONNECTION;
2214         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2215         conn->epoch = epoch;
2216         conn->cid = cid & RX_CIDMASK;
2217         /* conn->serial = conn->lastSerial = 0; */
2218         /* conn->timeout = 0; */
2219         conn->ackRate = RX_FAST_ACK_RATE;
2220         conn->service = service;
2221         conn->serviceId = serviceId;
2222         conn->securityIndex = securityIndex;
2223         conn->securityObject = service->securityObjects[securityIndex];
2224         conn->nSpecific = 0;
2225         conn->specific = NULL;
2226         rx_SetConnDeadTime(conn, service->connDeadTime);
2227         /* Notify security object of the new connection */
2228         RXS_NewConnection(conn->securityObject, conn);
2229         /* XXXX Connection timeout? */
2230         if (service->newConnProc) (*service->newConnProc)(conn);
2231         MUTEX_ENTER(&rx_stats_mutex);
2232         rx_stats.nServerConns++;
2233         MUTEX_EXIT(&rx_stats_mutex);
2234     }
2235     else
2236     {
2237     /* Ensure that the peer structure is set up in such a way that
2238     ** replies in this connection go back to that remote interface
2239     ** from which the last packet was sent out. In case, this packet's
2240     ** source IP address does not match the peer struct for this conn,
2241     ** then drop the refCount on conn->peer and get a new peer structure.
2242     ** We can check the host,port field in the peer structure without the
2243     ** rx_peerHashTable_lock because the peer structure has its refCount
2244     ** incremented and the only time the host,port in the peer struct gets
2245     ** updated is when the peer structure is created.
2246     */
2247         if (conn->peer->host == host )
2248                 peer = conn->peer; /* no change to the peer structure */
2249         else
2250                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2251     }
2252
2253     MUTEX_ENTER(&conn->conn_data_lock);
2254     conn->refCount++;
2255     conn->peer = peer;
2256     MUTEX_EXIT(&conn->conn_data_lock);
2257
2258     rxLastConn = conn;  /* store this connection as the last conn used */
2259     MUTEX_EXIT(&rx_connHashTable_lock);
2260     return conn;
2261 }
2262
2263 /* There are two packet tracing routines available for testing and monitoring
2264  * Rx.  One is called just after every packet is received and the other is
2265  * called just before every packet is sent.  Received packets, have had their
2266  * headers decoded, and packets to be sent have not yet had their headers
2267  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2268  * containing the network address.  Both can be modified.  The return value, if
2269  * non-zero, indicates that the packet should be dropped.  */
2270
2271 int (*rx_justReceived)() = 0;
2272 int (*rx_almostSent)() = 0;
2273
2274 /* A packet has been received off the interface.  Np is the packet, socket is
2275  * the socket number it was received from (useful in determining which service
2276  * this packet corresponds to), and (host, port) reflect the host,port of the
2277  * sender.  This call returns the packet to the caller if it is finished with
2278  * it, rather than de-allocating it, just as a small performance hack */
2279
2280 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2281     register struct rx_packet *np;
2282     osi_socket socket;
2283     afs_uint32 host;
2284     u_short port;
2285     int *tnop;
2286     struct rx_call **newcallp;
2287 {
2288     register struct rx_call *call;
2289     register struct rx_connection *conn;
2290     int channel;
2291     afs_uint32 currentCallNumber;
2292     int type;
2293     int skew;
2294 #ifdef RXDEBUG
2295     char *packetType;
2296 #endif
2297     struct rx_packet *tnp;
2298
2299 #ifdef RXDEBUG
2300 /* We don't print out the packet until now because (1) the time may not be
2301  * accurate enough until now in the lwp implementation (rx_Listener only gets
2302  * the time after the packet is read) and (2) from a protocol point of view,
2303  * this is the first time the packet has been seen */
2304     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2305         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2306     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2307          np->header.serial, packetType, host, port, np->header.serviceId,
2308          np->header.epoch, np->header.cid, np->header.callNumber, 
2309          np->header.seq, np->header.flags, np));
2310 #endif
2311
2312     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2313       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2314     }
2315
2316     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2317         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2318     }
2319 #ifdef RXDEBUG
2320     /* If an input tracer function is defined, call it with the packet and
2321      * network address.  Note this function may modify its arguments. */
2322     if (rx_justReceived) {
2323         struct sockaddr_in addr;
2324         int drop;
2325         addr.sin_family = AF_INET;
2326         addr.sin_port = port;
2327         addr.sin_addr.s_addr = host;
2328 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2329         addr.sin_len = sizeof(addr);
2330 #endif  /* AFS_OSF_ENV */
2331         drop = (*rx_justReceived) (np, &addr);
2332         /* drop packet if return value is non-zero */
2333         if (drop) return np;
2334         port = addr.sin_port;           /* in case fcn changed addr */
2335         host = addr.sin_addr.s_addr;
2336     }
2337 #endif
2338
2339     /* If packet was not sent by the client, then *we* must be the client */
2340     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2341         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2342
2343     /* Find the connection (or fabricate one, if we're the server & if
2344      * necessary) associated with this packet */
2345     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2346                               np->header.cid, np->header.epoch, type, 
2347                               np->header.securityIndex);
2348
2349     if (!conn) {
2350       /* If no connection found or fabricated, just ignore the packet.
2351        * (An argument could be made for sending an abort packet for
2352        * the conn) */
2353       return np;
2354     }   
2355
2356     MUTEX_ENTER(&conn->conn_data_lock);
2357     if (conn->maxSerial < np->header.serial)
2358       conn->maxSerial = np->header.serial;
2359     MUTEX_EXIT(&conn->conn_data_lock);
2360
2361     /* If the connection is in an error state, send an abort packet and ignore
2362      * the incoming packet */
2363     if (conn->error) {
2364         /* Don't respond to an abort packet--we don't want loops! */
2365         MUTEX_ENTER(&conn->conn_data_lock);
2366         if (np->header.type != RX_PACKET_TYPE_ABORT)
2367             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2368         conn->refCount--;
2369         MUTEX_EXIT(&conn->conn_data_lock);
2370         return np;
2371     }
2372
2373     /* Check for connection-only requests (i.e. not call specific). */
2374     if (np->header.callNumber == 0) {
2375         switch (np->header.type) {
2376             case RX_PACKET_TYPE_ABORT:
2377                 /* What if the supplied error is zero? */
2378                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2379                 MUTEX_ENTER(&conn->conn_data_lock);
2380                 conn->refCount--;
2381                 MUTEX_EXIT(&conn->conn_data_lock);
2382                 return np;
2383             case RX_PACKET_TYPE_CHALLENGE:
2384                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2385                 MUTEX_ENTER(&conn->conn_data_lock);
2386                 conn->refCount--;
2387                 MUTEX_EXIT(&conn->conn_data_lock);
2388                 return tnp;
2389             case RX_PACKET_TYPE_RESPONSE:
2390                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2391                 MUTEX_ENTER(&conn->conn_data_lock);
2392                 conn->refCount--;
2393                 MUTEX_EXIT(&conn->conn_data_lock);
2394                 return tnp;
2395             case RX_PACKET_TYPE_PARAMS:
2396             case RX_PACKET_TYPE_PARAMS+1:
2397             case RX_PACKET_TYPE_PARAMS+2:
2398                 /* ignore these packet types for now */
2399                 MUTEX_ENTER(&conn->conn_data_lock);
2400                 conn->refCount--;
2401                 MUTEX_EXIT(&conn->conn_data_lock);
2402                 return np;
2403
2404
2405             default:
2406                 /* Should not reach here, unless the peer is broken: send an
2407                  * abort packet */
2408                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2409                 MUTEX_ENTER(&conn->conn_data_lock);
2410                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2411                 conn->refCount--;
2412                 MUTEX_EXIT(&conn->conn_data_lock);
2413                 return tnp;
2414         }
2415     }
2416
2417     channel = np->header.cid & RX_CHANNELMASK;
2418     call = conn->call[channel];
2419 #ifdef  RX_ENABLE_LOCKS
2420     if (call)
2421         MUTEX_ENTER(&call->lock);
2422     /* Test to see if call struct is still attached to conn. */
2423     if (call != conn->call[channel]) {
2424         if (call)
2425             MUTEX_EXIT(&call->lock);
2426         if (type == RX_SERVER_CONNECTION) {
2427             call = conn->call[channel];
2428             /* If we started with no call attached and there is one now,
2429              * another thread is also running this routine and has gotten
2430              * the connection channel. We should drop this packet in the tests
2431              * below. If there was a call on this connection and it's now
2432              * gone, then we'll be making a new call below.
2433              * If there was previously a call and it's now different then
2434              * the old call was freed and another thread running this routine
2435              * has created a call on this channel. One of these two threads
2436              * has a packet for the old call and the code below handles those
2437              * cases.
2438              */
2439             if (call)
2440                 MUTEX_ENTER(&call->lock);
2441         }
2442         else {
2443             /* This packet can't be for this call. If the new call address is
2444              * 0 then no call is running on this channel. If there is a call
2445              * then, since this is a client connection we're getting data for
2446              * it must be for the previous call.
2447              */
2448             MUTEX_ENTER(&rx_stats_mutex);
2449             rx_stats.spuriousPacketsRead++;
2450             MUTEX_EXIT(&rx_stats_mutex);
2451             MUTEX_ENTER(&conn->conn_data_lock);
2452             conn->refCount--;
2453             MUTEX_EXIT(&conn->conn_data_lock);
2454             return np;
2455         }
2456     }
2457 #endif
2458     currentCallNumber = conn->callNumber[channel];
2459
2460     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2461         if (np->header.callNumber < currentCallNumber) {
2462             MUTEX_ENTER(&rx_stats_mutex);
2463             rx_stats.spuriousPacketsRead++;
2464             MUTEX_EXIT(&rx_stats_mutex);
2465 #ifdef  RX_ENABLE_LOCKS
2466             if (call)
2467                 MUTEX_EXIT(&call->lock);
2468 #endif
2469             MUTEX_ENTER(&conn->conn_data_lock);
2470             conn->refCount--;
2471             MUTEX_EXIT(&conn->conn_data_lock);
2472             return np;
2473         }
2474         if (!call) {
2475             call = rxi_NewCall(conn, channel);
2476             MUTEX_ENTER(&call->lock);
2477             *call->callNumber = np->header.callNumber;
2478             call->state = RX_STATE_PRECALL;
2479             clock_GetTime(&call->queueTime);
2480             hzero(call->bytesSent);
2481             hzero(call->bytesRcvd);
2482             rxi_KeepAliveOn(call);
2483         }
2484         else if (np->header.callNumber != currentCallNumber) {
2485             /* Wait until the transmit queue is idle before deciding
2486              * whether to reset the current call. Chances are that the
2487              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2488              * flag is cleared.
2489              */
2490 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2491             while ((call->state == RX_STATE_ACTIVE) &&
2492                    (call->flags & RX_CALL_TQ_BUSY)) {
2493                 call->flags |= RX_CALL_TQ_WAIT;
2494 #ifdef RX_ENABLE_LOCKS
2495                 CV_WAIT(&call->cv_tq, &call->lock);
2496 #else /* RX_ENABLE_LOCKS */
2497                 osi_rxSleep(&call->tq);
2498 #endif /* RX_ENABLE_LOCKS */
2499             }
2500 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2501             /* If the new call cannot be taken right now send a busy and set
2502              * the error condition in this call, so that it terminates as
2503              * quickly as possible */
2504             if (call->state == RX_STATE_ACTIVE) {
2505                 struct rx_packet *tp;
2506
2507                 rxi_CallError(call, RX_CALL_DEAD);
2508                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2509                 MUTEX_EXIT(&call->lock);
2510                 MUTEX_ENTER(&conn->conn_data_lock);
2511                 conn->refCount--;
2512                 MUTEX_EXIT(&conn->conn_data_lock);
2513                 return tp;
2514             }
2515             rxi_ResetCall(call, 0);
2516             *call->callNumber = np->header.callNumber;
2517             call->state = RX_STATE_PRECALL;
2518             clock_GetTime(&call->queueTime);
2519             hzero(call->bytesSent);
2520             hzero(call->bytesRcvd);
2521             /*
2522              * If the number of queued calls exceeds the overload
2523              * threshold then abort this call.
2524              */
2525             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2526                 struct rx_packet *tp;
2527
2528                 rxi_CallError(call, rx_BusyError);
2529                 tp = rxi_SendCallAbort(call, np, 1, 0);
2530                 MUTEX_EXIT(&call->lock);
2531                 MUTEX_ENTER(&conn->conn_data_lock);
2532                 conn->refCount--;
2533                 MUTEX_EXIT(&conn->conn_data_lock);
2534                 return tp;
2535             }
2536             rxi_KeepAliveOn(call);
2537         }
2538         else {
2539             /* Continuing call; do nothing here. */
2540         }
2541     } else { /* we're the client */
2542         /* Ignore all incoming acknowledgements for calls in DALLY state */
2543         if ( call && (call->state == RX_STATE_DALLY) 
2544          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2545             MUTEX_ENTER(&rx_stats_mutex);
2546             rx_stats.ignorePacketDally++;
2547             MUTEX_EXIT(&rx_stats_mutex);
2548 #ifdef  RX_ENABLE_LOCKS
2549             if (call) {
2550                 MUTEX_EXIT(&call->lock);
2551             }
2552 #endif
2553             MUTEX_ENTER(&conn->conn_data_lock);
2554             conn->refCount--;
2555             MUTEX_EXIT(&conn->conn_data_lock);
2556             return np;
2557         }
2558         
2559         /* Ignore anything that's not relevant to the current call.  If there
2560          * isn't a current call, then no packet is relevant. */
2561         if (!call || (np->header.callNumber != currentCallNumber)) {
2562             MUTEX_ENTER(&rx_stats_mutex);
2563             rx_stats.spuriousPacketsRead++;
2564             MUTEX_EXIT(&rx_stats_mutex);
2565 #ifdef  RX_ENABLE_LOCKS
2566             if (call) {
2567                 MUTEX_EXIT(&call->lock);
2568             }
2569 #endif
2570             MUTEX_ENTER(&conn->conn_data_lock);
2571             conn->refCount--;
2572             MUTEX_EXIT(&conn->conn_data_lock);
2573             return np;  
2574         }
2575         /* If the service security object index stamped in the packet does not
2576          * match the connection's security index, ignore the packet */
2577         if (np->header.securityIndex != conn->securityIndex) {
2578 #ifdef  RX_ENABLE_LOCKS
2579             MUTEX_EXIT(&call->lock);
2580 #endif
2581             MUTEX_ENTER(&conn->conn_data_lock);
2582             conn->refCount--;       
2583             MUTEX_EXIT(&conn->conn_data_lock);
2584             return np;
2585         }
2586
2587         /* If we're receiving the response, then all transmit packets are
2588          * implicitly acknowledged.  Get rid of them. */
2589         if (np->header.type == RX_PACKET_TYPE_DATA) {
2590 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2591             /* XXX Hack. Because we must release the global rx lock when
2592              * sending packets (osi_NetSend) we drop all acks while we're
2593              * traversing the tq in rxi_Start sending packets out because
2594              * packets may move to the freePacketQueue as result of being here!
2595              * So we drop these packets until we're safely out of the
2596              * traversing. Really ugly! 
2597              * For fine grain RX locking, we set the acked field in the
2598              * packets and let rxi_Start remove them from the transmit queue.
2599              */
2600             if (call->flags & RX_CALL_TQ_BUSY) {
2601 #ifdef  RX_ENABLE_LOCKS
2602                 rxi_SetAcksInTransmitQueue(call);
2603 #else
2604                 conn->refCount--;
2605                 return np;              /* xmitting; drop packet */
2606 #endif
2607             }
2608             else {
2609                 rxi_ClearTransmitQueue(call, 0);
2610             }
2611 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2612             rxi_ClearTransmitQueue(call, 0);
2613 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2614         } else {
2615           if (np->header.type == RX_PACKET_TYPE_ACK) {
2616         /* now check to see if this is an ack packet acknowledging that the
2617          * server actually *lost* some hard-acked data.  If this happens we
2618          * ignore this packet, as it may indicate that the server restarted in
2619          * the middle of a call.  It is also possible that this is an old ack
2620          * packet.  We don't abort the connection in this case, because this
2621          * *might* just be an old ack packet.  The right way to detect a server
2622          * restart in the midst of a call is to notice that the server epoch
2623          * changed, btw.  */
2624         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2625          * XXX unacknowledged.  I think that this is off-by-one, but
2626          * XXX I don't dare change it just yet, since it will
2627          * XXX interact badly with the server-restart detection 
2628          * XXX code in receiveackpacket.  */
2629             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2630                 MUTEX_ENTER(&rx_stats_mutex);
2631                 rx_stats.spuriousPacketsRead++;
2632                 MUTEX_EXIT(&rx_stats_mutex);
2633                 MUTEX_EXIT(&call->lock);
2634                 MUTEX_ENTER(&conn->conn_data_lock);
2635                 conn->refCount--;
2636                 MUTEX_EXIT(&conn->conn_data_lock);
2637                 return np;
2638             }
2639           }
2640         } /* else not a data packet */
2641     }
2642
2643     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2644     /* Set remote user defined status from packet */
2645     call->remoteStatus = np->header.userStatus;
2646
2647     /* Note the gap between the expected next packet and the actual
2648      * packet that arrived, when the new packet has a smaller serial number
2649      * than expected.  Rioses frequently reorder packets all by themselves,
2650      * so this will be quite important with very large window sizes.
2651      * Skew is checked against 0 here to avoid any dependence on the type of
2652      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2653      * true! 
2654      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2655      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2656      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2657      */
2658     MUTEX_ENTER(&conn->conn_data_lock);
2659     skew = conn->lastSerial - np->header.serial;
2660     conn->lastSerial = np->header.serial;
2661     MUTEX_EXIT(&conn->conn_data_lock);
2662     if (skew > 0) {
2663       register struct rx_peer *peer;
2664       peer = conn->peer;
2665       if (skew > peer->inPacketSkew) {
2666         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2667         peer->inPacketSkew = skew;
2668       }
2669     }
2670
2671     /* Now do packet type-specific processing */
2672     switch (np->header.type) {
2673         case RX_PACKET_TYPE_DATA:
2674             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2675                                        tnop, newcallp);
2676             break;
2677         case RX_PACKET_TYPE_ACK:
2678             /* Respond immediately to ack packets requesting acknowledgement
2679              * (ping packets) */
2680             if (np->header.flags & RX_REQUEST_ACK) {
2681                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2682                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2683             }
2684             np = rxi_ReceiveAckPacket(call, np, 1);
2685             break;
2686         case RX_PACKET_TYPE_ABORT:
2687             /* An abort packet: reset the connection, passing the error up to
2688              * the user */
2689             /* What if error is zero? */
2690             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2691             break;
2692         case RX_PACKET_TYPE_BUSY:
2693             /* XXXX */
2694             break;
2695         case RX_PACKET_TYPE_ACKALL:
2696             /* All packets acknowledged, so we can drop all packets previously
2697              * readied for sending */
2698 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2699             /* XXX Hack. We because we can't release the global rx lock when
2700              * sending packets (osi_NetSend) we drop all ack pkts while we're
2701              * traversing the tq in rxi_Start sending packets out because
2702              * packets may move to the freePacketQueue as result of being
2703              * here! So we drop these packets until we're safely out of the
2704              * traversing. Really ugly! 
2705              * For fine grain RX locking, we set the acked field in the packets
2706              * and let rxi_Start remove the packets from the transmit queue.
2707              */
2708             if (call->flags & RX_CALL_TQ_BUSY) {
2709 #ifdef  RX_ENABLE_LOCKS
2710                 rxi_SetAcksInTransmitQueue(call);
2711                 break;
2712 #else /* RX_ENABLE_LOCKS */
2713                 conn->refCount--;
2714                 return np;              /* xmitting; drop packet */
2715 #endif /* RX_ENABLE_LOCKS */
2716             }
2717 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2718             rxi_ClearTransmitQueue(call, 0);
2719             break;
2720         default:
2721             /* Should not reach here, unless the peer is broken: send an abort
2722              * packet */
2723             rxi_CallError(call, RX_PROTOCOL_ERROR);
2724             np = rxi_SendCallAbort(call, np, 1, 0);
2725             break;
2726     };
2727     /* Note when this last legitimate packet was received, for keep-alive
2728      * processing.  Note, we delay getting the time until now in the hope that
2729      * the packet will be delivered to the user before any get time is required
2730      * (if not, then the time won't actually be re-evaluated here). */
2731     call->lastReceiveTime = clock_Sec();
2732     MUTEX_EXIT(&call->lock);
2733     MUTEX_ENTER(&conn->conn_data_lock);
2734     conn->refCount--;
2735     MUTEX_EXIT(&conn->conn_data_lock);
2736     return np;
2737 }
2738
2739 /* return true if this is an "interesting" connection from the point of view
2740     of someone trying to debug the system */
2741 int rxi_IsConnInteresting(struct rx_connection *aconn)
2742 {
2743     register int i;
2744     register struct rx_call *tcall;
2745
2746     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2747         return 1;
2748     for(i=0;i<RX_MAXCALLS;i++) {
2749         tcall = aconn->call[i];
2750         if (tcall) {
2751             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2752                 return 1;
2753             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2754                 return 1;
2755         }
2756     }
2757     return 0;
2758 }
2759
2760 #ifdef KERNEL
2761 /* if this is one of the last few packets AND it wouldn't be used by the
2762    receiving call to immediately satisfy a read request, then drop it on
2763    the floor, since accepting it might prevent a lock-holding thread from
2764    making progress in its reading. If a call has been cleared while in
2765    the precall state then ignore all subsequent packets until the call
2766    is assigned to a thread. */
2767
2768 static TooLow(ap, acall)
2769   struct rx_call *acall;
2770   struct rx_packet *ap; {
2771     int rc=0;
2772     MUTEX_ENTER(&rx_stats_mutex);
2773     if (((ap->header.seq != 1) &&
2774          (acall->flags & RX_CALL_CLEARED) &&
2775          (acall->state == RX_STATE_PRECALL)) ||
2776         ((rx_nFreePackets < rxi_dataQuota+2) &&
2777          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2778            && (acall->flags & RX_CALL_READER_WAIT)))) {
2779         rc = 1;
2780     }
2781     MUTEX_EXIT(&rx_stats_mutex);
2782     return rc;
2783 }
2784 #endif /* KERNEL */
2785
2786 /* try to attach call, if authentication is complete */
2787 static void TryAttach(acall, socket, tnop, newcallp)
2788 register struct rx_call *acall;
2789 register osi_socket socket;
2790 register int *tnop;
2791 register struct rx_call **newcallp; {
2792     register struct rx_connection *conn;
2793     conn = acall->conn;
2794     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2795         /* Don't attach until we have any req'd. authentication. */
2796         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2797             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2798             /* Note:  this does not necessarily succeed; there
2799                may not any proc available */
2800         }
2801         else {
2802             rxi_ChallengeOn(acall->conn);
2803         }
2804     }
2805 }
2806
2807 /* A data packet has been received off the interface.  This packet is
2808  * appropriate to the call (the call is in the right state, etc.).  This
2809  * routine can return a packet to the caller, for re-use */
2810
2811 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2812                                         port, tnop, newcallp)
2813     register struct rx_call *call;
2814     register struct rx_packet *np;
2815     int istack;
2816     osi_socket socket;
2817     afs_uint32 host;
2818     u_short port;
2819     int *tnop;
2820     struct rx_call **newcallp;
2821 {
2822     int ackNeeded = 0;
2823     int newPackets = 0;
2824     int didHardAck = 0;
2825     int haveLast = 0;
2826     afs_uint32 seq, serial, flags;
2827     int isFirst;
2828     struct rx_packet *tnp;
2829     struct clock when;
2830     MUTEX_ENTER(&rx_stats_mutex);
2831     rx_stats.dataPacketsRead++;
2832     MUTEX_EXIT(&rx_stats_mutex);
2833
2834 #ifdef KERNEL
2835     /* If there are no packet buffers, drop this new packet, unless we can find
2836      * packet buffers from inactive calls */
2837     if (!call->error &&
2838         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2839         MUTEX_ENTER(&rx_freePktQ_lock);
2840         rxi_NeedMorePackets = TRUE;
2841         MUTEX_EXIT(&rx_freePktQ_lock);
2842         MUTEX_ENTER(&rx_stats_mutex);
2843         rx_stats.noPacketBuffersOnRead++;
2844         MUTEX_EXIT(&rx_stats_mutex);
2845         call->rprev = np->header.serial;
2846         rxi_calltrace(RX_TRACE_DROP, call);
2847         dpf (("packet %x dropped on receipt - quota problems", np));
2848         if (rxi_doreclaim)
2849             rxi_ClearReceiveQueue(call);
2850         clock_GetTime(&when);
2851         clock_Add(&when, &rx_softAckDelay);
2852         if (!call->delayedAckEvent ||
2853             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2854             rxevent_Cancel(call->delayedAckEvent, call,
2855                            RX_CALL_REFCOUNT_DELAY);
2856             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2857             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2858                                                  call, 0);
2859         }
2860         /* we've damaged this call already, might as well do it in. */
2861         return np;
2862     }
2863 #endif /* KERNEL */
2864
2865     /*
2866      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2867      * packet is one of several packets transmitted as a single
2868      * datagram. Do not send any soft or hard acks until all packets
2869      * in a jumbogram have been processed. Send negative acks right away.
2870      */
2871     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2872         /* tnp is non-null when there are more packets in the
2873          * current jumbo gram */
2874         if (tnp) {
2875             if (np)
2876                 rxi_FreePacket(np);
2877             np = tnp;
2878         }
2879
2880         seq = np->header.seq;
2881         serial = np->header.serial;
2882         flags = np->header.flags;
2883
2884         /* If the call is in an error state, send an abort message */
2885         if (call->error)
2886             return rxi_SendCallAbort(call, np, istack, 0);
2887
2888         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2889          * AFS 3.5 jumbogram. */
2890         if (flags & RX_JUMBO_PACKET) {
2891             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2892         } else {
2893             tnp = NULL;
2894         }
2895
2896         if (np->header.spare != 0) {
2897             MUTEX_ENTER(&call->conn->conn_data_lock);
2898             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2899             MUTEX_EXIT(&call->conn->conn_data_lock);
2900         }
2901
2902         /* The usual case is that this is the expected next packet */
2903         if (seq == call->rnext) {
2904
2905             /* Check to make sure it is not a duplicate of one already queued */
2906             if (queue_IsNotEmpty(&call->rq) 
2907                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2908                 MUTEX_ENTER(&rx_stats_mutex);
2909                 rx_stats.dupPacketsRead++;
2910                 MUTEX_EXIT(&rx_stats_mutex);
2911                 dpf (("packet %x dropped on receipt - duplicate", np));
2912                 rxevent_Cancel(call->delayedAckEvent, call,
2913                                RX_CALL_REFCOUNT_DELAY);
2914                 np = rxi_SendAck(call, np, seq, serial,
2915                                  flags, RX_ACK_DUPLICATE, istack);
2916                 ackNeeded = 0;
2917                 call->rprev = seq;
2918                 continue;
2919             }
2920
2921             /* It's the next packet. Stick it on the receive queue
2922              * for this call. Set newPackets to make sure we wake
2923              * the reader once all packets have been processed */
2924             queue_Prepend(&call->rq, np);
2925             call->nSoftAcks++;
2926             np = NULL; /* We can't use this anymore */
2927             newPackets = 1;
2928
2929             /* If an ack is requested then set a flag to make sure we
2930              * send an acknowledgement for this packet */
2931             if (flags & RX_REQUEST_ACK) {
2932                 ackNeeded = 1;
2933             }
2934
2935             /* Keep track of whether we have received the last packet */
2936             if (flags & RX_LAST_PACKET) {
2937                 call->flags |= RX_CALL_HAVE_LAST;
2938                 haveLast = 1;
2939             }
2940
2941             /* Check whether we have all of the packets for this call */
2942             if (call->flags & RX_CALL_HAVE_LAST) {
2943                 afs_uint32 tseq;                /* temporary sequence number */
2944                 struct rx_packet *tp;   /* Temporary packet pointer */
2945                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2946
2947                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2948                     if (tseq != tp->header.seq)
2949                         break;
2950                     if (tp->header.flags & RX_LAST_PACKET) {
2951                         call->flags |= RX_CALL_RECEIVE_DONE;
2952                         break;
2953                     }
2954                     tseq++;
2955                 }
2956             }
2957
2958             /* Provide asynchronous notification for those who want it
2959              * (e.g. multi rx) */
2960             if (call->arrivalProc) {
2961                 (*call->arrivalProc)(call, call->arrivalProcHandle,
2962                                      call->arrivalProcArg);
2963                 call->arrivalProc = (VOID (*)()) 0;
2964             }
2965
2966             /* Update last packet received */
2967             call->rprev = seq;
2968
2969             /* If there is no server process serving this call, grab
2970              * one, if available. We only need to do this once. If a
2971              * server thread is available, this thread becomes a server
2972              * thread and the server thread becomes a listener thread. */
2973             if (isFirst) {
2974                 TryAttach(call, socket, tnop, newcallp);
2975             }
2976         }       
2977         /* This is not the expected next packet. */
2978         else {
2979             /* Determine whether this is a new or old packet, and if it's
2980              * a new one, whether it fits into the current receive window.
2981              * Also figure out whether the packet was delivered in sequence.
2982              * We use the prev variable to determine whether the new packet
2983              * is the successor of its immediate predecessor in the
2984              * receive queue, and the missing flag to determine whether
2985              * any of this packets predecessors are missing.  */
2986
2987             afs_uint32 prev;            /* "Previous packet" sequence number */
2988             struct rx_packet *tp;       /* Temporary packet pointer */
2989             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
2990             int missing;                /* Are any predecessors missing? */
2991
2992             /* If the new packet's sequence number has been sent to the
2993              * application already, then this is a duplicate */
2994             if (seq < call->rnext) {
2995                 MUTEX_ENTER(&rx_stats_mutex);
2996                 rx_stats.dupPacketsRead++;
2997                 MUTEX_EXIT(&rx_stats_mutex);
2998                 rxevent_Cancel(call->delayedAckEvent, call,
2999                                RX_CALL_REFCOUNT_DELAY);
3000                 np = rxi_SendAck(call, np, seq, serial,
3001                                  flags, RX_ACK_DUPLICATE, istack);
3002                 ackNeeded = 0;
3003                 call->rprev = seq;
3004                 continue;
3005             }
3006
3007             /* If the sequence number is greater than what can be
3008              * accomodated by the current window, then send a negative
3009              * acknowledge and drop the packet */
3010             if ((call->rnext + call->rwind) <= seq) {
3011                 rxevent_Cancel(call->delayedAckEvent, call,
3012                                RX_CALL_REFCOUNT_DELAY);
3013                 np = rxi_SendAck(call, np, seq, serial,
3014                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3015                 ackNeeded = 0;
3016                 call->rprev = seq;
3017                 continue;
3018             }
3019
3020             /* Look for the packet in the queue of old received packets */
3021             for (prev = call->rnext - 1, missing = 0,
3022                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3023                 /*Check for duplicate packet */
3024                 if (seq == tp->header.seq) {
3025                     MUTEX_ENTER(&rx_stats_mutex);
3026                     rx_stats.dupPacketsRead++;
3027                     MUTEX_EXIT(&rx_stats_mutex);
3028                     rxevent_Cancel(call->delayedAckEvent, call,
3029                                    RX_CALL_REFCOUNT_DELAY);
3030                     np = rxi_SendAck(call, np, seq, serial, 
3031                                      flags, RX_ACK_DUPLICATE, istack);
3032                     ackNeeded = 0;
3033                     call->rprev = seq;
3034                     goto nextloop;
3035                 }
3036                 /* If we find a higher sequence packet, break out and
3037                  * insert the new packet here. */
3038                 if (seq < tp->header.seq) break;
3039                 /* Check for missing packet */
3040                 if (tp->header.seq != prev+1) {
3041                     missing = 1;
3042                 }
3043
3044                 prev = tp->header.seq;
3045             }
3046
3047             /* Keep track of whether we have received the last packet. */
3048             if (flags & RX_LAST_PACKET) {
3049                 call->flags |= RX_CALL_HAVE_LAST;
3050             }
3051
3052             /* It's within the window: add it to the the receive queue.
3053              * tp is left by the previous loop either pointing at the
3054              * packet before which to insert the new packet, or at the
3055              * queue head if the queue is empty or the packet should be
3056              * appended. */
3057             queue_InsertBefore(tp, np);
3058             call->nSoftAcks++;
3059             np = NULL;
3060
3061             /* Check whether we have all of the packets for this call */
3062             if ((call->flags & RX_CALL_HAVE_LAST)
3063              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3064                 afs_uint32 tseq;                /* temporary sequence number */
3065
3066                 for (tseq = call->rnext,
3067                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3068                     if (tseq != tp->header.seq)
3069                         break;
3070                     if (tp->header.flags & RX_LAST_PACKET) {
3071                         call->flags |= RX_CALL_RECEIVE_DONE;
3072                         break;
3073                     }
3074                     tseq++;
3075                 }
3076             }
3077
3078             /* We need to send an ack of the packet is out of sequence, 
3079              * or if an ack was requested by the peer. */
3080             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3081                 ackNeeded = 1;
3082             }
3083
3084             /* Acknowledge the last packet for each call */
3085             if (flags & RX_LAST_PACKET) {
3086                 haveLast = 1;
3087             }
3088
3089             call->rprev = seq;
3090         }
3091 nextloop:;
3092     }
3093
3094     if (newPackets) {
3095         /*
3096          * If the receiver is waiting for an iovec, fill the iovec
3097          * using the data from the receive queue */
3098         if (call->flags & RX_CALL_IOVEC_WAIT) {
3099             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3100             /* the call may have been aborted */
3101             if (call->error) {
3102                 return NULL;
3103             }
3104             if (didHardAck) {
3105                 ackNeeded = 0;
3106             }
3107         }
3108
3109         /* Wakeup the reader if any */
3110         if ((call->flags & RX_CALL_READER_WAIT) &&
3111             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3112              (call->iovNext >= call->iovMax) ||
3113              (call->flags & RX_CALL_RECEIVE_DONE))) {
3114             call->flags &= ~RX_CALL_READER_WAIT;
3115 #ifdef  RX_ENABLE_LOCKS
3116             CV_BROADCAST(&call->cv_rq);
3117 #else
3118             osi_rxWakeup(&call->rq);
3119 #endif
3120         }
3121     }
3122
3123     /*
3124      * Send an ack when requested by the peer, or once every
3125      * rxi_SoftAckRate packets until the last packet has been
3126      * received. Always send a soft ack for the last packet in
3127      * the server's reply. */
3128     if (ackNeeded) {
3129         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3130         np = rxi_SendAck(call, np, seq, serial, flags,
3131                          RX_ACK_REQUESTED, istack);
3132     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3133         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3134         np = rxi_SendAck(call, np, seq, serial, flags,
3135                          RX_ACK_DELAY, istack);
3136     } else if (call->nSoftAcks) {
3137         clock_GetTime(&when);
3138         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3139             clock_Add(&when, &rx_lastAckDelay);
3140         } else {
3141             clock_Add(&when, &rx_softAckDelay);
3142         }
3143         if (!call->delayedAckEvent ||
3144             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3145             rxevent_Cancel(call->delayedAckEvent, call,
3146                            RX_CALL_REFCOUNT_DELAY);
3147             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3148             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3149                                                  call, 0);
3150         }
3151     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3152         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3153     }
3154
3155     return np;
3156 }
3157
3158 #ifdef  ADAPT_WINDOW
3159 static void rxi_ComputeRate();
3160 #endif
3161
3162 /* The real smarts of the whole thing.  */
3163 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3164     register struct rx_call *call;
3165     struct rx_packet *np;
3166     int istack;
3167 {
3168     struct rx_ackPacket *ap;
3169     int nAcks;
3170     register struct rx_packet *tp;
3171     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3172     register struct rx_connection *conn = call->conn;
3173     struct rx_peer *peer = conn->peer;
3174     afs_uint32 first;
3175     afs_uint32 serial;
3176     /* because there are CM's that are bogus, sending weird values for this. */
3177     afs_uint32 skew = 0;
3178     int needRxStart = 0;
3179     int nbytes;
3180     int missing;
3181     int acked;
3182     int nNacked = 0;
3183     int newAckCount = 0;
3184     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3185     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3186
3187     MUTEX_ENTER(&rx_stats_mutex);
3188     rx_stats.ackPacketsRead++;
3189     MUTEX_EXIT(&rx_stats_mutex);
3190     ap = (struct rx_ackPacket *) rx_DataOf(np);
3191     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3192     if (nbytes < 0)
3193       return np;       /* truncated ack packet */
3194
3195     /* depends on ack packet struct */
3196     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3197     first = ntohl(ap->firstPacket);
3198     serial = ntohl(ap->serial);
3199     /* temporarily disabled -- needs to degrade over time 
3200        skew = ntohs(ap->maxSkew); */
3201
3202     /* Ignore ack packets received out of order */
3203     if (first < call->tfirst) {
3204         return np;
3205     }
3206
3207     if (np->header.flags & RX_SLOW_START_OK) {
3208         call->flags |= RX_CALL_SLOW_START_OK;
3209     }
3210     
3211 #ifdef RXDEBUG
3212     if (rx_Log) {
3213       fprintf( rx_Log, 
3214               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3215               ap->reason, ntohl(ap->previousPacket), np->header.seq, serial, 
3216               skew, ntohl(ap->firstPacket));
3217         if (nAcks) {
3218             int offset;
3219             for (offset = 0; offset < nAcks; offset++) 
3220                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3221         }
3222         putc('\n', rx_Log);
3223     }
3224 #endif
3225
3226     /* if a server connection has been re-created, it doesn't remember what
3227         serial # it was up to.  An ack will tell us, since the serial field
3228         contains the largest serial received by the other side */
3229     MUTEX_ENTER(&conn->conn_data_lock);
3230     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3231         conn->serial = serial+1;
3232     }
3233     MUTEX_EXIT(&conn->conn_data_lock);
3234
3235     /* Update the outgoing packet skew value to the latest value of
3236      * the peer's incoming packet skew value.  The ack packet, of
3237      * course, could arrive out of order, but that won't affect things
3238      * much */
3239     MUTEX_ENTER(&peer->peer_lock);
3240     peer->outPacketSkew = skew;
3241
3242     /* Check for packets that no longer need to be transmitted, and
3243      * discard them.  This only applies to packets positively
3244      * acknowledged as having been sent to the peer's upper level.
3245      * All other packets must be retained.  So only packets with
3246      * sequence numbers < ap->firstPacket are candidates. */
3247     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3248         if (tp->header.seq >= first) break;
3249         call->tfirst = tp->header.seq + 1;
3250         if (tp->header.serial == serial) {
3251           rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3252 #ifdef ADAPT_WINDOW
3253           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3254 #endif
3255         }
3256         else if ((tp->firstSerial == serial)) {
3257           rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3258 #ifdef ADAPT_WINDOW
3259           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3260 #endif
3261         }
3262 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3263     /* XXX Hack. Because we have to release the global rx lock when sending
3264      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3265      * in rxi_Start sending packets out because packets may move to the
3266      * freePacketQueue as result of being here! So we drop these packets until
3267      * we're safely out of the traversing. Really ugly! 
3268      * To make it even uglier, if we're using fine grain locking, we can
3269      * set the ack bits in the packets and have rxi_Start remove the packets
3270      * when it's done transmitting.
3271      */
3272         if (!tp->acked) {
3273             newAckCount++;
3274         }
3275         if (call->flags & RX_CALL_TQ_BUSY) {
3276 #ifdef RX_ENABLE_LOCKS
3277             tp->acked = 1;
3278             call->flags |= RX_CALL_TQ_SOME_ACKED;
3279 #else /* RX_ENABLE_LOCKS */
3280             break;
3281 #endif /* RX_ENABLE_LOCKS */
3282         } else
3283 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3284         {
3285         queue_Remove(tp);
3286         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3287         }
3288     }
3289
3290 #ifdef ADAPT_WINDOW
3291     /* Give rate detector a chance to respond to ping requests */
3292     if (ap->reason == RX_ACK_PING_RESPONSE) {
3293         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3294     }
3295 #endif
3296
3297     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3298    
3299    /* Now go through explicit acks/nacks and record the results in
3300     * the waiting packets.  These are packets that can't be released
3301     * yet, even with a positive acknowledge.  This positive
3302     * acknowledge only means the packet has been received by the
3303     * peer, not that it will be retained long enough to be sent to
3304     * the peer's upper level.  In addition, reset the transmit timers
3305     * of any missing packets (those packets that must be missing
3306     * because this packet was out of sequence) */
3307
3308     call->nSoftAcked = 0;
3309     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3310         /* Update round trip time if the ack was stimulated on receipt
3311          * of this packet */
3312 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3313 #ifdef RX_ENABLE_LOCKS
3314         if (tp->header.seq >= first) {
3315 #endif /* RX_ENABLE_LOCKS */
3316 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3317         if (tp->header.serial == serial) {
3318           rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3319 #ifdef ADAPT_WINDOW
3320           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3321 #endif
3322         }
3323         else if ((tp->firstSerial == serial)) {
3324           rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3325 #ifdef ADAPT_WINDOW
3326           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3327 #endif
3328         }
3329 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3330 #ifdef RX_ENABLE_LOCKS
3331         }
3332 #endif /* RX_ENABLE_LOCKS */
3333 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3334
3335         /* Set the acknowledge flag per packet based on the
3336          * information in the ack packet. An acknowlegded packet can
3337          * be downgraded when the server has discarded a packet it
3338          * soacked previously, or when an ack packet is received
3339          * out of sequence. */
3340         if (tp->header.seq < first) {
3341             /* Implicit ack information */
3342             if (!tp->acked) {
3343                 newAckCount++;
3344             }
3345             tp->acked = 1;
3346         }
3347         else if (tp->header.seq < first + nAcks) {
3348             /* Explicit ack information:  set it in the packet appropriately */
3349             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3350                 if (!tp->acked) {
3351                     newAckCount++;
3352                     tp->acked = 1;
3353                 }
3354                 if (missing) {
3355                     nNacked++;
3356                 } else {
3357                     call->nSoftAcked++;
3358                 }
3359             } else {
3360                 tp->acked = 0;
3361                 missing = 1;
3362             }
3363         }
3364         else {
3365             tp->acked = 0;
3366             missing = 1;
3367         }
3368
3369         /* If packet isn't yet acked, and it has been transmitted at least 
3370          * once, reset retransmit time using latest timeout 
3371          * ie, this should readjust the retransmit timer for all outstanding 
3372          * packets...  So we don't just retransmit when we should know better*/
3373
3374         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3375           tp->retryTime = tp->timeSent;
3376           clock_Add(&tp->retryTime, &peer->timeout);
3377           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3378           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3379         }
3380     }
3381
3382     /* If the window has been extended by this acknowledge packet,
3383      * then wakeup a sender waiting in alloc for window space, or try
3384      * sending packets now, if he's been sitting on packets due to
3385      * lack of window space */
3386     if (call->tnext < (call->tfirst + call->twind))  {
3387 #ifdef  RX_ENABLE_LOCKS
3388         CV_SIGNAL(&call->cv_twind);
3389 #else
3390         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3391             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3392             osi_rxWakeup(&call->twind);
3393         }
3394 #endif
3395         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3396             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3397         }
3398     }
3399
3400     /* if the ack packet has a receivelen field hanging off it,
3401      * update our state */
3402     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3403       afs_uint32 tSize;
3404
3405       /* If the ack packet has a "recommended" size that is less than 
3406        * what I am using now, reduce my size to match */
3407       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3408                     sizeof(afs_int32), &tSize);
3409       tSize = (afs_uint32) ntohl(tSize);
3410       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3411
3412       /* Get the maximum packet size to send to this peer */
3413       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3414                     &tSize);
3415       tSize = (afs_uint32)ntohl(tSize);
3416       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3417       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3418
3419       /* sanity check - peer might have restarted with different params.
3420        * If peer says "send less", dammit, send less...  Peer should never 
3421        * be unable to accept packets of the size that prior AFS versions would
3422        * send without asking.  */
3423       if (peer->maxMTU != tSize) {
3424           peer->maxMTU = tSize;
3425           peer->MTU = MIN(tSize, peer->MTU);
3426           call->MTU = MIN(call->MTU, tSize);
3427           peer->congestSeq++;
3428       }
3429
3430       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3431           /* AFS 3.4a */
3432           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3433                         sizeof(afs_int32), &tSize);
3434           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3435           if (tSize < call->twind) {       /* smaller than our send */
3436               call->twind = tSize;         /* window, we must send less... */
3437               call->ssthresh = MIN(call->twind, call->ssthresh);
3438           }
3439
3440           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3441            * network MTU confused with the loopback MTU. Calculate the
3442            * maximum MTU here for use in the slow start code below.
3443            */
3444           maxMTU = peer->maxMTU;
3445           /* Did peer restart with older RX version? */
3446           if (peer->maxDgramPackets > 1) {
3447               peer->maxDgramPackets = 1;
3448           }
3449       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3450           /* AFS 3.5 */
3451           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3452                         sizeof(afs_int32), &tSize);
3453           tSize = (afs_uint32) ntohl(tSize);
3454           /*
3455            * As of AFS 3.5 we set the send window to match the receive window. 
3456            */
3457           if (tSize < call->twind) {
3458               call->twind = tSize;
3459               call->ssthresh = MIN(call->twind, call->ssthresh);
3460           } else if (tSize > call->twind) {
3461               call->twind = tSize;
3462           }
3463
3464           /*
3465            * As of AFS 3.5, a jumbogram is more than one fixed size
3466            * packet transmitted in a single UDP datagram. If the remote
3467            * MTU is smaller than our local MTU then never send a datagram
3468            * larger than the natural MTU.
3469            */
3470           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3471                         sizeof(afs_int32), &tSize);
3472           maxDgramPackets = (afs_uint32) ntohl(tSize);
3473           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3474           maxDgramPackets = MIN(maxDgramPackets,
3475                                 (int)(peer->ifDgramPackets));
3476           maxDgramPackets = MIN(maxDgramPackets, tSize);
3477           if (maxDgramPackets > 1) {
3478             peer->maxDgramPackets = maxDgramPackets;
3479             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3480           } else {
3481             peer->maxDgramPackets = 1;
3482             call->MTU = peer->natMTU;
3483           }
3484        } else if (peer->maxDgramPackets > 1) {
3485           /* Restarted with lower version of RX */
3486           peer->maxDgramPackets = 1;
3487        }
3488     } else if (peer->maxDgramPackets > 1 ||
3489                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3490         /* Restarted with lower version of RX */
3491         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3492         peer->natMTU = OLD_MAX_PACKET_SIZE;
3493         peer->MTU = OLD_MAX_PACKET_SIZE;
3494         peer->maxDgramPackets = 1;
3495         peer->nDgramPackets = 1;
3496         peer->congestSeq++;
3497         call->MTU = OLD_MAX_PACKET_SIZE;
3498     }
3499
3500     if (nNacked) {
3501         /*
3502          * Calculate how many datagrams were successfully received after
3503          * the first missing packet and adjust the negative ack counter
3504          * accordingly.
3505          */
3506         call->nAcks = 0;
3507         call->nNacks++;
3508         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3509         if (call->nNacks < nNacked) {
3510             call->nNacks = nNacked;
3511         }
3512     } else {
3513         if (newAckCount) {
3514             call->nAcks++;
3515         }
3516         call->nNacks = 0;
3517     }
3518
3519     if (call->flags & RX_CALL_FAST_RECOVER) {
3520         if (nNacked) {
3521             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3522         } else {
3523             call->flags &= ~RX_CALL_FAST_RECOVER;
3524             call->cwind = call->nextCwind;
3525             call->nextCwind = 0;
3526             call->nAcks = 0;
3527         }
3528         call->nCwindAcks = 0;
3529     }
3530     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3531         /* Three negative acks in a row trigger congestion recovery */
3532 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3533         MUTEX_EXIT(&peer->peer_lock);
3534         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3535                 /* someone else is waiting to start recovery */
3536                 return np;
3537         }
3538         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3539         while (call->flags & RX_CALL_TQ_BUSY) {
3540             call->flags |= RX_CALL_TQ_WAIT;
3541 #ifdef RX_ENABLE_LOCKS
3542             CV_WAIT(&call->cv_tq, &call->lock);
3543 #else /* RX_ENABLE_LOCKS */
3544             osi_rxSleep(&call->tq);
3545 #endif /* RX_ENABLE_LOCKS */
3546         }
3547         MUTEX_ENTER(&peer->peer_lock);
3548 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3549         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3550         call->flags |= RX_CALL_FAST_RECOVER;
3551         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3552         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3553                           rx_maxSendWindow);
3554         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3555         call->nextCwind = call->ssthresh;
3556         call->nAcks = 0;
3557         call->nNacks = 0;
3558         peer->MTU = call->MTU;
3559         peer->cwind = call->nextCwind;
3560         peer->nDgramPackets = call->nDgramPackets;
3561         peer->congestSeq++;
3562         call->congestSeq = peer->congestSeq;
3563         /* Reset the resend times on the packets that were nacked
3564          * so we will retransmit as soon as the window permits*/
3565         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3566             if (acked) {
3567                 if (!tp->acked) {
3568                     clock_Zero(&tp->retryTime);
3569                 }
3570             } else if (tp->acked) {
3571                 acked = 1;
3572             }
3573         }
3574     } else {
3575         /* If cwind is smaller than ssthresh, then increase
3576          * the window one packet for each ack we receive (exponential
3577          * growth).
3578          * If cwind is greater than or equal to ssthresh then increase
3579          * the congestion window by one packet for each cwind acks we
3580          * receive (linear growth).  */
3581         if (call->cwind < call->ssthresh) {
3582             call->cwind = MIN((int)call->ssthresh,
3583                               (int)(call->cwind + newAckCount));
3584             call->nCwindAcks = 0;
3585         } else {
3586             call->nCwindAcks += newAckCount;
3587             if (call->nCwindAcks >= call->cwind) {
3588                 call->nCwindAcks = 0;
3589                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3590             }
3591         }
3592         /*
3593          * If we have received several acknowledgements in a row then
3594          * it is time to increase the size of our datagrams
3595          */
3596         if ((int)call->nAcks > rx_nDgramThreshold) {
3597             if (peer->maxDgramPackets > 1) {
3598                 if (call->nDgramPackets < peer->maxDgramPackets) {
3599                     call->nDgramPackets++;
3600                 }
3601                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3602             } else if (call->MTU < peer->maxMTU) {
3603                 call->MTU += peer->natMTU;
3604                 call->MTU = MIN(call->MTU, peer->maxMTU);
3605             }
3606             call->nAcks = 0;
3607         }
3608     }
3609
3610     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3611
3612     /* Servers need to hold the call until all response packets have
3613      * been acknowledged. Soft acks are good enough since clients
3614      * are not allowed to clear their receive queues. */
3615     if (call->state == RX_STATE_HOLD &&
3616         call->tfirst + call->nSoftAcked >= call->tnext) {
3617         call->state = RX_STATE_DALLY;
3618         rxi_ClearTransmitQueue(call, 0);
3619     } else if (!queue_IsEmpty(&call->tq)) {
3620         rxi_Start(0, call, istack);
3621     }
3622     return np;
3623 }
3624
3625 /* Received a response to a challenge packet */
3626 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3627     register struct rx_connection *conn;
3628     register struct rx_packet *np;
3629     int istack;
3630 {
3631     int error;
3632
3633     /* Ignore the packet if we're the client */
3634     if (conn->type == RX_CLIENT_CONNECTION) return np;
3635
3636     /* If already authenticated, ignore the packet (it's probably a retry) */
3637     if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3638         return np;
3639
3640     /* Otherwise, have the security object evaluate the response packet */
3641     error = RXS_CheckResponse(conn->securityObject, conn, np);
3642     if (error) {
3643         /* If the response is invalid, reset the connection, sending
3644          * an abort to the peer */
3645 #ifndef KERNEL
3646         rxi_Delay(1);
3647 #endif
3648         rxi_ConnectionError(conn, error);
3649         MUTEX_ENTER(&conn->conn_data_lock);
3650         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3651         MUTEX_EXIT(&conn->conn_data_lock);
3652         return np;
3653     }
3654     else {
3655         /* If the response is valid, any calls waiting to attach
3656          * servers can now do so */
3657         int i;
3658         for (i=0; i<RX_MAXCALLS; i++) {
3659             struct rx_call *call = conn->call[i];
3660             if (call) {
3661                 MUTEX_ENTER(&call->lock);
3662                  if (call->state == RX_STATE_PRECALL)
3663                      rxi_AttachServerProc(call, -1, NULL, NULL);
3664                 MUTEX_EXIT(&call->lock);
3665             }
3666         }
3667     }
3668     return np;
3669 }
3670
3671 /* A client has received an authentication challenge: the security
3672  * object is asked to cough up a respectable response packet to send
3673  * back to the server.  The server is responsible for retrying the
3674  * challenge if it fails to get a response. */
3675
3676 struct rx_packet *
3677 rxi_ReceiveChallengePacket(conn, np, istack)
3678     register struct rx_connection *conn;
3679     register struct rx_packet *np;
3680     int istack;
3681 {
3682     int error;
3683
3684     /* Ignore the challenge if we're the server */