rx-thread-id-startup-20030303
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #include <afsconfig.h>
13 #ifdef  KERNEL
14 #include "afs/param.h"
15 #else
16 #include <afs/param.h>
17 #endif
18
19 RCSID("$Header$");
20
21 #ifdef KERNEL
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
24 #ifndef UKERNEL
25 #include "h/types.h"
26 #include "h/time.h"
27 #include "h/stat.h"
28 #ifdef  AFS_OSF_ENV
29 #include <net/net_globals.h>
30 #endif  /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "h/socket.h"
33 #endif
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "h/systm.h"
39 #endif
40 #ifdef RXDEBUG
41 #undef RXDEBUG      /* turn off debugging */
42 #endif /* RXDEBUG */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
45 #endif
46 #include "afsint.h"
47 #ifdef  AFS_ALPHA_ENV
48 #undef kmem_alloc
49 #undef kmem_free
50 #undef mem_alloc
51 #undef mem_free
52 #undef register
53 #endif  /* AFS_ALPHA_ENV */
54 #else /* !UKERNEL */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
57 #endif /* !UKERNEL */
58 #include "afs/lock.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
61 #include "rx_clock.h"
62 #include "rx_queue.h"
63 #include "rx.h"
64 #include "rx_globals.h"
65 #include "rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
68 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
69 #include "afsint.h"
70 extern afs_int32 afs_termState;
71 #ifdef AFS_AIX41_ENV
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
76 #else /* KERNEL */
77 # include <sys/types.h>
78 # include <errno.h>
79 #ifdef AFS_NT40_ENV
80 # include <stdlib.h>
81 # include <fcntl.h>
82 # include <afsutil.h>
83 #else
84 # include <sys/socket.h>
85 # include <sys/file.h>
86 # include <netdb.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
90 #endif
91 #ifdef HAVE_STRING_H
92 #include <string.h>
93 #else
94 #ifdef HAVE_STRINGS_H
95 #include <strings.h>
96 #endif
97 #endif
98 # include "rx.h"
99 # include "rx_user.h"
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
105 #endif /* KERNEL */
106
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
109
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
114 #endif
115
116 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
117 struct rx_tq_debug {
118     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119     afs_int32 rxi_start_in_error;
120 } rx_tq_debug;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
122
123 /*
124  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125  * currently allocated within rx.  This number is used to allocate the
126  * memory required to return the statistics when queried.
127  */
128
129 static unsigned int rxi_rpc_peer_stat_cnt;
130
131 /*
132  * rxi_rpc_process_stat_cnt counts the total number of local process stat
133  * structures currently allocated within rx.  The number is used to allocate
134  * the memory required to return the statistics when queried.
135  */
136
137 static unsigned int rxi_rpc_process_stat_cnt;
138
139 #if !defined(offsetof)
140 #include <stddef.h>     /* for definition of offsetof() */
141 #endif
142
143 #ifdef AFS_PTHREAD_ENV
144 #include <assert.h>
145
146 /*
147  * Use procedural initialization of mutexes/condition variables
148  * to ease NT porting
149  */
150
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
164
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
167
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
171
172 static void rxi_InitPthread(void) {
173     assert(pthread_mutex_init(&rx_clock_mutex,
174                               (const pthread_mutexattr_t*)0)==0);
175     assert(pthread_mutex_init(&rxi_connCacheMutex,
176                               (const pthread_mutexattr_t*)0)==0);
177     assert(pthread_mutex_init(&rx_init_mutex,
178                               (const pthread_mutexattr_t*)0)==0);
179     assert(pthread_mutex_init(&epoch_mutex,
180                               (const pthread_mutexattr_t*)0)==0);
181     assert(pthread_mutex_init(&rx_event_mutex,
182                               (const pthread_mutexattr_t*)0)==0);
183     assert(pthread_mutex_init(&des_init_mutex,
184                               (const pthread_mutexattr_t*)0)==0);
185     assert(pthread_mutex_init(&des_random_mutex,
186                               (const pthread_mutexattr_t*)0)==0);
187     assert(pthread_mutex_init(&osi_malloc_mutex,
188                               (const pthread_mutexattr_t*)0)==0);
189     assert(pthread_mutex_init(&event_handler_mutex,
190                               (const pthread_mutexattr_t*)0)==0);
191     assert(pthread_mutex_init(&listener_mutex,
192                               (const pthread_mutexattr_t*)0)==0);
193     assert(pthread_mutex_init(&rx_if_init_mutex,
194                               (const pthread_mutexattr_t*)0)==0);
195     assert(pthread_mutex_init(&rx_if_mutex,
196                               (const pthread_mutexattr_t*)0)==0);
197     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198                               (const pthread_mutexattr_t*)0)==0);
199     assert(pthread_mutex_init(&rxkad_random_mutex,
200                               (const pthread_mutexattr_t*)0)==0);
201     assert(pthread_mutex_init(&rxkad_stats_mutex,
202                               (const pthread_mutexattr_t*)0)==0);
203     assert(pthread_mutex_init(&rx_debug_mutex,
204                               (const pthread_mutexattr_t*)0)==0);
205
206     assert(pthread_cond_init(&rx_event_handler_cond,
207                               (const pthread_condattr_t*)0)==0);
208     assert(pthread_cond_init(&rx_listener_cond,
209                               (const pthread_condattr_t*)0)==0);
210     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
211 }
212
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
216 /*
217  * The rx_stats_mutex mutex protects the following global variables:
218  * rxi_dataQuota
219  * rxi_minDeficit
220  * rxi_availProcs
221  * rxi_totalMin
222  * rxi_lowConnRefCount
223  * rxi_lowPeerRefCount
224  * rxi_nCalls
225  * rxi_Alloccnt
226  * rxi_Allocsize
227  * rx_nFreePackets
228  * rx_tq_debug
229  * rx_stats
230  */
231 #else
232 #define INIT_PTHREAD_LOCKS
233 #endif
234
235
236 /* Variables for handling the minProcs implementation.  availProcs gives the
237  * number of threads available in the pool at this moment (not counting dudes
238  * executing right now).  totalMin gives the total number of procs required
239  * for handling all minProcs requests.  minDeficit is a dynamic variable
240  * tracking the # of procs required to satisfy all of the remaining minProcs
241  * demands.
242  * For fine grain locking to work, the quota check and the reservation of
243  * a server thread has to come while rxi_availProcs and rxi_minDeficit
244  * are locked. To this end, the code has been modified under #ifdef
245  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246  * same time. A new function, ReturnToServerPool() returns the allocation.
247  * 
248  * A call can be on several queue's (but only one at a time). When
249  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250  * that no one else is touching the queue. To this end, we store the address
251  * of the queue lock in the call structure (under the call lock) when we
252  * put the call on a queue, and we clear the call_queue_lock when the
253  * call is removed from a queue (once the call lock has been obtained).
254  * This allows rxi_ResetCall to safely synchronize with others wishing
255  * to manipulate the queue.
256  */
257
258 #ifdef RX_ENABLE_LOCKS
259 static afs_kmutex_t rx_rpc_stats;
260 void rxi_StartUnlocked();
261 #endif
262
263 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
264 ** pretty good that the next packet coming in is from the same connection 
265 ** as the last packet, since we're send multiple packets in a transmit window.
266 */
267 struct rx_connection *rxLastConn = 0; 
268
269 #ifdef RX_ENABLE_LOCKS
270 /* The locking hierarchy for rx fine grain locking is composed of these
271  * tiers:
272  *
273  * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
274  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
275  * call->lock - locks call data fields.
276  * These are independent of each other:
277  *      rx_freeCallQueue_lock
278  *      rxi_keyCreate_lock
279  * rx_serverPool_lock
280  * freeSQEList_lock
281  *
282  * serverQueueEntry->lock
283  * rx_rpc_stats
284  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
285  * peer->lock - locks peer data fields.
286  * conn_data_lock - that more than one thread is not updating a conn data
287  *                  field at the same time.
288  * rx_freePktQ_lock
289  *
290  * lowest level:
291  *      multi_handle->lock
292  *      rxevent_lock
293  *      rx_stats_mutex
294  *
295  * Do we need a lock to protect the peer field in the conn structure?
296  *      conn->peer was previously a constant for all intents and so has no
297  *      lock protecting this field. The multihomed client delta introduced
298  *      a RX code change : change the peer field in the connection structure
299  *      to that remote inetrface from which the last packet for this
300  *      connection was sent out. This may become an issue if further changes
301  *      are made.
302  */
303 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
304 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
305 #ifdef RX_LOCKS_DB
306 /* rxdb_fileID is used to identify the lock location, along with line#. */
307 static int rxdb_fileID = RXDB_FILE_RX;
308 #endif /* RX_LOCKS_DB */
309 #else /* RX_ENABLE_LOCKS */
310 #define SET_CALL_QUEUE_LOCK(C, L)
311 #define CLEAR_CALL_QUEUE_LOCK(C)
312 #endif /* RX_ENABLE_LOCKS */
313 struct rx_serverQueueEntry *rx_waitForPacket = 0;
314
315 /* ------------Exported Interfaces------------- */
316
317 /* This function allows rxkad to set the epoch to a suitably random number
318  * which rx_NewConnection will use in the future.  The principle purpose is to
319  * get rxnull connections to use the same epoch as the rxkad connections do, at
320  * least once the first rxkad connection is established.  This is important now
321  * that the host/port addresses aren't used in FindConnection: the uniqueness
322  * of epoch/cid matters and the start time won't do. */
323
324 #ifdef AFS_PTHREAD_ENV
325 /*
326  * This mutex protects the following global variables:
327  * rx_epoch
328  */
329
330 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
331 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
332 #else
333 #define LOCK_EPOCH
334 #define UNLOCK_EPOCH
335 #endif /* AFS_PTHREAD_ENV */
336
337 void rx_SetEpoch (afs_uint32 epoch)
338 {
339     LOCK_EPOCH
340     rx_epoch = epoch;
341     UNLOCK_EPOCH
342 }
343
344 /* Initialize rx.  A port number may be mentioned, in which case this
345  * becomes the default port number for any service installed later.
346  * If 0 is provided for the port number, a random port will be chosen
347  * by the kernel.  Whether this will ever overlap anything in
348  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
349  * error. */
350 static int rxinit_status = 1;
351 #ifdef AFS_PTHREAD_ENV
352 /*
353  * This mutex protects the following global variables:
354  * rxinit_status
355  */
356
357 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
358 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
359 #else
360 #define LOCK_RX_INIT
361 #define UNLOCK_RX_INIT
362 #endif
363
364 int rx_Init(u_int port)
365 {
366 #ifdef KERNEL
367     osi_timeval_t tv;
368 #else /* KERNEL */
369     struct timeval tv;
370 #endif /* KERNEL */
371     char *htable, *ptable;
372     int tmp_status;
373
374 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
375     __djgpp_set_quiet_socket(1);
376 #endif
377
378     SPLVAR;
379
380     INIT_PTHREAD_LOCKS
381     LOCK_RX_INIT
382     if (rxinit_status == 0) {
383         tmp_status = rxinit_status;
384         UNLOCK_RX_INIT
385         return tmp_status; /* Already started; return previous error code. */
386     }
387
388 #ifdef AFS_NT40_ENV
389     if (afs_winsockInit()<0)
390         return -1;
391 #endif
392
393 #ifndef KERNEL
394     /*
395      * Initialize anything necessary to provide a non-premptive threading
396      * environment.
397      */
398     rxi_InitializeThreadSupport();
399 #endif
400
401     /* Allocate and initialize a socket for client and perhaps server
402      * connections. */
403
404     rx_socket = rxi_GetUDPSocket((u_short)port); 
405     if (rx_socket == OSI_NULLSOCKET) {
406         UNLOCK_RX_INIT
407         return RX_ADDRINUSE;
408     }
409     
410
411 #ifdef  RX_ENABLE_LOCKS
412 #ifdef RX_LOCKS_DB
413     rxdb_init();
414 #endif /* RX_LOCKS_DB */
415     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
416     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
417     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
418     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
419     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
420                MUTEX_DEFAULT,0);
421     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
422     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
423     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
424     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
425 #ifndef KERNEL
426     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
427 #endif /* !KERNEL */
428 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
429     if ( !uniprocessor )
430       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
431 #endif /* KERNEL && AFS_HPUX110_ENV */
432 #else /* RX_ENABLE_LOCKS */
433 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
434     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
435 #endif /* AFS_GLOBAL_SUNLOCK */
436 #endif /* RX_ENABLE_LOCKS */
437
438     rxi_nCalls = 0;
439     rx_connDeadTime = 12;
440     rx_tranquil     = 0;        /* reset flag */
441     memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
442     htable = (char *)
443         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
444     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
445     memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
446     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
447     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
448     memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
449
450     /* Malloc up a bunch of packets & buffers */
451     rx_nFreePackets = 0;
452     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
453     queue_Init(&rx_freePacketQueue);
454     rxi_NeedMorePackets = FALSE;
455     rxi_MorePackets(rx_nPackets);
456     rx_CheckPackets();
457
458     NETPRI;
459     AFS_RXGLOCK();
460
461     clock_Init();
462
463 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
464     tv.tv_sec = clock_now.sec;
465     tv.tv_usec = clock_now.usec;
466     srand((unsigned int) tv.tv_usec);
467 #else
468     osi_GetTime(&tv);
469 #endif
470     if (port) {
471         rx_port = port;
472     } else {
473 #if defined(KERNEL) && !defined(UKERNEL)
474         /* Really, this should never happen in a real kernel */
475         rx_port = 0;
476 #else
477         struct sockaddr_in addr;
478         int addrlen = sizeof(addr);
479         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
480             rx_Finalize();
481             return -1;
482         }
483         rx_port = addr.sin_port;
484 #endif
485     }
486     rx_stats.minRtt.sec = 9999999;
487 #ifdef  KERNEL
488     rx_SetEpoch (tv.tv_sec | 0x80000000);
489 #else
490     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
491                                          * will provide a randomer value. */
492 #endif
493     MUTEX_ENTER(&rx_stats_mutex);
494     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
495     MUTEX_EXIT(&rx_stats_mutex);
496     /* *Slightly* random start time for the cid.  This is just to help
497      * out with the hashing function at the peer */
498     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
499     rx_connHashTable = (struct rx_connection **) htable;
500     rx_peerHashTable = (struct rx_peer **) ptable;
501
502     rx_lastAckDelay.sec = 0;
503     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
504     rx_hardAckDelay.sec = 0;
505     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
506     rx_softAckDelay.sec = 0;
507     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
508
509     rxevent_Init(20, rxi_ReScheduleEvents);
510
511     /* Initialize various global queues */
512     queue_Init(&rx_idleServerQueue);
513     queue_Init(&rx_incomingCallQueue);
514     queue_Init(&rx_freeCallQueue);
515
516 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
517     /* Initialize our list of usable IP addresses. */
518     rx_GetIFInfo();
519 #endif
520
521     /* Start listener process (exact function is dependent on the
522      * implementation environment--kernel or user space) */
523     rxi_StartListener();
524
525     AFS_RXGUNLOCK();
526     USERPRI;
527     tmp_status = rxinit_status = 0;
528     UNLOCK_RX_INIT
529     return tmp_status;
530 }
531
532 /* called with unincremented nRequestsRunning to see if it is OK to start
533  * a new thread in this service.  Could be "no" for two reasons: over the
534  * max quota, or would prevent others from reaching their min quota.
535  */
536 #ifdef RX_ENABLE_LOCKS
537 /* This verion of QuotaOK reserves quota if it's ok while the
538  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
539  */
540 static int QuotaOK(register struct rx_service *aservice)
541 {
542     /* check if over max quota */
543     if (aservice->nRequestsRunning >= aservice->maxProcs) {
544         return 0;
545     }
546
547     /* under min quota, we're OK */
548     /* otherwise, can use only if there are enough to allow everyone
549      * to go to their min quota after this guy starts.
550      */
551     MUTEX_ENTER(&rx_stats_mutex);
552     if ((aservice->nRequestsRunning < aservice->minProcs) ||
553          (rxi_availProcs > rxi_minDeficit)) {
554         aservice->nRequestsRunning++;
555         /* just started call in minProcs pool, need fewer to maintain
556          * guarantee */
557         if (aservice->nRequestsRunning <= aservice->minProcs)
558             rxi_minDeficit--;
559         rxi_availProcs--;
560         MUTEX_EXIT(&rx_stats_mutex);
561         return 1;
562     }
563     MUTEX_EXIT(&rx_stats_mutex);
564
565     return 0;
566 }
567
568 static void ReturnToServerPool(register struct rx_service *aservice)
569 {
570     aservice->nRequestsRunning--;
571     MUTEX_ENTER(&rx_stats_mutex);
572     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
573     rxi_availProcs++;
574     MUTEX_EXIT(&rx_stats_mutex);
575 }
576
577 #else /* RX_ENABLE_LOCKS */
578 static int QuotaOK(register struct rx_service *aservice)
579 {
580     int rc=0;
581     /* under min quota, we're OK */
582     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
583
584     /* check if over max quota */
585     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
586
587     /* otherwise, can use only if there are enough to allow everyone
588      * to go to their min quota after this guy starts.
589      */
590     if (rxi_availProcs > rxi_minDeficit) rc = 1;
591     return rc;
592 }
593 #endif /* RX_ENABLE_LOCKS */
594
595 #ifndef KERNEL
596 /* Called by rx_StartServer to start up lwp's to service calls.
597    NExistingProcs gives the number of procs already existing, and which
598    therefore needn't be created. */
599 void rxi_StartServerProcs(int nExistingProcs)
600 {
601     register struct rx_service *service;
602     register int i;
603     int maxdiff = 0;
604     int nProcs = 0;
605
606     /* For each service, reserve N processes, where N is the "minimum"
607        number of processes that MUST be able to execute a request in parallel,
608        at any time, for that process.  Also compute the maximum difference
609        between any service's maximum number of processes that can run
610        (i.e. the maximum number that ever will be run, and a guarantee
611        that this number will run if other services aren't running), and its
612        minimum number.  The result is the extra number of processes that
613        we need in order to provide the latter guarantee */
614     for (i=0; i<RX_MAX_SERVICES; i++) {
615         int diff;
616         service = rx_services[i];
617         if (service == (struct rx_service *) 0) break;
618         nProcs += service->minProcs;
619         diff = service->maxProcs - service->minProcs;
620         if (diff > maxdiff) maxdiff = diff;
621     }
622     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
623     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
624     for (i = 0; i<nProcs; i++) {
625         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
626     }
627 }
628 #endif /* KERNEL */
629
630 /* This routine must be called if any services are exported.  If the
631  * donateMe flag is set, the calling process is donated to the server
632  * process pool */
633 void rx_StartServer(int donateMe)
634 {
635     register struct rx_service *service;
636     register int i, nProcs=0;
637     SPLVAR;
638     clock_NewTime();
639
640     NETPRI;
641     AFS_RXGLOCK();
642     /* Start server processes, if necessary (exact function is dependent
643      * on the implementation environment--kernel or user space).  DonateMe
644      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
645      * case, one less new proc will be created rx_StartServerProcs.
646      */
647     rxi_StartServerProcs(donateMe);
648
649     /* count up the # of threads in minProcs, and add set the min deficit to
650      * be that value, too.
651      */
652     for (i=0; i<RX_MAX_SERVICES; i++) {
653         service = rx_services[i];
654         if (service == (struct rx_service *) 0) break;
655         MUTEX_ENTER(&rx_stats_mutex);
656         rxi_totalMin += service->minProcs;
657         /* below works even if a thread is running, since minDeficit would
658          * still have been decremented and later re-incremented.
659          */
660         rxi_minDeficit += service->minProcs;
661         MUTEX_EXIT(&rx_stats_mutex);
662     }
663
664     /* Turn on reaping of idle server connections */
665     rxi_ReapConnections();
666
667     AFS_RXGUNLOCK();
668     USERPRI;
669
670     if (donateMe) {
671 #ifndef AFS_NT40_ENV
672 #ifndef KERNEL
673         char name[32];
674 #ifdef AFS_PTHREAD_ENV
675         pid_t pid;
676         pid = (pid_t) pthread_self();
677 #else /* AFS_PTHREAD_ENV */
678         PROCESS pid;
679         LWP_CurrentProcess(&pid);
680 #endif /* AFS_PTHREAD_ENV */
681
682         sprintf(name,"srv_%d", ++nProcs);
683         if (registerProgram)
684             (*registerProgram)(pid, name);
685 #endif /* KERNEL */
686 #endif /* AFS_NT40_ENV */
687         rx_ServerProc(); /* Never returns */
688     }
689     return;
690 }
691
692 /* Create a new client connection to the specified service, using the
693  * specified security object to implement the security model for this
694  * connection. */
695 struct rx_connection *rx_NewConnection(register afs_uint32 shost, 
696         u_short sport, u_short sservice, 
697         register struct rx_securityClass *securityObject, int serviceSecurityIndex)
698 {
699     int hashindex;
700     afs_int32 cid;
701     register struct rx_connection *conn;
702
703     SPLVAR;
704
705     clock_NewTime();
706     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
707           shost, sport, sservice, securityObject, serviceSecurityIndex));
708
709     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
710      * the case of kmem_alloc? */
711     conn = rxi_AllocConnection();
712 #ifdef  RX_ENABLE_LOCKS
713     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
714     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
715     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
716 #endif
717     NETPRI;
718     AFS_RXGLOCK();
719     MUTEX_ENTER(&rx_connHashTable_lock);
720     cid = (rx_nextCid += RX_MAXCALLS);
721     conn->type = RX_CLIENT_CONNECTION;
722     conn->cid = cid;
723     conn->epoch = rx_epoch;
724     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
725     conn->serviceId = sservice;
726     conn->securityObject = securityObject;
727     /* This doesn't work in all compilers with void (they're buggy), so fake it
728      * with VOID */
729     conn->securityData = (VOID *) 0;
730     conn->securityIndex = serviceSecurityIndex;
731     rx_SetConnDeadTime(conn, rx_connDeadTime);
732     conn->ackRate = RX_FAST_ACK_RATE;
733     conn->nSpecific = 0;
734     conn->specific = NULL;
735     conn->challengeEvent = NULL;
736     conn->delayedAbortEvent = NULL;
737     conn->abortCount = 0;
738     conn->error = 0;
739
740     RXS_NewConnection(securityObject, conn);
741     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
742     
743     conn->refCount++; /* no lock required since only this thread knows... */
744     conn->next = rx_connHashTable[hashindex];
745     rx_connHashTable[hashindex] = conn;
746     MUTEX_ENTER(&rx_stats_mutex);
747     rx_stats.nClientConns++;
748     MUTEX_EXIT(&rx_stats_mutex);
749
750     MUTEX_EXIT(&rx_connHashTable_lock);
751     AFS_RXGUNLOCK();
752     USERPRI;
753     return conn;
754 }
755
756 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
757 {
758     /* The idea is to set the dead time to a value that allows several
759      * keepalives to be dropped without timing out the connection. */
760     conn->secondsUntilDead = MAX(seconds, 6);
761     conn->secondsUntilPing = conn->secondsUntilDead/6;
762 }
763
764 int rxi_lowPeerRefCount = 0;
765 int rxi_lowConnRefCount = 0;
766
767 /*
768  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
769  * NOTE: must not be called with rx_connHashTable_lock held.
770  */
771 void rxi_CleanupConnection(struct rx_connection *conn)
772 {
773     /* Notify the service exporter, if requested, that this connection
774      * is being destroyed */
775     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
776       (*conn->service->destroyConnProc)(conn);
777
778     /* Notify the security module that this connection is being destroyed */
779     RXS_DestroyConnection(conn->securityObject, conn);
780
781     /* If this is the last connection using the rx_peer struct, set its
782      * idle time to now. rxi_ReapConnections will reap it if it's still
783      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
784      */
785     MUTEX_ENTER(&rx_peerHashTable_lock);
786     if (--conn->peer->refCount <= 0) {
787         conn->peer->idleWhen = clock_Sec();
788         if (conn->peer->refCount < 0) {
789             conn->peer->refCount = 0; 
790             MUTEX_ENTER(&rx_stats_mutex);
791             rxi_lowPeerRefCount ++;
792             MUTEX_EXIT(&rx_stats_mutex);
793         }
794     }
795     MUTEX_EXIT(&rx_peerHashTable_lock);
796
797     MUTEX_ENTER(&rx_stats_mutex);
798     if (conn->type == RX_SERVER_CONNECTION)
799       rx_stats.nServerConns--;
800     else
801       rx_stats.nClientConns--;
802     MUTEX_EXIT(&rx_stats_mutex);
803
804 #ifndef KERNEL
805     if (conn->specific) {
806         int i;
807         for (i = 0 ; i < conn->nSpecific ; i++) {
808             if (conn->specific[i] && rxi_keyCreate_destructor[i])
809                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
810             conn->specific[i] = NULL;
811         }
812         free(conn->specific);
813     }
814     conn->specific = NULL;
815     conn->nSpecific = 0;
816 #endif /* !KERNEL */
817
818     MUTEX_DESTROY(&conn->conn_call_lock);
819     MUTEX_DESTROY(&conn->conn_data_lock);
820     CV_DESTROY(&conn->conn_call_cv);
821         
822     rxi_FreeConnection(conn);
823 }
824
825 /* Destroy the specified connection */
826 void rxi_DestroyConnection(register struct rx_connection *conn)
827 {
828     MUTEX_ENTER(&rx_connHashTable_lock);
829     rxi_DestroyConnectionNoLock(conn);
830     /* conn should be at the head of the cleanup list */
831     if (conn == rx_connCleanup_list) {
832         rx_connCleanup_list = rx_connCleanup_list->next;
833         MUTEX_EXIT(&rx_connHashTable_lock);
834         rxi_CleanupConnection(conn);
835     }
836 #ifdef RX_ENABLE_LOCKS
837     else {
838         MUTEX_EXIT(&rx_connHashTable_lock);
839     }
840 #endif /* RX_ENABLE_LOCKS */
841 }
842     
843 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
844 {
845     register struct rx_connection **conn_ptr;
846     register int havecalls = 0;
847     struct rx_packet *packet;
848     int i;
849     SPLVAR;
850
851     clock_NewTime();
852
853     NETPRI;
854     MUTEX_ENTER(&conn->conn_data_lock);
855     if (conn->refCount > 0)
856         conn->refCount--;
857     else {
858         MUTEX_ENTER(&rx_stats_mutex);
859         rxi_lowConnRefCount++;
860         MUTEX_EXIT(&rx_stats_mutex);
861     }
862
863     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
864         /* Busy; wait till the last guy before proceeding */
865         MUTEX_EXIT(&conn->conn_data_lock);
866         USERPRI;
867         return;
868     }
869
870     /* If the client previously called rx_NewCall, but it is still
871      * waiting, treat this as a running call, and wait to destroy the
872      * connection later when the call completes. */
873     if ((conn->type == RX_CLIENT_CONNECTION) &&
874         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
875         conn->flags |= RX_CONN_DESTROY_ME;
876         MUTEX_EXIT(&conn->conn_data_lock);
877         USERPRI;
878         return;
879     }
880     MUTEX_EXIT(&conn->conn_data_lock);
881
882     /* Check for extant references to this connection */
883     for (i = 0; i<RX_MAXCALLS; i++) {
884         register struct rx_call *call = conn->call[i];
885         if (call) {
886             havecalls = 1;
887             if (conn->type == RX_CLIENT_CONNECTION) {
888                 MUTEX_ENTER(&call->lock);
889                 if (call->delayedAckEvent) {
890                     /* Push the final acknowledgment out now--there
891                      * won't be a subsequent call to acknowledge the
892                      * last reply packets */
893                     rxevent_Cancel(call->delayedAckEvent, call,
894                                    RX_CALL_REFCOUNT_DELAY);
895                     if (call->state == RX_STATE_PRECALL ||
896                         call->state == RX_STATE_ACTIVE) {
897                         rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
898                     } else {
899                         rxi_AckAll(NULL, call, 0);
900                     }
901                 }
902                 MUTEX_EXIT(&call->lock);
903             }
904         }
905     }
906 #ifdef RX_ENABLE_LOCKS
907     if (!havecalls) {
908         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
909             MUTEX_EXIT(&conn->conn_data_lock);
910         }
911         else {
912             /* Someone is accessing a packet right now. */
913             havecalls = 1;
914         }
915     }
916 #endif /* RX_ENABLE_LOCKS */
917
918     if (havecalls) {
919         /* Don't destroy the connection if there are any call
920          * structures still in use */
921         MUTEX_ENTER(&conn->conn_data_lock);
922         conn->flags |= RX_CONN_DESTROY_ME;
923         MUTEX_EXIT(&conn->conn_data_lock);
924         USERPRI;
925         return;
926     }
927
928     if (conn->delayedAbortEvent) {
929         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
930         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
931         if (packet) {
932             MUTEX_ENTER(&conn->conn_data_lock);
933             rxi_SendConnectionAbort(conn, packet, 0, 1);
934             MUTEX_EXIT(&conn->conn_data_lock);
935             rxi_FreePacket(packet);
936         }
937     }
938
939     /* Remove from connection hash table before proceeding */
940     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
941                                              conn->epoch, conn->type) ];
942     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
943         if (*conn_ptr == conn) {
944             *conn_ptr = conn->next;
945             break;
946         }
947     }
948     /* if the conn that we are destroying was the last connection, then we
949     * clear rxLastConn as well */
950     if ( rxLastConn == conn )
951         rxLastConn = 0;
952
953     /* Make sure the connection is completely reset before deleting it. */
954     /* get rid of pending events that could zap us later */
955     if (conn->challengeEvent)
956         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
957     if (conn->checkReachEvent)
958         rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
959  
960     /* Add the connection to the list of destroyed connections that
961      * need to be cleaned up. This is necessary to avoid deadlocks
962      * in the routines we call to inform others that this connection is
963      * being destroyed. */
964     conn->next = rx_connCleanup_list;
965     rx_connCleanup_list = conn;
966 }
967
968 /* Externally available version */
969 void rx_DestroyConnection(register struct rx_connection *conn) 
970 {
971     SPLVAR;
972
973     NETPRI;
974     AFS_RXGLOCK();
975     rxi_DestroyConnection (conn);
976     AFS_RXGUNLOCK();
977     USERPRI;
978 }
979
980 /* Start a new rx remote procedure call, on the specified connection.
981  * If wait is set to 1, wait for a free call channel; otherwise return
982  * 0.  Maxtime gives the maximum number of seconds this call may take,
983  * after rx_MakeCall returns.  After this time interval, a call to any
984  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
985  * For fine grain locking, we hold the conn_call_lock in order to 
986  * to ensure that we don't get signalle after we found a call in an active
987  * state and before we go to sleep.
988  */
989 struct rx_call *rx_NewCall(register struct rx_connection *conn)
990 {
991     register int i;
992     register struct rx_call *call;
993     struct clock queueTime;
994     SPLVAR;
995
996     clock_NewTime();
997     dpf (("rx_MakeCall(conn %x)\n", conn));
998
999     NETPRI;
1000     clock_GetTime(&queueTime);
1001     AFS_RXGLOCK();
1002     MUTEX_ENTER(&conn->conn_call_lock);
1003
1004     /*
1005      * Check if there are others waiting for a new call.
1006      * If so, let them go first to avoid starving them.
1007      * This is a fairly simple scheme, and might not be
1008      * a complete solution for large numbers of waiters.
1009      */
1010     if (conn->makeCallWaiters) {
1011 #ifdef  RX_ENABLE_LOCKS
1012         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1013 #else
1014         osi_rxSleep(conn);
1015 #endif
1016     }
1017
1018     for (;;) {
1019         for (i=0; i<RX_MAXCALLS; i++) {
1020             call = conn->call[i];
1021             if (call) {
1022                 MUTEX_ENTER(&call->lock);
1023                 if (call->state == RX_STATE_DALLY) {
1024                     rxi_ResetCall(call, 0);
1025                     (*call->callNumber)++;
1026                     break;
1027                 }
1028                 MUTEX_EXIT(&call->lock);
1029             }
1030             else {
1031                 call = rxi_NewCall(conn, i);
1032                 break;
1033             }
1034         }
1035         if (i < RX_MAXCALLS) {
1036             break;
1037         }
1038         MUTEX_ENTER(&conn->conn_data_lock);
1039         conn->flags |= RX_CONN_MAKECALL_WAITING;
1040         MUTEX_EXIT(&conn->conn_data_lock);
1041
1042         conn->makeCallWaiters++;
1043 #ifdef  RX_ENABLE_LOCKS
1044         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1045 #else
1046         osi_rxSleep(conn);
1047 #endif
1048         conn->makeCallWaiters--;
1049     }
1050     /*
1051      * Wake up anyone else who might be giving us a chance to
1052      * run (see code above that avoids resource starvation).
1053      */
1054 #ifdef  RX_ENABLE_LOCKS
1055     CV_BROADCAST(&conn->conn_call_cv);
1056 #else
1057     osi_rxWakeup(conn);
1058 #endif
1059
1060     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1061
1062     /* Client is initially in send mode */
1063     call->state = RX_STATE_ACTIVE;
1064     call->mode = RX_MODE_SENDING;
1065
1066     /* remember start time for call in case we have hard dead time limit */
1067     call->queueTime = queueTime;
1068     clock_GetTime(&call->startTime);
1069     hzero(call->bytesSent);
1070     hzero(call->bytesRcvd);
1071
1072     /* Turn on busy protocol. */
1073     rxi_KeepAliveOn(call);
1074
1075     MUTEX_EXIT(&call->lock);
1076     MUTEX_EXIT(&conn->conn_call_lock);
1077     AFS_RXGUNLOCK();
1078     USERPRI;
1079
1080 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1081     /* Now, if TQ wasn't cleared earlier, do it now. */
1082     AFS_RXGLOCK();
1083     MUTEX_ENTER(&call->lock);
1084     while (call->flags & RX_CALL_TQ_BUSY) {
1085         call->flags |= RX_CALL_TQ_WAIT;
1086 #ifdef RX_ENABLE_LOCKS
1087         CV_WAIT(&call->cv_tq, &call->lock);
1088 #else /* RX_ENABLE_LOCKS */
1089         osi_rxSleep(&call->tq);
1090 #endif /* RX_ENABLE_LOCKS */
1091     }
1092     if (call->flags & RX_CALL_TQ_CLEARME) {
1093         rxi_ClearTransmitQueue(call, 0);
1094         queue_Init(&call->tq);
1095     }
1096     MUTEX_EXIT(&call->lock);
1097     AFS_RXGUNLOCK();
1098 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1099
1100     return call;
1101 }
1102
1103 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1104 {
1105     register int i;
1106     register struct rx_call *tcall;
1107     SPLVAR;
1108
1109     NETPRI;
1110     for(i=0; i<RX_MAXCALLS; i++) {
1111       if ((tcall = aconn->call[i])) {
1112         if ((tcall->state == RX_STATE_ACTIVE) 
1113             || (tcall->state == RX_STATE_PRECALL)) {
1114           USERPRI;
1115           return 1;
1116         }
1117       }
1118     }
1119     USERPRI;
1120     return 0;
1121 }
1122
1123 int rxi_GetCallNumberVector(register struct rx_connection *aconn, 
1124         register afs_int32 *aint32s)
1125 {
1126     register int i;
1127     register struct rx_call *tcall;
1128     SPLVAR;
1129
1130     NETPRI;
1131     for(i=0; i<RX_MAXCALLS; i++) {
1132         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1133             aint32s[i] = aconn->callNumber[i]+1;
1134         else
1135             aint32s[i] = aconn->callNumber[i];
1136     }
1137     USERPRI;
1138     return 0;
1139 }
1140
1141 int rxi_SetCallNumberVector(register struct rx_connection *aconn, 
1142         register afs_int32 *aint32s)
1143 {
1144     register int i;
1145     register struct rx_call *tcall;
1146     SPLVAR;
1147
1148     NETPRI;
1149     for(i=0; i<RX_MAXCALLS; i++) {
1150         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1151             aconn->callNumber[i] = aint32s[i] - 1;
1152         else
1153             aconn->callNumber[i] = aint32s[i];
1154     }
1155     USERPRI;
1156     return 0;
1157 }
1158
1159 /* Advertise a new service.  A service is named locally by a UDP port
1160  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1161  * on a failure. 
1162  *
1163      char *serviceName;  Name for identification purposes (e.g. the
1164                          service name might be used for probing for
1165                          statistics) */
1166 struct rx_service *rx_NewService(u_short port, u_short serviceId, 
1167         char *serviceName, 
1168         struct rx_securityClass **securityObjects,
1169         int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1170 {    
1171     osi_socket socket = OSI_NULLSOCKET;
1172     register struct rx_service *tservice;    
1173     register int i;
1174     SPLVAR;
1175
1176     clock_NewTime();
1177
1178     if (serviceId == 0) {
1179         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1180          serviceName);
1181         return 0;
1182     }
1183     if (port == 0) {
1184         if (rx_port == 0) {
1185             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1186             return 0;
1187         }
1188         port = rx_port;
1189         socket = rx_socket;
1190     }
1191
1192     tservice = rxi_AllocService();
1193     NETPRI;
1194     AFS_RXGLOCK();
1195     for (i = 0; i<RX_MAX_SERVICES; i++) {
1196         register struct rx_service *service = rx_services[i];
1197         if (service) {
1198             if (port == service->servicePort) {
1199                 if (service->serviceId == serviceId) {
1200                     /* The identical service has already been
1201                      * installed; if the caller was intending to
1202                      * change the security classes used by this
1203                      * service, he/she loses. */
1204                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1205                     AFS_RXGUNLOCK();
1206                     USERPRI;
1207                     rxi_FreeService(tservice);
1208                     return service;
1209                 }
1210                 /* Different service, same port: re-use the socket
1211                  * which is bound to the same port */
1212                 socket = service->socket;
1213             }
1214         } else {
1215             if (socket == OSI_NULLSOCKET) {
1216                 /* If we don't already have a socket (from another
1217                  * service on same port) get a new one */
1218                 socket = rxi_GetUDPSocket(port);
1219                 if (socket == OSI_NULLSOCKET) {
1220                     AFS_RXGUNLOCK();
1221                     USERPRI;
1222                     rxi_FreeService(tservice);
1223                     return 0;
1224                 }
1225             }
1226             service = tservice;
1227             service->socket = socket;
1228             service->servicePort = port;
1229             service->serviceId = serviceId;
1230             service->serviceName = serviceName;
1231             service->nSecurityObjects = nSecurityObjects;
1232             service->securityObjects = securityObjects;
1233             service->minProcs = 0;
1234             service->maxProcs = 1;
1235             service->idleDeadTime = 60;
1236             service->connDeadTime = rx_connDeadTime;
1237             service->executeRequestProc = serviceProc;
1238             service->checkReach = 0;
1239             rx_services[i] = service;   /* not visible until now */
1240             AFS_RXGUNLOCK();
1241             USERPRI;
1242             return service;
1243         }
1244     }
1245     AFS_RXGUNLOCK();
1246     USERPRI;
1247     rxi_FreeService(tservice);
1248     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1249     return 0;
1250 }
1251
1252 /* Generic request processing loop. This routine should be called
1253  * by the implementation dependent rx_ServerProc. If socketp is
1254  * non-null, it will be set to the file descriptor that this thread
1255  * is now listening on. If socketp is null, this routine will never
1256  * returns. */
1257 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1258 {
1259     register struct rx_call *call;
1260     register afs_int32 code;
1261     register struct rx_service *tservice = NULL;
1262
1263     for (;;) {
1264         if (newcall) {
1265             call = newcall;
1266             newcall = NULL;
1267         } else {
1268             call = rx_GetCall(threadID, tservice, socketp);
1269             if (socketp && *socketp != OSI_NULLSOCKET) {
1270                 /* We are now a listener thread */
1271                 return;
1272             }
1273         }
1274
1275         /* if server is restarting( typically smooth shutdown) then do not
1276          * allow any new calls.
1277          */
1278
1279         if ( rx_tranquil && (call != NULL) ) {
1280             SPLVAR;
1281
1282             NETPRI;
1283             AFS_RXGLOCK();
1284             MUTEX_ENTER(&call->lock);
1285
1286             rxi_CallError(call, RX_RESTARTING);
1287             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1288
1289             MUTEX_EXIT(&call->lock);
1290             AFS_RXGUNLOCK();
1291             USERPRI;
1292         }
1293
1294 #ifdef  KERNEL
1295         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1296 #ifdef RX_ENABLE_LOCKS
1297             AFS_GLOCK();
1298 #endif /* RX_ENABLE_LOCKS */
1299             afs_termState = AFSOP_STOP_AFS;
1300             afs_osi_Wakeup(&afs_termState);
1301 #ifdef RX_ENABLE_LOCKS
1302             AFS_GUNLOCK();
1303 #endif /* RX_ENABLE_LOCKS */
1304             return;
1305         }
1306 #endif
1307
1308         tservice = call->conn->service;
1309
1310         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1311
1312         code = call->conn->service->executeRequestProc(call);
1313
1314         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1315
1316         rx_EndCall(call, code);
1317         MUTEX_ENTER(&rx_stats_mutex);
1318         rxi_nCalls++;
1319         MUTEX_EXIT(&rx_stats_mutex);
1320     }
1321 }
1322
1323
1324 void rx_WakeupServerProcs(void)
1325 {
1326     struct rx_serverQueueEntry *np, *tqp;
1327     SPLVAR;
1328
1329     NETPRI;
1330     AFS_RXGLOCK();
1331     MUTEX_ENTER(&rx_serverPool_lock);
1332
1333 #ifdef RX_ENABLE_LOCKS
1334     if (rx_waitForPacket)
1335         CV_BROADCAST(&rx_waitForPacket->cv);
1336 #else /* RX_ENABLE_LOCKS */
1337     if (rx_waitForPacket)
1338         osi_rxWakeup(rx_waitForPacket);
1339 #endif /* RX_ENABLE_LOCKS */
1340     MUTEX_ENTER(&freeSQEList_lock);
1341     for (np = rx_FreeSQEList; np; np = tqp) {
1342       tqp = *(struct rx_serverQueueEntry **)np;
1343 #ifdef RX_ENABLE_LOCKS
1344       CV_BROADCAST(&np->cv);
1345 #else /* RX_ENABLE_LOCKS */
1346       osi_rxWakeup(np);
1347 #endif /* RX_ENABLE_LOCKS */
1348     }
1349     MUTEX_EXIT(&freeSQEList_lock);
1350     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1351 #ifdef RX_ENABLE_LOCKS
1352       CV_BROADCAST(&np->cv);
1353 #else /* RX_ENABLE_LOCKS */
1354       osi_rxWakeup(np);
1355 #endif /* RX_ENABLE_LOCKS */
1356     }
1357     MUTEX_EXIT(&rx_serverPool_lock);
1358     AFS_RXGUNLOCK();
1359     USERPRI;
1360 }
1361
1362 /* meltdown:
1363  * One thing that seems to happen is that all the server threads get
1364  * tied up on some empty or slow call, and then a whole bunch of calls
1365  * arrive at once, using up the packet pool, so now there are more 
1366  * empty calls.  The most critical resources here are server threads
1367  * and the free packet pool.  The "doreclaim" code seems to help in
1368  * general.  I think that eventually we arrive in this state: there
1369  * are lots of pending calls which do have all their packets present,
1370  * so they won't be reclaimed, are multi-packet calls, so they won't
1371  * be scheduled until later, and thus are tying up most of the free 
1372  * packet pool for a very long time.
1373  * future options:
1374  * 1.  schedule multi-packet calls if all the packets are present.  
1375  * Probably CPU-bound operation, useful to return packets to pool. 
1376  * Do what if there is a full window, but the last packet isn't here?
1377  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1378  * it sleeps and waits for that type of call.
1379  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1380  * the current dataquota business is badly broken.  The quota isn't adjusted
1381  * to reflect how many packets are presently queued for a running call.
1382  * So, when we schedule a queued call with a full window of packets queued
1383  * up for it, that *should* free up a window full of packets for other 2d-class
1384  * calls to be able to use from the packet pool.  But it doesn't.
1385  *
1386  * NB.  Most of the time, this code doesn't run -- since idle server threads
1387  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1388  * as a new call arrives.
1389  */
1390 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1391  * for an rx_Read. */
1392 #ifdef RX_ENABLE_LOCKS
1393 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1394 {
1395     struct rx_serverQueueEntry *sq;
1396     register struct rx_call *call = (struct rx_call *) 0;
1397     struct rx_service *service = NULL;
1398     SPLVAR;
1399
1400     MUTEX_ENTER(&freeSQEList_lock);
1401
1402     if ((sq = rx_FreeSQEList)) {
1403         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1404         MUTEX_EXIT(&freeSQEList_lock);
1405     } else {    /* otherwise allocate a new one and return that */
1406         MUTEX_EXIT(&freeSQEList_lock);
1407         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1408         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1409         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1410     }
1411
1412     MUTEX_ENTER(&rx_serverPool_lock);
1413     if (cur_service != NULL) {
1414         ReturnToServerPool(cur_service);
1415     }
1416     while (1) {
1417         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1418             register struct rx_call *tcall, *ncall, *choice2 = NULL;
1419
1420             /* Scan for eligible incoming calls.  A call is not eligible
1421              * if the maximum number of calls for its service type are
1422              * already executing */
1423             /* One thread will process calls FCFS (to prevent starvation),
1424              * while the other threads may run ahead looking for calls which
1425              * have all their input data available immediately.  This helps 
1426              * keep threads from blocking, waiting for data from the client. */
1427             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1428                 service = tcall->conn->service;
1429                 if (!QuotaOK(service)) {
1430                     continue;
1431                 }
1432                 if (tno==rxi_fcfs_thread_num || !tcall->queue_item_header.next  ) {
1433                     /* If we're the fcfs thread , then  we'll just use 
1434                      * this call. If we haven't been able to find an optimal 
1435                      * choice, and we're at the end of the list, then use a 
1436                      * 2d choice if one has been identified.  Otherwise... */
1437                     call = (choice2 ? choice2 : tcall);
1438                     service = call->conn->service;
1439                 } else if (!queue_IsEmpty(&tcall->rq)) {
1440                     struct rx_packet *rp;
1441                     rp = queue_First(&tcall->rq, rx_packet);
1442                     if (rp->header.seq == 1) {
1443                         if (!meltdown_1pkt ||
1444                             (rp->header.flags & RX_LAST_PACKET)) {
1445                             call = tcall;
1446                         } else if (rxi_2dchoice && !choice2 &&
1447                                    !(tcall->flags & RX_CALL_CLEARED) &&
1448                                    (tcall->rprev > rxi_HardAckRate)) {
1449                             choice2 = tcall;
1450                         } else rxi_md2cnt++;
1451                     }
1452                 }
1453                 if (call) {
1454                     break;
1455                 } else {
1456                     ReturnToServerPool(service);
1457                 }
1458             }
1459         }
1460
1461         if (call) {
1462             queue_Remove(call);
1463             MUTEX_EXIT(&rx_serverPool_lock);
1464             MUTEX_ENTER(&call->lock);
1465
1466             if (call->state != RX_STATE_PRECALL || call->error) {
1467                 MUTEX_EXIT(&call->lock);
1468                 MUTEX_ENTER(&rx_serverPool_lock);
1469                 ReturnToServerPool(service);
1470                 call = NULL;
1471                 continue;
1472             }
1473
1474             if (queue_IsEmpty(&call->rq) ||
1475                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1476                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1477
1478             CLEAR_CALL_QUEUE_LOCK(call);
1479             call->flags &= ~RX_CALL_WAIT_PROC;
1480             MUTEX_ENTER(&rx_stats_mutex);
1481             rx_nWaiting--;
1482             MUTEX_EXIT(&rx_stats_mutex);
1483             break;
1484         }
1485         else {
1486             /* If there are no eligible incoming calls, add this process
1487              * to the idle server queue, to wait for one */
1488             sq->newcall = 0;
1489             sq->tno = tno;
1490             if (socketp) {
1491                 *socketp = OSI_NULLSOCKET;
1492             }
1493             sq->socketp = socketp;
1494             queue_Append(&rx_idleServerQueue, sq);
1495 #ifndef AFS_AIX41_ENV
1496             rx_waitForPacket = sq;
1497 #endif /* AFS_AIX41_ENV */
1498             do {
1499                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1500 #ifdef  KERNEL
1501                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1502                     MUTEX_EXIT(&rx_serverPool_lock);
1503                     return (struct rx_call *)0;
1504                 }
1505 #endif
1506             } while (!(call = sq->newcall) &&
1507                      !(socketp && *socketp != OSI_NULLSOCKET));
1508             MUTEX_EXIT(&rx_serverPool_lock);
1509             if (call) {
1510                 MUTEX_ENTER(&call->lock);
1511             }
1512             break;
1513         }
1514     }
1515
1516     MUTEX_ENTER(&freeSQEList_lock);
1517     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1518     rx_FreeSQEList = sq;
1519     MUTEX_EXIT(&freeSQEList_lock);
1520
1521     if (call) {
1522         clock_GetTime(&call->startTime);
1523         call->state = RX_STATE_ACTIVE;
1524         call->mode = RX_MODE_RECEIVING;
1525
1526         rxi_calltrace(RX_CALL_START, call);
1527         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1528              call->conn->service->servicePort, 
1529              call->conn->service->serviceId, call));
1530
1531         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1532         MUTEX_EXIT(&call->lock);
1533     } else {
1534         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1535     }
1536
1537     return call;
1538 }
1539 #else /* RX_ENABLE_LOCKS */
1540 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1541 {
1542     struct rx_serverQueueEntry *sq;
1543     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1544     struct rx_service *service = NULL;
1545     SPLVAR;
1546
1547     NETPRI;
1548     AFS_RXGLOCK();
1549     MUTEX_ENTER(&freeSQEList_lock);
1550
1551     if ((sq = rx_FreeSQEList)) {
1552         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1553         MUTEX_EXIT(&freeSQEList_lock);
1554     } else {    /* otherwise allocate a new one and return that */
1555         MUTEX_EXIT(&freeSQEList_lock);
1556         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1557         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1558         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1559     }
1560     MUTEX_ENTER(&sq->lock);
1561
1562     if (cur_service != NULL) {
1563         cur_service->nRequestsRunning--;
1564         if (cur_service->nRequestsRunning < cur_service->minProcs)
1565             rxi_minDeficit++;
1566         rxi_availProcs++;
1567     }
1568     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1569         register struct rx_call *tcall, *ncall;
1570         /* Scan for eligible incoming calls.  A call is not eligible
1571          * if the maximum number of calls for its service type are
1572          * already executing */
1573         /* One thread will process calls FCFS (to prevent starvation),
1574          * while the other threads may run ahead looking for calls which
1575          * have all their input data available immediately.  This helps 
1576          * keep threads from blocking, waiting for data from the client. */
1577         choice2 = (struct rx_call *) 0;
1578         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1579           service = tcall->conn->service;
1580           if (QuotaOK(service)) {
1581              if (tno==rxi_fcfs_thread_num || !tcall->queue_item_header.next  ) {
1582                  /* If we're the fcfs thread, then  we'll just use 
1583                   * this call. If we haven't been able to find an optimal 
1584                   * choice, and we're at the end of the list, then use a 
1585                   * 2d choice if one has been identified.  Otherwise... */
1586                  call = (choice2 ? choice2 : tcall);
1587                  service = call->conn->service;
1588              } else if (!queue_IsEmpty(&tcall->rq)) {
1589                  struct rx_packet *rp;
1590                  rp = queue_First(&tcall->rq, rx_packet);
1591                  if (rp->header.seq == 1
1592                      && (!meltdown_1pkt ||
1593                          (rp->header.flags & RX_LAST_PACKET))) {
1594                      call = tcall;
1595                  } else if (rxi_2dchoice && !choice2 &&
1596                             !(tcall->flags & RX_CALL_CLEARED) &&
1597                             (tcall->rprev > rxi_HardAckRate)) {
1598                      choice2 = tcall;
1599                  } else rxi_md2cnt++;
1600              }
1601           }
1602           if (call) 
1603              break;
1604         }
1605       }
1606
1607     if (call) {
1608         queue_Remove(call);
1609         /* we can't schedule a call if there's no data!!! */
1610         /* send an ack if there's no data, if we're missing the
1611          * first packet, or we're missing something between first 
1612          * and last -- there's a "hole" in the incoming data. */
1613         if (queue_IsEmpty(&call->rq) ||
1614             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1615             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1616           rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1617
1618         call->flags &= (~RX_CALL_WAIT_PROC);
1619         service->nRequestsRunning++;
1620         /* just started call in minProcs pool, need fewer to maintain
1621          * guarantee */
1622         if (service->nRequestsRunning <= service->minProcs)
1623             rxi_minDeficit--;
1624         rxi_availProcs--;
1625         rx_nWaiting--;
1626         /* MUTEX_EXIT(&call->lock); */
1627     }
1628     else {
1629         /* If there are no eligible incoming calls, add this process
1630          * to the idle server queue, to wait for one */
1631         sq->newcall = 0;
1632         if (socketp) {
1633             *socketp = OSI_NULLSOCKET;
1634         }
1635         sq->socketp = socketp;
1636         queue_Append(&rx_idleServerQueue, sq);
1637         do {
1638             osi_rxSleep(sq);
1639 #ifdef  KERNEL
1640                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1641                     AFS_RXGUNLOCK();
1642                     USERPRI;
1643                     return (struct rx_call *)0;
1644                 }
1645 #endif
1646         } while (!(call = sq->newcall) &&
1647                  !(socketp && *socketp != OSI_NULLSOCKET));
1648     }
1649     MUTEX_EXIT(&sq->lock);
1650
1651     MUTEX_ENTER(&freeSQEList_lock);
1652     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1653     rx_FreeSQEList = sq;
1654     MUTEX_EXIT(&freeSQEList_lock);
1655
1656     if (call) {
1657         clock_GetTime(&call->startTime);
1658         call->state = RX_STATE_ACTIVE;
1659         call->mode = RX_MODE_RECEIVING;
1660
1661         rxi_calltrace(RX_CALL_START, call);
1662         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1663          call->conn->service->servicePort, 
1664          call->conn->service->serviceId, call));
1665     } else {
1666         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1667     }
1668
1669     AFS_RXGUNLOCK();
1670     USERPRI;
1671
1672     return call;
1673 }
1674 #endif /* RX_ENABLE_LOCKS */
1675
1676
1677
1678 /* Establish a procedure to be called when a packet arrives for a
1679  * call.  This routine will be called at most once after each call,
1680  * and will also be called if there is an error condition on the or
1681  * the call is complete.  Used by multi rx to build a selection
1682  * function which determines which of several calls is likely to be a
1683  * good one to read from.  
1684  * NOTE: the way this is currently implemented it is probably only a
1685  * good idea to (1) use it immediately after a newcall (clients only)
1686  * and (2) only use it once.  Other uses currently void your warranty
1687  */
1688 void rx_SetArrivalProc(register struct rx_call *call, 
1689         register VOID (*proc)(register struct rx_call *call,
1690         register struct multi_handle *mh, register int index),
1691         register VOID *handle, register VOID *arg)
1692 {
1693     call->arrivalProc = proc;
1694     call->arrivalProcHandle = handle;
1695     call->arrivalProcArg = arg;
1696 }
1697
1698 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1699  * appropriate, and return the final error code from the conversation
1700  * to the caller */
1701
1702 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1703 {
1704     register struct rx_connection *conn = call->conn;
1705     register struct rx_service *service;
1706     register struct rx_packet *tp; /* Temporary packet pointer */
1707     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1708     afs_int32 error;
1709     SPLVAR;
1710
1711     dpf(("rx_EndCall(call %x)\n", call));
1712
1713     NETPRI;
1714     AFS_RXGLOCK();
1715     MUTEX_ENTER(&call->lock);
1716
1717     if (rc == 0 && call->error == 0) {
1718         call->abortCode = 0;
1719         call->abortCount = 0;
1720     }
1721
1722     call->arrivalProc = (VOID (*)()) 0;
1723     if (rc && call->error == 0) {
1724         rxi_CallError(call, rc);
1725         /* Send an abort message to the peer if this error code has
1726          * only just been set.  If it was set previously, assume the
1727          * peer has already been sent the error code or will request it 
1728          */
1729         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1730     }
1731     if (conn->type == RX_SERVER_CONNECTION) {
1732         /* Make sure reply or at least dummy reply is sent */
1733         if (call->mode == RX_MODE_RECEIVING) {
1734             rxi_WriteProc(call, 0, 0);
1735         }
1736         if (call->mode == RX_MODE_SENDING) {
1737             rxi_FlushWrite(call);
1738         }
1739         service = conn->service;
1740         rxi_calltrace(RX_CALL_END, call);
1741         /* Call goes to hold state until reply packets are acknowledged */
1742         if (call->tfirst + call->nSoftAcked < call->tnext) {
1743             call->state = RX_STATE_HOLD;
1744         } else {
1745             call->state = RX_STATE_DALLY;
1746             rxi_ClearTransmitQueue(call, 0);
1747             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1748             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1749         }
1750     }
1751     else { /* Client connection */
1752         char dummy;
1753         /* Make sure server receives input packets, in the case where
1754          * no reply arguments are expected */
1755         if ((call->mode == RX_MODE_SENDING)
1756          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1757             (void) rxi_ReadProc(call, &dummy, 1);
1758         }
1759
1760         /* If we had an outstanding delayed ack, be nice to the server
1761          * and force-send it now.
1762          */
1763         if (call->delayedAckEvent) {
1764             rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1765             call->delayedAckEvent = NULL;
1766             rxi_SendDelayedAck(NULL, call, NULL);
1767         }
1768
1769         /* We need to release the call lock since it's lower than the
1770          * conn_call_lock and we don't want to hold the conn_call_lock
1771          * over the rx_ReadProc call. The conn_call_lock needs to be held
1772          * here for the case where rx_NewCall is perusing the calls on
1773          * the connection structure. We don't want to signal until
1774          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1775          * have checked this call, found it active and by the time it
1776          * goes to sleep, will have missed the signal.
1777          */
1778         MUTEX_EXIT(&call->lock);
1779         MUTEX_ENTER(&conn->conn_call_lock);
1780         MUTEX_ENTER(&call->lock);
1781         MUTEX_ENTER(&conn->conn_data_lock);
1782         conn->flags |= RX_CONN_BUSY;
1783         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1784             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1785             MUTEX_EXIT(&conn->conn_data_lock);
1786 #ifdef  RX_ENABLE_LOCKS
1787             CV_BROADCAST(&conn->conn_call_cv);
1788 #else
1789             osi_rxWakeup(conn);
1790 #endif
1791         }
1792 #ifdef RX_ENABLE_LOCKS
1793         else {
1794             MUTEX_EXIT(&conn->conn_data_lock);
1795         }
1796 #endif /* RX_ENABLE_LOCKS */
1797         call->state = RX_STATE_DALLY;
1798     }
1799     error = call->error;
1800
1801     /* currentPacket, nLeft, and NFree must be zeroed here, because
1802      * ResetCall cannot: ResetCall may be called at splnet(), in the
1803      * kernel version, and may interrupt the macros rx_Read or
1804      * rx_Write, which run at normal priority for efficiency. */
1805     if (call->currentPacket) {
1806         rxi_FreePacket(call->currentPacket);
1807         call->currentPacket = (struct rx_packet *) 0;
1808         call->nLeft = call->nFree = call->curlen = 0;
1809     }
1810     else
1811         call->nLeft = call->nFree = call->curlen = 0;
1812
1813     /* Free any packets from the last call to ReadvProc/WritevProc */
1814     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1815         queue_Remove(tp);
1816         rxi_FreePacket(tp);
1817     }
1818
1819     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1820     MUTEX_EXIT(&call->lock);
1821     if (conn->type == RX_CLIENT_CONNECTION) {
1822         MUTEX_EXIT(&conn->conn_call_lock);
1823         conn->flags &= ~RX_CONN_BUSY;
1824     }
1825     AFS_RXGUNLOCK();
1826     USERPRI;
1827     /*
1828      * Map errors to the local host's errno.h format.
1829      */
1830     error = ntoh_syserr_conv(error);
1831     return error;
1832 }
1833
1834 #if !defined(KERNEL)
1835
1836 /* Call this routine when shutting down a server or client (especially
1837  * clients).  This will allow Rx to gracefully garbage collect server
1838  * connections, and reduce the number of retries that a server might
1839  * make to a dead client.
1840  * This is not quite right, since some calls may still be ongoing and
1841  * we can't lock them to destroy them. */
1842 void rx_Finalize(void)
1843 {
1844     register struct rx_connection **conn_ptr, **conn_end;
1845
1846     INIT_PTHREAD_LOCKS
1847     LOCK_RX_INIT
1848     if (rxinit_status == 1) {
1849         UNLOCK_RX_INIT
1850         return; /* Already shutdown. */
1851     }
1852     rxi_DeleteCachedConnections();
1853     if (rx_connHashTable) {
1854         MUTEX_ENTER(&rx_connHashTable_lock);
1855         for (conn_ptr = &rx_connHashTable[0], 
1856              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1857              conn_ptr < conn_end; conn_ptr++) {
1858             struct rx_connection *conn, *next;
1859             for (conn = *conn_ptr; conn; conn = next) {
1860                 next = conn->next;
1861                 if (conn->type == RX_CLIENT_CONNECTION) {
1862                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1863                     conn->refCount++;
1864                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1865 #ifdef RX_ENABLE_LOCKS
1866                     rxi_DestroyConnectionNoLock(conn);
1867 #else /* RX_ENABLE_LOCKS */
1868                     rxi_DestroyConnection(conn);
1869 #endif /* RX_ENABLE_LOCKS */
1870                 }
1871             }
1872         }
1873 #ifdef RX_ENABLE_LOCKS
1874         while (rx_connCleanup_list) {
1875             struct rx_connection *conn;
1876             conn = rx_connCleanup_list;
1877             rx_connCleanup_list = rx_connCleanup_list->next;
1878             MUTEX_EXIT(&rx_connHashTable_lock);
1879             rxi_CleanupConnection(conn);
1880             MUTEX_ENTER(&rx_connHashTable_lock);
1881         }
1882         MUTEX_EXIT(&rx_connHashTable_lock);
1883 #endif /* RX_ENABLE_LOCKS */
1884     }
1885     rxi_flushtrace();
1886
1887     rxinit_status = 1;
1888     UNLOCK_RX_INIT
1889 }
1890 #endif
1891
1892 /* if we wakeup packet waiter too often, can get in loop with two
1893     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1894 void rxi_PacketsUnWait(void)
1895 {
1896     if (!rx_waitingForPackets) {
1897         return;
1898     }
1899 #ifdef KERNEL
1900     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1901         return;                                     /* still over quota */
1902     }
1903 #endif /* KERNEL */
1904     rx_waitingForPackets = 0;
1905 #ifdef  RX_ENABLE_LOCKS
1906     CV_BROADCAST(&rx_waitingForPackets_cv);
1907 #else
1908     osi_rxWakeup(&rx_waitingForPackets);
1909 #endif
1910     return;
1911 }
1912
1913
1914 /* ------------------Internal interfaces------------------------- */
1915
1916 /* Return this process's service structure for the
1917  * specified socket and service */
1918 struct rx_service *rxi_FindService(register osi_socket socket, 
1919         register u_short serviceId)
1920 {
1921     register struct rx_service **sp;    
1922     for (sp = &rx_services[0]; *sp; sp++) {
1923         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1924           return *sp;
1925     }
1926     return 0;
1927 }
1928
1929 /* Allocate a call structure, for the indicated channel of the
1930  * supplied connection.  The mode and state of the call must be set by
1931  * the caller. Returns the call with mutex locked. */
1932 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1933         register int channel)
1934 {
1935     register struct rx_call *call;
1936 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1937     register struct rx_call *cp;        /* Call pointer temp */
1938     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1939 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1940
1941     /* Grab an existing call structure, or allocate a new one.
1942      * Existing call structures are assumed to have been left reset by
1943      * rxi_FreeCall */
1944     MUTEX_ENTER(&rx_freeCallQueue_lock);
1945
1946 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1947     /*
1948      * EXCEPT that the TQ might not yet be cleared out.
1949      * Skip over those with in-use TQs.
1950      */
1951     call = NULL;
1952     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1953         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1954             call = cp;
1955             break;
1956         }
1957     }
1958     if (call) {
1959 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1960     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1961         call = queue_First(&rx_freeCallQueue, rx_call);
1962 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1963         queue_Remove(call);
1964         MUTEX_ENTER(&rx_stats_mutex);
1965         rx_stats.nFreeCallStructs--;
1966         MUTEX_EXIT(&rx_stats_mutex);
1967         MUTEX_EXIT(&rx_freeCallQueue_lock);
1968         MUTEX_ENTER(&call->lock);
1969         CLEAR_CALL_QUEUE_LOCK(call);
1970 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1971         /* Now, if TQ wasn't cleared earlier, do it now. */
1972         if (call->flags & RX_CALL_TQ_CLEARME) {
1973             rxi_ClearTransmitQueue(call, 0);
1974             queue_Init(&call->tq);
1975         }
1976 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1977         /* Bind the call to its connection structure */
1978         call->conn = conn;
1979         rxi_ResetCall(call, 1);
1980     }
1981     else {
1982         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1983
1984         MUTEX_EXIT(&rx_freeCallQueue_lock);
1985         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1986         MUTEX_ENTER(&call->lock);
1987         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1988         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1989         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1990
1991         MUTEX_ENTER(&rx_stats_mutex);
1992         rx_stats.nCallStructs++;
1993         MUTEX_EXIT(&rx_stats_mutex);
1994         /* Initialize once-only items */
1995         queue_Init(&call->tq);
1996         queue_Init(&call->rq);
1997         queue_Init(&call->iovq);
1998         /* Bind the call to its connection structure (prereq for reset) */
1999         call->conn = conn;
2000         rxi_ResetCall(call, 1);
2001     }
2002     call->channel = channel;
2003     call->callNumber = &conn->callNumber[channel];
2004     /* Note that the next expected call number is retained (in
2005      * conn->callNumber[i]), even if we reallocate the call structure
2006      */
2007     conn->call[channel] = call;
2008     /* if the channel's never been used (== 0), we should start at 1, otherwise
2009         the call number is valid from the last time this channel was used */
2010     if (*call->callNumber == 0) *call->callNumber = 1;
2011
2012     return call;
2013 }
2014
2015 /* A call has been inactive long enough that so we can throw away
2016  * state, including the call structure, which is placed on the call
2017  * free list.
2018  * Call is locked upon entry.
2019  * haveCTLock set if called from rxi_ReapConnections
2020  */
2021 #ifdef RX_ENABLE_LOCKS
2022 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2023 #else /* RX_ENABLE_LOCKS */
2024 void rxi_FreeCall(register struct rx_call *call)
2025 #endif /* RX_ENABLE_LOCKS */
2026 {
2027     register int channel = call->channel;
2028     register struct rx_connection *conn = call->conn;
2029
2030
2031     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2032       (*call->callNumber)++;
2033     rxi_ResetCall(call, 0);
2034     call->conn->call[channel] = (struct rx_call *) 0;
2035
2036     MUTEX_ENTER(&rx_freeCallQueue_lock);
2037     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2038 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2039     /* A call may be free even though its transmit queue is still in use.
2040      * Since we search the call list from head to tail, put busy calls at
2041      * the head of the list, and idle calls at the tail.
2042      */
2043     if (call->flags & RX_CALL_TQ_BUSY)
2044         queue_Prepend(&rx_freeCallQueue, call);
2045     else
2046         queue_Append(&rx_freeCallQueue, call);
2047 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2048     queue_Append(&rx_freeCallQueue, call);
2049 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2050     MUTEX_ENTER(&rx_stats_mutex);
2051     rx_stats.nFreeCallStructs++;
2052     MUTEX_EXIT(&rx_stats_mutex);
2053
2054     MUTEX_EXIT(&rx_freeCallQueue_lock);
2055  
2056     /* Destroy the connection if it was previously slated for
2057      * destruction, i.e. the Rx client code previously called
2058      * rx_DestroyConnection (client connections), or
2059      * rxi_ReapConnections called the same routine (server
2060      * connections).  Only do this, however, if there are no
2061      * outstanding calls. Note that for fine grain locking, there appears
2062      * to be a deadlock in that rxi_FreeCall has a call locked and
2063      * DestroyConnectionNoLock locks each call in the conn. But note a
2064      * few lines up where we have removed this call from the conn.
2065      * If someone else destroys a connection, they either have no
2066      * call lock held or are going through this section of code.
2067      */
2068     if (conn->flags & RX_CONN_DESTROY_ME) {
2069         MUTEX_ENTER(&conn->conn_data_lock);
2070         conn->refCount++;
2071         MUTEX_EXIT(&conn->conn_data_lock);
2072 #ifdef RX_ENABLE_LOCKS
2073         if (haveCTLock)
2074             rxi_DestroyConnectionNoLock(conn);
2075         else
2076             rxi_DestroyConnection(conn);
2077 #else /* RX_ENABLE_LOCKS */
2078         rxi_DestroyConnection(conn);
2079 #endif /* RX_ENABLE_LOCKS */
2080     }
2081 }
2082
2083 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2084 char *rxi_Alloc(register size_t size)
2085 {
2086     register char *p;
2087
2088 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2089     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2090      * implementation.
2091      */
2092     int glockOwner = ISAFS_GLOCK();
2093     if (!glockOwner)
2094         AFS_GLOCK();
2095 #endif
2096     MUTEX_ENTER(&rx_stats_mutex);
2097     rxi_Alloccnt++; rxi_Allocsize += size;
2098     MUTEX_EXIT(&rx_stats_mutex);
2099 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2100     if (size > AFS_SMALLOCSIZ) {
2101         p = (char *) osi_AllocMediumSpace(size);
2102     } else
2103         p = (char *) osi_AllocSmall(size, 1);
2104 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2105     if (!glockOwner)
2106         AFS_GUNLOCK();
2107 #endif
2108 #else
2109     p = (char *) osi_Alloc(size);
2110 #endif
2111     if (!p) osi_Panic("rxi_Alloc error");
2112     memset(p, 0, size);
2113     return p;
2114 }
2115
2116 void rxi_Free(void *addr, register size_t size)
2117 {
2118 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2119     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2120      * implementation.
2121      */
2122     int glockOwner = ISAFS_GLOCK();
2123     if (!glockOwner)
2124         AFS_GLOCK();
2125 #endif
2126     MUTEX_ENTER(&rx_stats_mutex);
2127     rxi_Alloccnt--; rxi_Allocsize -= size;
2128     MUTEX_EXIT(&rx_stats_mutex);
2129 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2130     if (size > AFS_SMALLOCSIZ)
2131         osi_FreeMediumSpace(addr);
2132     else
2133         osi_FreeSmall(addr);
2134 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2135     if (!glockOwner)
2136         AFS_GUNLOCK();
2137 #endif
2138 #else
2139     osi_Free(addr, size);
2140 #endif    
2141 }
2142
2143 /* Find the peer process represented by the supplied (host,port)
2144  * combination.  If there is no appropriate active peer structure, a
2145  * new one will be allocated and initialized 
2146  * The origPeer, if set, is a pointer to a peer structure on which the
2147  * refcount will be be decremented. This is used to replace the peer
2148  * structure hanging off a connection structure */
2149 struct rx_peer *rxi_FindPeer(register afs_uint32 host, 
2150         register u_short port, struct rx_peer *origPeer, int create)
2151 {
2152     register struct rx_peer *pp;
2153     int hashIndex;
2154     hashIndex = PEER_HASH(host, port);
2155     MUTEX_ENTER(&rx_peerHashTable_lock);
2156     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2157         if ((pp->host == host) && (pp->port == port)) break;
2158     }
2159     if (!pp) {
2160         if (create) {
2161             pp = rxi_AllocPeer(); /* This bzero's *pp */
2162             pp->host = host;      /* set here or in InitPeerParams is zero */
2163             pp->port = port;
2164             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2165             queue_Init(&pp->congestionQueue);
2166             queue_Init(&pp->rpcStats);
2167             pp->next = rx_peerHashTable[hashIndex];
2168             rx_peerHashTable[hashIndex] = pp;
2169             rxi_InitPeerParams(pp);
2170             MUTEX_ENTER(&rx_stats_mutex);
2171             rx_stats.nPeerStructs++;
2172             MUTEX_EXIT(&rx_stats_mutex);
2173         }
2174     }
2175     if (pp && create) {
2176         pp->refCount++;
2177     }
2178     if ( origPeer)
2179         origPeer->refCount--;
2180     MUTEX_EXIT(&rx_peerHashTable_lock);
2181     return pp;
2182 }
2183
2184
2185 /* Find the connection at (host, port) started at epoch, and with the
2186  * given connection id.  Creates the server connection if necessary.
2187  * The type specifies whether a client connection or a server
2188  * connection is desired.  In both cases, (host, port) specify the
2189  * peer's (host, pair) pair.  Client connections are not made
2190  * automatically by this routine.  The parameter socket gives the
2191  * socket descriptor on which the packet was received.  This is used,
2192  * in the case of server connections, to check that *new* connections
2193  * come via a valid (port, serviceId).  Finally, the securityIndex
2194  * parameter must match the existing index for the connection.  If a
2195  * server connection is created, it will be created using the supplied
2196  * index, if the index is valid for this service */
2197 struct rx_connection *rxi_FindConnection(osi_socket socket, 
2198         register afs_int32 host, register u_short port, u_short serviceId, 
2199         afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2200 {
2201     int hashindex, flag;
2202     register struct rx_connection *conn;
2203     hashindex = CONN_HASH(host, port, cid, epoch, type);
2204     MUTEX_ENTER(&rx_connHashTable_lock);
2205     rxLastConn ? (conn = rxLastConn, flag = 0) :
2206                  (conn = rx_connHashTable[hashindex], flag = 1);
2207     for (; conn; ) {
2208       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2209           && (epoch == conn->epoch)) {
2210         register struct rx_peer *pp = conn->peer;
2211         if (securityIndex != conn->securityIndex) {
2212             /* this isn't supposed to happen, but someone could forge a packet
2213                like this, and there seems to be some CM bug that makes this
2214                happen from time to time -- in which case, the fileserver
2215                asserts. */  
2216             MUTEX_EXIT(&rx_connHashTable_lock);
2217             return (struct rx_connection *) 0;
2218         }
2219         if (pp->host == host && pp->port == port)
2220             break;
2221         if (type == RX_CLIENT_CONNECTION && pp->port == port)
2222             break;
2223         if (type == RX_CLIENT_CONNECTION && (conn->epoch & 0x80000000))
2224             break;
2225       }
2226       if ( !flag )
2227       {
2228         /* the connection rxLastConn that was used the last time is not the
2229         ** one we are looking for now. Hence, start searching in the hash */
2230         flag = 1;
2231         conn = rx_connHashTable[hashindex];
2232       }
2233       else
2234         conn = conn->next;
2235     }
2236     if (!conn) {
2237         struct rx_service *service;
2238         if (type == RX_CLIENT_CONNECTION) {
2239             MUTEX_EXIT(&rx_connHashTable_lock);
2240             return (struct rx_connection *) 0;
2241         }
2242         service = rxi_FindService(socket, serviceId);
2243         if (!service || (securityIndex >= service->nSecurityObjects) 
2244             || (service->securityObjects[securityIndex] == 0)) {
2245             MUTEX_EXIT(&rx_connHashTable_lock);
2246             return (struct rx_connection *) 0;
2247         }
2248         conn = rxi_AllocConnection(); /* This bzero's the connection */
2249         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2250                    MUTEX_DEFAULT,0);
2251         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2252                    MUTEX_DEFAULT,0);
2253         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2254         conn->next = rx_connHashTable[hashindex];
2255         rx_connHashTable[hashindex] = conn;
2256         conn->peer = rxi_FindPeer(host, port, 0, 1);
2257         conn->type = RX_SERVER_CONNECTION;
2258         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2259         conn->epoch = epoch;
2260         conn->cid = cid & RX_CIDMASK;
2261         /* conn->serial = conn->lastSerial = 0; */
2262         /* conn->timeout = 0; */
2263         conn->ackRate = RX_FAST_ACK_RATE;
2264         conn->service = service;
2265         conn->serviceId = serviceId;
2266         conn->securityIndex = securityIndex;
2267         conn->securityObject = service->securityObjects[securityIndex];
2268         conn->nSpecific = 0;
2269         conn->specific = NULL;
2270         rx_SetConnDeadTime(conn, service->connDeadTime);
2271         rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
2272         /* Notify security object of the new connection */
2273         RXS_NewConnection(conn->securityObject, conn);
2274         /* XXXX Connection timeout? */
2275         if (service->newConnProc) (*service->newConnProc)(conn);
2276         MUTEX_ENTER(&rx_stats_mutex);
2277         rx_stats.nServerConns++;
2278         MUTEX_EXIT(&rx_stats_mutex);
2279     }
2280
2281     MUTEX_ENTER(&conn->conn_data_lock);
2282     conn->refCount++;
2283     MUTEX_EXIT(&conn->conn_data_lock);
2284
2285     rxLastConn = conn;  /* store this connection as the last conn used */
2286     MUTEX_EXIT(&rx_connHashTable_lock);
2287     return conn;
2288 }
2289
2290 /* There are two packet tracing routines available for testing and monitoring
2291  * Rx.  One is called just after every packet is received and the other is
2292  * called just before every packet is sent.  Received packets, have had their
2293  * headers decoded, and packets to be sent have not yet had their headers
2294  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2295  * containing the network address.  Both can be modified.  The return value, if
2296  * non-zero, indicates that the packet should be dropped.  */
2297
2298 int (*rx_justReceived)() = 0;
2299 int (*rx_almostSent)() = 0;
2300
2301 /* A packet has been received off the interface.  Np is the packet, socket is
2302  * the socket number it was received from (useful in determining which service
2303  * this packet corresponds to), and (host, port) reflect the host,port of the
2304  * sender.  This call returns the packet to the caller if it is finished with
2305  * it, rather than de-allocating it, just as a small performance hack */
2306
2307 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np, 
2308         osi_socket socket, afs_uint32 host, u_short port, 
2309         int *tnop, struct rx_call **newcallp)
2310 {
2311     register struct rx_call *call;
2312     register struct rx_connection *conn;
2313     int channel;
2314     afs_uint32 currentCallNumber;
2315     int type;
2316     int skew;
2317 #ifdef RXDEBUG
2318     char *packetType;
2319 #endif
2320     struct rx_packet *tnp;
2321
2322 #ifdef RXDEBUG
2323 /* We don't print out the packet until now because (1) the time may not be
2324  * accurate enough until now in the lwp implementation (rx_Listener only gets
2325  * the time after the packet is read) and (2) from a protocol point of view,
2326  * this is the first time the packet has been seen */
2327     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2328         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2329     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2330          np->header.serial, packetType, host, port, np->header.serviceId,
2331          np->header.epoch, np->header.cid, np->header.callNumber, 
2332          np->header.seq, np->header.flags, np));
2333 #endif
2334
2335     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2336       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2337     }
2338
2339     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2340         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2341     }
2342 #ifdef RXDEBUG
2343     /* If an input tracer function is defined, call it with the packet and
2344      * network address.  Note this function may modify its arguments. */
2345     if (rx_justReceived) {
2346         struct sockaddr_in addr;
2347         int drop;
2348         addr.sin_family = AF_INET;
2349         addr.sin_port = port;
2350         addr.sin_addr.s_addr = host;
2351 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN 
2352         addr.sin_len = sizeof(addr);
2353 #endif  /* AFS_OSF_ENV */
2354         drop = (*rx_justReceived) (np, &addr);
2355         /* drop packet if return value is non-zero */
2356         if (drop) return np;
2357         port = addr.sin_port;           /* in case fcn changed addr */
2358         host = addr.sin_addr.s_addr;
2359     }
2360 #endif
2361
2362     /* If packet was not sent by the client, then *we* must be the client */
2363     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2364         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2365
2366     /* Find the connection (or fabricate one, if we're the server & if
2367      * necessary) associated with this packet */
2368     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2369                               np->header.cid, np->header.epoch, type, 
2370                               np->header.securityIndex);
2371
2372     if (!conn) {
2373       /* If no connection found or fabricated, just ignore the packet.
2374        * (An argument could be made for sending an abort packet for
2375        * the conn) */
2376       return np;
2377     }   
2378
2379     MUTEX_ENTER(&conn->conn_data_lock);
2380     if (conn->maxSerial < np->header.serial)
2381         conn->maxSerial = np->header.serial;
2382     MUTEX_EXIT(&conn->conn_data_lock);
2383
2384     /* If the connection is in an error state, send an abort packet and ignore
2385      * the incoming packet */
2386     if (conn->error) {
2387         /* Don't respond to an abort packet--we don't want loops! */
2388         MUTEX_ENTER(&conn->conn_data_lock);
2389         if (np->header.type != RX_PACKET_TYPE_ABORT)
2390             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2391         conn->refCount--;
2392         MUTEX_EXIT(&conn->conn_data_lock);
2393         return np;
2394     }
2395
2396     /* Check for connection-only requests (i.e. not call specific). */
2397     if (np->header.callNumber == 0) {
2398         switch (np->header.type) {
2399             case RX_PACKET_TYPE_ABORT:
2400                 /* What if the supplied error is zero? */
2401                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2402                 MUTEX_ENTER(&conn->conn_data_lock);
2403                 conn->refCount--;
2404                 MUTEX_EXIT(&conn->conn_data_lock);
2405                 return np;
2406             case RX_PACKET_TYPE_CHALLENGE:
2407                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2408                 MUTEX_ENTER(&conn->conn_data_lock);
2409                 conn->refCount--;
2410                 MUTEX_EXIT(&conn->conn_data_lock);
2411                 return tnp;
2412             case RX_PACKET_TYPE_RESPONSE:
2413                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2414                 MUTEX_ENTER(&conn->conn_data_lock);
2415                 conn->refCount--;
2416                 MUTEX_EXIT(&conn->conn_data_lock);
2417                 return tnp;
2418             case RX_PACKET_TYPE_PARAMS:
2419             case RX_PACKET_TYPE_PARAMS+1:
2420             case RX_PACKET_TYPE_PARAMS+2:
2421                 /* ignore these packet types for now */
2422                 MUTEX_ENTER(&conn->conn_data_lock);
2423                 conn->refCount--;
2424                 MUTEX_EXIT(&conn->conn_data_lock);
2425                 return np;
2426
2427
2428             default:
2429                 /* Should not reach here, unless the peer is broken: send an
2430                  * abort packet */
2431                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2432                 MUTEX_ENTER(&conn->conn_data_lock);
2433                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2434                 conn->refCount--;
2435                 MUTEX_EXIT(&conn->conn_data_lock);
2436                 return tnp;
2437         }
2438     }
2439
2440     channel = np->header.cid & RX_CHANNELMASK;
2441     call = conn->call[channel];
2442 #ifdef  RX_ENABLE_LOCKS
2443     if (call)
2444         MUTEX_ENTER(&call->lock);
2445     /* Test to see if call struct is still attached to conn. */
2446     if (call != conn->call[channel]) {
2447         if (call)
2448             MUTEX_EXIT(&call->lock);
2449         if (type == RX_SERVER_CONNECTION) {
2450             call = conn->call[channel];
2451             /* If we started with no call attached and there is one now,
2452              * another thread is also running this routine and has gotten
2453              * the connection channel. We should drop this packet in the tests
2454              * below. If there was a call on this connection and it's now
2455              * gone, then we'll be making a new call below.
2456              * If there was previously a call and it's now different then
2457              * the old call was freed and another thread running this routine
2458              * has created a call on this channel. One of these two threads
2459              * has a packet for the old call and the code below handles those
2460              * cases.
2461              */
2462             if (call)
2463                 MUTEX_ENTER(&call->lock);
2464         }
2465         else {
2466             /* This packet can't be for this call. If the new call address is
2467              * 0 then no call is running on this channel. If there is a call
2468              * then, since this is a client connection we're getting data for
2469              * it must be for the previous call.
2470              */
2471             MUTEX_ENTER(&rx_stats_mutex);
2472             rx_stats.spuriousPacketsRead++;
2473             MUTEX_EXIT(&rx_stats_mutex);
2474             MUTEX_ENTER(&conn->conn_data_lock);
2475             conn->refCount--;
2476             MUTEX_EXIT(&conn->conn_data_lock);
2477             return np;
2478         }
2479     }
2480 #endif
2481     currentCallNumber = conn->callNumber[channel];
2482
2483     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2484         if (np->header.callNumber < currentCallNumber) {
2485             MUTEX_ENTER(&rx_stats_mutex);
2486             rx_stats.spuriousPacketsRead++;
2487             MUTEX_EXIT(&rx_stats_mutex);
2488 #ifdef  RX_ENABLE_LOCKS
2489             if (call)
2490                 MUTEX_EXIT(&call->lock);
2491 #endif
2492             MUTEX_ENTER(&conn->conn_data_lock);
2493             conn->refCount--;
2494             MUTEX_EXIT(&conn->conn_data_lock);
2495             return np;
2496         }
2497         if (!call) {
2498             MUTEX_ENTER(&conn->conn_call_lock);
2499             call = rxi_NewCall(conn, channel);
2500             MUTEX_EXIT(&conn->conn_call_lock);
2501             *call->callNumber = np->header.callNumber;
2502             call->state = RX_STATE_PRECALL;
2503             clock_GetTime(&call->queueTime);
2504             hzero(call->bytesSent);
2505             hzero(call->bytesRcvd);
2506             rxi_KeepAliveOn(call);
2507         }
2508         else if (np->header.callNumber != currentCallNumber) {
2509             /* Wait until the transmit queue is idle before deciding
2510              * whether to reset the current call. Chances are that the
2511              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2512              * flag is cleared.
2513              */
2514 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2515             while ((call->state == RX_STATE_ACTIVE) &&
2516                    (call->flags & RX_CALL_TQ_BUSY)) {
2517                 call->flags |= RX_CALL_TQ_WAIT;
2518 #ifdef RX_ENABLE_LOCKS
2519                 CV_WAIT(&call->cv_tq, &call->lock);
2520 #else /* RX_ENABLE_LOCKS */
2521                 osi_rxSleep(&call->tq);
2522 #endif /* RX_ENABLE_LOCKS */
2523             }
2524 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2525             /* If the new call cannot be taken right now send a busy and set
2526              * the error condition in this call, so that it terminates as
2527              * quickly as possible */
2528             if (call->state == RX_STATE_ACTIVE) {
2529                 struct rx_packet *tp;
2530
2531                 rxi_CallError(call, RX_CALL_DEAD);
2532                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2533                 MUTEX_EXIT(&call->lock);
2534                 MUTEX_ENTER(&conn->conn_data_lock);
2535                 conn->refCount--;
2536                 MUTEX_EXIT(&conn->conn_data_lock);
2537                 return tp;
2538             }
2539             rxi_ResetCall(call, 0);
2540             *call->callNumber = np->header.callNumber;
2541             call->state = RX_STATE_PRECALL;
2542             clock_GetTime(&call->queueTime);
2543             hzero(call->bytesSent);
2544             hzero(call->bytesRcvd);
2545             /*
2546              * If the number of queued calls exceeds the overload
2547              * threshold then abort this call.
2548              */
2549             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2550                 struct rx_packet *tp;
2551
2552                 rxi_CallError(call, rx_BusyError);
2553                 tp = rxi_SendCallAbort(call, np, 1, 0);
2554                 MUTEX_EXIT(&call->lock);
2555                 MUTEX_ENTER(&conn->conn_data_lock);
2556                 conn->refCount--;
2557                 MUTEX_EXIT(&conn->conn_data_lock);
2558                 return tp;
2559             }
2560             rxi_KeepAliveOn(call);
2561         }
2562         else {
2563             /* Continuing call; do nothing here. */
2564         }
2565     } else { /* we're the client */
2566         /* Ignore all incoming acknowledgements for calls in DALLY state */
2567         if ( call && (call->state == RX_STATE_DALLY) 
2568          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2569             MUTEX_ENTER(&rx_stats_mutex);
2570             rx_stats.ignorePacketDally++;
2571             MUTEX_EXIT(&rx_stats_mutex);
2572 #ifdef  RX_ENABLE_LOCKS
2573             if (call) {
2574                 MUTEX_EXIT(&call->lock);
2575             }
2576 #endif
2577             MUTEX_ENTER(&conn->conn_data_lock);
2578             conn->refCount--;
2579             MUTEX_EXIT(&conn->conn_data_lock);
2580             return np;
2581         }
2582         
2583         /* Ignore anything that's not relevant to the current call.  If there
2584          * isn't a current call, then no packet is relevant. */
2585         if (!call || (np->header.callNumber != currentCallNumber)) {
2586             MUTEX_ENTER(&rx_stats_mutex);
2587             rx_stats.spuriousPacketsRead++;
2588             MUTEX_EXIT(&rx_stats_mutex);
2589 #ifdef  RX_ENABLE_LOCKS
2590             if (call) {
2591                 MUTEX_EXIT(&call->lock);
2592             }
2593 #endif
2594             MUTEX_ENTER(&conn->conn_data_lock);
2595             conn->refCount--;
2596             MUTEX_EXIT(&conn->conn_data_lock);
2597             return np;  
2598         }
2599         /* If the service security object index stamped in the packet does not
2600          * match the connection's security index, ignore the packet */
2601         if (np->header.securityIndex != conn->securityIndex) {
2602 #ifdef  RX_ENABLE_LOCKS
2603             MUTEX_EXIT(&call->lock);
2604 #endif
2605             MUTEX_ENTER(&conn->conn_data_lock);
2606             conn->refCount--;       
2607             MUTEX_EXIT(&conn->conn_data_lock);
2608             return np;
2609         }
2610
2611         /* If we're receiving the response, then all transmit packets are
2612          * implicitly acknowledged.  Get rid of them. */
2613         if (np->header.type == RX_PACKET_TYPE_DATA) {
2614 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2615             /* XXX Hack. Because we must release the global rx lock when
2616              * sending packets (osi_NetSend) we drop all acks while we're
2617              * traversing the tq in rxi_Start sending packets out because
2618              * packets may move to the freePacketQueue as result of being here!
2619              * So we drop these packets until we're safely out of the
2620              * traversing. Really ugly! 
2621              * For fine grain RX locking, we set the acked field in the
2622              * packets and let rxi_Start remove them from the transmit queue.
2623              */
2624             if (call->flags & RX_CALL_TQ_BUSY) {
2625 #ifdef  RX_ENABLE_LOCKS
2626                 rxi_SetAcksInTransmitQueue(call);
2627 #else
2628                 conn->refCount--;
2629                 return np;              /* xmitting; drop packet */
2630 #endif
2631             }
2632             else {
2633                 rxi_ClearTransmitQueue(call, 0);
2634             }
2635 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2636             rxi_ClearTransmitQueue(call, 0);
2637 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2638         } else {
2639           if (np->header.type == RX_PACKET_TYPE_ACK) {
2640         /* now check to see if this is an ack packet acknowledging that the
2641          * server actually *lost* some hard-acked data.  If this happens we
2642          * ignore this packet, as it may indicate that the server restarted in
2643          * the middle of a call.  It is also possible that this is an old ack
2644          * packet.  We don't abort the connection in this case, because this
2645          * *might* just be an old ack packet.  The right way to detect a server
2646          * restart in the midst of a call is to notice that the server epoch
2647          * changed, btw.  */
2648         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2649          * XXX unacknowledged.  I think that this is off-by-one, but
2650          * XXX I don't dare change it just yet, since it will
2651          * XXX interact badly with the server-restart detection 
2652          * XXX code in receiveackpacket.  */
2653             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2654                 MUTEX_ENTER(&rx_stats_mutex);
2655                 rx_stats.spuriousPacketsRead++;
2656                 MUTEX_EXIT(&rx_stats_mutex);
2657                 MUTEX_EXIT(&call->lock);
2658                 MUTEX_ENTER(&conn->conn_data_lock);
2659                 conn->refCount--;
2660                 MUTEX_EXIT(&conn->conn_data_lock);
2661                 return np;
2662             }
2663           }
2664         } /* else not a data packet */
2665     }
2666
2667     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2668     /* Set remote user defined status from packet */
2669     call->remoteStatus = np->header.userStatus;
2670
2671     /* Note the gap between the expected next packet and the actual
2672      * packet that arrived, when the new packet has a smaller serial number
2673      * than expected.  Rioses frequently reorder packets all by themselves,
2674      * so this will be quite important with very large window sizes.
2675      * Skew is checked against 0 here to avoid any dependence on the type of
2676      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2677      * true! 
2678      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2679      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2680      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2681      */
2682     MUTEX_ENTER(&conn->conn_data_lock);
2683     skew = conn->lastSerial - np->header.serial;
2684     conn->lastSerial = np->header.serial;
2685     MUTEX_EXIT(&conn->conn_data_lock);
2686     if (skew > 0) {
2687       register struct rx_peer *peer;
2688       peer = conn->peer;
2689       if (skew > peer->inPacketSkew) {
2690         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2691         peer->inPacketSkew = skew;
2692       }
2693     }
2694
2695     /* Now do packet type-specific processing */
2696     switch (np->header.type) {
2697         case RX_PACKET_TYPE_DATA:
2698             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2699                                        tnop, newcallp);
2700             break;
2701         case RX_PACKET_TYPE_ACK:
2702             /* Respond immediately to ack packets requesting acknowledgement
2703              * (ping packets) */
2704             if (np->header.flags & RX_REQUEST_ACK) {
2705                 if (call->error)
2706                     (void) rxi_SendCallAbort(call, 0, 1, 0);
2707                 else
2708                     (void) rxi_SendAck(call, 0, np->header.serial,
2709                                        RX_ACK_PING_RESPONSE, 1);
2710             }
2711             np = rxi_ReceiveAckPacket(call, np, 1);
2712             break;
2713         case RX_PACKET_TYPE_ABORT:
2714             /* An abort packet: reset the connection, passing the error up to
2715              * the user */
2716             /* What if error is zero? */
2717             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2718             break;
2719         case RX_PACKET_TYPE_BUSY:
2720             /* XXXX */
2721             break;
2722         case RX_PACKET_TYPE_ACKALL:
2723             /* All packets acknowledged, so we can drop all packets previously
2724              * readied for sending */
2725 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2726             /* XXX Hack. We because we can't release the global rx lock when
2727              * sending packets (osi_NetSend) we drop all ack pkts while we're
2728              * traversing the tq in rxi_Start sending packets out because
2729              * packets may move to the freePacketQueue as result of being
2730              * here! So we drop these packets until we're safely out of the
2731              * traversing. Really ugly! 
2732              * For fine grain RX locking, we set the acked field in the packets
2733              * and let rxi_Start remove the packets from the transmit queue.
2734              */
2735             if (call->flags & RX_CALL_TQ_BUSY) {
2736 #ifdef  RX_ENABLE_LOCKS
2737                 rxi_SetAcksInTransmitQueue(call);
2738                 break;
2739 #else /* RX_ENABLE_LOCKS */
2740                 conn->refCount--;
2741                 return np;              /* xmitting; drop packet */
2742 #endif /* RX_ENABLE_LOCKS */
2743             }
2744 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2745             rxi_ClearTransmitQueue(call, 0);
2746             break;
2747         default:
2748             /* Should not reach here, unless the peer is broken: send an abort
2749              * packet */
2750             rxi_CallError(call, RX_PROTOCOL_ERROR);
2751             np = rxi_SendCallAbort(call, np, 1, 0);
2752             break;
2753     };
2754     /* Note when this last legitimate packet was received, for keep-alive
2755      * processing.  Note, we delay getting the time until now in the hope that
2756      * the packet will be delivered to the user before any get time is required
2757      * (if not, then the time won't actually be re-evaluated here). */
2758     call->lastReceiveTime = clock_Sec();
2759     MUTEX_EXIT(&call->lock);
2760     MUTEX_ENTER(&conn->conn_data_lock);
2761     conn->refCount--;
2762     MUTEX_EXIT(&conn->conn_data_lock);
2763     return np;
2764 }
2765
2766 /* return true if this is an "interesting" connection from the point of view
2767     of someone trying to debug the system */
2768 int rxi_IsConnInteresting(struct rx_connection *aconn)
2769 {
2770     register int i;
2771     register struct rx_call *tcall;
2772
2773     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2774         return 1;
2775     for(i=0;i<RX_MAXCALLS;i++) {
2776         tcall = aconn->call[i];
2777         if (tcall) {
2778             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2779                 return 1;
2780             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2781                 return 1;
2782         }
2783     }
2784     return 0;
2785 }
2786
2787 #ifdef KERNEL
2788 /* if this is one of the last few packets AND it wouldn't be used by the
2789    receiving call to immediately satisfy a read request, then drop it on
2790    the floor, since accepting it might prevent a lock-holding thread from
2791    making progress in its reading. If a call has been cleared while in
2792    the precall state then ignore all subsequent packets until the call
2793    is assigned to a thread. */
2794
2795 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2796 {
2797     int rc=0;
2798     MUTEX_ENTER(&rx_stats_mutex);
2799     if (((ap->header.seq != 1) &&
2800          (acall->flags & RX_CALL_CLEARED) &&
2801          (acall->state == RX_STATE_PRECALL)) ||
2802         ((rx_nFreePackets < rxi_dataQuota+2) &&
2803          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2804            && (acall->flags & RX_CALL_READER_WAIT)))) {
2805         rc = 1;
2806     }
2807     MUTEX_EXIT(&rx_stats_mutex);
2808     return rc;
2809 }
2810 #endif /* KERNEL */
2811
2812 static void rxi_CheckReachEvent(struct rxevent *event, 
2813         struct rx_connection *conn, struct rx_call *acall)
2814 {
2815     struct rx_call *call = acall;
2816     struct clock when;
2817     int i, waiting;
2818
2819     MUTEX_ENTER(&conn->conn_data_lock);
2820     conn->checkReachEvent = NULL;
2821     waiting = conn->flags & RX_CONN_ATTACHWAIT;
2822     if (event) conn->refCount--;
2823     MUTEX_EXIT(&conn->conn_data_lock);
2824
2825     if (waiting) {
2826         if (!call) {
2827             MUTEX_ENTER(&conn->conn_call_lock);
2828             MUTEX_ENTER(&conn->conn_data_lock);
2829             for (i=0; i<RX_MAXCALLS; i++) {
2830                 struct rx_call *tc = conn->call[i];
2831                 if (tc && tc->state == RX_STATE_PRECALL) {
2832                     call = tc;
2833                     break;
2834                 }
2835             }
2836             if (!call)
2837                 /* Indicate that rxi_CheckReachEvent is no longer running by
2838                  * clearing the flag.  Must be atomic under conn_data_lock to
2839                  * avoid a new call slipping by: rxi_CheckConnReach holds
2840                  * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2841                  */
2842                 conn->flags &= ~RX_CONN_ATTACHWAIT;
2843             MUTEX_EXIT(&conn->conn_data_lock);
2844             MUTEX_EXIT(&conn->conn_call_lock);
2845         }
2846
2847         if (call) {
2848             if (call != acall) MUTEX_ENTER(&call->lock);
2849             rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
2850             if (call != acall) MUTEX_EXIT(&call->lock);
2851
2852             clock_GetTime(&when);
2853             when.sec += RX_CHECKREACH_TIMEOUT;
2854             MUTEX_ENTER(&conn->conn_data_lock);
2855             if (!conn->checkReachEvent) {
2856                 conn->refCount++;
2857                 conn->checkReachEvent =
2858                     rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2859             }
2860             MUTEX_EXIT(&conn->conn_data_lock);
2861         }
2862     }
2863 }
2864
2865 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2866 {
2867     struct rx_service *service = conn->service;
2868     struct rx_peer *peer = conn->peer;
2869     afs_uint32 now, lastReach;
2870
2871     if (service->checkReach == 0)
2872         return 0;
2873
2874     now = clock_Sec();
2875     MUTEX_ENTER(&peer->peer_lock);
2876     lastReach = peer->lastReachTime;
2877     MUTEX_EXIT(&peer->peer_lock);
2878     if (now - lastReach < RX_CHECKREACH_TTL)
2879         return 0;
2880
2881     MUTEX_ENTER(&conn->conn_data_lock);
2882     if (conn->flags & RX_CONN_ATTACHWAIT) {
2883         MUTEX_EXIT(&conn->conn_data_lock);
2884         return 1;
2885     }
2886     conn->flags |= RX_CONN_ATTACHWAIT;
2887     MUTEX_EXIT(&conn->conn_data_lock);
2888     if (!conn->checkReachEvent)
2889         rxi_CheckReachEvent(NULL, conn, call);
2890
2891     return 1;
2892 }
2893
2894 /* try to attach call, if authentication is complete */
2895 static void TryAttach(register struct rx_call *acall, 
2896         register osi_socket socket, register int *tnop, 
2897         register struct rx_call **newcallp, int reachOverride)
2898 {
2899     struct rx_connection *conn = acall->conn;
2900
2901     if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2902         /* Don't attach until we have any req'd. authentication. */
2903         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2904             if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2905                 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2906                 /* Note:  this does not necessarily succeed; there
2907                  * may not any proc available
2908                  */
2909         }
2910         else {
2911             rxi_ChallengeOn(acall->conn);
2912         }
2913     }
2914 }
2915
2916 /* A data packet has been received off the interface.  This packet is
2917  * appropriate to the call (the call is in the right state, etc.).  This
2918  * routine can return a packet to the caller, for re-use */
2919
2920 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call, 
2921         register struct rx_packet *np, int istack, osi_socket socket, 
2922         afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2923 {
2924     int ackNeeded = 0;  /* 0 means no, otherwise ack_reason */
2925     int newPackets = 0;
2926     int didHardAck = 0;
2927     int haveLast = 0;
2928     afs_uint32 seq, serial, flags;
2929     int isFirst;
2930     struct rx_packet *tnp;
2931     struct clock when;
2932     MUTEX_ENTER(&rx_stats_mutex);
2933     rx_stats.dataPacketsRead++;
2934     MUTEX_EXIT(&rx_stats_mutex);
2935
2936 #ifdef KERNEL
2937     /* If there are no packet buffers, drop this new packet, unless we can find
2938      * packet buffers from inactive calls */
2939     if (!call->error &&
2940         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2941         MUTEX_ENTER(&rx_freePktQ_lock);
2942         rxi_NeedMorePackets = TRUE;
2943         MUTEX_EXIT(&rx_freePktQ_lock);
2944         MUTEX_ENTER(&rx_stats_mutex);
2945         rx_stats.noPacketBuffersOnRead++;
2946         MUTEX_EXIT(&rx_stats_mutex);
2947         call->rprev = np->header.serial;
2948         rxi_calltrace(RX_TRACE_DROP, call);
2949         dpf (("packet %x dropped on receipt - quota problems", np));
2950         if (rxi_doreclaim)
2951             rxi_ClearReceiveQueue(call);
2952         clock_GetTime(&when);
2953         clock_Add(&when, &rx_softAckDelay);
2954         if (!call->delayedAckEvent ||
2955             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2956             rxevent_Cancel(call->delayedAckEvent, call,
2957                            RX_CALL_REFCOUNT_DELAY);
2958             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2959             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2960                                                  call, 0);
2961         }
2962         /* we've damaged this call already, might as well do it in. */
2963         return np;
2964     }
2965 #endif /* KERNEL */
2966
2967     /*
2968      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2969      * packet is one of several packets transmitted as a single
2970      * datagram. Do not send any soft or hard acks until all packets
2971      * in a jumbogram have been processed. Send negative acks right away.
2972      */
2973     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2974         /* tnp is non-null when there are more packets in the
2975          * current jumbo gram */
2976         if (tnp) {
2977             if (np)
2978                 rxi_FreePacket(np);
2979             np = tnp;
2980         }
2981
2982         seq = np->header.seq;
2983         serial = np->header.serial;
2984         flags = np->header.flags;
2985
2986         /* If the call is in an error state, send an abort message */
2987         if (call->error)
2988             return rxi_SendCallAbort(call, np, istack, 0);
2989
2990         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2991          * AFS 3.5 jumbogram. */
2992         if (flags & RX_JUMBO_PACKET) {
2993             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2994         } else {
2995             tnp = NULL;
2996         }
2997
2998         if (np->header.spare != 0) {
2999             MUTEX_ENTER(&call->conn->conn_data_lock);
3000             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3001             MUTEX_EXIT(&call->conn->conn_data_lock);
3002         }
3003
3004         /* The usual case is that this is the expected next packet */
3005         if (seq == call->rnext) {
3006
3007             /* Check to make sure it is not a duplicate of one already queued */
3008             if (queue_IsNotEmpty(&call->rq) 
3009                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3010                 MUTEX_ENTER(&rx_stats_mutex);
3011                 rx_stats.dupPacketsRead++;
3012                 MUTEX_EXIT(&rx_stats_mutex);
3013                 dpf (("packet %x dropped on receipt - duplicate", np));
3014                 rxevent_Cancel(call->delayedAckEvent, call,
3015                                RX_CALL_REFCOUNT_DELAY);
3016                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3017                 ackNeeded = 0;
3018                 call->rprev = seq;
3019                 continue;
3020             }
3021
3022             /* It's the next packet. Stick it on the receive queue
3023              * for this call. Set newPackets to make sure we wake
3024              * the reader once all packets have been processed */
3025             queue_Prepend(&call->rq, np);
3026             call->nSoftAcks++;
3027             np = NULL; /* We can't use this anymore */
3028             newPackets = 1;
3029
3030             /* If an ack is requested then set a flag to make sure we
3031              * send an acknowledgement for this packet */
3032             if (flags & RX_REQUEST_ACK) {
3033                 ackNeeded = RX_ACK_REQUESTED;
3034             }
3035
3036             /* Keep track of whether we have received the last packet */
3037             if (flags & RX_LAST_PACKET) {
3038                 call->flags |= RX_CALL_HAVE_LAST;
3039                 haveLast = 1;
3040             }
3041
3042             /* Check whether we have all of the packets for this call */
3043             if (call->flags & RX_CALL_HAVE_LAST) {
3044                 afs_uint32 tseq;                /* temporary sequence number */
3045                 struct rx_packet *tp;   /* Temporary packet pointer */
3046                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
3047
3048                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3049                     if (tseq != tp->header.seq)
3050                         break;
3051                     if (tp->header.flags & RX_LAST_PACKET) {
3052                         call->flags |= RX_CALL_RECEIVE_DONE;
3053                         break;
3054                     }
3055                     tseq++;
3056                 }
3057             }
3058
3059             /* Provide asynchronous notification for those who want it
3060              * (e.g. multi rx) */
3061             if (call->arrivalProc) {
3062                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3063                                      (int) call->arrivalProcArg);
3064                 call->arrivalProc = (VOID (*)()) 0;
3065             }
3066
3067             /* Update last packet received */
3068             call->rprev = seq;
3069
3070             /* If there is no server process serving this call, grab
3071              * one, if available. We only need to do this once. If a
3072              * server thread is available, this thread becomes a server
3073              * thread and the server thread becomes a listener thread. */
3074             if (isFirst) {
3075                 TryAttach(call, socket, tnop, newcallp, 0);
3076             }
3077         }       
3078         /* This is not the expected next packet. */
3079         else {
3080             /* Determine whether this is a new or old packet, and if it's
3081              * a new one, whether it fits into the current receive window.
3082              * Also figure out whether the packet was delivered in sequence.
3083              * We use the prev variable to determine whether the new packet
3084              * is the successor of its immediate predecessor in the
3085              * receive queue, and the missing flag to determine whether
3086              * any of this packets predecessors are missing.  */
3087
3088             afs_uint32 prev;            /* "Previous packet" sequence number */
3089             struct rx_packet *tp;       /* Temporary packet pointer */
3090             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3091             int missing;                /* Are any predecessors missing? */
3092
3093             /* If the new packet's sequence number has been sent to the
3094              * application already, then this is a duplicate */
3095             if (seq < call->rnext) {
3096                 MUTEX_ENTER(&rx_stats_mutex);
3097                 rx_stats.dupPacketsRead++;
3098                 MUTEX_EXIT(&rx_stats_mutex);
3099                 rxevent_Cancel(call->delayedAckEvent, call,
3100                                RX_CALL_REFCOUNT_DELAY);
3101                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3102                 ackNeeded = 0;
3103                 call->rprev = seq;
3104                 continue;
3105             }
3106
3107             /* If the sequence number is greater than what can be
3108              * accomodated by the current window, then send a negative
3109              * acknowledge and drop the packet */
3110             if ((call->rnext + call->rwind) <= seq) {
3111                 rxevent_Cancel(call->delayedAckEvent, call,
3112                                RX_CALL_REFCOUNT_DELAY);
3113                 np = rxi_SendAck(call, np, serial,
3114                                  RX_ACK_EXCEEDS_WINDOW, istack);
3115                 ackNeeded = 0;
3116                 call->rprev = seq;
3117                 continue;
3118             }
3119
3120             /* Look for the packet in the queue of old received packets */
3121             for (prev = call->rnext - 1, missing = 0,
3122                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3123                 /*Check for duplicate packet */
3124                 if (seq == tp->header.seq) {
3125                     MUTEX_ENTER(&rx_stats_mutex);
3126                     rx_stats.dupPacketsRead++;
3127                     MUTEX_EXIT(&rx_stats_mutex);
3128                     rxevent_Cancel(call->delayedAckEvent, call,
3129                                    RX_CALL_REFCOUNT_DELAY);
3130                     np = rxi_SendAck(call, np, serial, 
3131                                      RX_ACK_DUPLICATE, istack);
3132                     ackNeeded = 0;
3133                     call->rprev = seq;
3134                     goto nextloop;
3135                 }
3136                 /* If we find a higher sequence packet, break out and
3137                  * insert the new packet here. */
3138                 if (seq < tp->header.seq) break;
3139                 /* Check for missing packet */
3140                 if (tp->header.seq != prev+1) {
3141                     missing = 1;
3142                 }
3143
3144                 prev = tp->header.seq;
3145             }
3146
3147             /* Keep track of whether we have received the last packet. */
3148             if (flags & RX_LAST_PACKET) {
3149                 call->flags |= RX_CALL_HAVE_LAST;
3150             }
3151
3152             /* It's within the window: add it to the the receive queue.
3153              * tp is left by the previous loop either pointing at the
3154              * packet before which to insert the new packet, or at the
3155              * queue head if the queue is empty or the packet should be
3156              * appended. */
3157             queue_InsertBefore(tp, np);
3158             call->nSoftAcks++;
3159             np = NULL;
3160
3161             /* Check whether we have all of the packets for this call */
3162             if ((call->flags & RX_CALL_HAVE_LAST)
3163              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3164                 afs_uint32 tseq;                /* temporary sequence number */
3165
3166                 for (tseq = call->rnext,
3167                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3168                     if (tseq != tp->header.seq)
3169                         break;
3170                     if (tp->header.flags & RX_LAST_PACKET) {
3171                         call->flags |= RX_CALL_RECEIVE_DONE;
3172                         break;
3173                     }
3174                     tseq++;
3175                 }
3176             }
3177
3178             /* We need to send an ack of the packet is out of sequence, 
3179              * or if an ack was requested by the peer. */
3180             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3181                 ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
3182             }
3183
3184             /* Acknowledge the last packet for each call */
3185             if (flags & RX_LAST_PACKET) {
3186                 haveLast = 1;
3187             }
3188
3189             call->rprev = seq;
3190         }
3191 nextloop:;
3192     }
3193
3194     if (newPackets) {
3195         /*
3196          * If the receiver is waiting for an iovec, fill the iovec
3197          * using the data from the receive queue */
3198         if (call->flags & RX_CALL_IOVEC_WAIT) {
3199             didHardAck = rxi_FillReadVec(call, serial); 
3200             /* the call may have been aborted */
3201             if (call->error) {
3202                 return NULL;
3203             }
3204             if (didHardAck) {
3205                 ackNeeded = 0;
3206             }
3207         }
3208
3209         /* Wakeup the reader if any */
3210         if ((call->flags & RX_CALL_READER_WAIT) &&
3211             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3212              (call->iovNext >= call->iovMax) ||
3213              (call->flags & RX_CALL_RECEIVE_DONE))) {
3214             call->flags &= ~RX_CALL_READER_WAIT;
3215 #ifdef  RX_ENABLE_LOCKS
3216             CV_BROADCAST(&call->cv_rq);
3217 #else
3218             osi_rxWakeup(&call->rq);
3219 #endif
3220         }
3221     }
3222
3223     /*
3224      * Send an ack when requested by the peer, or once every
3225      * rxi_SoftAckRate packets until the last packet has been
3226      * received. Always send a soft ack for the last packet in
3227      * the server's reply. */
3228     if (ackNeeded) {
3229         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3230         np = rxi_SendAck(call, np, serial, ackNeeded, istack);
3231     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3232         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3233         np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
3234     } else if (call->nSoftAcks) {
3235         clock_GetTime(&when);
3236         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3237             clock_Add(&when, &rx_lastAckDelay);
3238         } else {
3239             clock_Add(&when, &rx_softAckDelay);
3240         }
3241         if (!call->delayedAckEvent ||
3242             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3243             rxevent_Cancel(call->delayedAckEvent, call,
3244                            RX_CALL_REFCOUNT_DELAY);
3245             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3246             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3247                                                  call, 0);
3248         }
3249     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3250         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3251     }
3252
3253     return np;
3254 }
3255
3256 #ifdef  ADAPT_WINDOW
3257 static void rxi_ComputeRate();
3258 #endif
3259
3260 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3261 {
3262     struct rx_peer *peer = conn->peer;
3263
3264     MUTEX_ENTER(&peer->peer_lock);
3265     peer->lastReachTime = clock_Sec();
3266     MUTEX_EXIT(&peer->peer_lock);
3267
3268     MUTEX_ENTER(&conn->conn_data_lock);
3269     if (conn->flags & RX_CONN_ATTACHWAIT) {
3270         int i;
3271
3272         conn->flags &= ~RX_CONN_ATTACHWAIT;
3273         MUTEX_EXIT(&conn->conn_data_lock);
3274
3275         for (i=0; i<RX_MAXCALLS; i++) {
3276             struct rx_call *call = conn->call[i];
3277             if (call) {
3278                 if (call != acall) MUTEX_ENTER(&call->lock);
3279                 /* tnop can be null if newcallp is null */
3280                 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3281                 if (call != acall) MUTEX_EXIT(&call->lock);
3282             }
3283         }
3284     } else
3285         MUTEX_EXIT(&conn->conn_data_lock);
3286 }
3287
3288 /* rxi_ComputePeerNetStats
3289  *
3290  * Called exclusively by rxi_ReceiveAckPacket to compute network link
3291  * estimates (like RTT and throughput) based on ack packets.  Caller
3292  * must ensure that the packet in question is the right one (i.e.
3293  * serial number matches).
3294  */
3295 static void
3296 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3297         struct rx_ackPacket *ap, struct rx_packet *np)
3298 {
3299     struct rx_peer *peer = call->conn->peer;
3300
3301     /* Use RTT if not delayed by client. */
3302     if (ap->reason != RX_ACK_DELAY)
3303         rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3304 #ifdef ADAPT_WINDOW
3305     rxi_ComputeRate(peer, call, p, np, ap->reason);
3306 #endif
3307 }
3308
3309 /* The real smarts of the whole thing.  */
3310 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call, 
3311         struct rx_packet *np, int istack)
3312 {
3313     struct rx_ackPacket *ap;
3314     int nAcks;
3315     register struct rx_packet *tp;
3316     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3317     register struct rx_connection *conn = call->conn;
3318     struct rx_peer *peer = conn->peer;
3319     afs_uint32 first;
3320     afs_uint32 serial;
3321     /* because there are CM's that are bogus, sending weird values for this. */
3322     afs_uint32 skew = 0;
3323     int nbytes;
3324     int missing;
3325     int acked;
3326     int nNacked = 0;
3327     int newAckCount = 0;
3328     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3329     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3330
3331     MUTEX_ENTER(&rx_stats_mutex);
3332     rx_stats.ackPacketsRead++;
3333     MUTEX_EXIT(&rx_stats_mutex);
3334     ap = (struct rx_ackPacket *) rx_DataOf(np);
3335     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3336     if (nbytes < 0)
3337       return np;       /* truncated ack packet */
3338
3339     /* depends on ack packet struct */
3340     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3341     first = ntohl(ap->firstPacket);
3342     serial = ntohl(ap->serial);
3343     /* temporarily disabled -- needs to degrade over time 
3344        skew = ntohs(ap->maxSkew); */
3345
3346     /* Ignore ack packets received out of order */
3347     if (first < call->tfirst) {
3348         return np;
3349     }
3350
3351     if (np->header.flags & RX_SLOW_START_OK) {
3352         call->flags |= RX_CALL_SLOW_START_OK;
3353     }
3354
3355     if (ap->reason == RX_ACK_PING_RESPONSE)
3356         rxi_UpdatePeerReach(conn, call);
3357     
3358 #ifdef RXDEBUG
3359     if (rx_Log) {
3360       fprintf( rx_Log, 
3361               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3362               ap->reason, ntohl(ap->previousPacket), 
3363               (unsigned int) np->header.seq, (unsigned int) serial, 
3364               (unsigned int) skew, ntohl(ap->firstPacket));
3365         if (nAcks) {
3366             int offset;
3367             for (offset = 0; offset < nAcks; offset++) 
3368                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3369         }
3370         putc('\n', rx_Log);
3371     }
3372 #endif
3373
3374     /* Update the outgoing packet skew value to the latest value of
3375      * the peer's incoming packet skew value.  The ack packet, of
3376      * course, could arrive out of order, but that won't affect things
3377      * much */
3378     MUTEX_ENTER(&peer->peer_lock);
3379     peer->outPacketSkew = skew;
3380
3381     /* Check for packets that no longer need to be transmitted, and
3382      * discard them.  This only applies to packets positively
3383      * acknowledged as having been sent to the peer's upper level.
3384      * All other packets must be retained.  So only packets with
3385      * sequence numbers < ap->firstPacket are candidates. */
3386     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3387         if (tp->header.seq >= first) break;
3388         call->tfirst = tp->header.seq + 1;
3389         if (serial && (tp->header.serial == serial ||
3390                        tp->firstSerial == serial))
3391             rxi_ComputePeerNetStats(call, tp, ap, np);
3392 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3393     /* XXX Hack. Because we have to release the global rx lock when sending
3394      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3395      * in rxi_Start sending packets out because packets may move to the
3396      * freePacketQueue as result of being here! So we drop these packets until
3397      * we're safely out of the traversing. Really ugly! 
3398      * To make it even uglier, if we're using fine grain locking, we can
3399      * set the ack bits in the packets and have rxi_Start remove the packets
3400      * when it's done transmitting.
3401      */
3402         if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3403             newAckCount++;
3404         }
3405         if (call->flags & RX_CALL_TQ_BUSY) {
3406 #ifdef RX_ENABLE_LOCKS
3407             tp->flags |= RX_PKTFLAG_ACKED;
3408             call->flags |= RX_CALL_TQ_SOME_ACKED;
3409 #else /* RX_ENABLE_LOCKS */
3410             break;
3411 #endif /* RX_ENABLE_LOCKS */
3412         } else
3413 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3414         {
3415         queue_Remove(tp);
3416         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3417         }
3418     }
3419
3420 #ifdef ADAPT_WINDOW
3421     /* Give rate detector a chance to respond to ping requests */
3422     if (ap->reason == RX_ACK_PING_RESPONSE) {
3423         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3424     }
3425 #endif
3426
3427     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3428    
3429    /* Now go through explicit acks/nacks and record the results in
3430     * the waiting packets.  These are packets that can't be released
3431     * yet, even with a positive acknowledge.  This positive
3432     * acknowledge only means the packet has been received by the
3433     * peer, not that it will be retained long enough to be sent to
3434     * the peer's upper level.  In addition, reset the transmit timers
3435     * of any missing packets (those packets that must be missing
3436     * because this packet was out of sequence) */
3437
3438     call->nSoftAcked = 0;
3439     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3440         /* Update round trip time if the ack was stimulated on receipt
3441          * of this packet */
3442 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3443 #ifdef RX_ENABLE_LOCKS
3444         if (tp->header.seq >= first)
3445 #endif /* RX_ENABLE_LOCKS */
3446 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3447             if (serial && (tp->header.serial == serial ||
3448                            tp->firstSerial == serial))
3449                 rxi_ComputePeerNetStats(call, tp, ap, np);
3450
3451         /* Set the acknowledge flag per packet based on the
3452          * information in the ack packet. An acknowlegded packet can
3453          * be downgraded when the server has discarded a packet it
3454          * soacked previously, or when an ack packet is received
3455          * out of sequence. */
3456         if (tp->header.seq < first) {
3457             /* Implicit ack information */
3458             if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3459                 newAckCount++;
3460             }
3461             tp->flags |= RX_PKTFLAG_ACKED;
3462         }
3463         else if (tp->header.seq < first + nAcks) {
3464             /* Explicit ack information:  set it in the packet appropriately */
3465             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3466                 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3467                     newAckCount++;
3468                     tp->flags |= RX_PKTFLAG_ACKED;
3469                 }
3470                 if (missing) {
3471                     nNacked++;
3472                 } else {
3473                     call->nSoftAcked++;
3474                 }
3475             } else {
3476                 tp->flags &= ~RX_PKTFLAG_ACKED;
3477                 missing = 1;
3478             }
3479         }
3480         else {
3481             tp->flags &= ~RX_PKTFLAG_ACKED;
3482             missing = 1;
3483         }
3484
3485         /* If packet isn't yet acked, and it has been transmitted at least 
3486          * once, reset retransmit time using latest timeout 
3487          * ie, this should readjust the retransmit timer for all outstanding 
3488          * packets...  So we don't just retransmit when we should know better*/
3489
3490         if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3491           tp->retryTime = tp->timeSent;
3492           clock_Add(&tp->retryTime, &peer->timeout);
3493           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3494           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3495         }
3496     }
3497
3498     /* If the window has been extended by this acknowledge packet,
3499      * then wakeup a sender waiting in alloc for window space, or try
3500      * sending packets now, if he's been sitting on packets due to
3501      * lack of window space */
3502     if (call->tnext < (call->tfirst + call->twind))  {
3503 #ifdef  RX_ENABLE_LOCKS
3504         CV_SIGNAL(&call->cv_twind);
3505 #else
3506         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3507             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3508             osi_rxWakeup(&call->twind);
3509         }
3510 #endif
3511         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3512             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3513         }
3514     }
3515
3516     /* if the ack packet has a receivelen field hanging off it,
3517      * update our state */
3518     if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3519       afs_uint32 tSize;
3520
3521       /* If the ack packet has a "recommended" size that is less than 
3522        * what I am using now, reduce my size to match */
3523       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3524                     sizeof(afs_int32), &tSize);
3525       tSize = (afs_uint32) ntohl(tSize);
3526       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3527
3528       /* Get the maximum packet size to send to this peer */
3529       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3530                     &tSize);
3531       tSize = (afs_uint32)ntohl(tSize);
3532       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3533       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3534
3535       /* sanity check - peer might have restarted with different params.
3536        * If peer says "send less", dammit, send less...  Peer should never 
3537        * be unable to accept packets of the size that prior AFS versions would
3538        * send without asking.  */
3539       if (peer->maxMTU != tSize) {
3540           peer->maxMTU = tSize;
3541           peer->MTU = MIN(tSize, peer->MTU);
3542           call->MTU = MIN(call->MTU, tSize);
3543           peer->congestSeq++;
3544       }
3545
3546       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3547           /* AFS 3.4a */
3548           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3549                         sizeof(afs_int32), &tSize);
3550           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3551           if (tSize < call->twind) {       /* smaller than our send */
3552               call->twind = tSize;         /* window, we must send less... */
3553               call->ssthresh = MIN(call->twind, call->ssthresh);
3554           }
3555
3556           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3557            * network MTU confused with the loopback MTU. Calculate the
3558            * maximum MTU here for use in the slow start code below.
3559            */
3560           maxMTU = peer->maxMTU;
3561           /* Did peer restart with older RX version? */
3562           if (peer->maxDgramPackets > 1) {
3563               peer->maxDgramPackets = 1;
3564           }
3565       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3566           /* AFS 3.5 */
3567           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3568                         sizeof(afs_int32), &tSize);
3569           tSize = (afs_uint32) ntohl(tSize);
3570           /*
3571            * As of AFS 3.5 we set the send window to match the receive window. 
3572            */
3573           if (tSize < call->twind) {
3574               call->twind = tSize;
3575               call->ssthresh = MIN(call->twind, call->ssthresh);
3576           } else if (tSize > call->twind) {
3577               call->twind = tSize;
3578           }
3579
3580           /*
3581            * As of AFS 3.5, a jumbogram is more than one fixed size
3582            * packet transmitted in a single UDP datagram. If the remote
3583            * MTU is smaller than our local MTU then never send a datagram
3584            * larger than the natural MTU.
3585            */
3586           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3587                         sizeof(afs_int32), &tSize);
3588           maxDgramPackets = (afs_uint32) ntohl(tSize);
3589           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3590           maxDgramPackets = MIN(maxDgramPackets,
3591                                 (int)(peer->ifDgramPackets));
3592           maxDgramPackets = MIN(maxDgramPackets, tSize);
3593           if (maxDgramPackets > 1) {
3594             peer->maxDgramPackets = maxDgramPackets;
3595             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3596           } else {
3597             peer->maxDgramPackets = 1;
3598             call->MTU = peer->natMTU;
3599           }
3600        } else if (peer->maxDgramPackets > 1) {
3601           /* Restarted with lower version of RX */
3602           peer->maxDgramPackets = 1;
3603        }
3604     } else if (peer->maxDgramPackets > 1 ||
3605                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3606         /* Restarted with lower version of RX */
3607         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3608         peer->natMTU = OLD_MAX_PACKET_SIZE;
3609         peer->MTU = OLD_MAX_PACKET_SIZE;
3610         peer->maxDgramPackets = 1;
3611         peer->nDgramPackets = 1;
3612         peer->congestSeq++;
3613         call->MTU = OLD_MAX_PACKET_SIZE;
3614     }
3615
3616     if (nNacked) {
3617         /*
3618          * Calculate how many datagrams were successfully received after
3619          * the first missing packet and adjust the negative ack counter
3620          * accordingly.
3621          */
3622         call->nAcks = 0;
3623         call->nNacks++;
3624         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3625         if (call->nNacks < nNacked) {
3626             call->nNacks = nNacked;
3627         }
3628     } else {
3629         if (newAckCount) {
3630             call->nAcks++;
3631         }
3632         call->nNacks = 0;
3633     }
3634
3635     if (call->flags & RX_CALL_FAST_RECOVER) {
3636         if (nNacked) {
3637             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3638         } else {
3639             call->flags &= ~RX_CALL_FAST_RECOVER;
3640             call->cwind = call->nextCwind;
3641             call->nextCwind = 0;
3642             call->nAcks = 0;
3643         }
3644         call->nCwindAcks = 0;
3645     }
3646     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3647         /* Three negative acks in a row trigger congestion recovery */
3648 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3649         MUTEX_EXIT(&peer->peer_lock);
3650         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3651                 /* someone else is waiting to start recovery */
3652                 return np;
3653         }
3654         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3655         while (call->flags & RX_CALL_TQ_BUSY) {
3656             call->flags |= RX_CALL_TQ_WAIT;
3657 #ifdef RX_ENABLE_LOCKS
3658             CV_WAIT(&call->cv_tq, &call->lock);
3659 #else /* RX_ENABLE_LOCKS */
3660             osi_rxSleep(&call->tq);
3661 #endif /* RX_ENABLE_LOCKS */
3662         }
3663         MUTEX_ENTER(&peer->peer_lock);
3664 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3665         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3666         call->flags |= RX_CALL_FAST_RECOVER;
3667         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3668         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3669                           rx_maxSendWindow);
3670         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3671         call->nextCwind = call->ssthresh;
3672         call->nAcks = 0;
3673         call->nNacks = 0;
3674         peer->MTU = call->MTU;
3675         peer->cwind = call->nextCwind;
3676         peer->nDgramPackets = call->nDgramPackets;
3677         peer->congestSeq++;
3678         call->congestSeq = peer->congestSeq;
3679         /* Reset the resend times on the packets that were nacked
3680          * so we will retransmit as soon as the window permits*/