Initial IBM OpenAFS 1.0 tree
[openafs.git] / src / rx / rx.c
1 /*
2 ****************************************************************************
3 *        Copyright IBM Corporation 1988, 1989 - All Rights Reserved        *
4 *                                                                          *
5 * Permission to use, copy, modify, and distribute this software and its    *
6 * documentation for any purpose and without fee is hereby granted,         *
7 * provided that the above copyright notice appear in all copies and        *
8 * that both that copyright notice and this permission notice appear in     *
9 * supporting documentation, and that the name of IBM not be used in        *
10 * advertising or publicity pertaining to distribution of the software      *
11 * without specific, written prior permission.                              *
12 *                                                                          *
13 * IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
14 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
15 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY      *
16 * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER  *
17 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING   *
18 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.    *
19 ****************************************************************************
20 */
21
22 /* RX:  Extended Remote Procedure Call */
23
24 #ifdef  KERNEL
25 #include "../afs/param.h"
26 #include "../afs/sysincludes.h"
27 #include "../afs/afsincludes.h"
28 #ifndef UKERNEL
29 #include "../h/types.h"
30 #include "../h/time.h"
31 #include "../h/stat.h"
32 #ifdef  AFS_OSF_ENV
33 #include <net/net_globals.h>
34 #endif  /* AFS_OSF_ENV */
35 #ifdef AFS_LINUX20_ENV
36 #include "../h/socket.h"
37 #endif
38 #include "../netinet/in.h"
39 #include "../afs/afs_args.h"
40 #include "../afs/afs_osi.h"
41 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
42 #include "../h/systm.h"
43 #endif
44 #ifdef RXDEBUG
45 #undef RXDEBUG      /* turn off debugging */
46 #endif /* RXDEBUG */
47 #if defined(AFS_SGI_ENV)
48 #include "../sys/debug.h"
49 #endif
50 #include "../afsint/afsint.h"
51 #ifdef  AFS_ALPHA_ENV
52 #undef kmem_alloc
53 #undef kmem_free
54 #undef mem_alloc
55 #undef mem_free
56 #undef register
57 #endif  /* AFS_ALPHA_ENV */
58 #else /* !UKERNEL */
59 #include "../afs/sysincludes.h"
60 #include "../afs/afsincludes.h"
61 #endif /* !UKERNEL */
62 #include "../afs/lock.h"
63 #include "../rx/rx_kmutex.h"
64 #include "../rx/rx_kernel.h"
65 #include "../rx/rx_clock.h"
66 #include "../rx/rx_queue.h"
67 #include "../rx/rx.h"
68 #include "../rx/rx_globals.h"
69 #include "../rx/rx_trace.h"
70 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
71 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
72 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
73 #include "../afsint/afsint.h"
74 extern afs_int32 afs_termState;
75 #ifdef AFS_AIX41_ENV
76 #include "sys/lockl.h"
77 #include "sys/lock_def.h"
78 #endif /* AFS_AIX41_ENV */
79 # include "../afsint/rxgen_consts.h"
80 #else /* KERNEL */
81 # include <afs/param.h>
82 # include <sys/types.h>
83 # include <errno.h>
84 #ifdef AFS_NT40_ENV
85 # include <stdlib.h>
86 # include <fcntl.h>
87 # include <afsutil.h>
88 #else
89 # include <sys/socket.h>
90 # include <sys/file.h>
91 # include <netdb.h>
92 # include <sys/stat.h>
93 # include <netinet/in.h>
94 # include <sys/time.h>
95 #endif
96 # include "rx.h"
97 # include "rx_user.h"
98 # include "rx_clock.h"
99 # include "rx_queue.h"
100 # include "rx_globals.h"
101 # include "rx_trace.h"
102 # include "rx_internal.h"
103 # include <afs/rxgen_consts.h>
104 #endif /* KERNEL */
105
106 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
107 struct rx_tq_debug {
108     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
109     afs_int32 rxi_start_in_error;
110 } rx_tq_debug;
111 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
112
113 /*
114  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
115  * currently allocated within rx.  This number is used to allocate the
116  * memory required to return the statistics when queried.
117  */
118
119 static unsigned int rxi_rpc_peer_stat_cnt;
120
121 /*
122  * rxi_rpc_process_stat_cnt counts the total number of local process stat
123  * structures currently allocated within rx.  The number is used to allocate
124  * the memory required to return the statistics when queried.
125  */
126
127 static unsigned int rxi_rpc_process_stat_cnt;
128
129 #if !defined(offsetof)
130 #include <stddef.h>     /* for definition of offsetof() */
131 #endif
132
133 #ifdef AFS_PTHREAD_ENV
134 #include <assert.h>
135
136 /*
137  * Use procedural initialization of mutexes/condition variables
138  * to ease NT porting
139  */
140
141 extern pthread_mutex_t rxkad_stats_mutex;
142 extern pthread_mutex_t des_init_mutex;
143 extern pthread_mutex_t des_random_mutex;
144 extern pthread_mutex_t rx_clock_mutex;
145 extern pthread_mutex_t rxi_connCacheMutex;
146 extern pthread_mutex_t rx_event_mutex;
147 extern pthread_mutex_t osi_malloc_mutex;
148 extern pthread_mutex_t event_handler_mutex;
149 extern pthread_mutex_t listener_mutex;
150 extern pthread_mutex_t rx_if_init_mutex;
151 extern pthread_mutex_t rx_if_mutex;
152 extern pthread_mutex_t rxkad_client_uid_mutex;
153 extern pthread_mutex_t rxkad_random_mutex;
154
155 extern pthread_cond_t rx_event_handler_cond;
156 extern pthread_cond_t rx_listener_cond;
157
158 static pthread_mutex_t epoch_mutex;
159 static pthread_mutex_t rx_init_mutex;
160 static pthread_mutex_t rx_debug_mutex;
161
162 static void rxi_InitPthread(void) {
163     assert(pthread_mutex_init(&rx_clock_mutex,
164                               (const pthread_mutexattr_t*)0)==0);
165     assert(pthread_mutex_init(&rxi_connCacheMutex,
166                               (const pthread_mutexattr_t*)0)==0);
167     assert(pthread_mutex_init(&rx_init_mutex,
168                               (const pthread_mutexattr_t*)0)==0);
169     assert(pthread_mutex_init(&epoch_mutex,
170                               (const pthread_mutexattr_t*)0)==0);
171     assert(pthread_mutex_init(&rx_event_mutex,
172                               (const pthread_mutexattr_t*)0)==0);
173     assert(pthread_mutex_init(&des_init_mutex,
174                               (const pthread_mutexattr_t*)0)==0);
175     assert(pthread_mutex_init(&des_random_mutex,
176                               (const pthread_mutexattr_t*)0)==0);
177     assert(pthread_mutex_init(&osi_malloc_mutex,
178                               (const pthread_mutexattr_t*)0)==0);
179     assert(pthread_mutex_init(&event_handler_mutex,
180                               (const pthread_mutexattr_t*)0)==0);
181     assert(pthread_mutex_init(&listener_mutex,
182                               (const pthread_mutexattr_t*)0)==0);
183     assert(pthread_mutex_init(&rx_if_init_mutex,
184                               (const pthread_mutexattr_t*)0)==0);
185     assert(pthread_mutex_init(&rx_if_mutex,
186                               (const pthread_mutexattr_t*)0)==0);
187     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
188                               (const pthread_mutexattr_t*)0)==0);
189     assert(pthread_mutex_init(&rxkad_random_mutex,
190                               (const pthread_mutexattr_t*)0)==0);
191     assert(pthread_mutex_init(&rxkad_stats_mutex,
192                               (const pthread_mutexattr_t*)0)==0);
193     assert(pthread_mutex_init(&rx_debug_mutex,
194                               (const pthread_mutexattr_t*)0)==0);
195
196     assert(pthread_cond_init(&rx_event_handler_cond,
197                               (const pthread_condattr_t*)0)==0);
198     assert(pthread_cond_init(&rx_listener_cond,
199                               (const pthread_condattr_t*)0)==0);
200     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
201 }
202
203 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
204 #define INIT_PTHREAD_LOCKS \
205 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
206 /*
207  * The rx_stats_mutex mutex protects the following global variables:
208  * rxi_dataQuota
209  * rxi_minDeficit
210  * rxi_availProcs
211  * rxi_totalMin
212  * rxi_lowConnRefCount
213  * rxi_lowPeerRefCount
214  * rxi_nCalls
215  * rxi_Alloccnt
216  * rxi_Allocsize
217  * rx_nFreePackets
218  * rx_tq_debug
219  * rx_stats
220  */
221 #else
222 #define INIT_PTHREAD_LOCKS
223 #endif
224
225 extern void rxi_DeleteCachedConnections(void);
226
227
228 /* Variables for handling the minProcs implementation.  availProcs gives the
229  * number of threads available in the pool at this moment (not counting dudes
230  * executing right now).  totalMin gives the total number of procs required
231  * for handling all minProcs requests.  minDeficit is a dynamic variable
232  * tracking the # of procs required to satisfy all of the remaining minProcs
233  * demands.
234  * For fine grain locking to work, the quota check and the reservation of
235  * a server thread has to come while rxi_availProcs and rxi_minDeficit
236  * are locked. To this end, the code has been modified under #ifdef
237  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
238  * same time. A new function, ReturnToServerPool() returns the allocation.
239  * 
240  * A call can be on several queue's (but only one at a time). When
241  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
242  * that no one else is touching the queue. To this end, we store the address
243  * of the queue lock in the call structure (under the call lock) when we
244  * put the call on a queue, and we clear the call_queue_lock when the
245  * call is removed from a queue (once the call lock has been obtained).
246  * This allows rxi_ResetCall to safely synchronize with others wishing
247  * to manipulate the queue.
248  */
249
250 extern void rxi_Delay(int);
251
252 static int rxi_ServerThreadSelectingCall;
253
254 #ifdef RX_ENABLE_LOCKS
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
257 #endif
258
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
260 ** pretty good that the next packet coming in is from the same connection 
261 ** as the last packet, since we're send multiple packets in a transmit window.
262 */
263 struct rx_connection *rxLastConn; 
264
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
267  * tiers:
268  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269  * call->lock - locks call data fields.
270  * Most any other lock - these are all independent of each other.....
271  *      rx_freePktQ_lock
272  *      rx_freeCallQueue_lock
273  *      freeSQEList_lock
274  *      rx_connHashTable_lock
275  *      rx_serverPool_lock
276  *      rxi_keyCreate_lock
277  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
278
279  * lowest level:
280  *      peer_lock - locks peer data fields.
281  *      conn_data_lock - that more than one thread is not updating a conn data
282  *              field at the same time.
283  * Do we need a lock to protect the peer field in the conn structure?
284  *      conn->peer was previously a constant for all intents and so has no
285  *      lock protecting this field. The multihomed client delta introduced
286  *      a RX code change : change the peer field in the connection structure
287  *      to that remote inetrface from which the last packet for this
288  *      connection was sent out. This may become an issue if further changes
289  *      are made.
290  */
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
293 #ifdef RX_LOCKS_DB
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 void rxi_DestroyConnection();
305 void rxi_CleanupConnection();
306 struct rx_serverQueueEntry *rx_waitForPacket = 0;
307
308 /* ------------Exported Interfaces------------- */
309
310 /* This function allows rxkad to set the epoch to a suitably random number
311  * which rx_NewConnection will use in the future.  The principle purpose is to
312  * get rxnull connections to use the same epoch as the rxkad connections do, at
313  * least once the first rxkad connection is established.  This is important now
314  * that the host/port addresses aren't used in FindConnection: the uniqueness
315  * of epoch/cid matters and the start time won't do. */
316
317 #ifdef AFS_PTHREAD_ENV
318 /*
319  * This mutex protects the following global variables:
320  * rx_epoch
321  */
322
323 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
324 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
325 #else
326 #define LOCK_EPOCH
327 #define UNLOCK_EPOCH
328 #endif /* AFS_PTHREAD_ENV */
329
330 void rx_SetEpoch (epoch)
331   afs_uint32 epoch;
332 {
333     LOCK_EPOCH
334     rx_epoch = epoch;
335     UNLOCK_EPOCH
336 }
337
338 /* Initialize rx.  A port number may be mentioned, in which case this
339  * becomes the default port number for any service installed later.
340  * If 0 is provided for the port number, a random port will be chosen
341  * by the kernel.  Whether this will ever overlap anything in
342  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
343  * error. */
344 static int rxinit_status = 1;
345 #ifdef AFS_PTHREAD_ENV
346 /*
347  * This mutex protects the following global variables:
348  * rxinit_status
349  */
350
351 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
352 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
353 #else
354 #define LOCK_RX_INIT
355 #define UNLOCK_RX_INIT
356 #endif
357
358 int rx_Init(u_int port)
359 {
360 #ifdef KERNEL
361     osi_timeval_t tv;
362 #else /* KERNEL */
363     struct timeval tv;
364 #endif /* KERNEL */
365     char *htable, *ptable;
366     int tmp_status;
367
368     SPLVAR;
369
370     INIT_PTHREAD_LOCKS
371     LOCK_RX_INIT
372     if (rxinit_status == 0) {
373         tmp_status = rxinit_status;
374         UNLOCK_RX_INIT
375         return tmp_status; /* Already started; return previous error code. */
376     }
377
378 #ifdef AFS_NT40_ENV
379     if (afs_winsockInit()<0)
380         return -1;
381 #endif
382
383 #ifndef KERNEL
384     /*
385      * Initialize anything necessary to provide a non-premptive threading
386      * environment.
387      */
388     rxi_InitializeThreadSupport();
389 #endif
390
391     /* Allocate and initialize a socket for client and perhaps server
392      * connections. */
393
394     rx_socket = rxi_GetUDPSocket((u_short)port); 
395     if (rx_socket == OSI_NULLSOCKET) {
396         UNLOCK_RX_INIT
397         return RX_ADDRINUSE;
398     }
399     
400
401 #ifdef  RX_ENABLE_LOCKS
402 #ifdef RX_LOCKS_DB
403     rxdb_init();
404 #endif /* RX_LOCKS_DB */
405     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
406     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
407     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
408     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
409     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
410                MUTEX_DEFAULT,0);
411     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
412     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
413     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
414     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
415 #ifndef KERNEL
416     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
417 #endif /* !KERNEL */
418     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
419 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
420     if ( !uniprocessor )
421       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
422 #endif /* KERNEL && AFS_HPUX110_ENV */
423 #else /* RX_ENABLE_LOCKS */
424 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
425     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
426 #endif /* AFS_GLOBAL_SUNLOCK */
427 #endif /* RX_ENABLE_LOCKS */
428
429     rxi_nCalls = 0;
430     rx_connDeadTime = 12;
431     rx_tranquil     = 0;        /* reset flag */
432     bzero((char *)&rx_stats, sizeof(struct rx_stats));
433     htable = (char *)
434         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
435     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
436     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
437     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
438     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
439     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
440
441     /* Malloc up a bunch of packets & buffers */
442     rx_nFreePackets = 0;
443     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
444     queue_Init(&rx_freePacketQueue);
445     rxi_NeedMorePackets = FALSE;
446     rxi_MorePackets(rx_nPackets);
447     rx_CheckPackets();
448
449     NETPRI;
450     AFS_RXGLOCK();
451
452     clock_Init();
453
454 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
455     tv.tv_sec = clock_now.sec;
456     tv.tv_usec = clock_now.usec;
457     srand((unsigned int) tv.tv_usec);
458 #else
459     osi_GetTime(&tv);
460 #endif
461     /* *Slightly* random start time for the cid.  This is just to help
462      * out with the hashing function at the peer */
463     rx_port = port;
464     rx_stats.minRtt.sec = 9999999;
465 #ifdef  KERNEL
466     rx_SetEpoch (tv.tv_sec | 0x80000000);
467 #else
468     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
469                                          * will provide a randomer value. */
470 #endif
471     MUTEX_ENTER(&rx_stats_mutex);
472     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
473     MUTEX_EXIT(&rx_stats_mutex);
474     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
475     rx_connHashTable = (struct rx_connection **) htable;
476     rx_peerHashTable = (struct rx_peer **) ptable;
477
478     rx_lastAckDelay.sec = 0;
479     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
480     rx_hardAckDelay.sec = 0;
481     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
482     rx_softAckDelay.sec = 0;
483     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
484
485     rxevent_Init(20, rxi_ReScheduleEvents);
486
487     /* Initialize various global queues */
488     queue_Init(&rx_idleServerQueue);
489     queue_Init(&rx_incomingCallQueue);
490     queue_Init(&rx_freeCallQueue);
491
492 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
493     /* Initialize our list of usable IP addresses. */
494     rx_GetIFInfo();
495 #endif
496
497     /* Start listener process (exact function is dependent on the
498      * implementation environment--kernel or user space) */
499     rxi_StartListener();
500
501     AFS_RXGUNLOCK();
502     USERPRI;
503     tmp_status = rxinit_status = 0;
504     UNLOCK_RX_INIT
505     return tmp_status;
506 }
507
508 /* called with unincremented nRequestsRunning to see if it is OK to start
509  * a new thread in this service.  Could be "no" for two reasons: over the
510  * max quota, or would prevent others from reaching their min quota.
511  */
512 #ifdef RX_ENABLE_LOCKS
513 /* This verion of QuotaOK reserves quota if it's ok while the
514  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
515  */
516 static int QuotaOK(aservice)
517 register struct rx_service *aservice;
518 {
519     /* check if over max quota */
520     if (aservice->nRequestsRunning >= aservice->maxProcs) {
521         return 0;
522     }
523
524     /* under min quota, we're OK */
525     /* otherwise, can use only if there are enough to allow everyone
526      * to go to their min quota after this guy starts.
527      */
528     MUTEX_ENTER(&rx_stats_mutex);
529     if ((aservice->nRequestsRunning < aservice->minProcs) ||
530          (rxi_availProcs > rxi_minDeficit)) {
531         aservice->nRequestsRunning++;
532         /* just started call in minProcs pool, need fewer to maintain
533          * guarantee */
534         if (aservice->nRequestsRunning <= aservice->minProcs)
535             rxi_minDeficit--;
536         rxi_availProcs--;
537         MUTEX_EXIT(&rx_stats_mutex);
538         return 1;
539     }
540     MUTEX_EXIT(&rx_stats_mutex);
541
542     return 0;
543 }
544 static void ReturnToServerPool(aservice)
545 register struct rx_service *aservice;
546 {
547     aservice->nRequestsRunning--;
548     MUTEX_ENTER(&rx_stats_mutex);
549     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
550     rxi_availProcs++;
551     MUTEX_EXIT(&rx_stats_mutex);
552 }
553
554 #else /* RX_ENABLE_LOCKS */
555 static QuotaOK(aservice)
556 register struct rx_service *aservice; {
557     int rc=0;
558     /* under min quota, we're OK */
559     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
560
561     /* check if over max quota */
562     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
563
564     /* otherwise, can use only if there are enough to allow everyone
565      * to go to their min quota after this guy starts.
566      */
567     if (rxi_availProcs > rxi_minDeficit) rc = 1;
568     return rc;
569 }
570 #endif /* RX_ENABLE_LOCKS */
571
572 #ifndef KERNEL
573 /* Called by rx_StartServer to start up lwp's to service calls.
574    NExistingProcs gives the number of procs already existing, and which
575    therefore needn't be created. */
576 void rxi_StartServerProcs(nExistingProcs)
577     int nExistingProcs;
578 {
579     register struct rx_service *service;
580     register int i;
581     int maxdiff = 0;
582     int nProcs = 0;
583
584     /* For each service, reserve N processes, where N is the "minimum"
585        number of processes that MUST be able to execute a request in parallel,
586        at any time, for that process.  Also compute the maximum difference
587        between any service's maximum number of processes that can run
588        (i.e. the maximum number that ever will be run, and a guarantee
589        that this number will run if other services aren't running), and its
590        minimum number.  The result is the extra number of processes that
591        we need in order to provide the latter guarantee */
592     for (i=0; i<RX_MAX_SERVICES; i++) {
593         int diff;
594         service = rx_services[i];
595         if (service == (struct rx_service *) 0) break;
596         nProcs += service->minProcs;
597         diff = service->maxProcs - service->minProcs;
598         if (diff > maxdiff) maxdiff = diff;
599     }
600     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
601     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
602     for (i = 0; i<nProcs; i++) {
603         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
604     }
605 }
606 #endif /* KERNEL */
607
608 /* This routine must be called if any services are exported.  If the
609  * donateMe flag is set, the calling process is donated to the server
610  * process pool */
611 void rx_StartServer(donateMe)
612 {
613     register struct rx_service *service;
614     register int i;
615     SPLVAR;
616     clock_NewTime();
617
618     NETPRI;
619     AFS_RXGLOCK();
620     /* Start server processes, if necessary (exact function is dependent
621      * on the implementation environment--kernel or user space).  DonateMe
622      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
623      * case, one less new proc will be created rx_StartServerProcs.
624      */
625     rxi_StartServerProcs(donateMe);
626
627     /* count up the # of threads in minProcs, and add set the min deficit to
628      * be that value, too.
629      */
630     for (i=0; i<RX_MAX_SERVICES; i++) {
631         service = rx_services[i];
632         if (service == (struct rx_service *) 0) break;
633         MUTEX_ENTER(&rx_stats_mutex);
634         rxi_totalMin += service->minProcs;
635         /* below works even if a thread is running, since minDeficit would
636          * still have been decremented and later re-incremented.
637          */
638         rxi_minDeficit += service->minProcs;
639         MUTEX_EXIT(&rx_stats_mutex);
640     }
641
642     /* Turn on reaping of idle server connections */
643     rxi_ReapConnections();
644
645     AFS_RXGUNLOCK();
646     USERPRI;
647
648     if (donateMe) rx_ServerProc(); /* Never returns */
649     return;
650 }
651
652 /* Create a new client connection to the specified service, using the
653  * specified security object to implement the security model for this
654  * connection. */
655 struct rx_connection *
656 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
657     register afs_uint32 shost;      /* Server host */
658     u_short sport;                  /* Server port */
659     u_short sservice;               /* Server service id */
660     register struct rx_securityClass *securityObject;
661     int serviceSecurityIndex;
662 {
663     int hashindex;
664     afs_int32 cid;
665     register struct rx_connection *conn;
666
667     SPLVAR;
668
669     clock_NewTime();
670     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
671           shost, sport, sservice, securityObject, serviceSecurityIndex));
672
673     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
674      * the case of kmem_alloc? */
675     conn = rxi_AllocConnection();
676 #ifdef  RX_ENABLE_LOCKS
677     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
678     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
679     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
680 #endif
681     NETPRI;
682     AFS_RXGLOCK();
683     MUTEX_ENTER(&rx_connHashTable_lock);
684     cid = (rx_nextCid += RX_MAXCALLS);
685     conn->type = RX_CLIENT_CONNECTION;
686     conn->cid = cid;
687     conn->epoch = rx_epoch;
688     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
689     conn->serviceId = sservice;
690     conn->securityObject = securityObject;
691     /* This doesn't work in all compilers with void (they're buggy), so fake it
692      * with VOID */
693     conn->securityData = (VOID *) 0;
694     conn->securityIndex = serviceSecurityIndex;
695     rx_SetConnDeadTime(conn, rx_connDeadTime);
696     conn->ackRate = RX_FAST_ACK_RATE;
697     conn->nSpecific = 0;
698     conn->specific = NULL;
699     conn->challengeEvent = (struct rxevent *)0;
700     conn->delayedAbortEvent = (struct rxevent *)0;
701     conn->abortCount = 0;
702     conn->error = 0;
703
704     RXS_NewConnection(securityObject, conn);
705     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
706     
707     conn->refCount++; /* no lock required since only this thread knows... */
708     conn->next = rx_connHashTable[hashindex];
709     rx_connHashTable[hashindex] = conn;
710     MUTEX_ENTER(&rx_stats_mutex);
711     rx_stats.nClientConns++;
712     MUTEX_EXIT(&rx_stats_mutex);
713
714     MUTEX_EXIT(&rx_connHashTable_lock);
715     AFS_RXGUNLOCK();
716     USERPRI;
717     return conn;
718 }
719
720 void rx_SetConnDeadTime(conn, seconds)
721     register struct rx_connection *conn;
722     register int seconds;
723 {
724     /* The idea is to set the dead time to a value that allows several
725      * keepalives to be dropped without timing out the connection. */
726     conn->secondsUntilDead = MAX(seconds, 6);
727     conn->secondsUntilPing = conn->secondsUntilDead/6;
728 }
729
730 int rxi_lowPeerRefCount = 0;
731 int rxi_lowConnRefCount = 0;
732
733 /*
734  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
735  * NOTE: must not be called with rx_connHashTable_lock held.
736  */
737 void rxi_CleanupConnection(conn)
738     struct rx_connection *conn;
739 {
740     int i;
741
742     /* Notify the service exporter, if requested, that this connection
743      * is being destroyed */
744     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
745       (*conn->service->destroyConnProc)(conn);
746
747     /* Notify the security module that this connection is being destroyed */
748     RXS_DestroyConnection(conn->securityObject, conn);
749
750     /* If this is the last connection using the rx_peer struct, set its
751      * idle time to now. rxi_ReapConnections will reap it if it's still
752      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
753      */
754     MUTEX_ENTER(&rx_peerHashTable_lock);
755     if (--conn->peer->refCount <= 0) {
756         conn->peer->idleWhen = clock_Sec();
757         if (conn->peer->refCount < 0) {
758             conn->peer->refCount = 0; 
759             MUTEX_ENTER(&rx_stats_mutex);
760             rxi_lowPeerRefCount ++;
761             MUTEX_EXIT(&rx_stats_mutex);
762         }
763     }
764     MUTEX_EXIT(&rx_peerHashTable_lock);
765
766     MUTEX_ENTER(&rx_stats_mutex);
767     if (conn->type == RX_SERVER_CONNECTION)
768       rx_stats.nServerConns--;
769     else
770       rx_stats.nClientConns--;
771     MUTEX_EXIT(&rx_stats_mutex);
772
773 #ifndef KERNEL
774     if (conn->specific) {
775         for (i = 0 ; i < conn->nSpecific ; i++) {
776             if (conn->specific[i] && rxi_keyCreate_destructor[i])
777                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
778             conn->specific[i] = NULL;
779         }
780         free(conn->specific);
781     }
782     conn->specific = NULL;
783     conn->nSpecific = 0;
784 #endif /* !KERNEL */
785
786     MUTEX_DESTROY(&conn->conn_call_lock);
787     MUTEX_DESTROY(&conn->conn_data_lock);
788     CV_DESTROY(&conn->conn_call_cv);
789         
790     rxi_FreeConnection(conn);
791 }
792
793 /* Destroy the specified connection */
794 void rxi_DestroyConnection(conn)
795     register struct rx_connection *conn;
796 {
797     MUTEX_ENTER(&rx_connHashTable_lock);
798     rxi_DestroyConnectionNoLock(conn);
799     /* conn should be at the head of the cleanup list */
800     if (conn == rx_connCleanup_list) {
801         rx_connCleanup_list = rx_connCleanup_list->next;
802         MUTEX_EXIT(&rx_connHashTable_lock);
803         rxi_CleanupConnection(conn);
804     }
805 #ifdef RX_ENABLE_LOCKS
806     else {
807         MUTEX_EXIT(&rx_connHashTable_lock);
808     }
809 #endif /* RX_ENABLE_LOCKS */
810 }
811     
812 static void rxi_DestroyConnectionNoLock(conn)
813     register struct rx_connection *conn;
814 {
815     register struct rx_connection **conn_ptr;
816     register int havecalls = 0;
817     struct rx_packet *packet;
818     int i;
819     SPLVAR;
820
821     clock_NewTime();
822
823     NETPRI;
824     MUTEX_ENTER(&conn->conn_data_lock);
825     if (conn->refCount > 0)
826         conn->refCount--;
827     else {
828         MUTEX_ENTER(&rx_stats_mutex);
829         rxi_lowConnRefCount++;
830         MUTEX_EXIT(&rx_stats_mutex);
831     }
832
833     if (conn->refCount > 0) {
834         /* Busy; wait till the last guy before proceeding */
835         MUTEX_EXIT(&conn->conn_data_lock);
836         USERPRI;
837         return;
838     }
839
840     /* If the client previously called rx_NewCall, but it is still
841      * waiting, treat this as a running call, and wait to destroy the
842      * connection later when the call completes. */
843     if ((conn->type == RX_CLIENT_CONNECTION) &&
844         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
845         conn->flags |= RX_CONN_DESTROY_ME;
846         MUTEX_EXIT(&conn->conn_data_lock);
847         USERPRI;
848         return;
849     }
850     MUTEX_EXIT(&conn->conn_data_lock);
851
852     /* Check for extant references to this connection */
853     for (i = 0; i<RX_MAXCALLS; i++) {
854         register struct rx_call *call = conn->call[i];
855         if (call) {
856             havecalls = 1;
857             if (conn->type == RX_CLIENT_CONNECTION) {
858                 MUTEX_ENTER(&call->lock);
859                 if (call->delayedAckEvent) {
860                     /* Push the final acknowledgment out now--there
861                      * won't be a subsequent call to acknowledge the
862                      * last reply packets */
863                     rxevent_Cancel(call->delayedAckEvent, call,
864                                    RX_CALL_REFCOUNT_DELAY);
865                     rxi_AckAll((struct rxevent *)0, call, 0);
866                 }
867                 MUTEX_EXIT(&call->lock);
868             }
869         }
870     }
871 #ifdef RX_ENABLE_LOCKS
872     if (!havecalls) {
873         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
874             MUTEX_EXIT(&conn->conn_data_lock);
875         }
876         else {
877             /* Someone is accessing a packet right now. */
878             havecalls = 1;
879         }
880     }
881 #endif /* RX_ENABLE_LOCKS */
882
883     if (havecalls) {
884         /* Don't destroy the connection if there are any call
885          * structures still in use */
886         MUTEX_ENTER(&conn->conn_data_lock);
887         conn->flags |= RX_CONN_DESTROY_ME;
888         MUTEX_EXIT(&conn->conn_data_lock);
889         USERPRI;
890         return;
891     }
892
893     if (conn->delayedAbortEvent) {
894         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
895         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
896         if (packet) {
897             MUTEX_ENTER(&conn->conn_data_lock);
898             rxi_SendConnectionAbort(conn, packet, 0, 1);
899             MUTEX_EXIT(&conn->conn_data_lock);
900             rxi_FreePacket(packet);
901         }
902     }
903
904     /* Remove from connection hash table before proceeding */
905     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
906                                              conn->epoch, conn->type) ];
907     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
908         if (*conn_ptr == conn) {
909             *conn_ptr = conn->next;
910             break;
911         }
912     }
913     /* if the conn that we are destroying was the last connection, then we
914     * clear rxLastConn as well */
915     if ( rxLastConn == conn )
916         rxLastConn = 0;
917
918     /* Make sure the connection is completely reset before deleting it. */
919     /* get rid of pending events that could zap us later */
920     if (conn->challengeEvent) {
921         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
922     }
923  
924     /* Add the connection to the list of destroyed connections that
925      * need to be cleaned up. This is necessary to avoid deadlocks
926      * in the routines we call to inform others that this connection is
927      * being destroyed. */
928     conn->next = rx_connCleanup_list;
929     rx_connCleanup_list = conn;
930 }
931
932 /* Externally available version */
933 void rx_DestroyConnection(conn) 
934     register struct rx_connection *conn;
935 {
936     SPLVAR;
937
938     NETPRI;
939     AFS_RXGLOCK();
940     rxi_DestroyConnection (conn);
941     AFS_RXGUNLOCK();
942     USERPRI;
943 }
944
945 /* Start a new rx remote procedure call, on the specified connection.
946  * If wait is set to 1, wait for a free call channel; otherwise return
947  * 0.  Maxtime gives the maximum number of seconds this call may take,
948  * after rx_MakeCall returns.  After this time interval, a call to any
949  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
950  * For fine grain locking, we hold the conn_call_lock in order to 
951  * to ensure that we don't get signalle after we found a call in an active
952  * state and before we go to sleep.
953  */
954 struct rx_call *rx_NewCall(conn)
955     register struct rx_connection *conn;
956 {
957     register int i;
958     register struct rx_call *call;
959     struct clock queueTime;
960     SPLVAR;
961
962     clock_NewTime();
963     dpf (("rx_MakeCall(conn %x)\n", conn));
964
965     NETPRI;
966     clock_GetTime(&queueTime);
967     AFS_RXGLOCK();
968     MUTEX_ENTER(&conn->conn_call_lock);
969     for (;;) {
970         for (i=0; i<RX_MAXCALLS; i++) {
971             call = conn->call[i];
972             if (call) {
973                 MUTEX_ENTER(&call->lock);
974                 if (call->state == RX_STATE_DALLY) {
975                     rxi_ResetCall(call, 0);
976                     (*call->callNumber)++;
977                     break;
978                 }
979                 MUTEX_EXIT(&call->lock);
980             }
981             else {
982                 call = rxi_NewCall(conn, i);
983                 MUTEX_ENTER(&call->lock);
984                 break;
985             }
986         }
987         if (i < RX_MAXCALLS) {
988             break;
989         }
990         MUTEX_ENTER(&conn->conn_data_lock);
991         conn->flags |= RX_CONN_MAKECALL_WAITING;
992         MUTEX_EXIT(&conn->conn_data_lock);
993 #ifdef  RX_ENABLE_LOCKS
994         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
995 #else
996         osi_rxSleep(conn);
997 #endif
998     }
999
1000     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1001
1002     /* Client is initially in send mode */
1003     call->state = RX_STATE_ACTIVE;
1004     call->mode = RX_MODE_SENDING;
1005
1006     /* remember start time for call in case we have hard dead time limit */
1007     call->queueTime = queueTime;
1008     clock_GetTime(&call->startTime);
1009     hzero(call->bytesSent);
1010     hzero(call->bytesRcvd);
1011
1012     /* Turn on busy protocol. */
1013     rxi_KeepAliveOn(call);
1014
1015     MUTEX_EXIT(&call->lock);
1016     MUTEX_EXIT(&conn->conn_call_lock);
1017     AFS_RXGUNLOCK();
1018     USERPRI;
1019
1020 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1021     /* Now, if TQ wasn't cleared earlier, do it now. */
1022     AFS_RXGLOCK();
1023     MUTEX_ENTER(&call->lock);
1024     while (call->flags & RX_CALL_TQ_BUSY) {
1025         call->flags |= RX_CALL_TQ_WAIT;
1026 #ifdef RX_ENABLE_LOCKS
1027         CV_WAIT(&call->cv_tq, &call->lock);
1028 #else /* RX_ENABLE_LOCKS */
1029         osi_rxSleep(&call->tq);
1030 #endif /* RX_ENABLE_LOCKS */
1031     }
1032     if (call->flags & RX_CALL_TQ_CLEARME) {
1033         rxi_ClearTransmitQueue(call, 0);
1034         queue_Init(&call->tq);
1035     }
1036     MUTEX_EXIT(&call->lock);
1037     AFS_RXGUNLOCK();
1038 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1039
1040     return call;
1041 }
1042
1043 rxi_HasActiveCalls(aconn)
1044 register struct rx_connection *aconn; {
1045     register int i;
1046     register struct rx_call *tcall;
1047     SPLVAR;
1048
1049     NETPRI;
1050     for(i=0; i<RX_MAXCALLS; i++) {
1051       if (tcall = aconn->call[i]) {
1052         if ((tcall->state == RX_STATE_ACTIVE) 
1053             || (tcall->state == RX_STATE_PRECALL)) {
1054           USERPRI;
1055           return 1;
1056         }
1057       }
1058     }
1059     USERPRI;
1060     return 0;
1061 }
1062
1063 rxi_GetCallNumberVector(aconn, aint32s)
1064 register struct rx_connection *aconn;
1065 register afs_int32 *aint32s; {
1066     register int i;
1067     register struct rx_call *tcall;
1068     SPLVAR;
1069
1070     NETPRI;
1071     for(i=0; i<RX_MAXCALLS; i++) {
1072         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1073             aint32s[i] = aconn->callNumber[i]+1;
1074         else
1075             aint32s[i] = aconn->callNumber[i];
1076     }
1077     USERPRI;
1078     return 0;
1079 }
1080
1081 rxi_SetCallNumberVector(aconn, aint32s)
1082 register struct rx_connection *aconn;
1083 register afs_int32 *aint32s; {
1084     register int i;
1085     register struct rx_call *tcall;
1086     SPLVAR;
1087
1088     NETPRI;
1089     for(i=0; i<RX_MAXCALLS; i++) {
1090         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1091             aconn->callNumber[i] = aint32s[i] - 1;
1092         else
1093             aconn->callNumber[i] = aint32s[i];
1094     }
1095     USERPRI;
1096     return 0;
1097 }
1098
1099 /* Advertise a new service.  A service is named locally by a UDP port
1100  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1101  * on a failure. */
1102 struct rx_service *
1103 rx_NewService(port, serviceId, serviceName, securityObjects,
1104               nSecurityObjects, serviceProc)
1105     u_short port;
1106     u_short serviceId;
1107     char *serviceName;  /* Name for identification purposes (e.g. the
1108                          * service name might be used for probing for
1109                          * statistics) */
1110     struct rx_securityClass **securityObjects;
1111     int nSecurityObjects;
1112     afs_int32 (*serviceProc)();
1113 {    
1114     osi_socket socket = OSI_NULLSOCKET;
1115     register struct rx_service *tservice;    
1116     register int i;
1117     SPLVAR;
1118
1119     clock_NewTime();
1120
1121     if (serviceId == 0) {
1122         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1123          serviceName);
1124         return 0;
1125     }
1126     if (port == 0) {
1127         if (rx_port == 0) {
1128             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1129             return 0;
1130         }
1131         port = rx_port;
1132         socket = rx_socket;
1133     }
1134
1135     tservice = rxi_AllocService();
1136     NETPRI;
1137     AFS_RXGLOCK();
1138     for (i = 0; i<RX_MAX_SERVICES; i++) {
1139         register struct rx_service *service = rx_services[i];
1140         if (service) {
1141             if (port == service->servicePort) {
1142                 if (service->serviceId == serviceId) {
1143                     /* The identical service has already been
1144                      * installed; if the caller was intending to
1145                      * change the security classes used by this
1146                      * service, he/she loses. */
1147                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1148                     AFS_RXGUNLOCK();
1149                     USERPRI;
1150                     rxi_FreeService(tservice);
1151                     return service;
1152                 }
1153                 /* Different service, same port: re-use the socket
1154                  * which is bound to the same port */
1155                 socket = service->socket;
1156             }
1157         } else {
1158             if (socket == OSI_NULLSOCKET) {
1159                 /* If we don't already have a socket (from another
1160                  * service on same port) get a new one */
1161                 socket = rxi_GetUDPSocket(port);
1162                 if (socket == OSI_NULLSOCKET) {
1163                     AFS_RXGUNLOCK();
1164                     USERPRI;
1165                     rxi_FreeService(tservice);
1166                     return 0;
1167                 }
1168             }
1169             service = tservice;
1170             service->socket = socket;
1171             service->servicePort = port;
1172             service->serviceId = serviceId;
1173             service->serviceName = serviceName;
1174             service->nSecurityObjects = nSecurityObjects;
1175             service->securityObjects = securityObjects;
1176             service->minProcs = 0;
1177             service->maxProcs = 1;
1178             service->idleDeadTime = 60;
1179             service->connDeadTime = rx_connDeadTime;
1180             service->executeRequestProc = serviceProc;
1181             rx_services[i] = service;   /* not visible until now */
1182             AFS_RXGUNLOCK();
1183             USERPRI;
1184             return service;
1185         }
1186     }
1187     AFS_RXGUNLOCK();
1188     USERPRI;
1189     rxi_FreeService(tservice);
1190     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1191     return 0;
1192 }
1193
1194 /* Generic request processing loop. This routine should be called
1195  * by the implementation dependent rx_ServerProc. If socketp is
1196  * non-null, it will be set to the file descriptor that this thread
1197  * is now listening on. If socketp is null, this routine will never
1198  * returns. */
1199 void rxi_ServerProc(threadID, newcall, socketp)
1200 int threadID;
1201 struct rx_call *newcall;
1202 osi_socket *socketp;
1203 {
1204     register struct rx_call *call;
1205     register afs_int32 code;
1206     register struct rx_service *tservice = NULL;
1207
1208     for (;;) {
1209         if (newcall) {
1210             call = newcall;
1211             newcall = NULL;
1212         } else {
1213             call = rx_GetCall(threadID, tservice, socketp);
1214             if (socketp && *socketp != OSI_NULLSOCKET) {
1215                 /* We are now a listener thread */
1216                 return;
1217             }
1218         }
1219
1220         /* if server is restarting( typically smooth shutdown) then do not
1221          * allow any new calls.
1222          */
1223
1224         if ( rx_tranquil && (call != NULL) ) {
1225             SPLVAR;
1226
1227             NETPRI;
1228             AFS_RXGLOCK();
1229             MUTEX_ENTER(&call->lock);
1230
1231             rxi_CallError(call, RX_RESTARTING);
1232             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1233
1234             MUTEX_EXIT(&call->lock);
1235             AFS_RXGUNLOCK();
1236             USERPRI;
1237         }
1238
1239 #ifdef  KERNEL
1240         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1241 #ifdef RX_ENABLE_LOCKS
1242             AFS_GLOCK();
1243 #endif /* RX_ENABLE_LOCKS */
1244             afs_termState = AFSOP_STOP_AFS;
1245             afs_osi_Wakeup(&afs_termState);
1246 #ifdef RX_ENABLE_LOCKS
1247             AFS_GUNLOCK();
1248 #endif /* RX_ENABLE_LOCKS */
1249             return;
1250         }
1251 #endif
1252
1253         tservice = call->conn->service;
1254
1255         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1256
1257         code = call->conn->service->executeRequestProc(call);
1258
1259         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1260
1261         rx_EndCall(call, code);
1262         MUTEX_ENTER(&rx_stats_mutex);
1263         rxi_nCalls++;
1264         MUTEX_EXIT(&rx_stats_mutex);
1265     }
1266 }
1267
1268
1269 void rx_WakeupServerProcs()
1270 {
1271     struct rx_serverQueueEntry *np, *tqp;
1272     SPLVAR;
1273
1274     NETPRI;
1275     AFS_RXGLOCK();
1276     MUTEX_ENTER(&rx_serverPool_lock);
1277
1278 #ifdef RX_ENABLE_LOCKS
1279     if (rx_waitForPacket)
1280         CV_BROADCAST(&rx_waitForPacket->cv);
1281 #else /* RX_ENABLE_LOCKS */
1282     if (rx_waitForPacket)
1283         osi_rxWakeup(rx_waitForPacket);
1284 #endif /* RX_ENABLE_LOCKS */
1285     MUTEX_ENTER(&freeSQEList_lock);
1286     for (np = rx_FreeSQEList; np; np = tqp) {
1287       tqp = *(struct rx_serverQueueEntry **)np;
1288 #ifdef RX_ENABLE_LOCKS
1289       CV_BROADCAST(&np->cv);
1290 #else /* RX_ENABLE_LOCKS */
1291       osi_rxWakeup(np);
1292 #endif /* RX_ENABLE_LOCKS */
1293     }
1294     MUTEX_EXIT(&freeSQEList_lock);
1295     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1296 #ifdef RX_ENABLE_LOCKS
1297       CV_BROADCAST(&np->cv);
1298 #else /* RX_ENABLE_LOCKS */
1299       osi_rxWakeup(np);
1300 #endif /* RX_ENABLE_LOCKS */
1301     }
1302     MUTEX_EXIT(&rx_serverPool_lock);
1303     AFS_RXGUNLOCK();
1304     USERPRI;
1305 }
1306
1307 /* meltdown:
1308  * One thing that seems to happen is that all the server threads get
1309  * tied up on some empty or slow call, and then a whole bunch of calls
1310  * arrive at once, using up the packet pool, so now there are more 
1311  * empty calls.  The most critical resources here are server threads
1312  * and the free packet pool.  The "doreclaim" code seems to help in
1313  * general.  I think that eventually we arrive in this state: there
1314  * are lots of pending calls which do have all their packets present,
1315  * so they won't be reclaimed, are multi-packet calls, so they won't
1316  * be scheduled until later, and thus are tying up most of the free 
1317  * packet pool for a very long time.
1318  * future options:
1319  * 1.  schedule multi-packet calls if all the packets are present.  
1320  * Probably CPU-bound operation, useful to return packets to pool. 
1321  * Do what if there is a full window, but the last packet isn't here?
1322  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1323  * it sleeps and waits for that type of call.
1324  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1325  * the current dataquota business is badly broken.  The quota isn't adjusted
1326  * to reflect how many packets are presently queued for a running call.
1327  * So, when we schedule a queued call with a full window of packets queued
1328  * up for it, that *should* free up a window full of packets for other 2d-class
1329  * calls to be able to use from the packet pool.  But it doesn't.
1330  *
1331  * NB.  Most of the time, this code doesn't run -- since idle server threads
1332  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1333  * as a new call arrives.
1334  */
1335 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1336  * for an rx_Read. */
1337 #ifdef RX_ENABLE_LOCKS
1338 struct rx_call *
1339 rx_GetCall(tno, cur_service, socketp)
1340 int tno;
1341 struct rx_service *cur_service;
1342 osi_socket *socketp;
1343 {
1344     struct rx_serverQueueEntry *sq;
1345     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1346     struct rx_service *service;
1347     SPLVAR;
1348
1349     MUTEX_ENTER(&freeSQEList_lock);
1350
1351     if (sq = rx_FreeSQEList) {
1352         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1353         MUTEX_EXIT(&freeSQEList_lock);
1354     } else {    /* otherwise allocate a new one and return that */
1355         MUTEX_EXIT(&freeSQEList_lock);
1356         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1357         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1358         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1359     }
1360
1361     MUTEX_ENTER(&rx_serverPool_lock);
1362     if (cur_service != NULL) {
1363         ReturnToServerPool(cur_service);
1364     }
1365     while (1) {
1366         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1367             register struct rx_call *tcall, *ncall;
1368             choice2 = (struct rx_call *) 0;
1369             /* Scan for eligible incoming calls.  A call is not eligible
1370              * if the maximum number of calls for its service type are
1371              * already executing */
1372             /* One thread will process calls FCFS (to prevent starvation),
1373              * while the other threads may run ahead looking for calls which
1374              * have all their input data available immediately.  This helps 
1375              * keep threads from blocking, waiting for data from the client. */
1376             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1377               service = tcall->conn->service;
1378               if (!QuotaOK(service)) {
1379                 continue;
1380               }
1381               if (!tno || !tcall->queue_item_header.next  ) {
1382                 /* If we're thread 0, then  we'll just use 
1383                  * this call. If we haven't been able to find an optimal 
1384                  * choice, and we're at the end of the list, then use a 
1385                  * 2d choice if one has been identified.  Otherwise... */
1386                 call = (choice2 ? choice2 : tcall);
1387                 service = call->conn->service;
1388               } else if (!queue_IsEmpty(&tcall->rq)) {
1389                 struct rx_packet *rp;
1390                 rp = queue_First(&tcall->rq, rx_packet);
1391                 if (rp->header.seq == 1) {
1392                   if (!meltdown_1pkt ||             
1393                       (rp->header.flags & RX_LAST_PACKET)) {
1394                     call = tcall;
1395                   } else if (rxi_2dchoice && !choice2 &&
1396                              !(tcall->flags & RX_CALL_CLEARED) &&
1397                              (tcall->rprev > rxi_HardAckRate)) {
1398                     choice2 = tcall;
1399                   } else rxi_md2cnt++;
1400                 }
1401               }
1402               if (call)  {
1403                 break;
1404               } else {
1405                   ReturnToServerPool(service);
1406               }
1407             }
1408           }
1409
1410         if (call) {
1411             queue_Remove(call);
1412             rxi_ServerThreadSelectingCall = 1;
1413             MUTEX_EXIT(&rx_serverPool_lock);
1414             MUTEX_ENTER(&call->lock);
1415             MUTEX_ENTER(&rx_serverPool_lock);
1416
1417             if (queue_IsEmpty(&call->rq) ||
1418                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1419               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1420
1421             CLEAR_CALL_QUEUE_LOCK(call);
1422             if (call->error) {
1423                 MUTEX_EXIT(&call->lock);
1424                 ReturnToServerPool(service);
1425                 rxi_ServerThreadSelectingCall = 0;
1426                 CV_SIGNAL(&rx_serverPool_cv);
1427                 call = (struct rx_call*)0;
1428                 continue;
1429             }
1430             call->flags &= (~RX_CALL_WAIT_PROC);
1431             MUTEX_ENTER(&rx_stats_mutex);
1432             rx_nWaiting--;
1433             MUTEX_EXIT(&rx_stats_mutex);
1434             rxi_ServerThreadSelectingCall = 0;
1435             CV_SIGNAL(&rx_serverPool_cv);
1436             MUTEX_EXIT(&rx_serverPool_lock);
1437             break;
1438         }
1439         else {
1440             /* If there are no eligible incoming calls, add this process
1441              * to the idle server queue, to wait for one */
1442             sq->newcall = 0;
1443             sq->tno = tno;
1444             if (socketp) {
1445                 *socketp = OSI_NULLSOCKET;
1446             }
1447             sq->socketp = socketp;
1448             queue_Append(&rx_idleServerQueue, sq);
1449 #ifndef AFS_AIX41_ENV
1450             rx_waitForPacket = sq;
1451 #endif /* AFS_AIX41_ENV */
1452             do {
1453                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1454 #ifdef  KERNEL
1455                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1456                     MUTEX_EXIT(&rx_serverPool_lock);
1457                     return (struct rx_call *)0;
1458                 }
1459 #endif
1460             } while (!(call = sq->newcall) &&
1461                      !(socketp && *socketp != OSI_NULLSOCKET));
1462             MUTEX_EXIT(&rx_serverPool_lock);
1463             if (call) {
1464                 MUTEX_ENTER(&call->lock);
1465             }
1466             break;
1467         }
1468     }
1469
1470     MUTEX_ENTER(&freeSQEList_lock);
1471     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1472     rx_FreeSQEList = sq;
1473     MUTEX_EXIT(&freeSQEList_lock);
1474
1475     if (call) {
1476         clock_GetTime(&call->startTime);
1477         call->state = RX_STATE_ACTIVE;
1478         call->mode = RX_MODE_RECEIVING;
1479
1480         rxi_calltrace(RX_CALL_START, call);
1481         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1482              call->conn->service->servicePort, 
1483              call->conn->service->serviceId, call));
1484
1485         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1486         MUTEX_EXIT(&call->lock);
1487     } else {
1488         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1489     }
1490
1491     return call;
1492 }
1493 #else /* RX_ENABLE_LOCKS */
1494 struct rx_call *
1495 rx_GetCall(tno, cur_service, socketp)
1496   int tno;
1497   struct rx_service *cur_service;
1498   osi_socket *socketp;
1499 {
1500     struct rx_serverQueueEntry *sq;
1501     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1502     struct rx_service *service;
1503     SPLVAR;
1504
1505     NETPRI;
1506     AFS_RXGLOCK();
1507     MUTEX_ENTER(&freeSQEList_lock);
1508
1509     if (sq = rx_FreeSQEList) {
1510         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1511         MUTEX_EXIT(&freeSQEList_lock);
1512     } else {    /* otherwise allocate a new one and return that */
1513         MUTEX_EXIT(&freeSQEList_lock);
1514         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1515         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1516         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1517     }
1518     MUTEX_ENTER(&sq->lock);
1519
1520     if (cur_service != NULL) {
1521         cur_service->nRequestsRunning--;
1522         if (cur_service->nRequestsRunning < cur_service->minProcs)
1523             rxi_minDeficit++;
1524         rxi_availProcs++;
1525     }
1526     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1527         register struct rx_call *tcall, *ncall;
1528         /* Scan for eligible incoming calls.  A call is not eligible
1529          * if the maximum number of calls for its service type are
1530          * already executing */
1531         /* One thread will process calls FCFS (to prevent starvation),
1532          * while the other threads may run ahead looking for calls which
1533          * have all their input data available immediately.  This helps 
1534          * keep threads from blocking, waiting for data from the client. */
1535         choice2 = (struct rx_call *) 0;
1536         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1537           service = tcall->conn->service;
1538           if (QuotaOK(service)) {
1539              if (!tno || !tcall->queue_item_header.next  ) {
1540                  /* If we're thread 0, then  we'll just use 
1541                   * this call. If we haven't been able to find an optimal 
1542                   * choice, and we're at the end of the list, then use a 
1543                   * 2d choice if one has been identified.  Otherwise... */
1544                  call = (choice2 ? choice2 : tcall);
1545                  service = call->conn->service;
1546              } else if (!queue_IsEmpty(&tcall->rq)) {
1547                  struct rx_packet *rp;
1548                  rp = queue_First(&tcall->rq, rx_packet);
1549                  if (rp->header.seq == 1
1550                      && (!meltdown_1pkt ||
1551                          (rp->header.flags & RX_LAST_PACKET))) {
1552                      call = tcall;
1553                  } else if (rxi_2dchoice && !choice2 &&
1554                             !(tcall->flags & RX_CALL_CLEARED) &&
1555                             (tcall->rprev > rxi_HardAckRate)) {
1556                      choice2 = tcall;
1557                  } else rxi_md2cnt++;
1558              }
1559           }
1560           if (call) 
1561              break;
1562         }
1563       }
1564
1565     if (call) {
1566         queue_Remove(call);
1567         /* we can't schedule a call if there's no data!!! */
1568         /* send an ack if there's no data, if we're missing the
1569          * first packet, or we're missing something between first 
1570          * and last -- there's a "hole" in the incoming data. */
1571         if (queue_IsEmpty(&call->rq) ||
1572             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1573             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1574           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1575
1576         call->flags &= (~RX_CALL_WAIT_PROC);
1577         service->nRequestsRunning++;
1578         /* just started call in minProcs pool, need fewer to maintain
1579          * guarantee */
1580         if (service->nRequestsRunning <= service->minProcs)
1581             rxi_minDeficit--;
1582         rxi_availProcs--;
1583         rx_nWaiting--;
1584         /* MUTEX_EXIT(&call->lock); */
1585     }
1586     else {
1587         /* If there are no eligible incoming calls, add this process
1588          * to the idle server queue, to wait for one */
1589         sq->newcall = 0;
1590         if (socketp) {
1591             *socketp = OSI_NULLSOCKET;
1592         }
1593         sq->socketp = socketp;
1594         queue_Append(&rx_idleServerQueue, sq);
1595         do {
1596             osi_rxSleep(sq);
1597 #ifdef  KERNEL
1598                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1599                     AFS_RXGUNLOCK();
1600                     USERPRI;
1601                     return (struct rx_call *)0;
1602                 }
1603 #endif
1604         } while (!(call = sq->newcall) &&
1605                  !(socketp && *socketp != OSI_NULLSOCKET));
1606     }
1607     MUTEX_EXIT(&sq->lock);
1608
1609     MUTEX_ENTER(&freeSQEList_lock);
1610     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1611     rx_FreeSQEList = sq;
1612     MUTEX_EXIT(&freeSQEList_lock);
1613
1614     if (call) {
1615         clock_GetTime(&call->startTime);
1616         call->state = RX_STATE_ACTIVE;
1617         call->mode = RX_MODE_RECEIVING;
1618
1619         rxi_calltrace(RX_CALL_START, call);
1620         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1621          call->conn->service->servicePort, 
1622          call->conn->service->serviceId, call));
1623     } else {
1624         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1625     }
1626
1627     AFS_RXGUNLOCK();
1628     USERPRI;
1629
1630     return call;
1631 }
1632 #endif /* RX_ENABLE_LOCKS */
1633
1634
1635
1636 /* Establish a procedure to be called when a packet arrives for a
1637  * call.  This routine will be called at most once after each call,
1638  * and will also be called if there is an error condition on the or
1639  * the call is complete.  Used by multi rx to build a selection
1640  * function which determines which of several calls is likely to be a
1641  * good one to read from.  
1642  * NOTE: the way this is currently implemented it is probably only a
1643  * good idea to (1) use it immediately after a newcall (clients only)
1644  * and (2) only use it once.  Other uses currently void your warranty
1645  */
1646 void rx_SetArrivalProc(call, proc, handle, arg)
1647     register struct rx_call *call;
1648     register VOID (*proc)();
1649     register VOID *handle;
1650     register VOID *arg;
1651 {
1652     call->arrivalProc = proc;
1653     call->arrivalProcHandle = handle;
1654     call->arrivalProcArg = arg;
1655 }
1656
1657 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1658  * appropriate, and return the final error code from the conversation
1659  * to the caller */
1660
1661 afs_int32 rx_EndCall(call, rc)
1662     register struct rx_call *call;
1663     afs_int32 rc;
1664 {
1665     register struct rx_connection *conn = call->conn;
1666     register struct rx_service *service;
1667     register struct rx_packet *tp; /* Temporary packet pointer */
1668     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1669     afs_int32 error;
1670     SPLVAR;
1671
1672     dpf(("rx_EndCall(call %x)\n", call));
1673
1674     NETPRI;
1675     AFS_RXGLOCK();
1676     MUTEX_ENTER(&call->lock);
1677
1678     if (rc == 0 && call->error == 0) {
1679         call->abortCode = 0;
1680         call->abortCount = 0;
1681     }
1682
1683     call->arrivalProc = (VOID (*)()) 0;
1684     if (rc && call->error == 0) {
1685         rxi_CallError(call, rc);
1686         /* Send an abort message to the peer if this error code has
1687          * only just been set.  If it was set previously, assume the
1688          * peer has already been sent the error code or will request it 
1689          */
1690         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1691     }
1692     if (conn->type == RX_SERVER_CONNECTION) {
1693         /* Make sure reply or at least dummy reply is sent */
1694         if (call->mode == RX_MODE_RECEIVING) {
1695             rxi_WriteProc(call, 0, 0);
1696         }
1697         if (call->mode == RX_MODE_SENDING) {
1698             rxi_FlushWrite(call);
1699         }
1700         service = conn->service;
1701         rxi_calltrace(RX_CALL_END, call);
1702         /* Call goes to hold state until reply packets are acknowledged */
1703         if (call->tfirst + call->nSoftAcked < call->tnext) {
1704             call->state = RX_STATE_HOLD;
1705         } else {
1706             call->state = RX_STATE_DALLY;
1707             rxi_ClearTransmitQueue(call, 0);
1708             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1709             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1710         }
1711     }
1712     else { /* Client connection */
1713         char dummy;
1714         /* Make sure server receives input packets, in the case where
1715          * no reply arguments are expected */
1716         if ((call->mode == RX_MODE_SENDING)
1717          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1718             (void) rxi_ReadProc(call, &dummy, 1);
1719         }
1720         /* We need to release the call lock since it's lower than the
1721          * conn_call_lock and we don't want to hold the conn_call_lock
1722          * over the rx_ReadProc call. The conn_call_lock needs to be held
1723          * here for the case where rx_NewCall is perusing the calls on
1724          * the connection structure. We don't want to signal until
1725          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1726          * have checked this call, found it active and by the time it
1727          * goes to sleep, will have missed the signal.
1728          */
1729         MUTEX_EXIT(&call->lock);
1730         MUTEX_ENTER(&conn->conn_call_lock);
1731         MUTEX_ENTER(&call->lock);
1732         MUTEX_ENTER(&conn->conn_data_lock);
1733         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1734             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1735             MUTEX_EXIT(&conn->conn_data_lock);
1736 #ifdef  RX_ENABLE_LOCKS
1737             CV_BROADCAST(&conn->conn_call_cv);
1738 #else
1739             osi_rxWakeup(conn);
1740 #endif
1741         }
1742 #ifdef RX_ENABLE_LOCKS
1743         else {
1744             MUTEX_EXIT(&conn->conn_data_lock);
1745         }
1746 #endif /* RX_ENABLE_LOCKS */
1747         call->state = RX_STATE_DALLY;
1748     }
1749     error = call->error;
1750
1751     /* currentPacket, nLeft, and NFree must be zeroed here, because
1752      * ResetCall cannot: ResetCall may be called at splnet(), in the
1753      * kernel version, and may interrupt the macros rx_Read or
1754      * rx_Write, which run at normal priority for efficiency. */
1755     if (call->currentPacket) {
1756         rxi_FreePacket(call->currentPacket);
1757         call->currentPacket = (struct rx_packet *) 0;
1758         call->nLeft = call->nFree = call->curlen = 0;
1759     }
1760     else
1761         call->nLeft = call->nFree = call->curlen = 0;
1762
1763     /* Free any packets from the last call to ReadvProc/WritevProc */
1764     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1765         queue_Remove(tp);
1766         rxi_FreePacket(tp);
1767     }
1768
1769     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1770     MUTEX_EXIT(&call->lock);
1771     if (conn->type == RX_CLIENT_CONNECTION)
1772         MUTEX_EXIT(&conn->conn_call_lock);
1773     AFS_RXGUNLOCK();
1774     USERPRI;
1775     /*
1776      * Map errors to the local host's errno.h format.
1777      */
1778     error = ntoh_syserr_conv(error);
1779     return error;
1780 }
1781
1782 #if !defined(KERNEL)
1783
1784 /* Call this routine when shutting down a server or client (especially
1785  * clients).  This will allow Rx to gracefully garbage collect server
1786  * connections, and reduce the number of retries that a server might
1787  * make to a dead client.
1788  * This is not quite right, since some calls may still be ongoing and
1789  * we can't lock them to destroy them. */
1790 void rx_Finalize() {
1791     register struct rx_connection **conn_ptr, **conn_end;
1792
1793     INIT_PTHREAD_LOCKS
1794     LOCK_RX_INIT
1795     if (rxinit_status == 1) {
1796         UNLOCK_RX_INIT
1797         return; /* Already shutdown. */
1798     }
1799     rxi_DeleteCachedConnections();
1800     if (rx_connHashTable) {
1801         MUTEX_ENTER(&rx_connHashTable_lock);
1802         for (conn_ptr = &rx_connHashTable[0], 
1803              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1804              conn_ptr < conn_end; conn_ptr++) {
1805             struct rx_connection *conn, *next;
1806             for (conn = *conn_ptr; conn; conn = next) {
1807                 next = conn->next;
1808                 if (conn->type == RX_CLIENT_CONNECTION) {
1809                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1810                     conn->refCount++;
1811                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1812 #ifdef RX_ENABLE_LOCKS
1813                     rxi_DestroyConnectionNoLock(conn);
1814 #else /* RX_ENABLE_LOCKS */
1815                     rxi_DestroyConnection(conn);
1816 #endif /* RX_ENABLE_LOCKS */
1817                 }
1818             }
1819         }
1820 #ifdef RX_ENABLE_LOCKS
1821         while (rx_connCleanup_list) {
1822             struct rx_connection *conn;
1823             conn = rx_connCleanup_list;
1824             rx_connCleanup_list = rx_connCleanup_list->next;
1825             MUTEX_EXIT(&rx_connHashTable_lock);
1826             rxi_CleanupConnection(conn);
1827             MUTEX_ENTER(&rx_connHashTable_lock);
1828         }
1829         MUTEX_EXIT(&rx_connHashTable_lock);
1830 #endif /* RX_ENABLE_LOCKS */
1831     }
1832     rxi_flushtrace();
1833
1834     rxinit_status = 1;
1835     UNLOCK_RX_INIT
1836 }
1837 #endif
1838
1839 /* if we wakeup packet waiter too often, can get in loop with two
1840     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1841 void
1842 rxi_PacketsUnWait() {
1843
1844     if (!rx_waitingForPackets) {
1845         return;
1846     }
1847 #ifdef KERNEL
1848     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1849         return;                                     /* still over quota */
1850     }
1851 #endif /* KERNEL */
1852     rx_waitingForPackets = 0;
1853 #ifdef  RX_ENABLE_LOCKS
1854     CV_BROADCAST(&rx_waitingForPackets_cv);
1855 #else
1856     osi_rxWakeup(&rx_waitingForPackets);
1857 #endif
1858     return;
1859 }
1860
1861
1862 /* ------------------Internal interfaces------------------------- */
1863
1864 /* Return this process's service structure for the
1865  * specified socket and service */
1866 struct rx_service *rxi_FindService(socket, serviceId)
1867     register osi_socket socket;
1868     register u_short serviceId;
1869 {
1870     register struct rx_service **sp;    
1871     for (sp = &rx_services[0]; *sp; sp++) {
1872         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1873           return *sp;
1874     }
1875     return 0;
1876 }
1877
1878 /* Allocate a call structure, for the indicated channel of the
1879  * supplied connection.  The mode and state of the call must be set by
1880  * the caller. */
1881 struct rx_call *rxi_NewCall(conn, channel)
1882     register struct rx_connection *conn;
1883     register int channel;
1884 {
1885     register struct rx_call *call;
1886 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1887     register struct rx_call *cp;        /* Call pointer temp */
1888     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1889 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1890
1891     /* Grab an existing call structure, or allocate a new one.
1892      * Existing call structures are assumed to have been left reset by
1893      * rxi_FreeCall */
1894     MUTEX_ENTER(&rx_freeCallQueue_lock);
1895
1896 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1897     /*
1898      * EXCEPT that the TQ might not yet be cleared out.
1899      * Skip over those with in-use TQs.
1900      */
1901     call = NULL;
1902     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1903         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1904             call = cp;
1905             break;
1906         }
1907     }
1908     if (call) {
1909 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1910     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1911         call = queue_First(&rx_freeCallQueue, rx_call);
1912 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1913         queue_Remove(call);
1914         MUTEX_ENTER(&rx_stats_mutex);
1915         rx_stats.nFreeCallStructs--;
1916         MUTEX_EXIT(&rx_stats_mutex);
1917         MUTEX_EXIT(&rx_freeCallQueue_lock);
1918         MUTEX_ENTER(&call->lock);
1919         CLEAR_CALL_QUEUE_LOCK(call);
1920 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1921         /* Now, if TQ wasn't cleared earlier, do it now. */
1922         if (call->flags & RX_CALL_TQ_CLEARME) {
1923             rxi_ClearTransmitQueue(call, 0);
1924             queue_Init(&call->tq);
1925         }
1926 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1927         /* Bind the call to its connection structure */
1928         call->conn = conn;
1929         rxi_ResetCall(call, 1);
1930     }
1931     else {
1932         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1933
1934         MUTEX_EXIT(&rx_freeCallQueue_lock);
1935         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1936         MUTEX_ENTER(&call->lock);
1937         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1938         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1939         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1940
1941         MUTEX_ENTER(&rx_stats_mutex);
1942         rx_stats.nCallStructs++;
1943         MUTEX_EXIT(&rx_stats_mutex);
1944         /* Initialize once-only items */
1945         queue_Init(&call->tq);
1946         queue_Init(&call->rq);
1947         queue_Init(&call->iovq);
1948         /* Bind the call to its connection structure (prereq for reset) */
1949         call->conn = conn;
1950         rxi_ResetCall(call, 1);
1951     }
1952     call->channel = channel;
1953     call->callNumber = &conn->callNumber[channel];
1954     /* Note that the next expected call number is retained (in
1955      * conn->callNumber[i]), even if we reallocate the call structure
1956      */
1957     conn->call[channel] = call;
1958     /* if the channel's never been used (== 0), we should start at 1, otherwise
1959         the call number is valid from the last time this channel was used */
1960     if (*call->callNumber == 0) *call->callNumber = 1;
1961
1962     MUTEX_EXIT(&call->lock);
1963     return call;
1964 }
1965
1966 /* A call has been inactive long enough that so we can throw away
1967  * state, including the call structure, which is placed on the call
1968  * free list.
1969  * Call is locked upon entry.
1970  */
1971 #ifdef RX_ENABLE_LOCKS
1972 void rxi_FreeCall(call, haveCTLock)
1973     int haveCTLock; /* Set if called from rxi_ReapConnections */
1974 #else /* RX_ENABLE_LOCKS */
1975 void rxi_FreeCall(call)
1976 #endif /* RX_ENABLE_LOCKS */
1977     register struct rx_call *call;
1978 {
1979     register int channel = call->channel;
1980     register struct rx_connection *conn = call->conn;
1981
1982
1983     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
1984       (*call->callNumber)++;
1985     rxi_ResetCall(call, 0);
1986     call->conn->call[channel] = (struct rx_call *) 0;
1987
1988     MUTEX_ENTER(&rx_freeCallQueue_lock);
1989     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
1990 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1991     /* A call may be free even though its transmit queue is still in use.
1992      * Since we search the call list from head to tail, put busy calls at
1993      * the head of the list, and idle calls at the tail.
1994      */
1995     if (call->flags & RX_CALL_TQ_BUSY)
1996         queue_Prepend(&rx_freeCallQueue, call);
1997     else
1998         queue_Append(&rx_freeCallQueue, call);
1999 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2000     queue_Append(&rx_freeCallQueue, call);
2001 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2002     MUTEX_ENTER(&rx_stats_mutex);
2003     rx_stats.nFreeCallStructs++;
2004     MUTEX_EXIT(&rx_stats_mutex);
2005
2006     MUTEX_EXIT(&rx_freeCallQueue_lock);
2007  
2008     /* Destroy the connection if it was previously slated for
2009      * destruction, i.e. the Rx client code previously called
2010      * rx_DestroyConnection (client connections), or
2011      * rxi_ReapConnections called the same routine (server
2012      * connections).  Only do this, however, if there are no
2013      * outstanding calls. Note that for fine grain locking, there appears
2014      * to be a deadlock in that rxi_FreeCall has a call locked and
2015      * DestroyConnectionNoLock locks each call in the conn. But note a
2016      * few lines up where we have removed this call from the conn.
2017      * If someone else destroys a connection, they either have no
2018      * call lock held or are going through this section of code.
2019      */
2020     if (conn->flags & RX_CONN_DESTROY_ME) {
2021         MUTEX_ENTER(&conn->conn_data_lock);
2022         conn->refCount++;
2023         MUTEX_EXIT(&conn->conn_data_lock);
2024 #ifdef RX_ENABLE_LOCKS
2025         if (haveCTLock)
2026             rxi_DestroyConnectionNoLock(conn);
2027         else
2028             rxi_DestroyConnection(conn);
2029 #else /* RX_ENABLE_LOCKS */
2030         rxi_DestroyConnection(conn);
2031 #endif /* RX_ENABLE_LOCKS */
2032     }
2033 }
2034
2035 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2036 char *rxi_Alloc(size)
2037 register size_t size;
2038 {
2039     register char *p;
2040
2041 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2042     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2043      * implementation.
2044      */
2045     int glockOwner = ISAFS_GLOCK();
2046     if (!glockOwner)
2047         AFS_GLOCK();
2048 #endif
2049     MUTEX_ENTER(&rx_stats_mutex);
2050     rxi_Alloccnt++; rxi_Allocsize += size;
2051     MUTEX_EXIT(&rx_stats_mutex);
2052 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2053     if (size > AFS_SMALLOCSIZ) {
2054         p = (char *) osi_AllocMediumSpace(size);
2055     } else
2056         p = (char *) osi_AllocSmall(size, 1);
2057 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2058     if (!glockOwner)
2059         AFS_GUNLOCK();
2060 #endif
2061 #else
2062     p = (char *) osi_Alloc(size);
2063 #endif
2064     if (!p) osi_Panic("rxi_Alloc error");
2065     bzero(p, size);
2066     return p;
2067 }
2068
2069 void rxi_Free(addr, size)
2070 void *addr;
2071 register size_t size;
2072 {
2073 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2074     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2075      * implementation.
2076      */
2077     int glockOwner = ISAFS_GLOCK();
2078     if (!glockOwner)
2079         AFS_GLOCK();
2080 #endif
2081     MUTEX_ENTER(&rx_stats_mutex);
2082     rxi_Alloccnt--; rxi_Allocsize -= size;
2083     MUTEX_EXIT(&rx_stats_mutex);
2084 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2085     if (size > AFS_SMALLOCSIZ)
2086         osi_FreeMediumSpace(addr);
2087     else
2088         osi_FreeSmall(addr);
2089 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2090     if (!glockOwner)
2091         AFS_GUNLOCK();
2092 #endif
2093 #else
2094     osi_Free(addr, size);
2095 #endif    
2096 }
2097
2098 /* Find the peer process represented by the supplied (host,port)
2099  * combination.  If there is no appropriate active peer structure, a
2100  * new one will be allocated and initialized 
2101  * The origPeer, if set, is a pointer to a peer structure on which the
2102  * refcount will be be decremented. This is used to replace the peer
2103  * structure hanging off a connection structure */
2104 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2105     register afs_uint32 host;
2106     register u_short port;
2107     struct rx_peer *origPeer;
2108     int create;
2109 {
2110     register struct rx_peer *pp;
2111     int hashIndex;
2112     hashIndex = PEER_HASH(host, port);
2113     MUTEX_ENTER(&rx_peerHashTable_lock);
2114     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2115         if ((pp->host == host) && (pp->port == port)) break;
2116     }
2117     if (!pp) {
2118         if (create) {
2119             pp = rxi_AllocPeer(); /* This bzero's *pp */
2120             pp->host = host;      /* set here or in InitPeerParams is zero */
2121             pp->port = port;
2122             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2123             queue_Init(&pp->congestionQueue);
2124             queue_Init(&pp->rpcStats);
2125             pp->next = rx_peerHashTable[hashIndex];
2126             rx_peerHashTable[hashIndex] = pp;
2127             rxi_InitPeerParams(pp);
2128             MUTEX_ENTER(&rx_stats_mutex);
2129             rx_stats.nPeerStructs++;
2130             MUTEX_EXIT(&rx_stats_mutex);
2131         }
2132     }
2133     if (pp && create) {
2134         pp->refCount++;
2135     }
2136     if ( origPeer)
2137         origPeer->refCount--;
2138     MUTEX_EXIT(&rx_peerHashTable_lock);
2139     return pp;
2140 }
2141
2142
2143 /* Find the connection at (host, port) started at epoch, and with the
2144  * given connection id.  Creates the server connection if necessary.
2145  * The type specifies whether a client connection or a server
2146  * connection is desired.  In both cases, (host, port) specify the
2147  * peer's (host, pair) pair.  Client connections are not made
2148  * automatically by this routine.  The parameter socket gives the
2149  * socket descriptor on which the packet was received.  This is used,
2150  * in the case of server connections, to check that *new* connections
2151  * come via a valid (port, serviceId).  Finally, the securityIndex
2152  * parameter must match the existing index for the connection.  If a
2153  * server connection is created, it will be created using the supplied
2154  * index, if the index is valid for this service */
2155 struct rx_connection *
2156 rxi_FindConnection(socket, host, port, serviceId, cid, 
2157                    epoch, type, securityIndex)
2158     osi_socket socket;
2159     register afs_int32 host;
2160     register u_short port;
2161     u_short serviceId;
2162     afs_uint32 cid;
2163     afs_uint32 epoch;
2164     int type;
2165     u_int securityIndex;
2166 {
2167     int hashindex, flag;
2168     register struct rx_connection *conn;
2169     struct rx_peer *peer;
2170     hashindex = CONN_HASH(host, port, cid, epoch, type);
2171     MUTEX_ENTER(&rx_connHashTable_lock);
2172     rxLastConn ? (conn = rxLastConn, flag = 0) :
2173                  (conn = rx_connHashTable[hashindex], flag = 1);
2174     for (; conn; ) {
2175       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2176           && (epoch == conn->epoch)) {
2177         register struct rx_peer *pp = conn->peer;
2178         if (securityIndex != conn->securityIndex) {
2179             /* this isn't supposed to happen, but someone could forge a packet
2180                like this, and there seems to be some CM bug that makes this
2181                happen from time to time -- in which case, the fileserver
2182                asserts. */  
2183             MUTEX_EXIT(&rx_connHashTable_lock);
2184             return (struct rx_connection *) 0;
2185         }
2186         /* epoch's high order bits mean route for security reasons only on
2187          * the cid, not the host and port fields.
2188          */
2189         if (conn->epoch & 0x80000000) break;
2190         if (((type == RX_CLIENT_CONNECTION) 
2191              || (pp->host == host)) && (pp->port == port))
2192           break;
2193       }
2194       if ( !flag )
2195       {
2196         /* the connection rxLastConn that was used the last time is not the
2197         ** one we are looking for now. Hence, start searching in the hash */
2198         flag = 1;
2199         conn = rx_connHashTable[hashindex];
2200       }
2201       else
2202         conn = conn->next;
2203     }
2204     if (!conn) {
2205         struct rx_service *service;
2206         if (type == RX_CLIENT_CONNECTION) {
2207             MUTEX_EXIT(&rx_connHashTable_lock);
2208             return (struct rx_connection *) 0;
2209         }
2210         service = rxi_FindService(socket, serviceId);
2211         if (!service || (securityIndex >= service->nSecurityObjects) 
2212             || (service->securityObjects[securityIndex] == 0)) {
2213             MUTEX_EXIT(&rx_connHashTable_lock);
2214             return (struct rx_connection *) 0;
2215         }
2216         conn = rxi_AllocConnection(); /* This bzero's the connection */
2217         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2218                    MUTEX_DEFAULT,0);
2219         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2220                    MUTEX_DEFAULT,0);
2221         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2222         conn->next = rx_connHashTable[hashindex];
2223         rx_connHashTable[hashindex] = conn;
2224         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2225         conn->type = RX_SERVER_CONNECTION;
2226         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2227         conn->epoch = epoch;
2228         conn->cid = cid & RX_CIDMASK;
2229         /* conn->serial = conn->lastSerial = 0; */
2230         /* conn->timeout = 0; */
2231         conn->ackRate = RX_FAST_ACK_RATE;
2232         conn->service = service;
2233         conn->serviceId = serviceId;
2234         conn->securityIndex = securityIndex;
2235         conn->securityObject = service->securityObjects[securityIndex];
2236         conn->nSpecific = 0;
2237         conn->specific = NULL;
2238         rx_SetConnDeadTime(conn, service->connDeadTime);
2239         /* Notify security object of the new connection */
2240         RXS_NewConnection(conn->securityObject, conn);
2241         /* XXXX Connection timeout? */
2242         if (service->newConnProc) (*service->newConnProc)(conn);
2243         MUTEX_ENTER(&rx_stats_mutex);
2244         rx_stats.nServerConns++;
2245         MUTEX_EXIT(&rx_stats_mutex);
2246     }
2247     else
2248     {
2249     /* Ensure that the peer structure is set up in such a way that
2250     ** replies in this connection go back to that remote interface
2251     ** from which the last packet was sent out. In case, this packet's
2252     ** source IP address does not match the peer struct for this conn,
2253     ** then drop the refCount on conn->peer and get a new peer structure.
2254     ** We can check the host,port field in the peer structure without the
2255     ** rx_peerHashTable_lock because the peer structure has its refCount
2256     ** incremented and the only time the host,port in the peer struct gets
2257     ** updated is when the peer structure is created.
2258     */
2259         if (conn->peer->host == host )
2260                 peer = conn->peer; /* no change to the peer structure */
2261         else
2262                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2263     }
2264
2265     MUTEX_ENTER(&conn->conn_data_lock);
2266     conn->refCount++;
2267     conn->peer = peer;
2268     MUTEX_EXIT(&conn->conn_data_lock);
2269
2270     rxLastConn = conn;  /* store this connection as the last conn used */
2271     MUTEX_EXIT(&rx_connHashTable_lock);
2272     return conn;
2273 }
2274
2275 /* There are two packet tracing routines available for testing and monitoring
2276  * Rx.  One is called just after every packet is received and the other is
2277  * called just before every packet is sent.  Received packets, have had their
2278  * headers decoded, and packets to be sent have not yet had their headers
2279  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2280  * containing the network address.  Both can be modified.  The return value, if
2281  * non-zero, indicates that the packet should be dropped.  */
2282
2283 int (*rx_justReceived)() = 0;
2284 int (*rx_almostSent)() = 0;
2285
2286 /* A packet has been received off the interface.  Np is the packet, socket is
2287  * the socket number it was received from (useful in determining which service
2288  * this packet corresponds to), and (host, port) reflect the host,port of the
2289  * sender.  This call returns the packet to the caller if it is finished with
2290  * it, rather than de-allocating it, just as a small performance hack */
2291
2292 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2293     register struct rx_packet *np;
2294     osi_socket socket;
2295     afs_uint32 host;
2296     u_short port;
2297     int *tnop;
2298     struct rx_call **newcallp;
2299 {
2300     register struct rx_call *call;
2301     register struct rx_connection *conn;
2302     int channel;
2303     afs_uint32 currentCallNumber;
2304     int type;
2305     int skew;
2306 #ifdef RXDEBUG
2307     char *packetType;
2308 #endif
2309     struct rx_packet *tnp;
2310
2311 #ifdef RXDEBUG
2312 /* We don't print out the packet until now because (1) the time may not be
2313  * accurate enough until now in the lwp implementation (rx_Listener only gets
2314  * the time after the packet is read) and (2) from a protocol point of view,
2315  * this is the first time the packet has been seen */
2316     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2317         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2318     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2319          np->header.serial, packetType, host, port, np->header.serviceId,
2320          np->header.epoch, np->header.cid, np->header.callNumber, 
2321          np->header.seq, np->header.flags, np));
2322 #endif
2323
2324     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2325       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2326     }
2327
2328     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2329         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2330     }
2331 #ifdef RXDEBUG
2332     /* If an input tracer function is defined, call it with the packet and
2333      * network address.  Note this function may modify its arguments. */
2334     if (rx_justReceived) {
2335         struct sockaddr_in addr;
2336         int drop;
2337         addr.sin_family = AF_INET;
2338         addr.sin_port = port;
2339         addr.sin_addr.s_addr = host;
2340 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2341         addr.sin_len = sizeof(addr);
2342 #endif  /* AFS_OSF_ENV */
2343         drop = (*rx_justReceived) (np, &addr);
2344         /* drop packet if return value is non-zero */
2345         if (drop) return np;
2346         port = addr.sin_port;           /* in case fcn changed addr */
2347         host = addr.sin_addr.s_addr;
2348     }
2349 #endif
2350
2351     /* If packet was not sent by the client, then *we* must be the client */
2352     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2353         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2354
2355     /* Find the connection (or fabricate one, if we're the server & if
2356      * necessary) associated with this packet */
2357     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2358                               np->header.cid, np->header.epoch, type, 
2359                               np->header.securityIndex);
2360
2361     if (!conn) {
2362       /* If no connection found or fabricated, just ignore the packet.
2363        * (An argument could be made for sending an abort packet for
2364        * the conn) */
2365       return np;
2366     }   
2367
2368     MUTEX_ENTER(&conn->conn_data_lock);
2369     if (conn->maxSerial < np->header.serial)
2370       conn->maxSerial = np->header.serial;
2371     MUTEX_EXIT(&conn->conn_data_lock);
2372
2373     /* If the connection is in an error state, send an abort packet and ignore
2374      * the incoming packet */
2375     if (conn->error) {
2376         /* Don't respond to an abort packet--we don't want loops! */
2377         MUTEX_ENTER(&conn->conn_data_lock);
2378         if (np->header.type != RX_PACKET_TYPE_ABORT)
2379             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2380         conn->refCount--;
2381         MUTEX_EXIT(&conn->conn_data_lock);
2382         return np;
2383     }
2384
2385     /* Check for connection-only requests (i.e. not call specific). */
2386     if (np->header.callNumber == 0) {
2387         switch (np->header.type) {
2388             case RX_PACKET_TYPE_ABORT:
2389                 /* What if the supplied error is zero? */
2390                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2391                 MUTEX_ENTER(&conn->conn_data_lock);
2392                 conn->refCount--;
2393                 MUTEX_EXIT(&conn->conn_data_lock);
2394                 return np;
2395             case RX_PACKET_TYPE_CHALLENGE:
2396                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2397                 MUTEX_ENTER(&conn->conn_data_lock);
2398                 conn->refCount--;
2399                 MUTEX_EXIT(&conn->conn_data_lock);
2400                 return tnp;
2401             case RX_PACKET_TYPE_RESPONSE:
2402                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2403                 MUTEX_ENTER(&conn->conn_data_lock);
2404                 conn->refCount--;
2405                 MUTEX_EXIT(&conn->conn_data_lock);
2406                 return tnp;
2407             case RX_PACKET_TYPE_PARAMS:
2408             case RX_PACKET_TYPE_PARAMS+1:
2409             case RX_PACKET_TYPE_PARAMS+2:
2410                 /* ignore these packet types for now */
2411                 MUTEX_ENTER(&conn->conn_data_lock);
2412                 conn->refCount--;
2413                 MUTEX_EXIT(&conn->conn_data_lock);
2414                 return np;
2415
2416
2417             default:
2418                 /* Should not reach here, unless the peer is broken: send an
2419                  * abort packet */
2420                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2421                 MUTEX_ENTER(&conn->conn_data_lock);
2422                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2423                 conn->refCount--;
2424                 MUTEX_EXIT(&conn->conn_data_lock);
2425                 return tnp;
2426         }
2427     }
2428
2429     channel = np->header.cid & RX_CHANNELMASK;
2430     call = conn->call[channel];
2431 #ifdef  RX_ENABLE_LOCKS
2432     if (call)
2433         MUTEX_ENTER(&call->lock);
2434     /* Test to see if call struct is still attached to conn. */
2435     if (call != conn->call[channel]) {
2436         if (call)
2437             MUTEX_EXIT(&call->lock);
2438         if (type == RX_SERVER_CONNECTION) {
2439             call = conn->call[channel];
2440             /* If we started with no call attached and there is one now,
2441              * another thread is also running this routine and has gotten
2442              * the connection channel. We should drop this packet in the tests
2443              * below. If there was a call on this connection and it's now
2444              * gone, then we'll be making a new call below.
2445              * If there was previously a call and it's now different then
2446              * the old call was freed and another thread running this routine
2447              * has created a call on this channel. One of these two threads
2448              * has a packet for the old call and the code below handles those
2449              * cases.
2450              */
2451             if (call)
2452                 MUTEX_ENTER(&call->lock);
2453         }
2454         else {
2455             /* This packet can't be for this call. If the new call address is
2456              * 0 then no call is running on this channel. If there is a call
2457              * then, since this is a client connection we're getting data for
2458              * it must be for the previous call.
2459              */
2460             MUTEX_ENTER(&rx_stats_mutex);
2461             rx_stats.spuriousPacketsRead++;
2462             MUTEX_EXIT(&rx_stats_mutex);
2463             MUTEX_ENTER(&conn->conn_data_lock);
2464             conn->refCount--;
2465             MUTEX_EXIT(&conn->conn_data_lock);
2466             return np;
2467         }
2468     }
2469 #endif
2470     currentCallNumber = conn->callNumber[channel];
2471
2472     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2473         if (np->header.callNumber < currentCallNumber) {
2474             MUTEX_ENTER(&rx_stats_mutex);
2475             rx_stats.spuriousPacketsRead++;
2476             MUTEX_EXIT(&rx_stats_mutex);
2477 #ifdef  RX_ENABLE_LOCKS
2478             if (call)
2479                 MUTEX_EXIT(&call->lock);
2480 #endif
2481             MUTEX_ENTER(&conn->conn_data_lock);
2482             conn->refCount--;
2483             MUTEX_EXIT(&conn->conn_data_lock);
2484             return np;
2485         }
2486         if (!call) {
2487             call = rxi_NewCall(conn, channel);
2488             MUTEX_ENTER(&call->lock);
2489             *call->callNumber = np->header.callNumber;
2490             call->state = RX_STATE_PRECALL;
2491             clock_GetTime(&call->queueTime);
2492             hzero(call->bytesSent);
2493             hzero(call->bytesRcvd);
2494             rxi_KeepAliveOn(call);
2495         }
2496         else if (np->header.callNumber != currentCallNumber) {
2497             /* Wait until the transmit queue is idle before deciding
2498              * whether to reset the current call. Chances are that the
2499              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2500              * flag is cleared.
2501              */
2502 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2503             while ((call->state == RX_STATE_ACTIVE) &&
2504                    (call->flags & RX_CALL_TQ_BUSY)) {
2505                 call->flags |= RX_CALL_TQ_WAIT;
2506 #ifdef RX_ENABLE_LOCKS
2507                 CV_WAIT(&call->cv_tq, &call->lock);
2508 #else /* RX_ENABLE_LOCKS */
2509                 osi_rxSleep(&call->tq);
2510 #endif /* RX_ENABLE_LOCKS */
2511             }
2512 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2513             /* If the new call cannot be taken right now send a busy and set
2514              * the error condition in this call, so that it terminates as
2515              * quickly as possible */
2516             if (call->state == RX_STATE_ACTIVE) {
2517                 struct rx_packet *tp;
2518
2519                 rxi_CallError(call, RX_CALL_DEAD);
2520                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2521                 MUTEX_EXIT(&call->lock);
2522                 MUTEX_ENTER(&conn->conn_data_lock);
2523                 conn->refCount--;
2524                 MUTEX_EXIT(&conn->conn_data_lock);
2525                 return tp;
2526             }
2527             rxi_ResetCall(call, 0);
2528             *call->callNumber = np->header.callNumber;
2529             call->state = RX_STATE_PRECALL;
2530             clock_GetTime(&call->queueTime);
2531             hzero(call->bytesSent);
2532             hzero(call->bytesRcvd);
2533             /*
2534              * If the number of queued calls exceeds the overload
2535              * threshold then abort this call.
2536              */
2537             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2538                 struct rx_packet *tp;
2539
2540                 rxi_CallError(call, rx_BusyError);
2541                 tp = rxi_SendCallAbort(call, np, 1, 0);
2542                 MUTEX_EXIT(&call->lock);
2543                 MUTEX_ENTER(&conn->conn_data_lock);
2544                 conn->refCount--;
2545                 MUTEX_EXIT(&conn->conn_data_lock);
2546                 return tp;
2547             }
2548             rxi_KeepAliveOn(call);
2549         }
2550         else {
2551             /* Continuing call; do nothing here. */
2552         }
2553     } else { /* we're the client */
2554         /* Ignore all incoming acknowledgements for calls in DALLY state */
2555         if ( call && (call->state == RX_STATE_DALLY) 
2556          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2557             MUTEX_ENTER(&rx_stats_mutex);
2558             rx_stats.ignorePacketDally++;
2559             MUTEX_EXIT(&rx_stats_mutex);
2560 #ifdef  RX_ENABLE_LOCKS
2561             if (call) {
2562                 MUTEX_EXIT(&call->lock);
2563             }
2564 #endif
2565             MUTEX_ENTER(&conn->conn_data_lock);
2566             conn->refCount--;
2567             MUTEX_EXIT(&conn->conn_data_lock);
2568             return np;
2569         }
2570         
2571         /* Ignore anything that's not relevant to the current call.  If there
2572          * isn't a current call, then no packet is relevant. */
2573         if (!call || (np->header.callNumber != currentCallNumber)) {
2574             MUTEX_ENTER(&rx_stats_mutex);
2575             rx_stats.spuriousPacketsRead++;
2576             MUTEX_EXIT(&rx_stats_mutex);
2577 #ifdef  RX_ENABLE_LOCKS
2578             if (call) {
2579                 MUTEX_EXIT(&call->lock);
2580             }
2581 #endif
2582             MUTEX_ENTER(&conn->conn_data_lock);
2583             conn->refCount--;
2584             MUTEX_EXIT(&conn->conn_data_lock);
2585             return np;  
2586         }
2587         /* If the service security object index stamped in the packet does not
2588          * match the connection's security index, ignore the packet */
2589         if (np->header.securityIndex != conn->securityIndex) {
2590 #ifdef  RX_ENABLE_LOCKS
2591             MUTEX_EXIT(&call->lock);
2592 #endif
2593             MUTEX_ENTER(&conn->conn_data_lock);
2594             conn->refCount--;       
2595             MUTEX_EXIT(&conn->conn_data_lock);
2596             return np;
2597         }
2598
2599         /* If we're receiving the response, then all transmit packets are
2600          * implicitly acknowledged.  Get rid of them. */
2601         if (np->header.type == RX_PACKET_TYPE_DATA) {
2602 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2603             /* XXX Hack. Because we must release the global rx lock when
2604              * sending packets (osi_NetSend) we drop all acks while we're
2605              * traversing the tq in rxi_Start sending packets out because
2606              * packets may move to the freePacketQueue as result of being here!
2607              * So we drop these packets until we're safely out of the
2608              * traversing. Really ugly! 
2609              * For fine grain RX locking, we set the acked field in the
2610              * packets and let rxi_Start remove them from the transmit queue.
2611              */
2612             if (call->flags & RX_CALL_TQ_BUSY) {
2613 #ifdef  RX_ENABLE_LOCKS
2614                 rxi_SetAcksInTransmitQueue(call);
2615 #else
2616                 conn->refCount--;
2617                 return np;              /* xmitting; drop packet */
2618 #endif
2619             }
2620             else {
2621                 rxi_ClearTransmitQueue(call, 0);
2622             }
2623 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2624             rxi_ClearTransmitQueue(call, 0);
2625 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2626         } else {
2627           if (np->header.type == RX_PACKET_TYPE_ACK) {
2628         /* now check to see if this is an ack packet acknowledging that the
2629          * server actually *lost* some hard-acked data.  If this happens we
2630          * ignore this packet, as it may indicate that the server restarted in
2631          * the middle of a call.  It is also possible that this is an old ack
2632          * packet.  We don't abort the connection in this case, because this
2633          * *might* just be an old ack packet.  The right way to detect a server
2634          * restart in the midst of a call is to notice that the server epoch
2635          * changed, btw.  */
2636         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2637          * XXX unacknowledged.  I think that this is off-by-one, but
2638          * XXX I don't dare change it just yet, since it will
2639          * XXX interact badly with the server-restart detection 
2640          * XXX code in receiveackpacket.  */
2641             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2642                 MUTEX_ENTER(&rx_stats_mutex);
2643                 rx_stats.spuriousPacketsRead++;
2644                 MUTEX_EXIT(&rx_stats_mutex);
2645                 MUTEX_EXIT(&call->lock);
2646                 MUTEX_ENTER(&conn->conn_data_lock);
2647                 conn->refCount--;
2648                 MUTEX_EXIT(&conn->conn_data_lock);
2649                 return np;
2650             }
2651           }
2652         } /* else not a data packet */
2653     }
2654
2655     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2656     /* Set remote user defined status from packet */
2657     call->remoteStatus = np->header.userStatus;
2658
2659     /* Note the gap between the expected next packet and the actual
2660      * packet that arrived, when the new packet has a smaller serial number
2661      * than expected.  Rioses frequently reorder packets all by themselves,
2662      * so this will be quite important with very large window sizes.
2663      * Skew is checked against 0 here to avoid any dependence on the type of
2664      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2665      * true! 
2666      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2667      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2668      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2669      */
2670     MUTEX_ENTER(&conn->conn_data_lock);
2671     skew = conn->lastSerial - np->header.serial;
2672     conn->lastSerial = np->header.serial;
2673     MUTEX_EXIT(&conn->conn_data_lock);
2674     if (skew > 0) {
2675       register struct rx_peer *peer;
2676       peer = conn->peer;
2677       if (skew > peer->inPacketSkew) {
2678         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2679         peer->inPacketSkew = skew;
2680       }
2681     }
2682
2683     /* Now do packet type-specific processing */
2684     switch (np->header.type) {
2685         case RX_PACKET_TYPE_DATA:
2686             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2687                                        tnop, newcallp);
2688             break;
2689         case RX_PACKET_TYPE_ACK:
2690             /* Respond immediately to ack packets requesting acknowledgement
2691              * (ping packets) */
2692             if (np->header.flags & RX_REQUEST_ACK) {
2693                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2694                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2695             }
2696             np = rxi_ReceiveAckPacket(call, np, 1);
2697             break;
2698         case RX_PACKET_TYPE_ABORT:
2699             /* An abort packet: reset the connection, passing the error up to
2700              * the user */
2701             /* What if error is zero? */
2702             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2703             break;
2704         case RX_PACKET_TYPE_BUSY:
2705             /* XXXX */
2706             break;
2707         case RX_PACKET_TYPE_ACKALL:
2708             /* All packets acknowledged, so we can drop all packets previously
2709              * readied for sending */
2710 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2711             /* XXX Hack. We because we can't release the global rx lock when
2712              * sending packets (osi_NetSend) we drop all ack pkts while we're
2713              * traversing the tq in rxi_Start sending packets out because
2714              * packets may move to the freePacketQueue as result of being
2715              * here! So we drop these packets until we're safely out of the
2716              * traversing. Really ugly! 
2717              * For fine grain RX locking, we set the acked field in the packets
2718              * and let rxi_Start remove the packets from the transmit queue.
2719              */
2720             if (call->flags & RX_CALL_TQ_BUSY) {
2721 #ifdef  RX_ENABLE_LOCKS
2722                 rxi_SetAcksInTransmitQueue(call);
2723                 break;
2724 #else /* RX_ENABLE_LOCKS */
2725                 conn->refCount--;
2726                 return np;              /* xmitting; drop packet */
2727 #endif /* RX_ENABLE_LOCKS */
2728             }
2729 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2730             rxi_ClearTransmitQueue(call, 0);
2731             break;
2732         default:
2733             /* Should not reach here, unless the peer is broken: send an abort
2734              * packet */
2735             rxi_CallError(call, RX_PROTOCOL_ERROR);
2736             np = rxi_SendCallAbort(call, np, 1, 0);
2737             break;
2738     };
2739     /* Note when this last legitimate packet was received, for keep-alive
2740      * processing.  Note, we delay getting the time until now in the hope that
2741      * the packet will be delivered to the user before any get time is required
2742      * (if not, then the time won't actually be re-evaluated here). */
2743     call->lastReceiveTime = clock_Sec();
2744     MUTEX_EXIT(&call->lock);
2745     MUTEX_ENTER(&conn->conn_data_lock);
2746     conn->refCount--;
2747     MUTEX_EXIT(&conn->conn_data_lock);
2748     return np;
2749 }
2750
2751 /* return true if this is an "interesting" connection from the point of view
2752     of someone trying to debug the system */
2753 int rxi_IsConnInteresting(struct rx_connection *aconn)
2754 {
2755     register int i;
2756     register struct rx_call *tcall;
2757
2758     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2759         return 1;
2760     for(i=0;i<RX_MAXCALLS;i++) {
2761         tcall = aconn->call[i];
2762         if (tcall) {
2763             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2764                 return 1;
2765             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2766                 return 1;
2767         }
2768     }
2769     return 0;
2770 }
2771
2772 #ifdef KERNEL
2773 /* if this is one of the last few packets AND it wouldn't be used by the
2774    receiving call to immediately satisfy a read request, then drop it on
2775    the floor, since accepting it might prevent a lock-holding thread from
2776    making progress in its reading. If a call has been cleared while in
2777    the precall state then ignore all subsequent packets until the call
2778    is assigned to a thread. */
2779
2780 static TooLow(ap, acall)
2781   struct rx_call *acall;
2782   struct rx_packet *ap; {
2783     int rc=0;
2784     MUTEX_ENTER(&rx_stats_mutex);
2785     if (((ap->header.seq != 1) &&
2786          (acall->flags & RX_CALL_CLEARED) &&
2787          (acall->state == RX_STATE_PRECALL)) ||
2788         ((rx_nFreePackets < rxi_dataQuota+2) &&
2789          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2790            && (acall->flags & RX_CALL_READER_WAIT)))) {
2791         rc = 1;
2792     }
2793     MUTEX_EXIT(&rx_stats_mutex);
2794     return rc;
2795 }
2796 #endif /* KERNEL */
2797
2798 /* try to attach call, if authentication is complete */
2799 static void TryAttach(acall, socket, tnop, newcallp)
2800 register struct rx_call *acall;
2801 register osi_socket socket;
2802 register int *tnop;
2803 register struct rx_call **newcallp; {
2804     register struct rx_connection *conn;
2805     conn = acall->conn;
2806     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2807         /* Don't attach until we have any req'd. authentication. */
2808         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2809             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2810             /* Note:  this does not necessarily succeed; there
2811                may not any proc available */
2812         }
2813         else {
2814             rxi_ChallengeOn(acall->conn);
2815         }
2816     }
2817 }
2818
2819 /* A data packet has been received off the interface.  This packet is
2820  * appropriate to the call (the call is in the right state, etc.).  This
2821  * routine can return a packet to the caller, for re-use */
2822
2823 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2824                                         port, tnop, newcallp)
2825     register struct rx_call *call;
2826     register struct rx_packet *np;
2827     int istack;
2828     osi_socket socket;
2829     afs_uint32 host;
2830     u_short port;
2831     int *tnop;
2832     struct rx_call **newcallp;
2833 {
2834     int ackNeeded = 0;
2835     int newPackets = 0;
2836     int didHardAck = 0;
2837     int haveLast = 0;
2838     afs_uint32 seq, serial, flags;
2839     int isFirst;
2840     struct rx_packet *tnp;
2841     struct clock when;
2842     MUTEX_ENTER(&rx_stats_mutex);
2843     rx_stats.dataPacketsRead++;
2844     MUTEX_EXIT(&rx_stats_mutex);
2845
2846 #ifdef KERNEL
2847     /* If there are no packet buffers, drop this new packet, unless we can find
2848      * packet buffers from inactive calls */
2849     if (!call->error &&
2850         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2851         MUTEX_ENTER(&rx_freePktQ_lock);
2852         rxi_NeedMorePackets = TRUE;
2853         MUTEX_EXIT(&rx_freePktQ_lock);
2854         MUTEX_ENTER(&rx_stats_mutex);
2855         rx_stats.noPacketBuffersOnRead++;
2856         MUTEX_EXIT(&rx_stats_mutex);
2857         call->rprev = np->header.serial;
2858         rxi_calltrace(RX_TRACE_DROP, call);
2859         dpf (("packet %x dropped on receipt - quota problems", np));
2860         if (rxi_doreclaim)
2861             rxi_ClearReceiveQueue(call);
2862         clock_GetTime(&when);
2863         clock_Add(&when, &rx_softAckDelay);
2864         if (!call->delayedAckEvent ||
2865             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2866             rxevent_Cancel(call->delayedAckEvent, call,
2867                            RX_CALL_REFCOUNT_DELAY);
2868             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2869             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2870                                                  call, 0);
2871         }
2872         /* we've damaged this call already, might as well do it in. */
2873         return np;
2874     }
2875 #endif /* KERNEL */
2876
2877     /*
2878      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2879      * packet is one of several packets transmitted as a single
2880      * datagram. Do not send any soft or hard acks until all packets
2881      * in a jumbogram have been processed. Send negative acks right away.
2882      */
2883     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2884         /* tnp is non-null when there are more packets in the
2885          * current jumbo gram */
2886         if (tnp) {
2887             if (np)
2888                 rxi_FreePacket(np);
2889             np = tnp;
2890         }
2891
2892         seq = np->header.seq;
2893         serial = np->header.serial;
2894         flags = np->header.flags;
2895
2896         /* If the call is in an error state, send an abort message */
2897         if (call->error)
2898             return rxi_SendCallAbort(call, np, istack, 0);
2899
2900         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2901          * AFS 3.5 jumbogram. */
2902         if (flags & RX_JUMBO_PACKET) {
2903             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2904         } else {
2905             tnp = NULL;
2906         }
2907
2908         if (np->header.spare != 0) {
2909             MUTEX_ENTER(&call->conn->conn_data_lock);
2910             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2911             MUTEX_EXIT(&call->conn->conn_data_lock);
2912         }
2913
2914         /* The usual case is that this is the expected next packet */
2915         if (seq == call->rnext) {
2916
2917             /* Check to make sure it is not a duplicate of one already queued */
2918             if (queue_IsNotEmpty(&call->rq) 
2919                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2920                 MUTEX_ENTER(&rx_stats_mutex);
2921                 rx_stats.dupPacketsRead++;
2922                 MUTEX_EXIT(&rx_stats_mutex);
2923                 dpf (("packet %x dropped on receipt - duplicate", np));
2924                 rxevent_Cancel(call->delayedAckEvent, call,
2925                                RX_CALL_REFCOUNT_DELAY);
2926                 np = rxi_SendAck(call, np, seq, serial,
2927                                  flags, RX_ACK_DUPLICATE, istack);
2928                 ackNeeded = 0;
2929                 call->rprev = seq;
2930                 continue;
2931             }
2932
2933             /* It's the next packet. Stick it on the receive queue
2934              * for this call. Set newPackets to make sure we wake
2935              * the reader once all packets have been processed */
2936             queue_Prepend(&call->rq, np);
2937             call->nSoftAcks++;
2938             np = NULL; /* We can't use this anymore */
2939             newPackets = 1;
2940
2941             /* If an ack is requested then set a flag to make sure we
2942              * send an acknowledgement for this packet */
2943             if (flags & RX_REQUEST_ACK) {
2944                 ackNeeded = 1;
2945             }
2946
2947             /* Keep track of whether we have received the last packet */
2948             if (flags & RX_LAST_PACKET) {
2949                 call->flags |= RX_CALL_HAVE_LAST;
2950                 haveLast = 1;
2951             }
2952
2953             /* Check whether we have all of the packets for this call */
2954             if (call->flags & RX_CALL_HAVE_LAST) {
2955                 afs_uint32 tseq;                /* temporary sequence number */
2956                 struct rx_packet *tp;   /* Temporary packet pointer */
2957                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2958
2959                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2960                     if (tseq != tp->header.seq)
2961                         break;
2962                     if (tp->header.flags & RX_LAST_PACKET) {
2963                         call->flags |= RX_CALL_RECEIVE_DONE;
2964                         break;
2965                     }
2966                     tseq++;
2967                 }
2968             }
2969
2970             /* Provide asynchronous notification for those who want it
2971              * (e.g. multi rx) */
2972             if (call->arrivalProc) {
2973                 (*call->arrivalProc)(call, call->arrivalProcHandle,
2974                                      call->arrivalProcArg);
2975                 call->arrivalProc = (VOID (*)()) 0;
2976             }
2977
2978             /* Update last packet received */
2979             call->rprev = seq;
2980
2981             /* If there is no server process serving this call, grab
2982              * one, if available. We only need to do this once. If a
2983              * server thread is available, this thread becomes a server
2984              * thread and the server thread becomes a listener thread. */
2985             if (isFirst) {
2986                 TryAttach(call, socket, tnop, newcallp);
2987             }
2988         }       
2989         /* This is not the expected next packet. */
2990         else {
2991             /* Determine whether this is a new or old packet, and if it's
2992              * a new one, whether it fits into the current receive window.
2993              * Also figure out whether the packet was delivered in sequence.
2994              * We use the prev variable to determine whether the new packet
2995              * is the successor of its immediate predecessor in the
2996              * receive queue, and the missing flag to determine whether
2997              * any of this packets predecessors are missing.  */
2998
2999             afs_uint32 prev;            /* "Previous packet" sequence number */
3000             struct rx_packet *tp;       /* Temporary packet pointer */
3001             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3002             int missing;                /* Are any predecessors missing? */
3003
3004             /* If the new packet's sequence number has been sent to the
3005              * application already, then this is a duplicate */
3006             if (seq < call->rnext) {
3007                 MUTEX_ENTER(&rx_stats_mutex);
3008                 rx_stats.dupPacketsRead++;
3009                 MUTEX_EXIT(&rx_stats_mutex);
3010                 rxevent_Cancel(call->delayedAckEvent, call,
3011                                RX_CALL_REFCOUNT_DELAY);
3012                 np = rxi_SendAck(call, np, seq, serial,
3013                                  flags, RX_ACK_DUPLICATE, istack);
3014                 ackNeeded = 0;
3015                 call->rprev = seq;
3016                 continue;
3017             }
3018
3019             /* If the sequence number is greater than what can be
3020              * accomodated by the current window, then send a negative
3021              * acknowledge and drop the packet */
3022             if ((call->rnext + call->rwind) <= seq) {
3023                 rxevent_Cancel(call->delayedAckEvent, call,
3024                                RX_CALL_REFCOUNT_DELAY);
3025                 np = rxi_SendAck(call, np, seq, serial,
3026                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3027                 ackNeeded = 0;
3028                 call->rprev = seq;
3029                 continue;
3030             }
3031
3032             /* Look for the packet in the queue of old received packets */
3033             for (prev = call->rnext - 1, missing = 0,
3034                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3035                 /*Check for duplicate packet */
3036                 if (seq == tp->header.seq) {
3037                     MUTEX_ENTER(&rx_stats_mutex);
3038                     rx_stats.dupPacketsRead++;
3039                     MUTEX_EXIT(&rx_stats_mutex);
3040                     rxevent_Cancel(call->delayedAckEvent, call,
3041                                    RX_CALL_REFCOUNT_DELAY);
3042                     np = rxi_SendAck(call, np, seq, serial, 
3043                                      flags, RX_ACK_DUPLICATE, istack);
3044                     ackNeeded = 0;
3045                     call->rprev = seq;
3046                     goto nextloop;
3047                 }
3048                 /* If we find a higher sequence packet, break out and
3049                  * insert the new packet here. */
3050                 if (seq < tp->header.seq) break;
3051                 /* Check for missing packet */
3052                 if (tp->header.seq != prev+1) {
3053                     missing = 1;
3054                 }
3055
3056                 prev = tp->header.seq;
3057             }
3058
3059             /* Keep track of whether we have received the last packet. */
3060             if (flags & RX_LAST_PACKET) {
3061                 call->flags |= RX_CALL_HAVE_LAST;
3062             }
3063
3064             /* It's within the window: add it to the the receive queue.
3065              * tp is left by the previous loop either pointing at the
3066              * packet before which to insert the new packet, or at the
3067              * queue head if the queue is empty or the packet should be
3068              * appended. */
3069             queue_InsertBefore(tp, np);
3070             call->nSoftAcks++;
3071             np = NULL;
3072
3073             /* Check whether we have all of the packets for this call */
3074             if ((call->flags & RX_CALL_HAVE_LAST)
3075              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3076                 afs_uint32 tseq;                /* temporary sequence number */
3077
3078                 for (tseq = call->rnext,
3079                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3080                     if (tseq != tp->header.seq)
3081                         break;
3082                     if (tp->header.flags & RX_LAST_PACKET) {
3083                         call->flags |= RX_CALL_RECEIVE_DONE;
3084                         break;
3085                     }
3086                     tseq++;
3087                 }
3088             }
3089
3090             /* We need to send an ack of the packet is out of sequence, 
3091              * or if an ack was requested by the peer. */
3092             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3093                 ackNeeded = 1;
3094             }
3095
3096             /* Acknowledge the last packet for each call */
3097             if (flags & RX_LAST_PACKET) {
3098                 haveLast = 1;
3099             }
3100
3101             call->rprev = seq;
3102         }
3103 nextloop:;
3104     }
3105
3106     if (newPackets) {
3107         /*
3108          * If the receiver is waiting for an iovec, fill the iovec
3109          * using the data from the receive queue */
3110         if (call->flags & RX_CALL_IOVEC_WAIT) {
3111             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3112             /* the call may have been aborted */
3113             if (call->error) {
3114                 return NULL;
3115             }
3116             if (didHardAck) {
3117                 ackNeeded = 0;
3118             }
3119         }
3120
3121         /* Wakeup the reader if any */
3122         if ((call->flags & RX_CALL_READER_WAIT) &&
3123             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3124              (call->iovNext >= call->iovMax) ||
3125              (call->flags & RX_CALL_RECEIVE_DONE))) {
3126             call->flags &= ~RX_CALL_READER_WAIT;
3127 #ifdef  RX_ENABLE_LOCKS
3128             CV_BROADCAST(&call->cv_rq);
3129 #else
3130             osi_rxWakeup(&call->rq);
3131 #endif
3132         }
3133     }
3134
3135     /*
3136      * Send an ack when requested by the peer, or once every
3137      * rxi_SoftAckRate packets until the last packet has been
3138      * received. Always send a soft ack for the last packet in
3139      * the server's reply. */
3140     if (ackNeeded) {
3141         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3142         np = rxi_SendAck(call, np, seq, serial, flags,
3143                          RX_ACK_REQUESTED, istack);
3144     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3145         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3146         np = rxi_SendAck(call, np, seq, serial, flags,
3147                          RX_ACK_DELAY, istack);
3148     } else if (call->nSoftAcks) {
3149         clock_GetTime(&when);
3150         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3151             clock_Add(&when, &rx_lastAckDelay);
3152         } else {
3153             clock_Add(&when, &rx_softAckDelay);
3154         }
3155         if (!call->delayedAckEvent ||
3156             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3157             rxevent_Cancel(call->delayedAckEvent, call,
3158                            RX_CALL_REFCOUNT_DELAY);
3159             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3160             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3161                                                  call, 0);
3162         }
3163     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3164         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3165     }
3166
3167     return np;
3168 }
3169
3170 #ifdef  ADAPT_WINDOW
3171 static void rxi_ComputeRate();
3172 #endif
3173
3174 /* The real smarts of the whole thing.  */
3175 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3176     register struct rx_call *call;
3177     struct rx_packet *np;
3178     int istack;
3179 {
3180     struct rx_ackPacket *ap;
3181     int nAcks;
3182     register struct rx_packet *tp;
3183     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3184     register struct rx_connection *conn = call->conn;
3185     struct rx_peer *peer = conn->peer;
3186     afs_uint32 first;
3187     afs_uint32 serial;
3188     /* because there are CM's that are bogus, sending weird values for this. */
3189     afs_uint32 skew = 0;
3190     int needRxStart = 0;
3191     int nbytes;
3192     int missing;
3193     int acked;
3194     int nNacked = 0;
3195     int newAckCount = 0;
3196     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3197     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3198
3199     MUTEX_ENTER(&rx_stats_mutex);
3200     rx_stats.ackPacketsRead++;
3201     MUTEX_EXIT(&rx_stats_mutex);
3202     ap = (struct rx_ackPacket *) rx_DataOf(np);
3203     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3204     if (nbytes < 0)
3205       return np;       /* truncated ack packet */
3206
3207     /* depends on ack packet struct */
3208     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3209     first = ntohl(ap->firstPacket);
3210     serial = ntohl(ap->serial);
3211     /* temporarily disabled -- needs to degrade over time 
3212        skew = ntohs(ap->maxSkew); */
3213
3214     /* Ignore ack packets received out of order */
3215     if (first < call->tfirst) {
3216         return np;
3217     }
3218
3219     if (np->header.flags & RX_SLOW_START_OK) {
3220         call->flags |= RX_CALL_SLOW_START_OK;
3221     }
3222     
3223 #ifdef RXDEBUG
3224     if (rx_Log) {
3225       fprintf( rx_Log, 
3226               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3227               ap->reason, ntohl(ap->previousPacket), np->header.seq, serial, 
3228               skew, ntohl(ap->firstPacket));
3229         if (nAcks) {
3230             int offset;
3231             for (offset = 0; offset < nAcks; offset++) 
3232                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3233         }
3234         putc('\n', rx_Log);
3235     }
3236 #endif
3237
3238     /* if a server connection has been re-created, it doesn't remember what
3239         serial # it was up to.  An ack will tell us, since the serial field
3240         contains the largest serial received by the other side */
3241     MUTEX_ENTER(&conn->conn_data_lock);
3242     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3243         conn->serial = serial+1;
3244     }
3245     MUTEX_EXIT(&conn->conn_data_lock);
3246
3247     /* Update the outgoing packet skew value to the latest value of
3248      * the peer's incoming packet skew value.  The ack packet, of
3249      * course, could arrive out of order, but that won't affect things
3250      * much */
3251     MUTEX_ENTER(&peer->peer_lock);
3252     peer->outPacketSkew = skew;
3253
3254     /* Check for packets that no longer need to be transmitted, and
3255      * discard them.  This only applies to packets positively
3256      * acknowledged as having been sent to the peer's upper level.
3257      * All other packets must be retained.  So only packets with
3258      * sequence numbers < ap->firstPacket are candidates. */
3259     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3260         if (tp->header.seq >= first) break;
3261         call->tfirst = tp->header.seq + 1;
3262         if (tp->header.serial == serial) {
3263           rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3264 #ifdef ADAPT_WINDOW
3265           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3266 #endif
3267         }
3268         else if ((tp->firstSerial == serial)) {
3269           rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3270 #ifdef ADAPT_WINDOW
3271           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3272 #endif
3273         }
3274 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3275     /* XXX Hack. Because we have to release the global rx lock when sending
3276      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3277      * in rxi_Start sending packets out because packets may move to the
3278      * freePacketQueue as result of being here! So we drop these packets until
3279      * we're safely out of the traversing. Really ugly! 
3280      * To make it even uglier, if we're using fine grain locking, we can
3281      * set the ack bits in the packets and have rxi_Start remove the packets
3282      * when it's done transmitting.
3283      */
3284         if (!tp->acked) {
3285             newAckCount++;
3286         }
3287         if (call->flags & RX_CALL_TQ_BUSY) {
3288 #ifdef RX_ENABLE_LOCKS
3289             tp->acked = 1;
3290             call->flags |= RX_CALL_TQ_SOME_ACKED;
3291 #else /* RX_ENABLE_LOCKS */
3292             break;
3293 #endif /* RX_ENABLE_LOCKS */
3294         } else
3295 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3296         {
3297         queue_Remove(tp);
3298         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3299         }
3300     }
3301
3302 #ifdef ADAPT_WINDOW
3303     /* Give rate detector a chance to respond to ping requests */
3304     if (ap->reason == RX_ACK_PING_RESPONSE) {
3305         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3306     }
3307 #endif
3308
3309     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3310    
3311    /* Now go through explicit acks/nacks and record the results in
3312     * the waiting packets.  These are packets that can't be released
3313     * yet, even with a positive acknowledge.  This positive
3314     * acknowledge only means the packet has been received by the
3315     * peer, not that it will be retained long enough to be sent to
3316     * the peer's upper level.  In addition, reset the transmit timers
3317     * of any missing packets (those packets that must be missing
3318     * because this packet was out of sequence) */
3319
3320     call->nSoftAcked = 0;
3321     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3322         /* Update round trip time if the ack was stimulated on receipt
3323          * of this packet */
3324 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3325 #ifdef RX_ENABLE_LOCKS
3326         if (tp->header.seq >= first) {
3327 #endif /* RX_ENABLE_LOCKS */
3328 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3329         if (tp->header.serial == serial) {
3330           rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3331 #ifdef ADAPT_WINDOW
3332           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3333 #endif
3334         }
3335         else if ((tp->firstSerial == serial)) {
3336           rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3337 #ifdef ADAPT_WINDOW
3338           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3339 #endif
3340         }
3341 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3342 #ifdef RX_ENABLE_LOCKS
3343         }
3344 #endif /* RX_ENABLE_LOCKS */
3345 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3346
3347         /* Set the acknowledge flag per packet based on the
3348          * information in the ack packet. An acknowlegded packet can
3349          * be downgraded when the server has discarded a packet it
3350          * soacked previously, or when an ack packet is received
3351          * out of sequence. */
3352         if (tp->header.seq < first) {
3353             /* Implicit ack information */
3354             if (!tp->acked) {
3355                 newAckCount++;
3356             }
3357             tp->acked = 1;
3358         }
3359         else if (tp->header.seq < first + nAcks) {
3360             /* Explicit ack information:  set it in the packet appropriately */
3361             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3362                 if (!tp->acked) {
3363                     newAckCount++;
3364                     tp->acked = 1;
3365                 }
3366                 if (missing) {
3367                     nNacked++;
3368                 } else {
3369                     call->nSoftAcked++;
3370                 }
3371             } else {
3372                 tp->acked = 0;
3373                 missing = 1;
3374             }
3375         }
3376         else {
3377             tp->acked = 0;
3378             missing = 1;
3379         }
3380
3381         /* If packet isn't yet acked, and it has been transmitted at least 
3382          * once, reset retransmit time using latest timeout 
3383          * ie, this should readjust the retransmit timer for all outstanding 
3384          * packets...  So we don't just retransmit when we should know better*/
3385
3386         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3387           tp->retryTime = tp->timeSent;
3388           clock_Add(&tp->retryTime, &peer->timeout);
3389           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3390           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3391         }
3392     }
3393
3394     /* If the window has been extended by this acknowledge packet,
3395      * then wakeup a sender waiting in alloc for window space, or try
3396      * sending packets now, if he's been sitting on packets due to
3397      * lack of window space */
3398     if (call->tnext < (call->tfirst + call->twind))  {
3399 #ifdef  RX_ENABLE_LOCKS
3400         CV_SIGNAL(&call->cv_twind);
3401 #else
3402         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3403             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3404             osi_rxWakeup(&call->twind);
3405         }
3406 #endif
3407         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3408             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3409         }
3410     }
3411
3412     /* if the ack packet has a receivelen field hanging off it,
3413      * update our state */
3414     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3415       afs_uint32 tSize;
3416
3417       /* If the ack packet has a "recommended" size that is less than 
3418        * what I am using now, reduce my size to match */
3419       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3420                     sizeof(afs_int32), &tSize);
3421       tSize = (afs_uint32) ntohl(tSize);
3422       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3423
3424       /* Get the maximum packet size to send to this peer */
3425       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3426                     &tSize);
3427       tSize = (afs_uint32)ntohl(tSize);
3428       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3429       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3430
3431       /* sanity check - peer might have restarted with different params.
3432        * If peer says "send less", dammit, send less...  Peer should never 
3433        * be unable to accept packets of the size that prior AFS versions would
3434        * send without asking.  */
3435       if (peer->maxMTU != tSize) {
3436           peer->maxMTU = tSize;
3437           peer->MTU = MIN(tSize, peer->MTU);
3438           call->MTU = MIN(call->MTU, tSize);
3439           peer->congestSeq++;
3440       }
3441
3442       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3443           /* AFS 3.4a */
3444           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3445                         sizeof(afs_int32), &tSize);
3446           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3447           if (tSize < call->twind) {       /* smaller than our send */
3448               call->twind = tSize;         /* window, we must send less... */
3449               call->ssthresh = MIN(call->twind, call->ssthresh);
3450           }
3451
3452           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3453            * network MTU confused with the loopback MTU. Calculate the
3454            * maximum MTU here for use in the slow start code below.
3455            */
3456           maxMTU = peer->maxMTU;
3457           /* Did peer restart with older RX version? */
3458           if (peer->maxDgramPackets > 1) {
3459               peer->maxDgramPackets = 1;
3460           }
3461       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3462           /* AFS 3.5 */
3463           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3464                         sizeof(afs_int32), &tSize);
3465           tSize = (afs_uint32) ntohl(tSize);
3466           /*
3467            * As of AFS 3.5 we set the send window to match the receive window. 
3468            */
3469           if (tSize < call->twind) {
3470               call->twind = tSize;
3471               call->ssthresh = MIN(call->twind, call->ssthresh);
3472           } else if (tSize > call->twind) {
3473               call->twind = tSize;
3474           }
3475
3476           /*
3477            * As of AFS 3.5, a jumbogram is more than one fixed size
3478            * packet transmitted in a single UDP datagram. If the remote
3479            * MTU is smaller than our local MTU then never send a datagram
3480            * larger than the natural MTU.
3481            */
3482           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3483                         sizeof(afs_int32), &tSize);
3484           maxDgramPackets = (afs_uint32) ntohl(tSize);
3485           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3486           maxDgramPackets = MIN(maxDgramPackets,
3487                                 (int)(peer->ifDgramPackets));
3488           maxDgramPackets = MIN(maxDgramPackets, tSize);
3489           if (maxDgramPackets > 1) {
3490             peer->maxDgramPackets = maxDgramPackets;
3491             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3492           } else {
3493             peer->maxDgramPackets = 1;
3494             call->MTU = peer->natMTU;
3495           }
3496        } else if (peer->maxDgramPackets > 1) {
3497           /* Restarted with lower version of RX */
3498           peer->maxDgramPackets = 1;
3499        }
3500     } else if (peer->maxDgramPackets > 1 ||
3501                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3502         /* Restarted with lower version of RX */
3503         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3504         peer->natMTU = OLD_MAX_PACKET_SIZE;
3505         peer->MTU = OLD_MAX_PACKET_SIZE;
3506         peer->maxDgramPackets = 1;
3507         peer->nDgramPackets = 1;
3508         peer->congestSeq++;
3509         call->MTU = OLD_MAX_PACKET_SIZE;
3510     }
3511
3512     if (nNacked) {
3513         /*
3514          * Calculate how many datagrams were successfully received after
3515          * the first missing packet and adjust the negative ack counter
3516          * accordingly.
3517          */
3518         call->nAcks = 0;
3519         call->nNacks++;
3520         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3521         if (call->nNacks < nNacked) {
3522             call->nNacks = nNacked;
3523         }
3524     } else {
3525         if (newAckCount) {
3526             call->nAcks++;
3527         }
3528         call->nNacks = 0;
3529     }
3530
3531     if (call->flags & RX_CALL_FAST_RECOVER) {
3532         if (nNacked) {
3533             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3534         } else {
3535             call->flags &= ~RX_CALL_FAST_RECOVER;
3536             call->cwind = call->nextCwind;
3537             call->nextCwind = 0;
3538             call->nAcks = 0;
3539         }
3540         call->nCwindAcks = 0;
3541     }
3542     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3543         /* Three negative acks in a row trigger congestion recovery */
3544 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3545         MUTEX_EXIT(&peer->peer_lock);
3546         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3547                 /* someone else is waiting to start recovery */
3548                 return np;
3549         }
3550         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3551         while (call->flags & RX_CALL_TQ_BUSY) {
3552             call->flags |= RX_CALL_TQ_WAIT;
3553 #ifdef RX_ENABLE_LOCKS
3554             CV_WAIT(&call->cv_tq, &call->lock);
3555 #else /* RX_ENABLE_LOCKS */
3556             osi_rxSleep(&call->tq);
3557 #endif /* RX_ENABLE_LOCKS */
3558         }
3559         MUTEX_ENTER(&peer->peer_lock);
3560 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3561         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3562         call->flags |= RX_CALL_FAST_RECOVER;
3563         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3564         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3565                           rx_maxSendWindow);
3566         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3567         call->nextCwind = call->ssthresh;
3568         call->nAcks = 0;
3569         call->nNacks = 0;
3570         peer->MTU = call->MTU;
3571         peer->cwind = call->nextCwind;
3572         peer->nDgramPackets = call->nDgramPackets;
3573         peer->congestSeq++;
3574         call->congestSeq = peer->congestSeq;
3575         /* Reset the resend times on the packets that were nacked
3576          * so we will retransmit as soon as the window permits*/
3577         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3578             if (acked) {
3579                 if (!tp->acked) {
3580                     clock_Zero(&tp->retryTime);
3581                 }
3582             } else if (tp->acked) {
3583                 acked = 1;
3584             }
3585         }
3586     } else {
3587         /* If cwind is smaller than ssthresh, then increase
3588          * the window one packet for each ack we receive (exponential
3589          * growth).
3590          * If cwind is greater than or equal to ssthresh then increase
3591          * the congestion window by one packet for each cwind acks we
3592          * receive (linear growth).  */
3593         if (call->cwind < call->ssthresh) {
3594             call->cwind = MIN((int)call->ssthresh,
3595                               (int)(call->cwind + newAckCount));
3596             call->nCwindAcks = 0;
3597         } else {
3598             call->nCwindAcks += newAckCount;
3599             if (call->nCwindAcks >= call->cwind) {
3600                 call->nCwindAcks = 0;
3601                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3602             }
3603         }
3604         /*
3605          * If we have received several acknowledgements in a row then
3606          * it is time to increase the size of our datagrams
3607          */
3608         if ((int)call->nAcks > rx_nDgramThreshold) {
3609             if (peer->maxDgramPackets > 1) {
3610                 if (call->nDgramPackets < peer->maxDgramPackets) {
3611                     call->nDgramPackets++;
3612                 }
3613                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3614             } else if (call->MTU < peer->maxMTU) {
3615                 call->MTU += peer->natMTU;
3616                 call->MTU = MIN(call->MTU, peer->maxMTU);
3617             }
3618             call->nAcks = 0;
3619         }
3620     }
3621
3622     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3623
3624     /* Servers need to hold the call until all response packets have
3625      * been acknowledged. Soft acks are good enough since clients
3626      * are not allowed to clear their receive queues. */
3627     if (call->state == RX_STATE_HOLD &&
3628         call->tfirst + call->nSoftAcked >= call->tnext) {
3629         call->state = RX_STATE_DALLY;
3630         rxi_ClearTransmitQueue(call, 0);
3631     } else if (!queue_IsEmpty(&call->tq)) {
3632         rxi_Start(0, call, istack);
3633     }
3634     return np;
3635 }
3636
3637 /* Received a response to a challenge packet */
3638 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3639     register struct rx_connection *conn;
3640     register struct rx_packet *np;
3641     int istack;
3642 {
3643     int error;
3644
3645     /* Ignore the packet if we're the client */
3646     if (conn->type == RX_CLIENT_CONNECTION) return np;
3647
3648     /* If already authenticated, ignore the packet (it's probably a retry) */
3649     if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3650         return np;
3651
3652     /* Otherwise, have the security object evaluate the response packet */
3653     error = RXS_CheckResponse(conn->securityObject, conn, np);
3654     if (error) {
3655         /* If the response is invalid, reset the connection, sending
3656          * an abort to the peer */
3657 #ifndef KERNEL
3658         rxi_Delay(1);
3659 #endif
3660         rxi_ConnectionError(conn, error);
3661         MUTEX_ENTER(&conn->conn_data_lock);
3662         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3663         MUTEX_EXIT(&conn->conn_data_lock);
3664         return np;
3665     }
3666     else {
3667         /* If the response is valid, any calls waiting to attach
3668          * servers can now do so */
3669         int i;
3670         for (i=0; i<RX_MAXCALLS; i++) {
3671             struct rx_call *call = conn->call[i];
3672             if (call) {
3673                 MUTEX_ENTER(&call->lock);
3674                  if (call->state == RX_STATE_PRECALL)
3675                      rxi_AttachServerProc(call, -1, NULL, NULL);
3676                 MUTEX_EXIT(&call->lock);
3677             }
3678         }
3679     }
3680     return np;
3681 }
3682
3683 /* A client has received an authentication challenge: the security
3684  * object is asked to cough up a respectable response packet to send
3685  * back to the server.  The server is responsible for retrying the
3686  * challenge if it fails to get a response. */
3687
3688 struct rx_packet *
3689 rxi_ReceiveChallengePacket(conn, np, istack)
3690     register struct rx_connection *conn;
3691     register struct rx_packet *np;
3692     int istack;
3693 {
3694     int error;
3695
3696     /* Ignore the challenge if we're the server */
3697     if (conn->type == RX_SERVER_CONNECTION) return np;
3698
3699     /* Ignore the challenge if the connection is otherwise idle; someone's
3700      * trying to use us as an oracle. */
3701     if (!rxi_HasActiveCalls(conn)) return np;
3702
3703     /* Send the security object the challenge packet.  It is expected to fill
3704      * in the response. */
3705     error = RXS_GetResponse(conn->securityObject, conn, np);
3706
3707     /* If the security object is unable to return a valid response, reset the
3708      * connection and send an abort to the peer.  Otherwise send the response
3709      * packet to the peer connection. */
3710     if (error) {
3711         rxi_ConnectionError(conn, error);
3712         MUTEX_ENTER(&conn->conn_data_lock);
3713         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3714         MUTEX_EXIT(&conn->conn_data_lock);
3715     }
3716     else {
3717         np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3718                              RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3719     }
3720     return np;
3721 }
3722
3723
3724 /* Find an available server process to service the current request in
3725  * the given call structure.  If one isn't available, queue up this
3726  * call so it eventually gets one */
3727 void 
3728 rxi_AttachServerProc(call, socket, tnop, newcallp)
3729 register struct rx_call *call;
3730 register osi_socket socket;
3731 register int *tnop;
3732 register struct rx_call **newcallp;
3733 {
3734     register struct rx_serverQueueEntry *sq;
3735     register struct rx_service *service = call->conn->service;
3736 #ifdef RX_ENABLE_LOCKS
3737     register int haveQuota = 0;
3738 #endif /* RX_ENABLE_LOCKS */
3739     /* May already be attached */
3740     if (call->state == RX_STATE_ACTIVE) return;
3741
3742     MUTEX_ENTER(&rx_serverPool_lock);
3743 #ifdef RX_ENABLE_LOCKS
3744     while(rxi_ServerThreadSelectingCall) {
3745         MUTEX_EXIT(&call->lock);
3746         CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3747         MUTEX_EXIT(&rx_serverPool_lock);
3748         MUTEX_ENTER(&call->lock);
3749         MUTEX_ENTER(&rx_serverPool_lock);
3750         /* Call may have been attached */
3751         if (call->state == RX_STATE_ACTIVE) return;
3752     }
3753
3754     haveQuota = QuotaOK(service);
3755     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3756         /* If there are no processes available to service this call,
3757          * put the call on the incoming call queue (unless it's
3758          * already on the queue).
3759          */
3760         if (haveQuota)
3761             ReturnToServerPool(service);
3762         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3763             call->flags |= RX_CALL_WAIT_PROC;
3764             MUTEX_ENTER(&rx_stats_mutex);
3765             rx_nWaiting++;
3766             MUTEX_EXIT(&rx_stats_mutex);
3767             rxi_calltrace(RX_CALL_ARRIVAL, call);
3768             SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3769             queue_Append(&rx_incomingCallQueue, call);
3770         }
3771     }
3772 #else /* RX_ENABLE_LOCKS */
3773     if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3774         /* If there are no processes available to service this call,
3775          * put the call on the incoming call queue (unless it's
3776          * already on the queue).
3777          */
3778         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3779             call->flags |= RX_CALL_WAIT_PROC;
3780             rx_nWaiting++;
3781             rxi_calltrace(RX_CALL_ARRIVAL, call);
3782             queue_Append(&rx_incomingCallQueue, call);
3783         }
3784     }
3785 #endif /* RX_ENABLE_LOCKS */
3786     else {
3787         sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3788
3789         /* If hot threads are enabled, and both newcallp and sq->socketp
3790          * are non-null, then this thread will process the call, and the
3791          * idle server thread will start listening on this threads socket.
3792          */
3793         queue_Remove(sq);
3794         if (rx_enable_hot_thread && newcallp && sq->socketp) {
3795             *newcallp = call;
3796             *tnop = sq->tno;
3797             *sq->socketp = socket;
3798             clock_GetTime(&call->startTime);
3799             CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3800         } else {
3801             sq->newcall = call;
3802         }
3803         if (call->flags & RX_CALL_WAIT_PROC) {
3804             /* Conservative:  I don't think this should happen */
3805             call->flags &= ~RX_CALL_WAIT_PROC;
3806             MUTEX_ENTER(&rx_stats_mutex);
3807             rx_nWaiting--;
3808             MUTEX_EXIT(&rx_stats_mutex);
3809             queue_Remove(call);
3810         }
3811         call->state = RX_STATE_ACTIVE;
3812         call->mode = RX_MODE_RECEIVING;
3813         if (call->flags & RX_CALL_CLEARED) {
3814             /* send an ack now to start the packet flow up again */
3815             call->flags &= ~RX_CALL_CLEARED;
3816             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3817         }
3818 #ifdef  RX_ENABLE_LOCKS
3819         CV_SIGNAL(&sq->cv);
3820 #else
3821         service->nRequestsRunning++;
3822         if (service->nRequestsRunning <= service->minProcs)
3823           rxi_minDeficit--;
3824         rxi_availProcs--;
3825         osi_rxWakeup(sq);
3826 #endif
3827     }
3828     MUTEX_EXIT(&rx_serverPool_lock);
3829 }
3830
3831 /* Delay the sending of an acknowledge event for a short while, while
3832  * a new call is being prepared (in the case of a client) or a reply
3833  * is being prepared (in the case of a server).  Rather than sending
3834  * an ack packet, an ACKALL packet is sent. */
3835 void rxi_AckAll(event, call, dummy)
3836 struct rxevent *event;
3837 register struct rx_call *call;
3838 char *dummy;
3839 {
3840 #ifdef RX_ENABLE_LOCKS
3841     if (event) {
3842         MUTEX_ENTER(&call->lock);
3843         call->delayedAckEvent = (struct rxevent *) 0;
3844         CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3845     }
3846     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3847                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3848     if (event)
3849         MUTEX_EXIT(&call->lock);
3850 #else /* RX_ENABLE_LOCKS */
3851     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3852     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3853                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3854 #endif /* RX_ENABLE_LOCKS */
3855 }
3856
3857 void rxi_SendDelayedAck(event, call, dummy)     
3858 struct rxevent *event;
3859 register struct rx_call *call;
3860 char *dummy;
3861 {
3862 #ifdef RX_ENABLE_LOCKS
3863     if (event) {
3864         MUTEX_ENTER(&call->lock);
3865         if (event == call->delayedAckEvent)
3866             call->delayedAckEvent = (struct rxevent *) 0;
3867         CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3868     }
3869     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3870     if (event)
3871         MUTEX_EXIT(&call->lock);
3872 #else /* RX_ENABLE_LOCKS */
3873     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3874     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3875 #endif /* RX_ENABLE_LOCKS */
3876 }
3877
3878
3879 #ifdef RX_ENABLE_LOCKS
3880 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3881  * clearing them out.
3882  */
3883 static void rxi_SetAcksInTransmitQueue(call)
3884       register struct rx_call *call;
3885 {
3886     register struct rx_packet *p, *tp;
3887     int someAcked = 0;
3888
3889      for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3890          if (!p) 
3891              break;
3892          p->acked = 1;
3893          someAcked = 1;
3894      }
3895      if (someAcked) {
3896          call->flags |= RX_CALL_TQ_CLEARME;
3897          call->flags |= RX_CALL_TQ_SOME_ACKED;
3898      }
3899
3900      rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3901      rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3902      call->tfirst = call->tnext;
3903      call->nSoftAcked = 0;
3904
3905      if (call->flags & RX_CALL_FAST_RECOVER) {
3906         call->flags &= ~RX_CALL_FAST_RECOVER;
3907         call->cwind = call->nextCwind;
3908         call->nextCwind = 0;
3909      }
3910
3911      CV_SIGNAL(&call->cv_twind);
3912 }
3913 #endif /* RX_ENABLE_LOCKS */
3914
3915 /* Clear out the transmit queue for the current call (all packets have
3916  * been received by peer) */
3917 void rxi_ClearTransmitQueue(call, force)
3918     register struct rx_call *call;
3919     register int force;
3920 {
3921     register struct rx_packet *p, *tp;
3922
3923 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3924     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3925         int someAcked = 0;
3926         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3927           if (!p) 
3928              break;
3929           p->acked = 1;
3930           someAcked = 1;
3931         }
3932         if (someAcked) {
3933             call->flags |= RX_CALL_TQ_CLEARME;
3934             call->flags |= RX_CALL_TQ_SOME_ACKED;
3935         }
3936     } else {
3937 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3938         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3939             if (!p) 
3940                 break;
3941             queue_Remove(p);
3942             rxi_FreePacket(p);
3943         }
3944 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3945         call->flags &= ~RX_CALL_TQ_CLEARME;
3946     }
3947 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3948
3949     rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3950     rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3951     call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3952     call->nSoftAcked = 0;
3953
3954     if (call->flags & RX_CALL_FAST_RECOVER) {
3955         call->flags &= ~RX_CALL_FAST_RECOVER;
3956         call->cwind = call->nextCwind;
3957     }
3958
3959 #ifdef  RX_ENABLE_LOCKS
3960     CV_SIGNAL(&call->cv_twind);
3961 #else
3962     osi_rxWakeup(&call->twind);
3963 #endif
3964 }
3965
3966 void rxi_ClearReceiveQueue(call)
3967     register struct rx_call *call;
3968 {
3969     register struct rx_packet *p, *tp;
3970     if (queue_IsNotEmpty(&call->rq)) {
3971       for (queue_Scan(&call->rq, p, tp, rx_packet)) {
3972         if (!p)
3973           break;
3974         queue_Remove(p);
3975         rxi_FreePacket(p);
3976         rx_packetReclaims++;
3977       }
3978       call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
3979     }
3980     if (call->state == RX_STATE_PRECALL) {
3981         call->flags |= RX_CALL_CLEARED;
3982     }
3983 }
3984
3985 /* Send an abort packet for the specified call */
3986 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
3987     register struct rx_call *call;
3988     struct rx_packet *packet;
3989     int istack;
3990     int force;
3991 {
3992   afs_int32 error;
3993   struct clock when;
3994
3995   if (!call->error)
3996     return packet;
3997
3998   /* Clients should never delay abort messages */
3999   if (rx_IsClientConn(call->conn))
4000     force = 1;
4001
4002   if (call->abortCode != call->error) {
4003     call->abortCode = call->error;
4004     call->abortCount = 0;
4005   }
4006
4007   if (force || rxi_callAbortThreshhold == 0 ||
4008       call->abortCount < rxi_callAbortThreshhold) {
4009     if (call->delayedAbortEvent) {
4010         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4011     }
4012     error = htonl(call->error);
4013     call->abortCount++;
4014     packet = rxi_SendSpecial(call, call->conn, packet,
4015                              RX_PACKET_TYPE_ABORT, (char *)&error,
4016                              sizeof(error), istack);
4017   } else if (!call->delayedAbortEvent) {
4018     clock_GetTime(&when);
4019     clock_Addmsec(&when, rxi_callAbortDelay);
4020     CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4021     call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4022                                            call, 0);
4023   }
4024   return packet;
4025 }
4026
4027 /* Send an abort packet for the specified connection.  Packet is an
4028  * optional pointer to a packet that can be used to send the abort.
4029  * Once the number of abort messages reaches the threshhold, an
4030  * event is scheduled to send the abort. Setting the force flag
4031  * overrides sending delayed abort messages.
4032  *
4033  * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4034  *       to send the abort packet.
4035  */
4036 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4037     register struct rx_connection *conn;
4038     struct rx_packet *packet;
4039     int istack;
4040     int force;
4041 {
4042   afs_int32 error;
4043   struct clock when;
4044
4045   if (!conn->error)
4046     return packet;
4047
4048   /* Clients should never delay abort messages */
4049   if (rx_IsClientConn(conn))
4050     force = 1;
4051
4052   if (force || rxi_connAbortThreshhold == 0 ||
4053       conn->abortCount < rxi_connAbortThreshhold) {
4054     if (conn->delayedAbortEvent) {
4055         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4056     }
4057     error = htonl(conn->error);
4058     conn->abortCount++;
4059     MUTEX_EXIT(&conn->conn_data_lock);
4060     packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4061                              RX_PACKET_TYPE_ABORT, (char *)&error,
4062                              sizeof(error), istack);
4063     MUTEX_ENTER(&conn->conn_data_lock);
4064   } else if (!conn->delayedAbortEvent) {
4065     clock_GetTime(&when);
4066     clock_Addmsec(&when, rxi_connAbortDelay);
4067     conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4068                                            conn, 0);
4069   }
4070   return packet;
4071 }
4072
4073 /* Associate an error all of the calls owned by a connection.  Called
4074  * with error non-zero.  This is only for really fatal things, like
4075  * bad authentication responses.  The connection itself is set in
4076  * error at this point, so that future packets received will be
4077  * rejected. */
4078 void rxi_ConnectionError(conn, error)
4079     register struct rx_connection *conn;
4080     register afs_int32 error;
4081 {
4082     if (error) {
4083         register int i;
4084         if (conn->challengeEvent)
4085             rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4086         for (i=0; i<RX_MAXCALLS; i++) {
4087             struct rx_call *call = conn->call[i];
4088             if (call) {
4089                 MUTEX_ENTER(&call->lock);
4090                 rxi_CallError(call, error);
4091                 MUTEX_EXIT(&call->lock);
4092             }
4093         }
4094         conn->error = error;
4095         MUTEX_ENTER(&rx_stats_mutex);
4096         rx_stats.fatalErrors++;
4097         MUTEX_EXIT(&rx_stats_mutex);
4098     }
4099 }
4100
4101 void rxi_CallError(call, error)
4102     register struct rx_call *call;
4103     afs_int32 error;
4104 {
4105     if (call->error) error = call->error;
4106 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4107     if (!(call->flags & RX_CALL_TQ_BUSY)) {
4108         rxi_ResetCall(call, 0);
4109     }
4110 #else
4111         rxi_ResetCall(call, 0);
4112 #endif
4113     call->error = error;
4114     call->mode = RX_MODE_ERROR;
4115 }
4116
4117 /* Reset various fields in a call structure, and wakeup waiting
4118  * processes.  Some fields aren't changed: state & mode are not
4119  * touched (these must be set by the caller), and bufptr, nLeft, and
4120  * nFree are not reset, since these fields are manipulated by
4121  * unprotected macros, and may only be reset by non-interrupting code.
4122  */
4123 #ifdef ADAPT_WINDOW
4124 /* this code requires that call->conn be set properly as a pre-condition. */
4125 #endif /* ADAPT_WINDOW */
4126
4127 void rxi_ResetCall(call, newcall)
4128     register struct rx_call *call;
4129     register int newcall;
4130 {
4131     register int flags;
4132     register struct rx_peer *peer;
4133     struct rx_packet *packet;
4134
4135     /* Notify anyone who is waiting for asynchronous packet arrival */
4136     if (call->arrivalProc) {
4137         (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4138         call->arrivalProc = (VOID (*)()) 0;
4139     }
4140
4141     if (call->delayedAbortEvent) {
4142         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4143         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4144         if (packet) {
4145             rxi_SendCallAbort(call, packet, 0, 1);
4146             rxi_FreePacket(packet);
4147         }
4148     }
4149
4150     /*
4151      * Update the peer with the congestion information in this call
4152      * so other calls on this connection can pick up where this call
4153      * left off. If the congestion sequence numbers don't match then
4154      * another call experienced a retransmission.
4155      */
4156     peer = call->conn->peer;
4157     MUTEX_ENTER(&peer->peer_lock);
4158     if (!newcall) {
4159         if (call->congestSeq == peer->congestSeq) {
4160             peer->cwind = MAX(peer->cwind, call->cwind);
4161             peer->MTU = MAX(peer->MTU, call->MTU);
4162             peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4163         }
4164     } else {
4165         call->abortCode = 0;
4166         call->abortCount = 0;
4167     }
4168     if (peer->maxDgramPackets > 1) {
4169         call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4170     } else {
4171         call->MTU = peer->MTU;
4172     }
4173     call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4174     call->ssthresh = rx_maxSendWindow;
4175     call->nDgramPackets = peer->nDgramPackets;
4176     call->congestSeq = peer->congestSeq;
4177     MUTEX_EXIT(&peer->peer_lock);
4178
4179     flags = call->flags;
4180     rxi_ClearReceiveQueue(call);
4181 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4182     if (call->flags & RX_CALL_TQ_BUSY) {
4183         call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4184         call->flags |= (flags & RX_CALL_TQ_WAIT);
4185     } else
4186 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4187     {
4188         rxi_ClearTransmitQueue(call, 0);
4189         queue_Init(&call->tq);
4190         call->flags = 0;
4191     }
4192     queue_Init(&call->rq);
4193     call->error = 0;
4194     call->rwind = rx_initReceiveWindow; 
4195     call->twind = rx_initSendWindow; 
4196     call->nSoftAcked = 0;
4197     call->nextCwind = 0;
4198     call->nAcks = 0;
4199     call->nNacks = 0;
4200     call->nCwindAcks = 0;
4201     call->nSoftAcks = 0;
4202     call->nHardAcks = 0;
4203
4204     call->tfirst = call->rnext = call->tnext = 1;
4205     call->rprev = 0;
4206     call->lastAcked = 0;
4207     call->localStatus = call->remoteStatus = 0;
4208
4209     if (flags & RX_CALL_READER_WAIT)  {
4210 #ifdef  RX_ENABLE_LOCKS
4211         CV_BROADCAST(&call->cv_rq);
4212 #else
4213         osi_rxWakeup(&call->rq);
4214 #endif
4215     }
4216     if (flags & RX_CALL_WAIT_PACKETS) {
4217         MUTEX_ENTER(&rx_freePktQ_lock);
4218         rxi_PacketsUnWait();            /* XXX */
4219         MUTEX_EXIT(&rx_freePktQ_lock);
4220     }
4221
4222 #ifdef  RX_ENABLE_LOCKS
4223     CV_SIGNAL(&call->cv_twind);
4224 #else
4225     if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4226         osi_rxWakeup(&call->twind);
4227 #endif
4228
4229 #ifdef RX_ENABLE_LOCKS
4230     /* The following ensures that we don't mess with any queue while some
4231      * other thread might also be doing so. The call_queue_lock field is
4232      * is only modified under the call lock. If the call is in the process
4233      * of being removed from a queue, the call is not locked until the
4234      * the queue lock is dropped and only then is the call_queue_lock field
4235      * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4236      * Note that any other routine which removes a call from a queue has to
4237      * obtain the queue lock before examing the queue and removing the call.
4238      */
4239     if (call->call_queue_lock) {
4240         MUTEX_ENTER(call->call_queue_lock);
4241         if (queue_IsOnQueue(call)) {
4242             queue_Remove(call);
4243             if (flags & RX_CALL_WAIT_PROC) {
4244                 MUTEX_ENTER(&rx_stats_mutex);
4245                 rx_nWaiting--;
4246                 MUTEX_EXIT(&rx_stats_mutex);
4247             }
4248         }
4249         MUTEX_EXIT(call->call_queue_lock);
4250         CLEAR_CALL_QUEUE_LOCK(call);
4251     }
4252 #else /* RX_ENABLE_LOCKS */
4253     if (queue_IsOnQueue(call)) {
4254       queue_Remove(call);
4255       if (flags & RX_CALL_WAIT_PROC)
4256         rx_nWaiting--;
4257     }
4258 #endif /* RX_ENABLE_LOCKS */
4259
4260     rxi_KeepAliveOff(call);
4261     rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4262 }
4263
4264 /* Send an acknowledge for the indicated packet (seq,serial) of the
4265  * indicated call, for the indicated reason (reason).  This
4266  * acknowledge will specifically acknowledge receiving the packet, and
4267  * will also specify which other packets for this call have been
4268  * received.  This routine returns the packet that was used to the
4269  * caller.  The caller is responsible for freeing it or re-using it.
4270  * This acknowledgement also returns the highest sequence number
4271  * actually read out by the higher level to the sender; the sender
4272  * promises to keep around packets that have not been read by the
4273  * higher level yet (unless, of course, the sender decides to abort
4274  * the call altogether).  Any of p, seq, serial, pflags, or reason may
4275  * be set to zero without ill effect.  That is, if they are zero, they
4276  * will not convey any information.  
4277  * NOW there is a trailer field, after the ack where it will safely be
4278  * ignored by mundanes, which indicates the maximum size packet this 
4279  * host can swallow.  */  
4280 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4281     register struct rx_call *call;
4282     register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4283     int seq;                    /* Sequence number of the packet we are acking */
4284     int serial;                 /* Serial number of the packet */
4285     int pflags;                 /* Flags field from packet header */
4286     int reason;                 /* Reason an acknowledge was prompted */
4287     int istack;
4288 {
4289     struct rx_ackPacket *ap;
4290     register struct rx_packet *rqp;
4291     register struct rx_packet *nxp;  /* For queue_Scan */
4292     register struct rx_packet *p;
4293     u_char offset;
4294     afs_int32 templ;
4295
4296     /*
4297      * Open the receive window once a thread starts reading packets
4298      */
4299     if (call->rnext > 1) {
4300         call->rwind = rx_maxReceiveWindow;
4301     }
4302
4303     call->nHardAcks = 0;
4304     call->nSoftAcks = 0;
4305     if (call->rnext > call->lastAcked)
4306       call->lastAcked = call->rnext;
4307     p = optionalPacket;
4308
4309     if (p) {
4310       rx_computelen(p, p->length);  /* reset length, you never know */
4311     }                               /* where that's been...         */
4312     else
4313       if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4314           /* We won't send the ack, but don't panic. */
4315           return optionalPacket;
4316       }
4317
4318     templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4319     if (templ > 0) {
4320       if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4321           if (!optionalPacket) rxi_FreePacket(p);
4322           return optionalPacket;
4323       }
4324       templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32); 
4325       if (rx_Contiguous(p)<templ) {
4326           if (!optionalPacket) rxi_FreePacket(p);
4327           return optionalPacket;
4328       }
4329     }    /* MTUXXX failing to send an ack is very serious.  We should */
4330          /* try as hard as possible to send even a partial ack; it's */
4331          /* better than nothing. */
4332
4333     ap = (struct rx_ackPacket *) rx_DataOf(p);
4334     ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4335     ap->reason = reason;
4336
4337     /* The skew computation used to be bogus, I think it's better now. */
4338     /* We should start paying attention to skew.    XXX  */
4339     ap->serial = htonl(call->conn->maxSerial);
4340     ap->maxSkew = 0;    /* used to be peer->inPacketSkew */
4341
4342     ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4343     ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4344
4345     /* No fear of running out of ack packet here because there can only be at most
4346      * one window full of unacknowledged packets.  The window size must be constrained 
4347      * to be less than the maximum ack size, of course.  Also, an ack should always
4348      * fit into a single packet -- it should not ever be fragmented.  */
4349     for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4350         if (!rqp || !call->rq.next 
4351             || (rqp->header.seq > (call->rnext + call->rwind))) {
4352           if (!optionalPacket) rxi_FreePacket(p);
4353           rxi_CallError(call, RX_CALL_DEAD);
4354           return optionalPacket;   
4355         }
4356
4357         while (rqp->header.seq > call->rnext + offset) 
4358           ap->acks[offset++] = RX_ACK_TYPE_NACK;