rx-init-mutex-before-calling-getudpsocket-20030313
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #include <afsconfig.h>
13 #ifdef  KERNEL
14 #include "afs/param.h"
15 #else
16 #include <afs/param.h>
17 #endif
18
19 RCSID("$Header$");
20
21 #ifdef KERNEL
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
24 #ifndef UKERNEL
25 #include "h/types.h"
26 #include "h/time.h"
27 #include "h/stat.h"
28 #ifdef  AFS_OSF_ENV
29 #include <net/net_globals.h>
30 #endif  /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "h/socket.h"
33 #endif
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "h/systm.h"
39 #endif
40 #ifdef RXDEBUG
41 #undef RXDEBUG      /* turn off debugging */
42 #endif /* RXDEBUG */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
45 #endif
46 #include "afsint.h"
47 #ifdef  AFS_ALPHA_ENV
48 #undef kmem_alloc
49 #undef kmem_free
50 #undef mem_alloc
51 #undef mem_free
52 #undef register
53 #endif  /* AFS_ALPHA_ENV */
54 #else /* !UKERNEL */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
57 #endif /* !UKERNEL */
58 #include "afs/lock.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
61 #include "rx_clock.h"
62 #include "rx_queue.h"
63 #include "rx.h"
64 #include "rx_globals.h"
65 #include "rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
68 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
69 #include "afsint.h"
70 extern afs_int32 afs_termState;
71 #ifdef AFS_AIX41_ENV
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
76 #else /* KERNEL */
77 # include <sys/types.h>
78 # include <errno.h>
79 #ifdef AFS_NT40_ENV
80 # include <stdlib.h>
81 # include <fcntl.h>
82 # include <afsutil.h>
83 #else
84 # include <sys/socket.h>
85 # include <sys/file.h>
86 # include <netdb.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
90 #endif
91 #ifdef HAVE_STRING_H
92 #include <string.h>
93 #else
94 #ifdef HAVE_STRINGS_H
95 #include <strings.h>
96 #endif
97 #endif
98 # include "rx.h"
99 # include "rx_user.h"
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
105 #endif /* KERNEL */
106
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
109
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
114 #endif
115
116 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
117 struct rx_tq_debug {
118     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119     afs_int32 rxi_start_in_error;
120 } rx_tq_debug;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
122
123 /*
124  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125  * currently allocated within rx.  This number is used to allocate the
126  * memory required to return the statistics when queried.
127  */
128
129 static unsigned int rxi_rpc_peer_stat_cnt;
130
131 /*
132  * rxi_rpc_process_stat_cnt counts the total number of local process stat
133  * structures currently allocated within rx.  The number is used to allocate
134  * the memory required to return the statistics when queried.
135  */
136
137 static unsigned int rxi_rpc_process_stat_cnt;
138
139 #if !defined(offsetof)
140 #include <stddef.h>     /* for definition of offsetof() */
141 #endif
142
143 #ifdef AFS_PTHREAD_ENV
144 #include <assert.h>
145
146 /*
147  * Use procedural initialization of mutexes/condition variables
148  * to ease NT porting
149  */
150
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
164
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
167
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
171
172 static void rxi_InitPthread(void) {
173     assert(pthread_mutex_init(&rx_clock_mutex,
174                               (const pthread_mutexattr_t*)0)==0);
175     assert(pthread_mutex_init(&rxi_connCacheMutex,
176                               (const pthread_mutexattr_t*)0)==0);
177     assert(pthread_mutex_init(&rx_init_mutex,
178                               (const pthread_mutexattr_t*)0)==0);
179     assert(pthread_mutex_init(&epoch_mutex,
180                               (const pthread_mutexattr_t*)0)==0);
181     assert(pthread_mutex_init(&rx_event_mutex,
182                               (const pthread_mutexattr_t*)0)==0);
183     assert(pthread_mutex_init(&des_init_mutex,
184                               (const pthread_mutexattr_t*)0)==0);
185     assert(pthread_mutex_init(&des_random_mutex,
186                               (const pthread_mutexattr_t*)0)==0);
187     assert(pthread_mutex_init(&osi_malloc_mutex,
188                               (const pthread_mutexattr_t*)0)==0);
189     assert(pthread_mutex_init(&event_handler_mutex,
190                               (const pthread_mutexattr_t*)0)==0);
191     assert(pthread_mutex_init(&listener_mutex,
192                               (const pthread_mutexattr_t*)0)==0);
193     assert(pthread_mutex_init(&rx_if_init_mutex,
194                               (const pthread_mutexattr_t*)0)==0);
195     assert(pthread_mutex_init(&rx_if_mutex,
196                               (const pthread_mutexattr_t*)0)==0);
197     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198                               (const pthread_mutexattr_t*)0)==0);
199     assert(pthread_mutex_init(&rxkad_random_mutex,
200                               (const pthread_mutexattr_t*)0)==0);
201     assert(pthread_mutex_init(&rxkad_stats_mutex,
202                               (const pthread_mutexattr_t*)0)==0);
203     assert(pthread_mutex_init(&rx_debug_mutex,
204                               (const pthread_mutexattr_t*)0)==0);
205
206     assert(pthread_cond_init(&rx_event_handler_cond,
207                               (const pthread_condattr_t*)0)==0);
208     assert(pthread_cond_init(&rx_listener_cond,
209                               (const pthread_condattr_t*)0)==0);
210     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
211 }
212
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
216 /*
217  * The rx_stats_mutex mutex protects the following global variables:
218  * rxi_dataQuota
219  * rxi_minDeficit
220  * rxi_availProcs
221  * rxi_totalMin
222  * rxi_lowConnRefCount
223  * rxi_lowPeerRefCount
224  * rxi_nCalls
225  * rxi_Alloccnt
226  * rxi_Allocsize
227  * rx_nFreePackets
228  * rx_tq_debug
229  * rx_stats
230  */
231 #else
232 #define INIT_PTHREAD_LOCKS
233 #endif
234
235
236 /* Variables for handling the minProcs implementation.  availProcs gives the
237  * number of threads available in the pool at this moment (not counting dudes
238  * executing right now).  totalMin gives the total number of procs required
239  * for handling all minProcs requests.  minDeficit is a dynamic variable
240  * tracking the # of procs required to satisfy all of the remaining minProcs
241  * demands.
242  * For fine grain locking to work, the quota check and the reservation of
243  * a server thread has to come while rxi_availProcs and rxi_minDeficit
244  * are locked. To this end, the code has been modified under #ifdef
245  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246  * same time. A new function, ReturnToServerPool() returns the allocation.
247  * 
248  * A call can be on several queue's (but only one at a time). When
249  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250  * that no one else is touching the queue. To this end, we store the address
251  * of the queue lock in the call structure (under the call lock) when we
252  * put the call on a queue, and we clear the call_queue_lock when the
253  * call is removed from a queue (once the call lock has been obtained).
254  * This allows rxi_ResetCall to safely synchronize with others wishing
255  * to manipulate the queue.
256  */
257
258 #ifdef RX_ENABLE_LOCKS
259 static afs_kmutex_t rx_rpc_stats;
260 void rxi_StartUnlocked();
261 #endif
262
263 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
264 ** pretty good that the next packet coming in is from the same connection 
265 ** as the last packet, since we're send multiple packets in a transmit window.
266 */
267 struct rx_connection *rxLastConn = 0; 
268
269 #ifdef RX_ENABLE_LOCKS
270 /* The locking hierarchy for rx fine grain locking is composed of these
271  * tiers:
272  *
273  * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
274  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
275  * call->lock - locks call data fields.
276  * These are independent of each other:
277  *      rx_freeCallQueue_lock
278  *      rxi_keyCreate_lock
279  * rx_serverPool_lock
280  * freeSQEList_lock
281  *
282  * serverQueueEntry->lock
283  * rx_rpc_stats
284  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
285  * peer->lock - locks peer data fields.
286  * conn_data_lock - that more than one thread is not updating a conn data
287  *                  field at the same time.
288  * rx_freePktQ_lock
289  *
290  * lowest level:
291  *      multi_handle->lock
292  *      rxevent_lock
293  *      rx_stats_mutex
294  *
295  * Do we need a lock to protect the peer field in the conn structure?
296  *      conn->peer was previously a constant for all intents and so has no
297  *      lock protecting this field. The multihomed client delta introduced
298  *      a RX code change : change the peer field in the connection structure
299  *      to that remote inetrface from which the last packet for this
300  *      connection was sent out. This may become an issue if further changes
301  *      are made.
302  */
303 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
304 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
305 #ifdef RX_LOCKS_DB
306 /* rxdb_fileID is used to identify the lock location, along with line#. */
307 static int rxdb_fileID = RXDB_FILE_RX;
308 #endif /* RX_LOCKS_DB */
309 #else /* RX_ENABLE_LOCKS */
310 #define SET_CALL_QUEUE_LOCK(C, L)
311 #define CLEAR_CALL_QUEUE_LOCK(C)
312 #endif /* RX_ENABLE_LOCKS */
313 struct rx_serverQueueEntry *rx_waitForPacket = 0;
314
315 /* ------------Exported Interfaces------------- */
316
317 /* This function allows rxkad to set the epoch to a suitably random number
318  * which rx_NewConnection will use in the future.  The principle purpose is to
319  * get rxnull connections to use the same epoch as the rxkad connections do, at
320  * least once the first rxkad connection is established.  This is important now
321  * that the host/port addresses aren't used in FindConnection: the uniqueness
322  * of epoch/cid matters and the start time won't do. */
323
324 #ifdef AFS_PTHREAD_ENV
325 /*
326  * This mutex protects the following global variables:
327  * rx_epoch
328  */
329
330 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
331 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
332 #else
333 #define LOCK_EPOCH
334 #define UNLOCK_EPOCH
335 #endif /* AFS_PTHREAD_ENV */
336
337 void rx_SetEpoch (afs_uint32 epoch)
338 {
339     LOCK_EPOCH
340     rx_epoch = epoch;
341     UNLOCK_EPOCH
342 }
343
344 /* Initialize rx.  A port number may be mentioned, in which case this
345  * becomes the default port number for any service installed later.
346  * If 0 is provided for the port number, a random port will be chosen
347  * by the kernel.  Whether this will ever overlap anything in
348  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
349  * error. */
350 static int rxinit_status = 1;
351 #ifdef AFS_PTHREAD_ENV
352 /*
353  * This mutex protects the following global variables:
354  * rxinit_status
355  */
356
357 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
358 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
359 #else
360 #define LOCK_RX_INIT
361 #define UNLOCK_RX_INIT
362 #endif
363
364 int rx_Init(u_int port)
365 {
366 #ifdef KERNEL
367     osi_timeval_t tv;
368 #else /* KERNEL */
369     struct timeval tv;
370 #endif /* KERNEL */
371     char *htable, *ptable;
372     int tmp_status;
373
374 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
375     __djgpp_set_quiet_socket(1);
376 #endif
377
378     SPLVAR;
379
380     INIT_PTHREAD_LOCKS
381     LOCK_RX_INIT
382     if (rxinit_status == 0) {
383         tmp_status = rxinit_status;
384         UNLOCK_RX_INIT
385         return tmp_status; /* Already started; return previous error code. */
386     }
387
388 #ifdef AFS_NT40_ENV
389     if (afs_winsockInit()<0)
390         return -1;
391 #endif
392
393 #ifndef KERNEL
394     /*
395      * Initialize anything necessary to provide a non-premptive threading
396      * environment.
397      */
398     rxi_InitializeThreadSupport();
399 #endif
400
401     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
402
403     /* Allocate and initialize a socket for client and perhaps server
404      * connections. */
405
406     rx_socket = rxi_GetUDPSocket((u_short)port); 
407     if (rx_socket == OSI_NULLSOCKET) {
408         UNLOCK_RX_INIT
409         return RX_ADDRINUSE;
410     }
411     
412
413 #ifdef  RX_ENABLE_LOCKS
414 #ifdef RX_LOCKS_DB
415     rxdb_init();
416 #endif /* RX_LOCKS_DB */
417     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
418     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
419     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
420     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
421                MUTEX_DEFAULT,0);
422     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
423     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
424     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
425     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
426 #ifndef KERNEL
427     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
428 #endif /* !KERNEL */
429 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
430     if ( !uniprocessor )
431       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
432 #endif /* KERNEL && AFS_HPUX110_ENV */
433 #else /* RX_ENABLE_LOCKS */
434 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
435     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
436 #endif /* AFS_GLOBAL_SUNLOCK */
437 #endif /* RX_ENABLE_LOCKS */
438
439     rxi_nCalls = 0;
440     rx_connDeadTime = 12;
441     rx_tranquil     = 0;        /* reset flag */
442     memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
443     htable = (char *)
444         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
445     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
446     memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
447     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
448     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
449     memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
450
451     /* Malloc up a bunch of packets & buffers */
452     rx_nFreePackets = 0;
453     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
454     queue_Init(&rx_freePacketQueue);
455     rxi_NeedMorePackets = FALSE;
456     rxi_MorePackets(rx_nPackets);
457     rx_CheckPackets();
458
459     NETPRI;
460     AFS_RXGLOCK();
461
462     clock_Init();
463
464 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
465     tv.tv_sec = clock_now.sec;
466     tv.tv_usec = clock_now.usec;
467     srand((unsigned int) tv.tv_usec);
468 #else
469     osi_GetTime(&tv);
470 #endif
471     if (port) {
472         rx_port = port;
473     } else {
474 #if defined(KERNEL) && !defined(UKERNEL)
475         /* Really, this should never happen in a real kernel */
476         rx_port = 0;
477 #else
478         struct sockaddr_in addr;
479         int addrlen = sizeof(addr);
480         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
481             rx_Finalize();
482             return -1;
483         }
484         rx_port = addr.sin_port;
485 #endif
486     }
487     rx_stats.minRtt.sec = 9999999;
488 #ifdef  KERNEL
489     rx_SetEpoch (tv.tv_sec | 0x80000000);
490 #else
491     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
492                                          * will provide a randomer value. */
493 #endif
494     MUTEX_ENTER(&rx_stats_mutex);
495     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
496     MUTEX_EXIT(&rx_stats_mutex);
497     /* *Slightly* random start time for the cid.  This is just to help
498      * out with the hashing function at the peer */
499     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
500     rx_connHashTable = (struct rx_connection **) htable;
501     rx_peerHashTable = (struct rx_peer **) ptable;
502
503     rx_lastAckDelay.sec = 0;
504     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
505     rx_hardAckDelay.sec = 0;
506     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
507     rx_softAckDelay.sec = 0;
508     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
509
510     rxevent_Init(20, rxi_ReScheduleEvents);
511
512     /* Initialize various global queues */
513     queue_Init(&rx_idleServerQueue);
514     queue_Init(&rx_incomingCallQueue);
515     queue_Init(&rx_freeCallQueue);
516
517 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
518     /* Initialize our list of usable IP addresses. */
519     rx_GetIFInfo();
520 #endif
521
522     /* Start listener process (exact function is dependent on the
523      * implementation environment--kernel or user space) */
524     rxi_StartListener();
525
526     AFS_RXGUNLOCK();
527     USERPRI;
528     tmp_status = rxinit_status = 0;
529     UNLOCK_RX_INIT
530     return tmp_status;
531 }
532
533 /* called with unincremented nRequestsRunning to see if it is OK to start
534  * a new thread in this service.  Could be "no" for two reasons: over the
535  * max quota, or would prevent others from reaching their min quota.
536  */
537 #ifdef RX_ENABLE_LOCKS
538 /* This verion of QuotaOK reserves quota if it's ok while the
539  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
540  */
541 static int QuotaOK(register struct rx_service *aservice)
542 {
543     /* check if over max quota */
544     if (aservice->nRequestsRunning >= aservice->maxProcs) {
545         return 0;
546     }
547
548     /* under min quota, we're OK */
549     /* otherwise, can use only if there are enough to allow everyone
550      * to go to their min quota after this guy starts.
551      */
552     MUTEX_ENTER(&rx_stats_mutex);
553     if ((aservice->nRequestsRunning < aservice->minProcs) ||
554          (rxi_availProcs > rxi_minDeficit)) {
555         aservice->nRequestsRunning++;
556         /* just started call in minProcs pool, need fewer to maintain
557          * guarantee */
558         if (aservice->nRequestsRunning <= aservice->minProcs)
559             rxi_minDeficit--;
560         rxi_availProcs--;
561         MUTEX_EXIT(&rx_stats_mutex);
562         return 1;
563     }
564     MUTEX_EXIT(&rx_stats_mutex);
565
566     return 0;
567 }
568
569 static void ReturnToServerPool(register struct rx_service *aservice)
570 {
571     aservice->nRequestsRunning--;
572     MUTEX_ENTER(&rx_stats_mutex);
573     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
574     rxi_availProcs++;
575     MUTEX_EXIT(&rx_stats_mutex);
576 }
577
578 #else /* RX_ENABLE_LOCKS */
579 static int QuotaOK(register struct rx_service *aservice)
580 {
581     int rc=0;
582     /* under min quota, we're OK */
583     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
584
585     /* check if over max quota */
586     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
587
588     /* otherwise, can use only if there are enough to allow everyone
589      * to go to their min quota after this guy starts.
590      */
591     if (rxi_availProcs > rxi_minDeficit) rc = 1;
592     return rc;
593 }
594 #endif /* RX_ENABLE_LOCKS */
595
596 #ifndef KERNEL
597 /* Called by rx_StartServer to start up lwp's to service calls.
598    NExistingProcs gives the number of procs already existing, and which
599    therefore needn't be created. */
600 void rxi_StartServerProcs(int nExistingProcs)
601 {
602     register struct rx_service *service;
603     register int i;
604     int maxdiff = 0;
605     int nProcs = 0;
606
607     /* For each service, reserve N processes, where N is the "minimum"
608        number of processes that MUST be able to execute a request in parallel,
609        at any time, for that process.  Also compute the maximum difference
610        between any service's maximum number of processes that can run
611        (i.e. the maximum number that ever will be run, and a guarantee
612        that this number will run if other services aren't running), and its
613        minimum number.  The result is the extra number of processes that
614        we need in order to provide the latter guarantee */
615     for (i=0; i<RX_MAX_SERVICES; i++) {
616         int diff;
617         service = rx_services[i];
618         if (service == (struct rx_service *) 0) break;
619         nProcs += service->minProcs;
620         diff = service->maxProcs - service->minProcs;
621         if (diff > maxdiff) maxdiff = diff;
622     }
623     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
624     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
625     for (i = 0; i<nProcs; i++) {
626         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
627     }
628 }
629 #endif /* KERNEL */
630
631 /* This routine must be called if any services are exported.  If the
632  * donateMe flag is set, the calling process is donated to the server
633  * process pool */
634 void rx_StartServer(int donateMe)
635 {
636     register struct rx_service *service;
637     register int i, nProcs=0;
638     SPLVAR;
639     clock_NewTime();
640
641     NETPRI;
642     AFS_RXGLOCK();
643     /* Start server processes, if necessary (exact function is dependent
644      * on the implementation environment--kernel or user space).  DonateMe
645      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
646      * case, one less new proc will be created rx_StartServerProcs.
647      */
648     rxi_StartServerProcs(donateMe);
649
650     /* count up the # of threads in minProcs, and add set the min deficit to
651      * be that value, too.
652      */
653     for (i=0; i<RX_MAX_SERVICES; i++) {
654         service = rx_services[i];
655         if (service == (struct rx_service *) 0) break;
656         MUTEX_ENTER(&rx_stats_mutex);
657         rxi_totalMin += service->minProcs;
658         /* below works even if a thread is running, since minDeficit would
659          * still have been decremented and later re-incremented.
660          */
661         rxi_minDeficit += service->minProcs;
662         MUTEX_EXIT(&rx_stats_mutex);
663     }
664
665     /* Turn on reaping of idle server connections */
666     rxi_ReapConnections();
667
668     AFS_RXGUNLOCK();
669     USERPRI;
670
671     if (donateMe) {
672 #ifndef AFS_NT40_ENV
673 #ifndef KERNEL
674         char name[32];
675 #ifdef AFS_PTHREAD_ENV
676         pid_t pid;
677         pid = (pid_t) pthread_self();
678 #else /* AFS_PTHREAD_ENV */
679         PROCESS pid;
680         LWP_CurrentProcess(&pid);
681 #endif /* AFS_PTHREAD_ENV */
682
683         sprintf(name,"srv_%d", ++nProcs);
684         if (registerProgram)
685             (*registerProgram)(pid, name);
686 #endif /* KERNEL */
687 #endif /* AFS_NT40_ENV */
688         rx_ServerProc(); /* Never returns */
689     }
690     return;
691 }
692
693 /* Create a new client connection to the specified service, using the
694  * specified security object to implement the security model for this
695  * connection. */
696 struct rx_connection *rx_NewConnection(register afs_uint32 shost, 
697         u_short sport, u_short sservice, 
698         register struct rx_securityClass *securityObject, int serviceSecurityIndex)
699 {
700     int hashindex;
701     afs_int32 cid;
702     register struct rx_connection *conn;
703
704     SPLVAR;
705
706     clock_NewTime();
707     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
708           shost, sport, sservice, securityObject, serviceSecurityIndex));
709
710     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
711      * the case of kmem_alloc? */
712     conn = rxi_AllocConnection();
713 #ifdef  RX_ENABLE_LOCKS
714     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
715     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
716     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
717 #endif
718     NETPRI;
719     AFS_RXGLOCK();
720     MUTEX_ENTER(&rx_connHashTable_lock);
721     cid = (rx_nextCid += RX_MAXCALLS);
722     conn->type = RX_CLIENT_CONNECTION;
723     conn->cid = cid;
724     conn->epoch = rx_epoch;
725     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
726     conn->serviceId = sservice;
727     conn->securityObject = securityObject;
728     /* This doesn't work in all compilers with void (they're buggy), so fake it
729      * with VOID */
730     conn->securityData = (VOID *) 0;
731     conn->securityIndex = serviceSecurityIndex;
732     rx_SetConnDeadTime(conn, rx_connDeadTime);
733     conn->ackRate = RX_FAST_ACK_RATE;
734     conn->nSpecific = 0;
735     conn->specific = NULL;
736     conn->challengeEvent = NULL;
737     conn->delayedAbortEvent = NULL;
738     conn->abortCount = 0;
739     conn->error = 0;
740
741     RXS_NewConnection(securityObject, conn);
742     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
743     
744     conn->refCount++; /* no lock required since only this thread knows... */
745     conn->next = rx_connHashTable[hashindex];
746     rx_connHashTable[hashindex] = conn;
747     MUTEX_ENTER(&rx_stats_mutex);
748     rx_stats.nClientConns++;
749     MUTEX_EXIT(&rx_stats_mutex);
750
751     MUTEX_EXIT(&rx_connHashTable_lock);
752     AFS_RXGUNLOCK();
753     USERPRI;
754     return conn;
755 }
756
757 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
758 {
759     /* The idea is to set the dead time to a value that allows several
760      * keepalives to be dropped without timing out the connection. */
761     conn->secondsUntilDead = MAX(seconds, 6);
762     conn->secondsUntilPing = conn->secondsUntilDead/6;
763 }
764
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
767
768 /*
769  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770  * NOTE: must not be called with rx_connHashTable_lock held.
771  */
772 void rxi_CleanupConnection(struct rx_connection *conn)
773 {
774     /* Notify the service exporter, if requested, that this connection
775      * is being destroyed */
776     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
777       (*conn->service->destroyConnProc)(conn);
778
779     /* Notify the security module that this connection is being destroyed */
780     RXS_DestroyConnection(conn->securityObject, conn);
781
782     /* If this is the last connection using the rx_peer struct, set its
783      * idle time to now. rxi_ReapConnections will reap it if it's still
784      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
785      */
786     MUTEX_ENTER(&rx_peerHashTable_lock);
787     if (--conn->peer->refCount <= 0) {
788         conn->peer->idleWhen = clock_Sec();
789         if (conn->peer->refCount < 0) {
790             conn->peer->refCount = 0; 
791             MUTEX_ENTER(&rx_stats_mutex);
792             rxi_lowPeerRefCount ++;
793             MUTEX_EXIT(&rx_stats_mutex);
794         }
795     }
796     MUTEX_EXIT(&rx_peerHashTable_lock);
797
798     MUTEX_ENTER(&rx_stats_mutex);
799     if (conn->type == RX_SERVER_CONNECTION)
800       rx_stats.nServerConns--;
801     else
802       rx_stats.nClientConns--;
803     MUTEX_EXIT(&rx_stats_mutex);
804
805 #ifndef KERNEL
806     if (conn->specific) {
807         int i;
808         for (i = 0 ; i < conn->nSpecific ; i++) {
809             if (conn->specific[i] && rxi_keyCreate_destructor[i])
810                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
811             conn->specific[i] = NULL;
812         }
813         free(conn->specific);
814     }
815     conn->specific = NULL;
816     conn->nSpecific = 0;
817 #endif /* !KERNEL */
818
819     MUTEX_DESTROY(&conn->conn_call_lock);
820     MUTEX_DESTROY(&conn->conn_data_lock);
821     CV_DESTROY(&conn->conn_call_cv);
822         
823     rxi_FreeConnection(conn);
824 }
825
826 /* Destroy the specified connection */
827 void rxi_DestroyConnection(register struct rx_connection *conn)
828 {
829     MUTEX_ENTER(&rx_connHashTable_lock);
830     rxi_DestroyConnectionNoLock(conn);
831     /* conn should be at the head of the cleanup list */
832     if (conn == rx_connCleanup_list) {
833         rx_connCleanup_list = rx_connCleanup_list->next;
834         MUTEX_EXIT(&rx_connHashTable_lock);
835         rxi_CleanupConnection(conn);
836     }
837 #ifdef RX_ENABLE_LOCKS
838     else {
839         MUTEX_EXIT(&rx_connHashTable_lock);
840     }
841 #endif /* RX_ENABLE_LOCKS */
842 }
843     
844 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
845 {
846     register struct rx_connection **conn_ptr;
847     register int havecalls = 0;
848     struct rx_packet *packet;
849     int i;
850     SPLVAR;
851
852     clock_NewTime();
853
854     NETPRI;
855     MUTEX_ENTER(&conn->conn_data_lock);
856     if (conn->refCount > 0)
857         conn->refCount--;
858     else {
859         MUTEX_ENTER(&rx_stats_mutex);
860         rxi_lowConnRefCount++;
861         MUTEX_EXIT(&rx_stats_mutex);
862     }
863
864     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
865         /* Busy; wait till the last guy before proceeding */
866         MUTEX_EXIT(&conn->conn_data_lock);
867         USERPRI;
868         return;
869     }
870
871     /* If the client previously called rx_NewCall, but it is still
872      * waiting, treat this as a running call, and wait to destroy the
873      * connection later when the call completes. */
874     if ((conn->type == RX_CLIENT_CONNECTION) &&
875         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
876         conn->flags |= RX_CONN_DESTROY_ME;
877         MUTEX_EXIT(&conn->conn_data_lock);
878         USERPRI;
879         return;
880     }
881     MUTEX_EXIT(&conn->conn_data_lock);
882
883     /* Check for extant references to this connection */
884     for (i = 0; i<RX_MAXCALLS; i++) {
885         register struct rx_call *call = conn->call[i];
886         if (call) {
887             havecalls = 1;
888             if (conn->type == RX_CLIENT_CONNECTION) {
889                 MUTEX_ENTER(&call->lock);
890                 if (call->delayedAckEvent) {
891                     /* Push the final acknowledgment out now--there
892                      * won't be a subsequent call to acknowledge the
893                      * last reply packets */
894                     rxevent_Cancel(call->delayedAckEvent, call,
895                                    RX_CALL_REFCOUNT_DELAY);
896                     if (call->state == RX_STATE_PRECALL ||
897                         call->state == RX_STATE_ACTIVE) {
898                         rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
899                     } else {
900                         rxi_AckAll(NULL, call, 0);
901                     }
902                 }
903                 MUTEX_EXIT(&call->lock);
904             }
905         }
906     }
907 #ifdef RX_ENABLE_LOCKS
908     if (!havecalls) {
909         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
910             MUTEX_EXIT(&conn->conn_data_lock);
911         }
912         else {
913             /* Someone is accessing a packet right now. */
914             havecalls = 1;
915         }
916     }
917 #endif /* RX_ENABLE_LOCKS */
918
919     if (havecalls) {
920         /* Don't destroy the connection if there are any call
921          * structures still in use */
922         MUTEX_ENTER(&conn->conn_data_lock);
923         conn->flags |= RX_CONN_DESTROY_ME;
924         MUTEX_EXIT(&conn->conn_data_lock);
925         USERPRI;
926         return;
927     }
928
929     if (conn->delayedAbortEvent) {
930         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
931         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
932         if (packet) {
933             MUTEX_ENTER(&conn->conn_data_lock);
934             rxi_SendConnectionAbort(conn, packet, 0, 1);
935             MUTEX_EXIT(&conn->conn_data_lock);
936             rxi_FreePacket(packet);
937         }
938     }
939
940     /* Remove from connection hash table before proceeding */
941     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
942                                              conn->epoch, conn->type) ];
943     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
944         if (*conn_ptr == conn) {
945             *conn_ptr = conn->next;
946             break;
947         }
948     }
949     /* if the conn that we are destroying was the last connection, then we
950     * clear rxLastConn as well */
951     if ( rxLastConn == conn )
952         rxLastConn = 0;
953
954     /* Make sure the connection is completely reset before deleting it. */
955     /* get rid of pending events that could zap us later */
956     if (conn->challengeEvent)
957         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
958     if (conn->checkReachEvent)
959         rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
960  
961     /* Add the connection to the list of destroyed connections that
962      * need to be cleaned up. This is necessary to avoid deadlocks
963      * in the routines we call to inform others that this connection is
964      * being destroyed. */
965     conn->next = rx_connCleanup_list;
966     rx_connCleanup_list = conn;
967 }
968
969 /* Externally available version */
970 void rx_DestroyConnection(register struct rx_connection *conn) 
971 {
972     SPLVAR;
973
974     NETPRI;
975     AFS_RXGLOCK();
976     rxi_DestroyConnection (conn);
977     AFS_RXGUNLOCK();
978     USERPRI;
979 }
980
981 /* Start a new rx remote procedure call, on the specified connection.
982  * If wait is set to 1, wait for a free call channel; otherwise return
983  * 0.  Maxtime gives the maximum number of seconds this call may take,
984  * after rx_MakeCall returns.  After this time interval, a call to any
985  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
986  * For fine grain locking, we hold the conn_call_lock in order to 
987  * to ensure that we don't get signalle after we found a call in an active
988  * state and before we go to sleep.
989  */
990 struct rx_call *rx_NewCall(register struct rx_connection *conn)
991 {
992     register int i;
993     register struct rx_call *call;
994     struct clock queueTime;
995     SPLVAR;
996
997     clock_NewTime();
998     dpf (("rx_MakeCall(conn %x)\n", conn));
999
1000     NETPRI;
1001     clock_GetTime(&queueTime);
1002     AFS_RXGLOCK();
1003     MUTEX_ENTER(&conn->conn_call_lock);
1004
1005     /*
1006      * Check if there are others waiting for a new call.
1007      * If so, let them go first to avoid starving them.
1008      * This is a fairly simple scheme, and might not be
1009      * a complete solution for large numbers of waiters.
1010      */
1011     if (conn->makeCallWaiters) {
1012 #ifdef  RX_ENABLE_LOCKS
1013         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1014 #else
1015         osi_rxSleep(conn);
1016 #endif
1017     }
1018
1019     for (;;) {
1020         for (i=0; i<RX_MAXCALLS; i++) {
1021             call = conn->call[i];
1022             if (call) {
1023                 MUTEX_ENTER(&call->lock);
1024                 if (call->state == RX_STATE_DALLY) {
1025                     rxi_ResetCall(call, 0);
1026                     (*call->callNumber)++;
1027                     break;
1028                 }
1029                 MUTEX_EXIT(&call->lock);
1030             }
1031             else {
1032                 call = rxi_NewCall(conn, i);
1033                 break;
1034             }
1035         }
1036         if (i < RX_MAXCALLS) {
1037             break;
1038         }
1039         MUTEX_ENTER(&conn->conn_data_lock);
1040         conn->flags |= RX_CONN_MAKECALL_WAITING;
1041         MUTEX_EXIT(&conn->conn_data_lock);
1042
1043         conn->makeCallWaiters++;
1044 #ifdef  RX_ENABLE_LOCKS
1045         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1046 #else
1047         osi_rxSleep(conn);
1048 #endif
1049         conn->makeCallWaiters--;
1050     }
1051     /*
1052      * Wake up anyone else who might be giving us a chance to
1053      * run (see code above that avoids resource starvation).
1054      */
1055 #ifdef  RX_ENABLE_LOCKS
1056     CV_BROADCAST(&conn->conn_call_cv);
1057 #else
1058     osi_rxWakeup(conn);
1059 #endif
1060
1061     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1062
1063     /* Client is initially in send mode */
1064     call->state = RX_STATE_ACTIVE;
1065     call->mode = RX_MODE_SENDING;
1066
1067     /* remember start time for call in case we have hard dead time limit */
1068     call->queueTime = queueTime;
1069     clock_GetTime(&call->startTime);
1070     hzero(call->bytesSent);
1071     hzero(call->bytesRcvd);
1072
1073     /* Turn on busy protocol. */
1074     rxi_KeepAliveOn(call);
1075
1076     MUTEX_EXIT(&call->lock);
1077     MUTEX_EXIT(&conn->conn_call_lock);
1078     AFS_RXGUNLOCK();
1079     USERPRI;
1080
1081 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1082     /* Now, if TQ wasn't cleared earlier, do it now. */
1083     AFS_RXGLOCK();
1084     MUTEX_ENTER(&call->lock);
1085     while (call->flags & RX_CALL_TQ_BUSY) {
1086         call->flags |= RX_CALL_TQ_WAIT;
1087 #ifdef RX_ENABLE_LOCKS
1088         CV_WAIT(&call->cv_tq, &call->lock);
1089 #else /* RX_ENABLE_LOCKS */
1090         osi_rxSleep(&call->tq);
1091 #endif /* RX_ENABLE_LOCKS */
1092     }
1093     if (call->flags & RX_CALL_TQ_CLEARME) {
1094         rxi_ClearTransmitQueue(call, 0);
1095         queue_Init(&call->tq);
1096     }
1097     MUTEX_EXIT(&call->lock);
1098     AFS_RXGUNLOCK();
1099 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1100
1101     return call;
1102 }
1103
1104 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1105 {
1106     register int i;
1107     register struct rx_call *tcall;
1108     SPLVAR;
1109
1110     NETPRI;
1111     for(i=0; i<RX_MAXCALLS; i++) {
1112       if ((tcall = aconn->call[i])) {
1113         if ((tcall->state == RX_STATE_ACTIVE) 
1114             || (tcall->state == RX_STATE_PRECALL)) {
1115           USERPRI;
1116           return 1;
1117         }
1118       }
1119     }
1120     USERPRI;
1121     return 0;
1122 }
1123
1124 int rxi_GetCallNumberVector(register struct rx_connection *aconn, 
1125         register afs_int32 *aint32s)
1126 {
1127     register int i;
1128     register struct rx_call *tcall;
1129     SPLVAR;
1130
1131     NETPRI;
1132     for(i=0; i<RX_MAXCALLS; i++) {
1133         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1134             aint32s[i] = aconn->callNumber[i]+1;
1135         else
1136             aint32s[i] = aconn->callNumber[i];
1137     }
1138     USERPRI;
1139     return 0;
1140 }
1141
1142 int rxi_SetCallNumberVector(register struct rx_connection *aconn, 
1143         register afs_int32 *aint32s)
1144 {
1145     register int i;
1146     register struct rx_call *tcall;
1147     SPLVAR;
1148
1149     NETPRI;
1150     for(i=0; i<RX_MAXCALLS; i++) {
1151         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1152             aconn->callNumber[i] = aint32s[i] - 1;
1153         else
1154             aconn->callNumber[i] = aint32s[i];
1155     }
1156     USERPRI;
1157     return 0;
1158 }
1159
1160 /* Advertise a new service.  A service is named locally by a UDP port
1161  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1162  * on a failure. 
1163  *
1164      char *serviceName;  Name for identification purposes (e.g. the
1165                          service name might be used for probing for
1166                          statistics) */
1167 struct rx_service *rx_NewService(u_short port, u_short serviceId, 
1168         char *serviceName, 
1169         struct rx_securityClass **securityObjects,
1170         int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1171 {    
1172     osi_socket socket = OSI_NULLSOCKET;
1173     register struct rx_service *tservice;    
1174     register int i;
1175     SPLVAR;
1176
1177     clock_NewTime();
1178
1179     if (serviceId == 0) {
1180         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1181          serviceName);
1182         return 0;
1183     }
1184     if (port == 0) {
1185         if (rx_port == 0) {
1186             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1187             return 0;
1188         }
1189         port = rx_port;
1190         socket = rx_socket;
1191     }
1192
1193     tservice = rxi_AllocService();
1194     NETPRI;
1195     AFS_RXGLOCK();
1196     for (i = 0; i<RX_MAX_SERVICES; i++) {
1197         register struct rx_service *service = rx_services[i];
1198         if (service) {
1199             if (port == service->servicePort) {
1200                 if (service->serviceId == serviceId) {
1201                     /* The identical service has already been
1202                      * installed; if the caller was intending to
1203                      * change the security classes used by this
1204                      * service, he/she loses. */
1205                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1206                     AFS_RXGUNLOCK();
1207                     USERPRI;
1208                     rxi_FreeService(tservice);
1209                     return service;
1210                 }
1211                 /* Different service, same port: re-use the socket
1212                  * which is bound to the same port */
1213                 socket = service->socket;
1214             }
1215         } else {
1216             if (socket == OSI_NULLSOCKET) {
1217                 /* If we don't already have a socket (from another
1218                  * service on same port) get a new one */
1219                 socket = rxi_GetUDPSocket(port);
1220                 if (socket == OSI_NULLSOCKET) {
1221                     AFS_RXGUNLOCK();
1222                     USERPRI;
1223                     rxi_FreeService(tservice);
1224                     return 0;
1225                 }
1226             }
1227             service = tservice;
1228             service->socket = socket;
1229             service->servicePort = port;
1230             service->serviceId = serviceId;
1231             service->serviceName = serviceName;
1232             service->nSecurityObjects = nSecurityObjects;
1233             service->securityObjects = securityObjects;
1234             service->minProcs = 0;
1235             service->maxProcs = 1;
1236             service->idleDeadTime = 60;
1237             service->connDeadTime = rx_connDeadTime;
1238             service->executeRequestProc = serviceProc;
1239             service->checkReach = 0;
1240             rx_services[i] = service;   /* not visible until now */
1241             AFS_RXGUNLOCK();
1242             USERPRI;
1243             return service;
1244         }
1245     }
1246     AFS_RXGUNLOCK();
1247     USERPRI;
1248     rxi_FreeService(tservice);
1249     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1250     return 0;
1251 }
1252
1253 /* Generic request processing loop. This routine should be called
1254  * by the implementation dependent rx_ServerProc. If socketp is
1255  * non-null, it will be set to the file descriptor that this thread
1256  * is now listening on. If socketp is null, this routine will never
1257  * returns. */
1258 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1259 {
1260     register struct rx_call *call;
1261     register afs_int32 code;
1262     register struct rx_service *tservice = NULL;
1263
1264     for (;;) {
1265         if (newcall) {
1266             call = newcall;
1267             newcall = NULL;
1268         } else {
1269             call = rx_GetCall(threadID, tservice, socketp);
1270             if (socketp && *socketp != OSI_NULLSOCKET) {
1271                 /* We are now a listener thread */
1272                 return;
1273             }
1274         }
1275
1276         /* if server is restarting( typically smooth shutdown) then do not
1277          * allow any new calls.
1278          */
1279
1280         if ( rx_tranquil && (call != NULL) ) {
1281             SPLVAR;
1282
1283             NETPRI;
1284             AFS_RXGLOCK();
1285             MUTEX_ENTER(&call->lock);
1286
1287             rxi_CallError(call, RX_RESTARTING);
1288             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1289
1290             MUTEX_EXIT(&call->lock);
1291             AFS_RXGUNLOCK();
1292             USERPRI;
1293         }
1294
1295 #ifdef  KERNEL
1296         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1297 #ifdef RX_ENABLE_LOCKS
1298             AFS_GLOCK();
1299 #endif /* RX_ENABLE_LOCKS */
1300             afs_termState = AFSOP_STOP_AFS;
1301             afs_osi_Wakeup(&afs_termState);
1302 #ifdef RX_ENABLE_LOCKS
1303             AFS_GUNLOCK();
1304 #endif /* RX_ENABLE_LOCKS */
1305             return;
1306         }
1307 #endif
1308
1309         tservice = call->conn->service;
1310
1311         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1312
1313         code = call->conn->service->executeRequestProc(call);
1314
1315         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1316
1317         rx_EndCall(call, code);
1318         MUTEX_ENTER(&rx_stats_mutex);
1319         rxi_nCalls++;
1320         MUTEX_EXIT(&rx_stats_mutex);
1321     }
1322 }
1323
1324
1325 void rx_WakeupServerProcs(void)
1326 {
1327     struct rx_serverQueueEntry *np, *tqp;
1328     SPLVAR;
1329
1330     NETPRI;
1331     AFS_RXGLOCK();
1332     MUTEX_ENTER(&rx_serverPool_lock);
1333
1334 #ifdef RX_ENABLE_LOCKS
1335     if (rx_waitForPacket)
1336         CV_BROADCAST(&rx_waitForPacket->cv);
1337 #else /* RX_ENABLE_LOCKS */
1338     if (rx_waitForPacket)
1339         osi_rxWakeup(rx_waitForPacket);
1340 #endif /* RX_ENABLE_LOCKS */
1341     MUTEX_ENTER(&freeSQEList_lock);
1342     for (np = rx_FreeSQEList; np; np = tqp) {
1343       tqp = *(struct rx_serverQueueEntry **)np;
1344 #ifdef RX_ENABLE_LOCKS
1345       CV_BROADCAST(&np->cv);
1346 #else /* RX_ENABLE_LOCKS */
1347       osi_rxWakeup(np);
1348 #endif /* RX_ENABLE_LOCKS */
1349     }
1350     MUTEX_EXIT(&freeSQEList_lock);
1351     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1352 #ifdef RX_ENABLE_LOCKS
1353       CV_BROADCAST(&np->cv);
1354 #else /* RX_ENABLE_LOCKS */
1355       osi_rxWakeup(np);
1356 #endif /* RX_ENABLE_LOCKS */
1357     }
1358     MUTEX_EXIT(&rx_serverPool_lock);
1359     AFS_RXGUNLOCK();
1360     USERPRI;
1361 }
1362
1363 /* meltdown:
1364  * One thing that seems to happen is that all the server threads get
1365  * tied up on some empty or slow call, and then a whole bunch of calls
1366  * arrive at once, using up the packet pool, so now there are more 
1367  * empty calls.  The most critical resources here are server threads
1368  * and the free packet pool.  The "doreclaim" code seems to help in
1369  * general.  I think that eventually we arrive in this state: there
1370  * are lots of pending calls which do have all their packets present,
1371  * so they won't be reclaimed, are multi-packet calls, so they won't
1372  * be scheduled until later, and thus are tying up most of the free 
1373  * packet pool for a very long time.
1374  * future options:
1375  * 1.  schedule multi-packet calls if all the packets are present.  
1376  * Probably CPU-bound operation, useful to return packets to pool. 
1377  * Do what if there is a full window, but the last packet isn't here?
1378  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1379  * it sleeps and waits for that type of call.
1380  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1381  * the current dataquota business is badly broken.  The quota isn't adjusted
1382  * to reflect how many packets are presently queued for a running call.
1383  * So, when we schedule a queued call with a full window of packets queued
1384  * up for it, that *should* free up a window full of packets for other 2d-class
1385  * calls to be able to use from the packet pool.  But it doesn't.
1386  *
1387  * NB.  Most of the time, this code doesn't run -- since idle server threads
1388  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1389  * as a new call arrives.
1390  */
1391 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1392  * for an rx_Read. */
1393 #ifdef RX_ENABLE_LOCKS
1394 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1395 {
1396     struct rx_serverQueueEntry *sq;
1397     register struct rx_call *call = (struct rx_call *) 0;
1398     struct rx_service *service = NULL;
1399     SPLVAR;
1400
1401     MUTEX_ENTER(&freeSQEList_lock);
1402
1403     if ((sq = rx_FreeSQEList)) {
1404         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1405         MUTEX_EXIT(&freeSQEList_lock);
1406     } else {    /* otherwise allocate a new one and return that */
1407         MUTEX_EXIT(&freeSQEList_lock);
1408         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1409         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1410         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1411     }
1412
1413     MUTEX_ENTER(&rx_serverPool_lock);
1414     if (cur_service != NULL) {
1415         ReturnToServerPool(cur_service);
1416     }
1417     while (1) {
1418         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1419             register struct rx_call *tcall, *ncall, *choice2 = NULL;
1420
1421             /* Scan for eligible incoming calls.  A call is not eligible
1422              * if the maximum number of calls for its service type are
1423              * already executing */
1424             /* One thread will process calls FCFS (to prevent starvation),
1425              * while the other threads may run ahead looking for calls which
1426              * have all their input data available immediately.  This helps 
1427              * keep threads from blocking, waiting for data from the client. */
1428             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1429                 service = tcall->conn->service;
1430                 if (!QuotaOK(service)) {
1431                     continue;
1432                 }
1433                 if (tno==rxi_fcfs_thread_num || !tcall->queue_item_header.next  ) {
1434                     /* If we're the fcfs thread , then  we'll just use 
1435                      * this call. If we haven't been able to find an optimal 
1436                      * choice, and we're at the end of the list, then use a 
1437                      * 2d choice if one has been identified.  Otherwise... */
1438                     call = (choice2 ? choice2 : tcall);
1439                     service = call->conn->service;
1440                 } else if (!queue_IsEmpty(&tcall->rq)) {
1441                     struct rx_packet *rp;
1442                     rp = queue_First(&tcall->rq, rx_packet);
1443                     if (rp->header.seq == 1) {
1444                         if (!meltdown_1pkt ||
1445                             (rp->header.flags & RX_LAST_PACKET)) {
1446                             call = tcall;
1447                         } else if (rxi_2dchoice && !choice2 &&
1448                                    !(tcall->flags & RX_CALL_CLEARED) &&
1449                                    (tcall->rprev > rxi_HardAckRate)) {
1450                             choice2 = tcall;
1451                         } else rxi_md2cnt++;
1452                     }
1453                 }
1454                 if (call) {
1455                     break;
1456                 } else {
1457                     ReturnToServerPool(service);
1458                 }
1459             }
1460         }
1461
1462         if (call) {
1463             queue_Remove(call);
1464             MUTEX_EXIT(&rx_serverPool_lock);
1465             MUTEX_ENTER(&call->lock);
1466
1467             if (call->state != RX_STATE_PRECALL || call->error) {
1468                 MUTEX_EXIT(&call->lock);
1469                 MUTEX_ENTER(&rx_serverPool_lock);
1470                 ReturnToServerPool(service);
1471                 call = NULL;
1472                 continue;
1473             }
1474
1475             if (queue_IsEmpty(&call->rq) ||
1476                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1477                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1478
1479             CLEAR_CALL_QUEUE_LOCK(call);
1480             call->flags &= ~RX_CALL_WAIT_PROC;
1481             MUTEX_ENTER(&rx_stats_mutex);
1482             rx_nWaiting--;
1483             MUTEX_EXIT(&rx_stats_mutex);
1484             break;
1485         }
1486         else {
1487             /* If there are no eligible incoming calls, add this process
1488              * to the idle server queue, to wait for one */
1489             sq->newcall = 0;
1490             sq->tno = tno;
1491             if (socketp) {
1492                 *socketp = OSI_NULLSOCKET;
1493             }
1494             sq->socketp = socketp;
1495             queue_Append(&rx_idleServerQueue, sq);
1496 #ifndef AFS_AIX41_ENV
1497             rx_waitForPacket = sq;
1498 #endif /* AFS_AIX41_ENV */
1499             do {
1500                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1501 #ifdef  KERNEL
1502                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1503                     MUTEX_EXIT(&rx_serverPool_lock);
1504                     return (struct rx_call *)0;
1505                 }
1506 #endif
1507             } while (!(call = sq->newcall) &&
1508                      !(socketp && *socketp != OSI_NULLSOCKET));
1509             MUTEX_EXIT(&rx_serverPool_lock);
1510             if (call) {
1511                 MUTEX_ENTER(&call->lock);
1512             }
1513             break;
1514         }
1515     }
1516
1517     MUTEX_ENTER(&freeSQEList_lock);
1518     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1519     rx_FreeSQEList = sq;
1520     MUTEX_EXIT(&freeSQEList_lock);
1521
1522     if (call) {
1523         clock_GetTime(&call->startTime);
1524         call->state = RX_STATE_ACTIVE;
1525         call->mode = RX_MODE_RECEIVING;
1526
1527         rxi_calltrace(RX_CALL_START, call);
1528         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1529              call->conn->service->servicePort, 
1530              call->conn->service->serviceId, call));
1531
1532         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1533         MUTEX_EXIT(&call->lock);
1534     } else {
1535         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1536     }
1537
1538     return call;
1539 }
1540 #else /* RX_ENABLE_LOCKS */
1541 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1542 {
1543     struct rx_serverQueueEntry *sq;
1544     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1545     struct rx_service *service = NULL;
1546     SPLVAR;
1547
1548     NETPRI;
1549     AFS_RXGLOCK();
1550     MUTEX_ENTER(&freeSQEList_lock);
1551
1552     if ((sq = rx_FreeSQEList)) {
1553         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1554         MUTEX_EXIT(&freeSQEList_lock);
1555     } else {    /* otherwise allocate a new one and return that */
1556         MUTEX_EXIT(&freeSQEList_lock);
1557         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1558         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1559         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1560     }
1561     MUTEX_ENTER(&sq->lock);
1562
1563     if (cur_service != NULL) {
1564         cur_service->nRequestsRunning--;
1565         if (cur_service->nRequestsRunning < cur_service->minProcs)
1566             rxi_minDeficit++;
1567         rxi_availProcs++;
1568     }
1569     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1570         register struct rx_call *tcall, *ncall;
1571         /* Scan for eligible incoming calls.  A call is not eligible
1572          * if the maximum number of calls for its service type are
1573          * already executing */
1574         /* One thread will process calls FCFS (to prevent starvation),
1575          * while the other threads may run ahead looking for calls which
1576          * have all their input data available immediately.  This helps 
1577          * keep threads from blocking, waiting for data from the client. */
1578         choice2 = (struct rx_call *) 0;
1579         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1580           service = tcall->conn->service;
1581           if (QuotaOK(service)) {
1582              if (tno==rxi_fcfs_thread_num || !tcall->queue_item_header.next  ) {
1583                  /* If we're the fcfs thread, then  we'll just use 
1584                   * this call. If we haven't been able to find an optimal 
1585                   * choice, and we're at the end of the list, then use a 
1586                   * 2d choice if one has been identified.  Otherwise... */
1587                  call = (choice2 ? choice2 : tcall);
1588                  service = call->conn->service;
1589              } else if (!queue_IsEmpty(&tcall->rq)) {
1590                  struct rx_packet *rp;
1591                  rp = queue_First(&tcall->rq, rx_packet);
1592                  if (rp->header.seq == 1
1593                      && (!meltdown_1pkt ||
1594                          (rp->header.flags & RX_LAST_PACKET))) {
1595                      call = tcall;
1596                  } else if (rxi_2dchoice && !choice2 &&
1597                             !(tcall->flags & RX_CALL_CLEARED) &&
1598                             (tcall->rprev > rxi_HardAckRate)) {
1599                      choice2 = tcall;
1600                  } else rxi_md2cnt++;
1601              }
1602           }
1603           if (call) 
1604              break;
1605         }
1606       }
1607
1608     if (call) {
1609         queue_Remove(call);
1610         /* we can't schedule a call if there's no data!!! */
1611         /* send an ack if there's no data, if we're missing the
1612          * first packet, or we're missing something between first 
1613          * and last -- there's a "hole" in the incoming data. */
1614         if (queue_IsEmpty(&call->rq) ||
1615             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1616             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1617           rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1618
1619         call->flags &= (~RX_CALL_WAIT_PROC);
1620         service->nRequestsRunning++;
1621         /* just started call in minProcs pool, need fewer to maintain
1622          * guarantee */
1623         if (service->nRequestsRunning <= service->minProcs)
1624             rxi_minDeficit--;
1625         rxi_availProcs--;
1626         rx_nWaiting--;
1627         /* MUTEX_EXIT(&call->lock); */
1628     }
1629     else {
1630         /* If there are no eligible incoming calls, add this process
1631          * to the idle server queue, to wait for one */
1632         sq->newcall = 0;
1633         if (socketp) {
1634             *socketp = OSI_NULLSOCKET;
1635         }
1636         sq->socketp = socketp;
1637         queue_Append(&rx_idleServerQueue, sq);
1638         do {
1639             osi_rxSleep(sq);
1640 #ifdef  KERNEL
1641                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1642                     AFS_RXGUNLOCK();
1643                     USERPRI;
1644                     return (struct rx_call *)0;
1645                 }
1646 #endif
1647         } while (!(call = sq->newcall) &&
1648                  !(socketp && *socketp != OSI_NULLSOCKET));
1649     }
1650     MUTEX_EXIT(&sq->lock);
1651
1652     MUTEX_ENTER(&freeSQEList_lock);
1653     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1654     rx_FreeSQEList = sq;
1655     MUTEX_EXIT(&freeSQEList_lock);
1656
1657     if (call) {
1658         clock_GetTime(&call->startTime);
1659         call->state = RX_STATE_ACTIVE;
1660         call->mode = RX_MODE_RECEIVING;
1661
1662         rxi_calltrace(RX_CALL_START, call);
1663         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1664          call->conn->service->servicePort, 
1665          call->conn->service->serviceId, call));
1666     } else {
1667         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1668     }
1669
1670     AFS_RXGUNLOCK();
1671     USERPRI;
1672
1673     return call;
1674 }
1675 #endif /* RX_ENABLE_LOCKS */
1676
1677
1678
1679 /* Establish a procedure to be called when a packet arrives for a
1680  * call.  This routine will be called at most once after each call,
1681  * and will also be called if there is an error condition on the or
1682  * the call is complete.  Used by multi rx to build a selection
1683  * function which determines which of several calls is likely to be a
1684  * good one to read from.  
1685  * NOTE: the way this is currently implemented it is probably only a
1686  * good idea to (1) use it immediately after a newcall (clients only)
1687  * and (2) only use it once.  Other uses currently void your warranty
1688  */
1689 void rx_SetArrivalProc(register struct rx_call *call, 
1690         register VOID (*proc)(register struct rx_call *call,
1691         register struct multi_handle *mh, register int index),
1692         register VOID *handle, register VOID *arg)
1693 {
1694     call->arrivalProc = proc;
1695     call->arrivalProcHandle = handle;
1696     call->arrivalProcArg = arg;
1697 }
1698
1699 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1700  * appropriate, and return the final error code from the conversation
1701  * to the caller */
1702
1703 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1704 {
1705     register struct rx_connection *conn = call->conn;
1706     register struct rx_service *service;
1707     register struct rx_packet *tp; /* Temporary packet pointer */
1708     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1709     afs_int32 error;
1710     SPLVAR;
1711
1712     dpf(("rx_EndCall(call %x)\n", call));
1713
1714     NETPRI;
1715     AFS_RXGLOCK();
1716     MUTEX_ENTER(&call->lock);
1717
1718     if (rc == 0 && call->error == 0) {
1719         call->abortCode = 0;
1720         call->abortCount = 0;
1721     }
1722
1723     call->arrivalProc = (VOID (*)()) 0;
1724     if (rc && call->error == 0) {
1725         rxi_CallError(call, rc);
1726         /* Send an abort message to the peer if this error code has
1727          * only just been set.  If it was set previously, assume the
1728          * peer has already been sent the error code or will request it 
1729          */
1730         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1731     }
1732     if (conn->type == RX_SERVER_CONNECTION) {
1733         /* Make sure reply or at least dummy reply is sent */
1734         if (call->mode == RX_MODE_RECEIVING) {
1735             rxi_WriteProc(call, 0, 0);
1736         }
1737         if (call->mode == RX_MODE_SENDING) {
1738             rxi_FlushWrite(call);
1739         }
1740         service = conn->service;
1741         rxi_calltrace(RX_CALL_END, call);
1742         /* Call goes to hold state until reply packets are acknowledged */
1743         if (call->tfirst + call->nSoftAcked < call->tnext) {
1744             call->state = RX_STATE_HOLD;
1745         } else {
1746             call->state = RX_STATE_DALLY;
1747             rxi_ClearTransmitQueue(call, 0);
1748             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1749             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1750         }
1751     }
1752     else { /* Client connection */
1753         char dummy;
1754         /* Make sure server receives input packets, in the case where
1755          * no reply arguments are expected */
1756         if ((call->mode == RX_MODE_SENDING)
1757          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1758             (void) rxi_ReadProc(call, &dummy, 1);
1759         }
1760
1761         /* If we had an outstanding delayed ack, be nice to the server
1762          * and force-send it now.
1763          */
1764         if (call->delayedAckEvent) {
1765             rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1766             call->delayedAckEvent = NULL;
1767             rxi_SendDelayedAck(NULL, call, NULL);
1768         }
1769
1770         /* We need to release the call lock since it's lower than the
1771          * conn_call_lock and we don't want to hold the conn_call_lock
1772          * over the rx_ReadProc call. The conn_call_lock needs to be held
1773          * here for the case where rx_NewCall is perusing the calls on
1774          * the connection structure. We don't want to signal until
1775          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1776          * have checked this call, found it active and by the time it
1777          * goes to sleep, will have missed the signal.
1778          */
1779         MUTEX_EXIT(&call->lock);
1780         MUTEX_ENTER(&conn->conn_call_lock);
1781         MUTEX_ENTER(&call->lock);
1782         MUTEX_ENTER(&conn->conn_data_lock);
1783         conn->flags |= RX_CONN_BUSY;
1784         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1785             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1786             MUTEX_EXIT(&conn->conn_data_lock);
1787 #ifdef  RX_ENABLE_LOCKS
1788             CV_BROADCAST(&conn->conn_call_cv);
1789 #else
1790             osi_rxWakeup(conn);
1791 #endif
1792         }
1793 #ifdef RX_ENABLE_LOCKS
1794         else {
1795             MUTEX_EXIT(&conn->conn_data_lock);
1796         }
1797 #endif /* RX_ENABLE_LOCKS */
1798         call->state = RX_STATE_DALLY;
1799     }
1800     error = call->error;
1801
1802     /* currentPacket, nLeft, and NFree must be zeroed here, because
1803      * ResetCall cannot: ResetCall may be called at splnet(), in the
1804      * kernel version, and may interrupt the macros rx_Read or
1805      * rx_Write, which run at normal priority for efficiency. */
1806     if (call->currentPacket) {
1807         rxi_FreePacket(call->currentPacket);
1808         call->currentPacket = (struct rx_packet *) 0;
1809         call->nLeft = call->nFree = call->curlen = 0;
1810     }
1811     else
1812         call->nLeft = call->nFree = call->curlen = 0;
1813
1814     /* Free any packets from the last call to ReadvProc/WritevProc */
1815     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1816         queue_Remove(tp);
1817         rxi_FreePacket(tp);
1818     }
1819
1820     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1821     MUTEX_EXIT(&call->lock);
1822     if (conn->type == RX_CLIENT_CONNECTION) {
1823         MUTEX_EXIT(&conn->conn_call_lock);
1824         conn->flags &= ~RX_CONN_BUSY;
1825     }
1826     AFS_RXGUNLOCK();
1827     USERPRI;
1828     /*
1829      * Map errors to the local host's errno.h format.
1830      */
1831     error = ntoh_syserr_conv(error);
1832     return error;
1833 }
1834
1835 #if !defined(KERNEL)
1836
1837 /* Call this routine when shutting down a server or client (especially
1838  * clients).  This will allow Rx to gracefully garbage collect server
1839  * connections, and reduce the number of retries that a server might
1840  * make to a dead client.
1841  * This is not quite right, since some calls may still be ongoing and
1842  * we can't lock them to destroy them. */
1843 void rx_Finalize(void)
1844 {
1845     register struct rx_connection **conn_ptr, **conn_end;
1846
1847     INIT_PTHREAD_LOCKS
1848     LOCK_RX_INIT
1849     if (rxinit_status == 1) {
1850         UNLOCK_RX_INIT
1851         return; /* Already shutdown. */
1852     }
1853     rxi_DeleteCachedConnections();
1854     if (rx_connHashTable) {
1855         MUTEX_ENTER(&rx_connHashTable_lock);
1856         for (conn_ptr = &rx_connHashTable[0], 
1857              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1858              conn_ptr < conn_end; conn_ptr++) {
1859             struct rx_connection *conn, *next;
1860             for (conn = *conn_ptr; conn; conn = next) {
1861                 next = conn->next;
1862                 if (conn->type == RX_CLIENT_CONNECTION) {
1863                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1864                     conn->refCount++;
1865                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1866 #ifdef RX_ENABLE_LOCKS
1867                     rxi_DestroyConnectionNoLock(conn);
1868 #else /* RX_ENABLE_LOCKS */
1869                     rxi_DestroyConnection(conn);
1870 #endif /* RX_ENABLE_LOCKS */
1871                 }
1872             }
1873         }
1874 #ifdef RX_ENABLE_LOCKS
1875         while (rx_connCleanup_list) {
1876             struct rx_connection *conn;
1877             conn = rx_connCleanup_list;
1878             rx_connCleanup_list = rx_connCleanup_list->next;
1879             MUTEX_EXIT(&rx_connHashTable_lock);
1880             rxi_CleanupConnection(conn);
1881             MUTEX_ENTER(&rx_connHashTable_lock);
1882         }
1883         MUTEX_EXIT(&rx_connHashTable_lock);
1884 #endif /* RX_ENABLE_LOCKS */
1885     }
1886     rxi_flushtrace();
1887
1888     rxinit_status = 1;
1889     UNLOCK_RX_INIT
1890 }
1891 #endif
1892
1893 /* if we wakeup packet waiter too often, can get in loop with two
1894     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1895 void rxi_PacketsUnWait(void)
1896 {
1897     if (!rx_waitingForPackets) {
1898         return;
1899     }
1900 #ifdef KERNEL
1901     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1902         return;                                     /* still over quota */
1903     }
1904 #endif /* KERNEL */
1905     rx_waitingForPackets = 0;
1906 #ifdef  RX_ENABLE_LOCKS
1907     CV_BROADCAST(&rx_waitingForPackets_cv);
1908 #else
1909     osi_rxWakeup(&rx_waitingForPackets);
1910 #endif
1911     return;
1912 }
1913
1914
1915 /* ------------------Internal interfaces------------------------- */
1916
1917 /* Return this process's service structure for the
1918  * specified socket and service */
1919 struct rx_service *rxi_FindService(register osi_socket socket, 
1920         register u_short serviceId)
1921 {
1922     register struct rx_service **sp;    
1923     for (sp = &rx_services[0]; *sp; sp++) {
1924         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1925           return *sp;
1926     }
1927     return 0;
1928 }
1929
1930 /* Allocate a call structure, for the indicated channel of the
1931  * supplied connection.  The mode and state of the call must be set by
1932  * the caller. Returns the call with mutex locked. */
1933 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1934         register int channel)
1935 {
1936     register struct rx_call *call;
1937 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1938     register struct rx_call *cp;        /* Call pointer temp */
1939     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1940 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1941
1942     /* Grab an existing call structure, or allocate a new one.
1943      * Existing call structures are assumed to have been left reset by
1944      * rxi_FreeCall */
1945     MUTEX_ENTER(&rx_freeCallQueue_lock);
1946
1947 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1948     /*
1949      * EXCEPT that the TQ might not yet be cleared out.
1950      * Skip over those with in-use TQs.
1951      */
1952     call = NULL;
1953     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1954         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1955             call = cp;
1956             break;
1957         }
1958     }
1959     if (call) {
1960 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1961     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1962         call = queue_First(&rx_freeCallQueue, rx_call);
1963 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1964         queue_Remove(call);
1965         MUTEX_ENTER(&rx_stats_mutex);
1966         rx_stats.nFreeCallStructs--;
1967         MUTEX_EXIT(&rx_stats_mutex);
1968         MUTEX_EXIT(&rx_freeCallQueue_lock);
1969         MUTEX_ENTER(&call->lock);
1970         CLEAR_CALL_QUEUE_LOCK(call);
1971 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1972         /* Now, if TQ wasn't cleared earlier, do it now. */
1973         if (call->flags & RX_CALL_TQ_CLEARME) {
1974             rxi_ClearTransmitQueue(call, 0);
1975             queue_Init(&call->tq);
1976         }
1977 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1978         /* Bind the call to its connection structure */
1979         call->conn = conn;
1980         rxi_ResetCall(call, 1);
1981     }
1982     else {
1983         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1984
1985         MUTEX_EXIT(&rx_freeCallQueue_lock);
1986         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1987         MUTEX_ENTER(&call->lock);
1988         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1989         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1990         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1991
1992         MUTEX_ENTER(&rx_stats_mutex);
1993         rx_stats.nCallStructs++;
1994         MUTEX_EXIT(&rx_stats_mutex);
1995         /* Initialize once-only items */
1996         queue_Init(&call->tq);
1997         queue_Init(&call->rq);
1998         queue_Init(&call->iovq);
1999         /* Bind the call to its connection structure (prereq for reset) */
2000         call->conn = conn;
2001         rxi_ResetCall(call, 1);
2002     }
2003     call->channel = channel;
2004     call->callNumber = &conn->callNumber[channel];
2005     /* Note that the next expected call number is retained (in
2006      * conn->callNumber[i]), even if we reallocate the call structure
2007      */
2008     conn->call[channel] = call;
2009     /* if the channel's never been used (== 0), we should start at 1, otherwise
2010         the call number is valid from the last time this channel was used */
2011     if (*call->callNumber == 0) *call->callNumber = 1;
2012
2013     return call;
2014 }
2015
2016 /* A call has been inactive long enough that so we can throw away
2017  * state, including the call structure, which is placed on the call
2018  * free list.
2019  * Call is locked upon entry.
2020  * haveCTLock set if called from rxi_ReapConnections
2021  */
2022 #ifdef RX_ENABLE_LOCKS
2023 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2024 #else /* RX_ENABLE_LOCKS */
2025 void rxi_FreeCall(register struct rx_call *call)
2026 #endif /* RX_ENABLE_LOCKS */
2027 {
2028     register int channel = call->channel;
2029     register struct rx_connection *conn = call->conn;
2030
2031
2032     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2033       (*call->callNumber)++;
2034     rxi_ResetCall(call, 0);
2035     call->conn->call[channel] = (struct rx_call *) 0;
2036
2037     MUTEX_ENTER(&rx_freeCallQueue_lock);
2038     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2039 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2040     /* A call may be free even though its transmit queue is still in use.
2041      * Since we search the call list from head to tail, put busy calls at
2042      * the head of the list, and idle calls at the tail.
2043      */
2044     if (call->flags & RX_CALL_TQ_BUSY)
2045         queue_Prepend(&rx_freeCallQueue, call);
2046     else
2047         queue_Append(&rx_freeCallQueue, call);
2048 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2049     queue_Append(&rx_freeCallQueue, call);
2050 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2051     MUTEX_ENTER(&rx_stats_mutex);
2052     rx_stats.nFreeCallStructs++;
2053     MUTEX_EXIT(&rx_stats_mutex);
2054
2055     MUTEX_EXIT(&rx_freeCallQueue_lock);
2056  
2057     /* Destroy the connection if it was previously slated for
2058      * destruction, i.e. the Rx client code previously called
2059      * rx_DestroyConnection (client connections), or
2060      * rxi_ReapConnections called the same routine (server
2061      * connections).  Only do this, however, if there are no
2062      * outstanding calls. Note that for fine grain locking, there appears
2063      * to be a deadlock in that rxi_FreeCall has a call locked and
2064      * DestroyConnectionNoLock locks each call in the conn. But note a
2065      * few lines up where we have removed this call from the conn.
2066      * If someone else destroys a connection, they either have no
2067      * call lock held or are going through this section of code.
2068      */
2069     if (conn->flags & RX_CONN_DESTROY_ME) {
2070         MUTEX_ENTER(&conn->conn_data_lock);
2071         conn->refCount++;
2072         MUTEX_EXIT(&conn->conn_data_lock);
2073 #ifdef RX_ENABLE_LOCKS
2074         if (haveCTLock)
2075             rxi_DestroyConnectionNoLock(conn);
2076         else
2077             rxi_DestroyConnection(conn);
2078 #else /* RX_ENABLE_LOCKS */
2079         rxi_DestroyConnection(conn);
2080 #endif /* RX_ENABLE_LOCKS */
2081     }
2082 }
2083
2084 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2085 char *rxi_Alloc(register size_t size)
2086 {
2087     register char *p;
2088
2089 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2090     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2091      * implementation.
2092      */
2093     int glockOwner = ISAFS_GLOCK();
2094     if (!glockOwner)
2095         AFS_GLOCK();
2096 #endif
2097     MUTEX_ENTER(&rx_stats_mutex);
2098     rxi_Alloccnt++; rxi_Allocsize += size;
2099     MUTEX_EXIT(&rx_stats_mutex);
2100 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2101     if (size > AFS_SMALLOCSIZ) {
2102         p = (char *) osi_AllocMediumSpace(size);
2103     } else
2104         p = (char *) osi_AllocSmall(size, 1);
2105 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2106     if (!glockOwner)
2107         AFS_GUNLOCK();
2108 #endif
2109 #else
2110     p = (char *) osi_Alloc(size);
2111 #endif
2112     if (!p) osi_Panic("rxi_Alloc error");
2113     memset(p, 0, size);
2114     return p;
2115 }
2116
2117 void rxi_Free(void *addr, register size_t size)
2118 {
2119 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2120     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2121      * implementation.
2122      */
2123     int glockOwner = ISAFS_GLOCK();
2124     if (!glockOwner)
2125         AFS_GLOCK();
2126 #endif
2127     MUTEX_ENTER(&rx_stats_mutex);
2128     rxi_Alloccnt--; rxi_Allocsize -= size;
2129     MUTEX_EXIT(&rx_stats_mutex);
2130 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2131     if (size > AFS_SMALLOCSIZ)
2132         osi_FreeMediumSpace(addr);
2133     else
2134         osi_FreeSmall(addr);
2135 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2136     if (!glockOwner)
2137         AFS_GUNLOCK();
2138 #endif
2139 #else
2140     osi_Free(addr, size);
2141 #endif    
2142 }
2143
2144 /* Find the peer process represented by the supplied (host,port)
2145  * combination.  If there is no appropriate active peer structure, a
2146  * new one will be allocated and initialized 
2147  * The origPeer, if set, is a pointer to a peer structure on which the
2148  * refcount will be be decremented. This is used to replace the peer
2149  * structure hanging off a connection structure */
2150 struct rx_peer *rxi_FindPeer(register afs_uint32 host, 
2151         register u_short port, struct rx_peer *origPeer, int create)
2152 {
2153     register struct rx_peer *pp;
2154     int hashIndex;
2155     hashIndex = PEER_HASH(host, port);
2156     MUTEX_ENTER(&rx_peerHashTable_lock);
2157     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2158         if ((pp->host == host) && (pp->port == port)) break;
2159     }
2160     if (!pp) {
2161         if (create) {
2162             pp = rxi_AllocPeer(); /* This bzero's *pp */
2163             pp->host = host;      /* set here or in InitPeerParams is zero */
2164             pp->port = port;
2165             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2166             queue_Init(&pp->congestionQueue);
2167             queue_Init(&pp->rpcStats);
2168             pp->next = rx_peerHashTable[hashIndex];
2169             rx_peerHashTable[hashIndex] = pp;
2170             rxi_InitPeerParams(pp);
2171             MUTEX_ENTER(&rx_stats_mutex);
2172             rx_stats.nPeerStructs++;
2173             MUTEX_EXIT(&rx_stats_mutex);
2174         }
2175     }
2176     if (pp && create) {
2177         pp->refCount++;
2178     }
2179     if ( origPeer)
2180         origPeer->refCount--;
2181     MUTEX_EXIT(&rx_peerHashTable_lock);
2182     return pp;
2183 }
2184
2185
2186 /* Find the connection at (host, port) started at epoch, and with the
2187  * given connection id.  Creates the server connection if necessary.
2188  * The type specifies whether a client connection or a server
2189  * connection is desired.  In both cases, (host, port) specify the
2190  * peer's (host, pair) pair.  Client connections are not made
2191  * automatically by this routine.  The parameter socket gives the
2192  * socket descriptor on which the packet was received.  This is used,
2193  * in the case of server connections, to check that *new* connections
2194  * come via a valid (port, serviceId).  Finally, the securityIndex
2195  * parameter must match the existing index for the connection.  If a
2196  * server connection is created, it will be created using the supplied
2197  * index, if the index is valid for this service */
2198 struct rx_connection *rxi_FindConnection(osi_socket socket, 
2199         register afs_int32 host, register u_short port, u_short serviceId, 
2200         afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2201 {
2202     int hashindex, flag;
2203     register struct rx_connection *conn;
2204     hashindex = CONN_HASH(host, port, cid, epoch, type);
2205     MUTEX_ENTER(&rx_connHashTable_lock);
2206     rxLastConn ? (conn = rxLastConn, flag = 0) :
2207                  (conn = rx_connHashTable[hashindex], flag = 1);
2208     for (; conn; ) {
2209       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2210           && (epoch == conn->epoch)) {
2211         register struct rx_peer *pp = conn->peer;
2212         if (securityIndex != conn->securityIndex) {
2213             /* this isn't supposed to happen, but someone could forge a packet
2214                like this, and there seems to be some CM bug that makes this
2215                happen from time to time -- in which case, the fileserver
2216                asserts. */  
2217             MUTEX_EXIT(&rx_connHashTable_lock);
2218             return (struct rx_connection *) 0;
2219         }
2220         if (pp->host == host && pp->port == port)
2221             break;
2222         if (type == RX_CLIENT_CONNECTION && pp->port == port)
2223             break;
2224         if (type == RX_CLIENT_CONNECTION && (conn->epoch & 0x80000000))
2225             break;
2226       }
2227       if ( !flag )
2228       {
2229         /* the connection rxLastConn that was used the last time is not the
2230         ** one we are looking for now. Hence, start searching in the hash */
2231         flag = 1;
2232         conn = rx_connHashTable[hashindex];
2233       }
2234       else
2235         conn = conn->next;
2236     }
2237     if (!conn) {
2238         struct rx_service *service;
2239         if (type == RX_CLIENT_CONNECTION) {
2240             MUTEX_EXIT(&rx_connHashTable_lock);
2241             return (struct rx_connection *) 0;
2242         }
2243         service = rxi_FindService(socket, serviceId);
2244         if (!service || (securityIndex >= service->nSecurityObjects) 
2245             || (service->securityObjects[securityIndex] == 0)) {
2246             MUTEX_EXIT(&rx_connHashTable_lock);
2247             return (struct rx_connection *) 0;
2248         }
2249         conn = rxi_AllocConnection(); /* This bzero's the connection */
2250         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2251                    MUTEX_DEFAULT,0);
2252         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2253                    MUTEX_DEFAULT,0);
2254         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2255         conn->next = rx_connHashTable[hashindex];
2256         rx_connHashTable[hashindex] = conn;
2257         conn->peer = rxi_FindPeer(host, port, 0, 1);
2258         conn->type = RX_SERVER_CONNECTION;
2259         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2260         conn->epoch = epoch;
2261         conn->cid = cid & RX_CIDMASK;
2262         /* conn->serial = conn->lastSerial = 0; */
2263         /* conn->timeout = 0; */
2264         conn->ackRate = RX_FAST_ACK_RATE;
2265         conn->service = service;
2266         conn->serviceId = serviceId;
2267         conn->securityIndex = securityIndex;
2268         conn->securityObject = service->securityObjects[securityIndex];
2269         conn->nSpecific = 0;
2270         conn->specific = NULL;
2271         rx_SetConnDeadTime(conn, service->connDeadTime);
2272         rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
2273         /* Notify security object of the new connection */
2274         RXS_NewConnection(conn->securityObject, conn);
2275         /* XXXX Connection timeout? */
2276         if (service->newConnProc) (*service->newConnProc)(conn);
2277         MUTEX_ENTER(&rx_stats_mutex);
2278         rx_stats.nServerConns++;
2279         MUTEX_EXIT(&rx_stats_mutex);
2280     }
2281
2282     MUTEX_ENTER(&conn->conn_data_lock);
2283     conn->refCount++;
2284     MUTEX_EXIT(&conn->conn_data_lock);
2285
2286     rxLastConn = conn;  /* store this connection as the last conn used */
2287     MUTEX_EXIT(&rx_connHashTable_lock);
2288     return conn;
2289 }
2290
2291 /* There are two packet tracing routines available for testing and monitoring
2292  * Rx.  One is called just after every packet is received and the other is
2293  * called just before every packet is sent.  Received packets, have had their
2294  * headers decoded, and packets to be sent have not yet had their headers
2295  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2296  * containing the network address.  Both can be modified.  The return value, if
2297  * non-zero, indicates that the packet should be dropped.  */
2298
2299 int (*rx_justReceived)() = 0;
2300 int (*rx_almostSent)() = 0;
2301
2302 /* A packet has been received off the interface.  Np is the packet, socket is
2303  * the socket number it was received from (useful in determining which service
2304  * this packet corresponds to), and (host, port) reflect the host,port of the
2305  * sender.  This call returns the packet to the caller if it is finished with
2306  * it, rather than de-allocating it, just as a small performance hack */
2307
2308 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np, 
2309         osi_socket socket, afs_uint32 host, u_short port, 
2310         int *tnop, struct rx_call **newcallp)
2311 {
2312     register struct rx_call *call;
2313     register struct rx_connection *conn;
2314     int channel;
2315     afs_uint32 currentCallNumber;
2316     int type;
2317     int skew;
2318 #ifdef RXDEBUG
2319     char *packetType;
2320 #endif
2321     struct rx_packet *tnp;
2322
2323 #ifdef RXDEBUG
2324 /* We don't print out the packet until now because (1) the time may not be
2325  * accurate enough until now in the lwp implementation (rx_Listener only gets
2326  * the time after the packet is read) and (2) from a protocol point of view,
2327  * this is the first time the packet has been seen */
2328     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2329         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2330     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2331          np->header.serial, packetType, host, port, np->header.serviceId,
2332          np->header.epoch, np->header.cid, np->header.callNumber, 
2333          np->header.seq, np->header.flags, np));
2334 #endif
2335
2336     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2337       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2338     }
2339
2340     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2341         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2342     }
2343 #ifdef RXDEBUG
2344     /* If an input tracer function is defined, call it with the packet and
2345      * network address.  Note this function may modify its arguments. */
2346     if (rx_justReceived) {
2347         struct sockaddr_in addr;
2348         int drop;
2349         addr.sin_family = AF_INET;
2350         addr.sin_port = port;
2351         addr.sin_addr.s_addr = host;
2352 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN 
2353         addr.sin_len = sizeof(addr);
2354 #endif  /* AFS_OSF_ENV */
2355         drop = (*rx_justReceived) (np, &addr);
2356         /* drop packet if return value is non-zero */
2357         if (drop) return np;
2358         port = addr.sin_port;           /* in case fcn changed addr */
2359         host = addr.sin_addr.s_addr;
2360     }
2361 #endif
2362
2363     /* If packet was not sent by the client, then *we* must be the client */
2364     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2365         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2366
2367     /* Find the connection (or fabricate one, if we're the server & if
2368      * necessary) associated with this packet */
2369     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2370                               np->header.cid, np->header.epoch, type, 
2371                               np->header.securityIndex);
2372
2373     if (!conn) {
2374       /* If no connection found or fabricated, just ignore the packet.
2375        * (An argument could be made for sending an abort packet for
2376        * the conn) */
2377       return np;
2378     }   
2379
2380     MUTEX_ENTER(&conn->conn_data_lock);
2381     if (conn->maxSerial < np->header.serial)
2382         conn->maxSerial = np->header.serial;
2383     MUTEX_EXIT(&conn->conn_data_lock);
2384
2385     /* If the connection is in an error state, send an abort packet and ignore
2386      * the incoming packet */
2387     if (conn->error) {
2388         /* Don't respond to an abort packet--we don't want loops! */
2389         MUTEX_ENTER(&conn->conn_data_lock);
2390         if (np->header.type != RX_PACKET_TYPE_ABORT)
2391             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2392         conn->refCount--;
2393         MUTEX_EXIT(&conn->conn_data_lock);
2394         return np;
2395     }
2396
2397     /* Check for connection-only requests (i.e. not call specific). */
2398     if (np->header.callNumber == 0) {
2399         switch (np->header.type) {
2400             case RX_PACKET_TYPE_ABORT:
2401                 /* What if the supplied error is zero? */
2402                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2403                 MUTEX_ENTER(&conn->conn_data_lock);
2404                 conn->refCount--;
2405                 MUTEX_EXIT(&conn->conn_data_lock);
2406                 return np;
2407             case RX_PACKET_TYPE_CHALLENGE:
2408                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2409                 MUTEX_ENTER(&conn->conn_data_lock);
2410                 conn->refCount--;
2411                 MUTEX_EXIT(&conn->conn_data_lock);
2412                 return tnp;
2413             case RX_PACKET_TYPE_RESPONSE:
2414                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2415                 MUTEX_ENTER(&conn->conn_data_lock);
2416                 conn->refCount--;
2417                 MUTEX_EXIT(&conn->conn_data_lock);
2418                 return tnp;
2419             case RX_PACKET_TYPE_PARAMS:
2420             case RX_PACKET_TYPE_PARAMS+1:
2421             case RX_PACKET_TYPE_PARAMS+2:
2422                 /* ignore these packet types for now */
2423                 MUTEX_ENTER(&conn->conn_data_lock);
2424                 conn->refCount--;
2425                 MUTEX_EXIT(&conn->conn_data_lock);
2426                 return np;
2427
2428
2429             default:
2430                 /* Should not reach here, unless the peer is broken: send an
2431                  * abort packet */
2432                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2433                 MUTEX_ENTER(&conn->conn_data_lock);
2434                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2435                 conn->refCount--;
2436                 MUTEX_EXIT(&conn->conn_data_lock);
2437                 return tnp;
2438         }
2439     }
2440
2441     channel = np->header.cid & RX_CHANNELMASK;
2442     call = conn->call[channel];
2443 #ifdef  RX_ENABLE_LOCKS
2444     if (call)
2445         MUTEX_ENTER(&call->lock);
2446     /* Test to see if call struct is still attached to conn. */
2447     if (call != conn->call[channel]) {
2448         if (call)
2449             MUTEX_EXIT(&call->lock);
2450         if (type == RX_SERVER_CONNECTION) {
2451             call = conn->call[channel];
2452             /* If we started with no call attached and there is one now,
2453              * another thread is also running this routine and has gotten
2454              * the connection channel. We should drop this packet in the tests
2455              * below. If there was a call on this connection and it's now
2456              * gone, then we'll be making a new call below.
2457              * If there was previously a call and it's now different then
2458              * the old call was freed and another thread running this routine
2459              * has created a call on this channel. One of these two threads
2460              * has a packet for the old call and the code below handles those
2461              * cases.
2462              */
2463             if (call)
2464                 MUTEX_ENTER(&call->lock);
2465         }
2466         else {
2467             /* This packet can't be for this call. If the new call address is
2468              * 0 then no call is running on this channel. If there is a call
2469              * then, since this is a client connection we're getting data for
2470              * it must be for the previous call.
2471              */
2472             MUTEX_ENTER(&rx_stats_mutex);
2473             rx_stats.spuriousPacketsRead++;
2474             MUTEX_EXIT(&rx_stats_mutex);
2475             MUTEX_ENTER(&conn->conn_data_lock);
2476             conn->refCount--;
2477             MUTEX_EXIT(&conn->conn_data_lock);
2478             return np;
2479         }
2480     }
2481 #endif
2482     currentCallNumber = conn->callNumber[channel];
2483
2484     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2485         if (np->header.callNumber < currentCallNumber) {
2486             MUTEX_ENTER(&rx_stats_mutex);
2487             rx_stats.spuriousPacketsRead++;
2488             MUTEX_EXIT(&rx_stats_mutex);
2489 #ifdef  RX_ENABLE_LOCKS
2490             if (call)
2491                 MUTEX_EXIT(&call->lock);
2492 #endif
2493             MUTEX_ENTER(&conn->conn_data_lock);
2494             conn->refCount--;
2495             MUTEX_EXIT(&conn->conn_data_lock);
2496             return np;
2497         }
2498         if (!call) {
2499             MUTEX_ENTER(&conn->conn_call_lock);
2500             call = rxi_NewCall(conn, channel);
2501             MUTEX_EXIT(&conn->conn_call_lock);
2502             *call->callNumber = np->header.callNumber;
2503             call->state = RX_STATE_PRECALL;
2504             clock_GetTime(&call->queueTime);
2505             hzero(call->bytesSent);
2506             hzero(call->bytesRcvd);
2507             rxi_KeepAliveOn(call);
2508         }
2509         else if (np->header.callNumber != currentCallNumber) {
2510             /* Wait until the transmit queue is idle before deciding
2511              * whether to reset the current call. Chances are that the
2512              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2513              * flag is cleared.
2514              */
2515 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2516             while ((call->state == RX_STATE_ACTIVE) &&
2517                    (call->flags & RX_CALL_TQ_BUSY)) {
2518                 call->flags |= RX_CALL_TQ_WAIT;
2519 #ifdef RX_ENABLE_LOCKS
2520                 CV_WAIT(&call->cv_tq, &call->lock);
2521 #else /* RX_ENABLE_LOCKS */
2522                 osi_rxSleep(&call->tq);
2523 #endif /* RX_ENABLE_LOCKS */
2524             }
2525 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2526             /* If the new call cannot be taken right now send a busy and set
2527              * the error condition in this call, so that it terminates as
2528              * quickly as possible */
2529             if (call->state == RX_STATE_ACTIVE) {
2530                 struct rx_packet *tp;
2531
2532                 rxi_CallError(call, RX_CALL_DEAD);
2533                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2534                 MUTEX_EXIT(&call->lock);
2535                 MUTEX_ENTER(&conn->conn_data_lock);
2536                 conn->refCount--;
2537                 MUTEX_EXIT(&conn->conn_data_lock);
2538                 return tp;
2539             }
2540             rxi_ResetCall(call, 0);
2541             *call->callNumber = np->header.callNumber;
2542             call->state = RX_STATE_PRECALL;
2543             clock_GetTime(&call->queueTime);
2544             hzero(call->bytesSent);
2545             hzero(call->bytesRcvd);
2546             /*
2547              * If the number of queued calls exceeds the overload
2548              * threshold then abort this call.
2549              */
2550             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2551                 struct rx_packet *tp;
2552
2553                 rxi_CallError(call, rx_BusyError);
2554                 tp = rxi_SendCallAbort(call, np, 1, 0);
2555                 MUTEX_EXIT(&call->lock);
2556                 MUTEX_ENTER(&conn->conn_data_lock);
2557                 conn->refCount--;
2558                 MUTEX_EXIT(&conn->conn_data_lock);
2559                 return tp;
2560             }
2561             rxi_KeepAliveOn(call);
2562         }
2563         else {
2564             /* Continuing call; do nothing here. */
2565         }
2566     } else { /* we're the client */
2567         /* Ignore all incoming acknowledgements for calls in DALLY state */
2568         if ( call && (call->state == RX_STATE_DALLY) 
2569          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2570             MUTEX_ENTER(&rx_stats_mutex);
2571             rx_stats.ignorePacketDally++;
2572             MUTEX_EXIT(&rx_stats_mutex);
2573 #ifdef  RX_ENABLE_LOCKS
2574             if (call) {
2575                 MUTEX_EXIT(&call->lock);
2576             }
2577 #endif
2578             MUTEX_ENTER(&conn->conn_data_lock);
2579             conn->refCount--;
2580             MUTEX_EXIT(&conn->conn_data_lock);
2581             return np;
2582         }
2583         
2584         /* Ignore anything that's not relevant to the current call.  If there
2585          * isn't a current call, then no packet is relevant. */
2586         if (!call || (np->header.callNumber != currentCallNumber)) {
2587             MUTEX_ENTER(&rx_stats_mutex);
2588             rx_stats.spuriousPacketsRead++;
2589             MUTEX_EXIT(&rx_stats_mutex);
2590 #ifdef  RX_ENABLE_LOCKS
2591             if (call) {
2592                 MUTEX_EXIT(&call->lock);
2593             }
2594 #endif
2595             MUTEX_ENTER(&conn->conn_data_lock);
2596             conn->refCount--;
2597             MUTEX_EXIT(&conn->conn_data_lock);
2598             return np;  
2599         }
2600         /* If the service security object index stamped in the packet does not
2601          * match the connection's security index, ignore the packet */
2602         if (np->header.securityIndex != conn->securityIndex) {
2603 #ifdef  RX_ENABLE_LOCKS
2604             MUTEX_EXIT(&call->lock);
2605 #endif
2606             MUTEX_ENTER(&conn->conn_data_lock);
2607             conn->refCount--;       
2608             MUTEX_EXIT(&conn->conn_data_lock);
2609             return np;
2610         }
2611
2612         /* If we're receiving the response, then all transmit packets are
2613          * implicitly acknowledged.  Get rid of them. */
2614         if (np->header.type == RX_PACKET_TYPE_DATA) {
2615 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2616             /* XXX Hack. Because we must release the global rx lock when
2617              * sending packets (osi_NetSend) we drop all acks while we're
2618              * traversing the tq in rxi_Start sending packets out because
2619              * packets may move to the freePacketQueue as result of being here!
2620              * So we drop these packets until we're safely out of the
2621              * traversing. Really ugly! 
2622              * For fine grain RX locking, we set the acked field in the
2623              * packets and let rxi_Start remove them from the transmit queue.
2624              */
2625             if (call->flags & RX_CALL_TQ_BUSY) {
2626 #ifdef  RX_ENABLE_LOCKS
2627                 rxi_SetAcksInTransmitQueue(call);
2628 #else
2629                 conn->refCount--;
2630                 return np;              /* xmitting; drop packet */
2631 #endif
2632             }
2633             else {
2634                 rxi_ClearTransmitQueue(call, 0);
2635             }
2636 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2637             rxi_ClearTransmitQueue(call, 0);
2638 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2639         } else {
2640           if (np->header.type == RX_PACKET_TYPE_ACK) {
2641         /* now check to see if this is an ack packet acknowledging that the
2642          * server actually *lost* some hard-acked data.  If this happens we
2643          * ignore this packet, as it may indicate that the server restarted in
2644          * the middle of a call.  It is also possible that this is an old ack
2645          * packet.  We don't abort the connection in this case, because this
2646          * *might* just be an old ack packet.  The right way to detect a server
2647          * restart in the midst of a call is to notice that the server epoch
2648          * changed, btw.  */
2649         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2650          * XXX unacknowledged.  I think that this is off-by-one, but
2651          * XXX I don't dare change it just yet, since it will
2652          * XXX interact badly with the server-restart detection 
2653          * XXX code in receiveackpacket.  */
2654             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2655                 MUTEX_ENTER(&rx_stats_mutex);
2656                 rx_stats.spuriousPacketsRead++;
2657                 MUTEX_EXIT(&rx_stats_mutex);
2658                 MUTEX_EXIT(&call->lock);
2659                 MUTEX_ENTER(&conn->conn_data_lock);
2660                 conn->refCount--;
2661                 MUTEX_EXIT(&conn->conn_data_lock);
2662                 return np;
2663             }
2664           }
2665         } /* else not a data packet */
2666     }
2667
2668     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2669     /* Set remote user defined status from packet */
2670     call->remoteStatus = np->header.userStatus;
2671
2672     /* Note the gap between the expected next packet and the actual
2673      * packet that arrived, when the new packet has a smaller serial number
2674      * than expected.  Rioses frequently reorder packets all by themselves,
2675      * so this will be quite important with very large window sizes.
2676      * Skew is checked against 0 here to avoid any dependence on the type of
2677      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2678      * true! 
2679      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2680      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2681      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2682      */
2683     MUTEX_ENTER(&conn->conn_data_lock);
2684     skew = conn->lastSerial - np->header.serial;
2685     conn->lastSerial = np->header.serial;
2686     MUTEX_EXIT(&conn->conn_data_lock);
2687     if (skew > 0) {
2688       register struct rx_peer *peer;
2689       peer = conn->peer;
2690       if (skew > peer->inPacketSkew) {
2691         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2692         peer->inPacketSkew = skew;
2693       }
2694     }
2695
2696     /* Now do packet type-specific processing */
2697     switch (np->header.type) {
2698         case RX_PACKET_TYPE_DATA:
2699             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2700                                        tnop, newcallp);
2701             break;
2702         case RX_PACKET_TYPE_ACK:
2703             /* Respond immediately to ack packets requesting acknowledgement
2704              * (ping packets) */
2705             if (np->header.flags & RX_REQUEST_ACK) {
2706                 if (call->error)
2707                     (void) rxi_SendCallAbort(call, 0, 1, 0);
2708                 else
2709                     (void) rxi_SendAck(call, 0, np->header.serial,
2710                                        RX_ACK_PING_RESPONSE, 1);
2711             }
2712             np = rxi_ReceiveAckPacket(call, np, 1);
2713             break;
2714         case RX_PACKET_TYPE_ABORT:
2715             /* An abort packet: reset the connection, passing the error up to
2716              * the user */
2717             /* What if error is zero? */
2718             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2719             break;
2720         case RX_PACKET_TYPE_BUSY:
2721             /* XXXX */
2722             break;
2723         case RX_PACKET_TYPE_ACKALL:
2724             /* All packets acknowledged, so we can drop all packets previously
2725              * readied for sending */
2726 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2727             /* XXX Hack. We because we can't release the global rx lock when
2728              * sending packets (osi_NetSend) we drop all ack pkts while we're
2729              * traversing the tq in rxi_Start sending packets out because
2730              * packets may move to the freePacketQueue as result of being
2731              * here! So we drop these packets until we're safely out of the
2732              * traversing. Really ugly! 
2733              * For fine grain RX locking, we set the acked field in the packets
2734              * and let rxi_Start remove the packets from the transmit queue.
2735              */
2736             if (call->flags & RX_CALL_TQ_BUSY) {
2737 #ifdef  RX_ENABLE_LOCKS
2738                 rxi_SetAcksInTransmitQueue(call);
2739                 break;
2740 #else /* RX_ENABLE_LOCKS */
2741                 conn->refCount--;
2742                 return np;              /* xmitting; drop packet */
2743 #endif /* RX_ENABLE_LOCKS */
2744             }
2745 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2746             rxi_ClearTransmitQueue(call, 0);
2747             break;
2748         default:
2749             /* Should not reach here, unless the peer is broken: send an abort
2750              * packet */
2751             rxi_CallError(call, RX_PROTOCOL_ERROR);
2752             np = rxi_SendCallAbort(call, np, 1, 0);
2753             break;
2754     };
2755     /* Note when this last legitimate packet was received, for keep-alive
2756      * processing.  Note, we delay getting the time until now in the hope that
2757      * the packet will be delivered to the user before any get time is required
2758      * (if not, then the time won't actually be re-evaluated here). */
2759     call->lastReceiveTime = clock_Sec();
2760     MUTEX_EXIT(&call->lock);
2761     MUTEX_ENTER(&conn->conn_data_lock);
2762     conn->refCount--;
2763     MUTEX_EXIT(&conn->conn_data_lock);
2764     return np;
2765 }
2766
2767 /* return true if this is an "interesting" connection from the point of view
2768     of someone trying to debug the system */
2769 int rxi_IsConnInteresting(struct rx_connection *aconn)
2770 {
2771     register int i;
2772     register struct rx_call *tcall;
2773
2774     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2775         return 1;
2776     for(i=0;i<RX_MAXCALLS;i++) {
2777         tcall = aconn->call[i];
2778         if (tcall) {
2779             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2780                 return 1;
2781             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2782                 return 1;
2783         }
2784     }
2785     return 0;
2786 }
2787
2788 #ifdef KERNEL
2789 /* if this is one of the last few packets AND it wouldn't be used by the
2790    receiving call to immediately satisfy a read request, then drop it on
2791    the floor, since accepting it might prevent a lock-holding thread from
2792    making progress in its reading. If a call has been cleared while in
2793    the precall state then ignore all subsequent packets until the call
2794    is assigned to a thread. */
2795
2796 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2797 {
2798     int rc=0;
2799     MUTEX_ENTER(&rx_stats_mutex);
2800     if (((ap->header.seq != 1) &&
2801          (acall->flags & RX_CALL_CLEARED) &&
2802          (acall->state == RX_STATE_PRECALL)) ||
2803         ((rx_nFreePackets < rxi_dataQuota+2) &&
2804          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2805            && (acall->flags & RX_CALL_READER_WAIT)))) {
2806         rc = 1;
2807     }
2808     MUTEX_EXIT(&rx_stats_mutex);
2809     return rc;
2810 }
2811 #endif /* KERNEL */
2812
2813 static void rxi_CheckReachEvent(struct rxevent *event, 
2814         struct rx_connection *conn, struct rx_call *acall)
2815 {
2816     struct rx_call *call = acall;
2817     struct clock when;
2818     int i, waiting;
2819
2820     MUTEX_ENTER(&conn->conn_data_lock);
2821     conn->checkReachEvent = NULL;
2822     waiting = conn->flags & RX_CONN_ATTACHWAIT;
2823     if (event) conn->refCount--;
2824     MUTEX_EXIT(&conn->conn_data_lock);
2825
2826     if (waiting) {
2827         if (!call) {
2828             MUTEX_ENTER(&conn->conn_call_lock);
2829             MUTEX_ENTER(&conn->conn_data_lock);
2830             for (i=0; i<RX_MAXCALLS; i++) {
2831                 struct rx_call *tc = conn->call[i];
2832                 if (tc && tc->state == RX_STATE_PRECALL) {
2833                     call = tc;
2834                     break;
2835                 }
2836             }
2837             if (!call)
2838                 /* Indicate that rxi_CheckReachEvent is no longer running by
2839                  * clearing the flag.  Must be atomic under conn_data_lock to
2840                  * avoid a new call slipping by: rxi_CheckConnReach holds
2841                  * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2842                  */
2843                 conn->flags &= ~RX_CONN_ATTACHWAIT;
2844             MUTEX_EXIT(&conn->conn_data_lock);
2845             MUTEX_EXIT(&conn->conn_call_lock);
2846         }
2847
2848         if (call) {
2849             if (call != acall) MUTEX_ENTER(&call->lock);
2850             rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
2851             if (call != acall) MUTEX_EXIT(&call->lock);
2852
2853             clock_GetTime(&when);
2854             when.sec += RX_CHECKREACH_TIMEOUT;
2855             MUTEX_ENTER(&conn->conn_data_lock);
2856             if (!conn->checkReachEvent) {
2857                 conn->refCount++;
2858                 conn->checkReachEvent =
2859                     rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2860             }
2861             MUTEX_EXIT(&conn->conn_data_lock);
2862         }
2863     }
2864 }
2865
2866 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2867 {
2868     struct rx_service *service = conn->service;
2869     struct rx_peer *peer = conn->peer;
2870     afs_uint32 now, lastReach;
2871
2872     if (service->checkReach == 0)
2873         return 0;
2874
2875     now = clock_Sec();
2876     MUTEX_ENTER(&peer->peer_lock);
2877     lastReach = peer->lastReachTime;
2878     MUTEX_EXIT(&peer->peer_lock);
2879     if (now - lastReach < RX_CHECKREACH_TTL)
2880         return 0;
2881
2882     MUTEX_ENTER(&conn->conn_data_lock);
2883     if (conn->flags & RX_CONN_ATTACHWAIT) {
2884         MUTEX_EXIT(&conn->conn_data_lock);
2885         return 1;
2886     }
2887     conn->flags |= RX_CONN_ATTACHWAIT;
2888     MUTEX_EXIT(&conn->conn_data_lock);
2889     if (!conn->checkReachEvent)
2890         rxi_CheckReachEvent(NULL, conn, call);
2891
2892     return 1;
2893 }
2894
2895 /* try to attach call, if authentication is complete */
2896 static void TryAttach(register struct rx_call *acall, 
2897         register osi_socket socket, register int *tnop, 
2898         register struct rx_call **newcallp, int reachOverride)
2899 {
2900     struct rx_connection *conn = acall->conn;
2901
2902     if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2903         /* Don't attach until we have any req'd. authentication. */
2904         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2905             if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2906                 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2907                 /* Note:  this does not necessarily succeed; there
2908                  * may not any proc available
2909                  */
2910         }
2911         else {
2912             rxi_ChallengeOn(acall->conn);
2913         }
2914     }
2915 }
2916
2917 /* A data packet has been received off the interface.  This packet is
2918  * appropriate to the call (the call is in the right state, etc.).  This
2919  * routine can return a packet to the caller, for re-use */
2920
2921 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call, 
2922         register struct rx_packet *np, int istack, osi_socket socket, 
2923         afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2924 {
2925     int ackNeeded = 0;  /* 0 means no, otherwise ack_reason */
2926     int newPackets = 0;
2927     int didHardAck = 0;
2928     int haveLast = 0;
2929     afs_uint32 seq, serial, flags;
2930     int isFirst;
2931     struct rx_packet *tnp;
2932     struct clock when;
2933     MUTEX_ENTER(&rx_stats_mutex);
2934     rx_stats.dataPacketsRead++;
2935     MUTEX_EXIT(&rx_stats_mutex);
2936
2937 #ifdef KERNEL
2938     /* If there are no packet buffers, drop this new packet, unless we can find
2939      * packet buffers from inactive calls */
2940     if (!call->error &&
2941         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2942         MUTEX_ENTER(&rx_freePktQ_lock);
2943         rxi_NeedMorePackets = TRUE;
2944         MUTEX_EXIT(&rx_freePktQ_lock);
2945         MUTEX_ENTER(&rx_stats_mutex);
2946         rx_stats.noPacketBuffersOnRead++;
2947         MUTEX_EXIT(&rx_stats_mutex);
2948         call->rprev = np->header.serial;
2949         rxi_calltrace(RX_TRACE_DROP, call);
2950         dpf (("packet %x dropped on receipt - quota problems", np));
2951         if (rxi_doreclaim)
2952             rxi_ClearReceiveQueue(call);
2953         clock_GetTime(&when);
2954         clock_Add(&when, &rx_softAckDelay);
2955         if (!call->delayedAckEvent ||
2956             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2957             rxevent_Cancel(call->delayedAckEvent, call,
2958                            RX_CALL_REFCOUNT_DELAY);
2959             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2960             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2961                                                  call, 0);
2962         }
2963         /* we've damaged this call already, might as well do it in. */
2964         return np;
2965     }
2966 #endif /* KERNEL */
2967
2968     /*
2969      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2970      * packet is one of several packets transmitted as a single
2971      * datagram. Do not send any soft or hard acks until all packets
2972      * in a jumbogram have been processed. Send negative acks right away.
2973      */
2974     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2975         /* tnp is non-null when there are more packets in the
2976          * current jumbo gram */
2977         if (tnp) {
2978             if (np)
2979                 rxi_FreePacket(np);
2980             np = tnp;
2981         }
2982
2983         seq = np->header.seq;
2984         serial = np->header.serial;
2985         flags = np->header.flags;
2986
2987         /* If the call is in an error state, send an abort message */
2988         if (call->error)
2989             return rxi_SendCallAbort(call, np, istack, 0);
2990
2991         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2992          * AFS 3.5 jumbogram. */
2993         if (flags & RX_JUMBO_PACKET) {
2994             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2995         } else {
2996             tnp = NULL;
2997         }
2998
2999         if (np->header.spare != 0) {
3000             MUTEX_ENTER(&call->conn->conn_data_lock);
3001             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3002             MUTEX_EXIT(&call->conn->conn_data_lock);
3003         }
3004
3005         /* The usual case is that this is the expected next packet */
3006         if (seq == call->rnext) {
3007
3008             /* Check to make sure it is not a duplicate of one already queued */
3009             if (queue_IsNotEmpty(&call->rq) 
3010                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3011                 MUTEX_ENTER(&rx_stats_mutex);
3012                 rx_stats.dupPacketsRead++;
3013                 MUTEX_EXIT(&rx_stats_mutex);
3014                 dpf (("packet %x dropped on receipt - duplicate", np));
3015                 rxevent_Cancel(call->delayedAckEvent, call,
3016                                RX_CALL_REFCOUNT_DELAY);
3017                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3018                 ackNeeded = 0;
3019                 call->rprev = seq;
3020                 continue;
3021             }
3022
3023             /* It's the next packet. Stick it on the receive queue
3024              * for this call. Set newPackets to make sure we wake
3025              * the reader once all packets have been processed */
3026             queue_Prepend(&call->rq, np);
3027             call->nSoftAcks++;
3028             np = NULL; /* We can't use this anymore */
3029             newPackets = 1;
3030
3031             /* If an ack is requested then set a flag to make sure we
3032              * send an acknowledgement for this packet */
3033             if (flags & RX_REQUEST_ACK) {
3034                 ackNeeded = RX_ACK_REQUESTED;
3035             }
3036
3037             /* Keep track of whether we have received the last packet */
3038             if (flags & RX_LAST_PACKET) {
3039                 call->flags |= RX_CALL_HAVE_LAST;
3040                 haveLast = 1;
3041             }
3042
3043             /* Check whether we have all of the packets for this call */
3044             if (call->flags & RX_CALL_HAVE_LAST) {
3045                 afs_uint32 tseq;                /* temporary sequence number */
3046                 struct rx_packet *tp;   /* Temporary packet pointer */
3047                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
3048
3049                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3050                     if (tseq != tp->header.seq)
3051                         break;
3052                     if (tp->header.flags & RX_LAST_PACKET) {
3053                         call->flags |= RX_CALL_RECEIVE_DONE;
3054                         break;
3055                     }
3056                     tseq++;
3057                 }
3058             }
3059
3060             /* Provide asynchronous notification for those who want it
3061              * (e.g. multi rx) */
3062             if (call->arrivalProc) {
3063                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3064                                      (int) call->arrivalProcArg);
3065                 call->arrivalProc = (VOID (*)()) 0;
3066             }
3067
3068             /* Update last packet received */
3069             call->rprev = seq;
3070
3071             /* If there is no server process serving this call, grab
3072              * one, if available. We only need to do this once. If a
3073              * server thread is available, this thread becomes a server
3074              * thread and the server thread becomes a listener thread. */
3075             if (isFirst) {
3076                 TryAttach(call, socket, tnop, newcallp, 0);
3077             }
3078         }       
3079         /* This is not the expected next packet. */
3080         else {
3081             /* Determine whether this is a new or old packet, and if it's
3082              * a new one, whether it fits into the current receive window.
3083              * Also figure out whether the packet was delivered in sequence.
3084              * We use the prev variable to determine whether the new packet
3085              * is the successor of its immediate predecessor in the
3086              * receive queue, and the missing flag to determine whether
3087              * any of this packets predecessors are missing.  */
3088
3089             afs_uint32 prev;            /* "Previous packet" sequence number */
3090             struct rx_packet *tp;       /* Temporary packet pointer */
3091             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3092             int missing;                /* Are any predecessors missing? */
3093
3094             /* If the new packet's sequence number has been sent to the
3095              * application already, then this is a duplicate */
3096             if (seq < call->rnext) {
3097                 MUTEX_ENTER(&rx_stats_mutex);
3098                 rx_stats.dupPacketsRead++;
3099                 MUTEX_EXIT(&rx_stats_mutex);
3100                 rxevent_Cancel(call->delayedAckEvent, call,
3101                                RX_CALL_REFCOUNT_DELAY);
3102                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3103                 ackNeeded = 0;
3104                 call->rprev = seq;
3105                 continue;
3106             }
3107
3108             /* If the sequence number is greater than what can be
3109              * accomodated by the current window, then send a negative
3110              * acknowledge and drop the packet */
3111             if ((call->rnext + call->rwind) <= seq) {
3112                 rxevent_Cancel(call->delayedAckEvent, call,
3113                                RX_CALL_REFCOUNT_DELAY);
3114                 np = rxi_SendAck(call, np, serial,
3115                                  RX_ACK_EXCEEDS_WINDOW, istack);
3116                 ackNeeded = 0;
3117                 call->rprev = seq;
3118                 continue;
3119             }
3120
3121             /* Look for the packet in the queue of old received packets */
3122             for (prev = call->rnext - 1, missing = 0,
3123                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3124                 /*Check for duplicate packet */
3125                 if (seq == tp->header.seq) {
3126                     MUTEX_ENTER(&rx_stats_mutex);
3127                     rx_stats.dupPacketsRead++;
3128                     MUTEX_EXIT(&rx_stats_mutex);
3129                     rxevent_Cancel(call->delayedAckEvent, call,
3130                                    RX_CALL_REFCOUNT_DELAY);
3131                     np = rxi_SendAck(call, np, serial, 
3132                                      RX_ACK_DUPLICATE, istack);
3133                     ackNeeded = 0;
3134                     call->rprev = seq;
3135                     goto nextloop;
3136                 }
3137                 /* If we find a higher sequence packet, break out and
3138                  * insert the new packet here. */
3139                 if (seq < tp->header.seq) break;
3140                 /* Check for missing packet */
3141                 if (tp->header.seq != prev+1) {
3142                     missing = 1;
3143                 }
3144
3145                 prev = tp->header.seq;
3146             }
3147
3148             /* Keep track of whether we have received the last packet. */
3149             if (flags & RX_LAST_PACKET) {
3150                 call->flags |= RX_CALL_HAVE_LAST;
3151             }
3152
3153             /* It's within the window: add it to the the receive queue.
3154              * tp is left by the previous loop either pointing at the
3155              * packet before which to insert the new packet, or at the
3156              * queue head if the queue is empty or the packet should be
3157              * appended. */
3158             queue_InsertBefore(tp, np);
3159             call->nSoftAcks++;
3160             np = NULL;
3161
3162             /* Check whether we have all of the packets for this call */
3163             if ((call->flags & RX_CALL_HAVE_LAST)
3164              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3165                 afs_uint32 tseq;                /* temporary sequence number */
3166
3167                 for (tseq = call->rnext,
3168                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3169                     if (tseq != tp->header.seq)
3170                         break;
3171                     if (tp->header.flags & RX_LAST_PACKET) {
3172                         call->flags |= RX_CALL_RECEIVE_DONE;
3173                         break;
3174                     }
3175                     tseq++;
3176                 }
3177             }
3178
3179             /* We need to send an ack of the packet is out of sequence, 
3180              * or if an ack was requested by the peer. */
3181             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3182                 ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
3183             }
3184
3185             /* Acknowledge the last packet for each call */
3186             if (flags & RX_LAST_PACKET) {
3187                 haveLast = 1;
3188             }
3189
3190             call->rprev = seq;
3191         }
3192 nextloop:;
3193     }
3194
3195     if (newPackets) {
3196         /*
3197          * If the receiver is waiting for an iovec, fill the iovec
3198          * using the data from the receive queue */
3199         if (call->flags & RX_CALL_IOVEC_WAIT) {
3200             didHardAck = rxi_FillReadVec(call, serial); 
3201             /* the call may have been aborted */
3202             if (call->error) {
3203                 return NULL;
3204             }
3205             if (didHardAck) {
3206                 ackNeeded = 0;
3207             }
3208         }
3209
3210         /* Wakeup the reader if any */
3211         if ((call->flags & RX_CALL_READER_WAIT) &&
3212             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3213              (call->iovNext >= call->iovMax) ||
3214              (call->flags & RX_CALL_RECEIVE_DONE))) {
3215             call->flags &= ~RX_CALL_READER_WAIT;
3216 #ifdef  RX_ENABLE_LOCKS
3217             CV_BROADCAST(&call->cv_rq);
3218 #else
3219             osi_rxWakeup(&call->rq);
3220 #endif
3221         }
3222     }
3223
3224     /*
3225      * Send an ack when requested by the peer, or once every
3226      * rxi_SoftAckRate packets until the last packet has been
3227      * received. Always send a soft ack for the last packet in
3228      * the server's reply. */
3229     if (ackNeeded) {
3230         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3231         np = rxi_SendAck(call, np, serial, ackNeeded, istack);
3232     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3233         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3234         np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
3235     } else if (call->nSoftAcks) {
3236         clock_GetTime(&when);
3237         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3238             clock_Add(&when, &rx_lastAckDelay);
3239         } else {
3240             clock_Add(&when, &rx_softAckDelay);
3241         }
3242         if (!call->delayedAckEvent ||
3243             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3244             rxevent_Cancel(call->delayedAckEvent, call,
3245                            RX_CALL_REFCOUNT_DELAY);
3246             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3247             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3248                                                  call, 0);
3249         }
3250     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3251         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3252     }
3253
3254     return np;
3255 }
3256
3257 #ifdef  ADAPT_WINDOW
3258 static void rxi_ComputeRate();
3259 #endif
3260
3261 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3262 {
3263     struct rx_peer *peer = conn->peer;
3264
3265     MUTEX_ENTER(&peer->peer_lock);
3266     peer->lastReachTime = clock_Sec();
3267     MUTEX_EXIT(&peer->peer_lock);
3268
3269     MUTEX_ENTER(&conn->conn_data_lock);
3270     if (conn->flags & RX_CONN_ATTACHWAIT) {
3271         int i;
3272
3273         conn->flags &= ~RX_CONN_ATTACHWAIT;
3274         MUTEX_EXIT(&conn->conn_data_lock);
3275
3276         for (i=0; i<RX_MAXCALLS; i++) {
3277             struct rx_call *call = conn->call[i];
3278             if (call) {
3279                 if (call != acall) MUTEX_ENTER(&call->lock);
3280                 /* tnop can be null if newcallp is null */
3281                 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3282                 if (call != acall) MUTEX_EXIT(&call->lock);
3283             }
3284         }
3285     } else
3286         MUTEX_EXIT(&conn->conn_data_lock);
3287 }
3288
3289 /* rxi_ComputePeerNetStats
3290  *
3291  * Called exclusively by rxi_ReceiveAckPacket to compute network link
3292  * estimates (like RTT and throughput) based on ack packets.  Caller
3293  * must ensure that the packet in question is the right one (i.e.
3294  * serial number matches).
3295  */
3296 static void
3297 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3298         struct rx_ackPacket *ap, struct rx_packet *np)
3299 {
3300     struct rx_peer *peer = call->conn->peer;
3301
3302     /* Use RTT if not delayed by client. */
3303     if (ap->reason != RX_ACK_DELAY)
3304         rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3305 #ifdef ADAPT_WINDOW
3306     rxi_ComputeRate(peer, call, p, np, ap->reason);
3307 #endif
3308 }
3309
3310 /* The real smarts of the whole thing.  */
3311 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call, 
3312         struct rx_packet *np, int istack)
3313 {
3314     struct rx_ackPacket *ap;
3315     int nAcks;
3316     register struct rx_packet *tp;
3317     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3318     register struct rx_connection *conn = call->conn;
3319     struct rx_peer *peer = conn->peer;
3320     afs_uint32 first;
3321     afs_uint32 serial;
3322     /* because there are CM's that are bogus, sending weird values for this. */
3323     afs_uint32 skew = 0;
3324     int nbytes;
3325     int missing;
3326     int acked;
3327     int nNacked = 0;
3328     int newAckCount = 0;
3329     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3330     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3331
3332     MUTEX_ENTER(&rx_stats_mutex);
3333     rx_stats.ackPacketsRead++;
3334     MUTEX_EXIT(&rx_stats_mutex);
3335     ap = (struct rx_ackPacket *) rx_DataOf(np);
3336     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3337     if (nbytes < 0)
3338       return np;       /* truncated ack packet */
3339
3340     /* depends on ack packet struct */
3341     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3342     first = ntohl(ap->firstPacket);
3343     serial = ntohl(ap->serial);
3344     /* temporarily disabled -- needs to degrade over time 
3345        skew = ntohs(ap->maxSkew); */
3346
3347     /* Ignore ack packets received out of order */
3348     if (first < call->tfirst) {
3349         return np;
3350     }
3351
3352     if (np->header.flags & RX_SLOW_START_OK) {
3353         call->flags |= RX_CALL_SLOW_START_OK;
3354     }
3355
3356     if (ap->reason == RX_ACK_PING_RESPONSE)
3357         rxi_UpdatePeerReach(conn, call);
3358     
3359 #ifdef RXDEBUG
3360     if (rx_Log) {
3361       fprintf( rx_Log, 
3362               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3363               ap->reason, ntohl(ap->previousPacket), 
3364               (unsigned int) np->header.seq, (unsigned int) serial, 
3365               (unsigned int) skew, ntohl(ap->firstPacket));
3366         if (nAcks) {
3367             int offset;
3368             for (offset = 0; offset < nAcks; offset++) 
3369                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3370         }
3371         putc('\n', rx_Log);
3372     }
3373 #endif
3374
3375     /* Update the outgoing packet skew value to the latest value of
3376      * the peer's incoming packet skew value.  The ack packet, of
3377      * course, could arrive out of order, but that won't affect things
3378      * much */
3379     MUTEX_ENTER(&peer->peer_lock);
3380     peer->outPacketSkew = skew;
3381
3382     /* Check for packets that no longer need to be transmitted, and
3383      * discard them.  This only applies to packets positively
3384      * acknowledged as having been sent to the peer's upper level.
3385      * All other packets must be retained.  So only packets with
3386      * sequence numbers < ap->firstPacket are candidates. */
3387     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3388         if (tp->header.seq >= first) break;
3389         call->tfirst = tp->header.seq + 1;
3390         if (serial && (tp->header.serial == serial ||
3391                        tp->firstSerial == serial))
3392             rxi_ComputePeerNetStats(call, tp, ap, np);
3393 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3394     /* XXX Hack. Because we have to release the global rx lock when sending
3395      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3396      * in rxi_Start sending packets out because packets may move to the
3397      * freePacketQueue as result of being here! So we drop these packets until
3398      * we're safely out of the traversing. Really ugly! 
3399      * To make it even uglier, if we're using fine grain locking, we can
3400      * set the ack bits in the packets and have rxi_Start remove the packets
3401      * when it's done transmitting.
3402      */
3403         if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3404             newAckCount++;
3405         }
3406         if (call->flags & RX_CALL_TQ_BUSY) {
3407 #ifdef RX_ENABLE_LOCKS
3408             tp->flags |= RX_PKTFLAG_ACKED;
3409             call->flags |= RX_CALL_TQ_SOME_ACKED;
3410 #else /* RX_ENABLE_LOCKS */
3411             break;
3412 #endif /* RX_ENABLE_LOCKS */
3413         } else
3414 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3415         {
3416         queue_Remove(tp);
3417         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3418         }
3419     }
3420
3421 #ifdef ADAPT_WINDOW
3422     /* Give rate detector a chance to respond to ping requests */
3423     if (ap->reason == RX_ACK_PING_RESPONSE) {
3424         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3425     }
3426 #endif
3427
3428     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3429    
3430    /* Now go through explicit acks/nacks and record the results in
3431     * the waiting packets.  These are packets that can't be released
3432     * yet, even with a positive acknowledge.  This positive
3433     * acknowledge only means the packet has been received by the
3434     * peer, not that it will be retained long enough to be sent to
3435     * the peer's upper level.  In addition, reset the transmit timers
3436     * of any missing packets (those packets that must be missing
3437     * because this packet was out of sequence) */
3438
3439     call->nSoftAcked = 0;
3440     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3441         /* Update round trip time if the ack was stimulated on receipt
3442          * of this packet */
3443 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3444 #ifdef RX_ENABLE_LOCKS
3445         if (tp->header.seq >= first)
3446 #endif /* RX_ENABLE_LOCKS */
3447 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3448             if (serial && (tp->header.serial == serial ||
3449                            tp->firstSerial == serial))
3450                 rxi_ComputePeerNetStats(call, tp, ap, np);
3451
3452         /* Set the acknowledge flag per packet based on the
3453          * information in the ack packet. An acknowlegded packet can
3454          * be downgraded when the server has discarded a packet it
3455          * soacked previously, or when an ack packet is received
3456          * out of sequence. */
3457         if (tp->header.seq < first) {
3458             /* Implicit ack information */
3459             if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3460                 newAckCount++;
3461             }
3462             tp->flags |= RX_PKTFLAG_ACKED;
3463         }
3464         else if (tp->header.seq < first + nAcks) {
3465             /* Explicit ack information:  set it in the packet appropriately */
3466             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3467                 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3468                     newAckCount++;
3469                     tp->flags |= RX_PKTFLAG_ACKED;
3470                 }
3471                 if (missing) {
3472                     nNacked++;
3473                 } else {
3474                     call->nSoftAcked++;
3475                 }
3476             } else {
3477                 tp->flags &= ~RX_PKTFLAG_ACKED;
3478                 missing = 1;
3479             }
3480         }
3481         else {
3482             tp->flags &= ~RX_PKTFLAG_ACKED;
3483             missing = 1;
3484         }
3485
3486         /* If packet isn't yet acked, and it has been transmitted at least 
3487          * once, reset retransmit time using latest timeout 
3488          * ie, this should readjust the retransmit timer for all outstanding 
3489          * packets...  So we don't just retransmit when we should know better*/
3490
3491         if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3492           tp->retryTime = tp->timeSent;
3493           clock_Add(&tp->retryTime, &peer->timeout);
3494           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3495           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3496         }
3497     }
3498
3499     /* If the window has been extended by this acknowledge packet,
3500      * then wakeup a sender waiting in alloc for window space, or try
3501      * sending packets now, if he's been sitting on packets due to
3502      * lack of window space */
3503     if (call->tnext < (call->tfirst + call->twind))  {
3504 #ifdef  RX_ENABLE_LOCKS
3505         CV_SIGNAL(&call->cv_twind);
3506 #else
3507         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3508             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3509             osi_rxWakeup(&call->twind);
3510         }
3511 #endif
3512         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3513             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3514         }
3515     }
3516
3517     /* if the ack packet has a receivelen field hanging off it,
3518      * update our state */
3519     if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3520       afs_uint32 tSize;
3521
3522       /* If the ack packet has a "recommended" size that is less than 
3523        * what I am using now, reduce my size to match */
3524       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3525                     sizeof(afs_int32), &tSize);
3526       tSize = (afs_uint32) ntohl(tSize);
3527       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3528
3529       /* Get the maximum packet size to send to this peer */
3530       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3531                     &tSize);
3532       tSize = (afs_uint32)ntohl(tSize);
3533       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3534       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3535
3536       /* sanity check - peer might have restarted with different params.
3537        * If peer says "send less", dammit, send less...  Peer should never 
3538        * be unable to accept packets of the size that prior AFS versions would
3539        * send without asking.  */
3540       if (peer->maxMTU != tSize) {
3541           peer->maxMTU = tSize;
3542           peer->MTU = MIN(tSize, peer->MTU);
3543           call->MTU = MIN(call->MTU, tSize);
3544           peer->congestSeq++;
3545       }
3546
3547       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3548           /* AFS 3.4a */
3549           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3550                         sizeof(afs_int32), &tSize);
3551           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3552           if (tSize < call->twind) {       /* smaller than our send */
3553               call->twind = tSize;         /* window, we must send less... */
3554               call->ssthresh = MIN(call->twind, call->ssthresh);
3555           }
3556
3557           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3558            * network MTU confused with the loopback MTU. Calculate the
3559            * maximum MTU here for use in the slow start code below.
3560            */
3561           maxMTU = peer->maxMTU;
3562           /* Did peer restart with older RX version? */
3563           if (peer->maxDgramPackets > 1) {
3564               peer->maxDgramPackets = 1;
3565           }
3566       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3567           /* AFS 3.5 */
3568           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3569                         sizeof(afs_int32), &tSize);
3570           tSize = (afs_uint32) ntohl(tSize);
3571           /*
3572            * As of AFS 3.5 we set the send window to match the receive window. 
3573            */
3574           if (tSize < call->twind) {
3575               call->twind = tSize;
3576               call->ssthresh = MIN(call->twind, call->ssthresh);
3577           } else if (tSize > call->twind) {
3578               call->twind = tSize;
3579           }
3580
3581           /*
3582            * As of AFS 3.5, a jumbogram is more than one fixed size
3583            * packet transmitted in a single UDP datagram. If the remote
3584            * MTU is smaller than our local MTU then never send a datagram
3585            * larger than the natural MTU.
3586            */
3587           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3588                         sizeof(afs_int32), &tSize);
3589           maxDgramPackets = (afs_uint32) ntohl(tSize);
3590           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3591           maxDgramPackets = MIN(maxDgramPackets,
3592                                 (int)(peer->ifDgramPackets));
3593           maxDgramPackets = MIN(maxDgramPackets, tSize);
3594           if (maxDgramPackets > 1) {
3595             peer->maxDgramPackets = maxDgramPackets;
3596             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3597           } else {
3598             peer->maxDgramPackets = 1;
3599             call->MTU = peer->natMTU;
3600           }
3601        } else if (peer->maxDgramPackets > 1) {
3602           /* Restarted with lower version of RX */
3603           peer->maxDgramPackets = 1;
3604        }
3605     } else if (peer->maxDgramPackets > 1 ||
3606                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3607         /* Restarted with lower version of RX */
3608         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3609         peer->natMTU = OLD_MAX_PACKET_SIZE;
3610         peer->MTU = OLD_MAX_PACKET_SIZE;
3611         peer->maxDgramPackets = 1;
3612         peer->nDgramPackets = 1;
3613         peer->congestSeq++;
3614         call->MTU = OLD_MAX_PACKET_SIZE;
3615     }
3616
3617     if (nNacked) {
3618         /*
3619          * Calculate how many datagrams were successfully received after
3620          * the first missing packet and adjust the negative ack counter
3621          * accordingly.
3622          */
3623         call->nAcks = 0;
3624         call->nNacks++;
3625         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3626         if (call->nNacks < nNacked) {
3627             call->nNacks = nNacked;
3628         }
3629     } else {
3630         if (newAckCount) {
3631             call->nAcks++;
3632         }
3633         call->nNacks = 0;
3634     }
3635
3636     if (call->flags & RX_CALL_FAST_RECOVER) {
3637         if (nNacked) {
3638             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3639         } else {
3640             call->flags &= ~RX_CALL_FAST_RECOVER;
3641             call->cwind = call->nextCwind;
3642             call->nextCwind = 0;
3643             call->nAcks = 0;
3644         }
3645         call->nCwindAcks = 0;
3646     }
3647     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3648         /* Three negative acks in a row trigger congestion recovery */
3649 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3650         MUTEX_EXIT(&peer->peer_lock);
3651         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3652                 /* someone else is waiting to start recovery */
3653                 return np;
3654         }
3655         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3656         while (call->flags & RX_CALL_TQ_BUSY) {
3657             call->flags |= RX_CALL_TQ_WAIT;
3658 #ifdef RX_ENABLE_LOCKS
3659             CV_WAIT(&call->cv_tq, &call->lock);
3660 #else /* RX_ENABLE_LOCKS */
3661             osi_rxSleep(&call->tq);
3662 #endif /* RX_ENABLE_LOCKS */
3663         }
3664         MUTEX_ENTER(&peer->peer_lock);
3665 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3666         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3667         call->flags |= RX_CALL_FAST_RECOVER;
3668         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3669         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3670                           rx_maxSendWindow);
3671         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3672         call->nextCwind = call->ssthresh;
3673         call->nAcks = 0;
3674         call->nNacks = 0;
3675         peer->MTU = call->MTU;
3676         peer->cwind = call->nextCwind;
3677         peer->nDgramPackets = call->nDgramPackets;
3678         peer->congestSeq++;
3679         call->congestSeq = peer->congestSeq;
3680         /* Reset the resend times on the packets that were nacked
3681          * so we will&