4b71a6aca59cf2a65e5aa7fd5493839969e9c435
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #include <afsconfig.h>
13 #ifdef  KERNEL
14 #include "afs/param.h"
15 #else
16 #include <afs/param.h>
17 #endif
18
19 RCSID("$Header$");
20
21 #ifdef KERNEL
22 #include "afs/sysincludes.h"
23 #include "afsincludes.h"
24 #ifndef UKERNEL
25 #include "h/types.h"
26 #include "h/time.h"
27 #include "h/stat.h"
28 #ifdef  AFS_OSF_ENV
29 #include <net/net_globals.h>
30 #endif  /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "h/socket.h"
33 #endif
34 #include "netinet/in.h"
35 #include "afs/afs_args.h"
36 #include "afs/afs_osi.h"
37 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "h/systm.h"
39 #endif
40 #ifdef RXDEBUG
41 #undef RXDEBUG      /* turn off debugging */
42 #endif /* RXDEBUG */
43 #if defined(AFS_SGI_ENV)
44 #include "sys/debug.h"
45 #endif
46 #include "afsint.h"
47 #ifdef  AFS_ALPHA_ENV
48 #undef kmem_alloc
49 #undef kmem_free
50 #undef mem_alloc
51 #undef mem_free
52 #undef register
53 #endif  /* AFS_ALPHA_ENV */
54 #else /* !UKERNEL */
55 #include "afs/sysincludes.h"
56 #include "afsincludes.h"
57 #endif /* !UKERNEL */
58 #include "afs/lock.h"
59 #include "rx_kmutex.h"
60 #include "rx_kernel.h"
61 #include "rx_clock.h"
62 #include "rx_queue.h"
63 #include "rx.h"
64 #include "rx_globals.h"
65 #include "rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
68 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
69 #include "afsint.h"
70 extern afs_int32 afs_termState;
71 #ifdef AFS_AIX41_ENV
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "rxgen_consts.h"
76 #else /* KERNEL */
77 # include <sys/types.h>
78 # include <errno.h>
79 #ifdef AFS_NT40_ENV
80 # include <stdlib.h>
81 # include <fcntl.h>
82 # include <afsutil.h>
83 #else
84 # include <sys/socket.h>
85 # include <sys/file.h>
86 # include <netdb.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
90 #endif
91 #ifdef HAVE_STRING_H
92 #include <string.h>
93 #else
94 #ifdef HAVE_STRINGS_H
95 #include <strings.h>
96 #endif
97 #endif
98 # include "rx.h"
99 # include "rx_user.h"
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include <afs/rxgen_consts.h>
105 #endif /* KERNEL */
106
107 int (*registerProgram)() = 0;
108 int (*swapNameProgram)() = 0;
109
110 /* Local static routines */
111 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
112 #ifdef RX_ENABLE_LOCKS
113 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
114 #endif
115
116 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
117 struct rx_tq_debug {
118     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
119     afs_int32 rxi_start_in_error;
120 } rx_tq_debug;
121 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
122
123 /*
124  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
125  * currently allocated within rx.  This number is used to allocate the
126  * memory required to return the statistics when queried.
127  */
128
129 static unsigned int rxi_rpc_peer_stat_cnt;
130
131 /*
132  * rxi_rpc_process_stat_cnt counts the total number of local process stat
133  * structures currently allocated within rx.  The number is used to allocate
134  * the memory required to return the statistics when queried.
135  */
136
137 static unsigned int rxi_rpc_process_stat_cnt;
138
139 #if !defined(offsetof)
140 #include <stddef.h>     /* for definition of offsetof() */
141 #endif
142
143 #ifdef AFS_PTHREAD_ENV
144 #include <assert.h>
145
146 /*
147  * Use procedural initialization of mutexes/condition variables
148  * to ease NT porting
149  */
150
151 extern pthread_mutex_t rxkad_stats_mutex;
152 extern pthread_mutex_t des_init_mutex;
153 extern pthread_mutex_t des_random_mutex;
154 extern pthread_mutex_t rx_clock_mutex;
155 extern pthread_mutex_t rxi_connCacheMutex;
156 extern pthread_mutex_t rx_event_mutex;
157 extern pthread_mutex_t osi_malloc_mutex;
158 extern pthread_mutex_t event_handler_mutex;
159 extern pthread_mutex_t listener_mutex;
160 extern pthread_mutex_t rx_if_init_mutex;
161 extern pthread_mutex_t rx_if_mutex;
162 extern pthread_mutex_t rxkad_client_uid_mutex;
163 extern pthread_mutex_t rxkad_random_mutex;
164
165 extern pthread_cond_t rx_event_handler_cond;
166 extern pthread_cond_t rx_listener_cond;
167
168 static pthread_mutex_t epoch_mutex;
169 static pthread_mutex_t rx_init_mutex;
170 static pthread_mutex_t rx_debug_mutex;
171
172 static void rxi_InitPthread(void) {
173     assert(pthread_mutex_init(&rx_clock_mutex,
174                               (const pthread_mutexattr_t*)0)==0);
175     assert(pthread_mutex_init(&rxi_connCacheMutex,
176                               (const pthread_mutexattr_t*)0)==0);
177     assert(pthread_mutex_init(&rx_init_mutex,
178                               (const pthread_mutexattr_t*)0)==0);
179     assert(pthread_mutex_init(&epoch_mutex,
180                               (const pthread_mutexattr_t*)0)==0);
181     assert(pthread_mutex_init(&rx_event_mutex,
182                               (const pthread_mutexattr_t*)0)==0);
183     assert(pthread_mutex_init(&des_init_mutex,
184                               (const pthread_mutexattr_t*)0)==0);
185     assert(pthread_mutex_init(&des_random_mutex,
186                               (const pthread_mutexattr_t*)0)==0);
187     assert(pthread_mutex_init(&osi_malloc_mutex,
188                               (const pthread_mutexattr_t*)0)==0);
189     assert(pthread_mutex_init(&event_handler_mutex,
190                               (const pthread_mutexattr_t*)0)==0);
191     assert(pthread_mutex_init(&listener_mutex,
192                               (const pthread_mutexattr_t*)0)==0);
193     assert(pthread_mutex_init(&rx_if_init_mutex,
194                               (const pthread_mutexattr_t*)0)==0);
195     assert(pthread_mutex_init(&rx_if_mutex,
196                               (const pthread_mutexattr_t*)0)==0);
197     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
198                               (const pthread_mutexattr_t*)0)==0);
199     assert(pthread_mutex_init(&rxkad_random_mutex,
200                               (const pthread_mutexattr_t*)0)==0);
201     assert(pthread_mutex_init(&rxkad_stats_mutex,
202                               (const pthread_mutexattr_t*)0)==0);
203     assert(pthread_mutex_init(&rx_debug_mutex,
204                               (const pthread_mutexattr_t*)0)==0);
205
206     assert(pthread_cond_init(&rx_event_handler_cond,
207                               (const pthread_condattr_t*)0)==0);
208     assert(pthread_cond_init(&rx_listener_cond,
209                               (const pthread_condattr_t*)0)==0);
210     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
211 }
212
213 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
214 #define INIT_PTHREAD_LOCKS \
215 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
216 /*
217  * The rx_stats_mutex mutex protects the following global variables:
218  * rxi_dataQuota
219  * rxi_minDeficit
220  * rxi_availProcs
221  * rxi_totalMin
222  * rxi_lowConnRefCount
223  * rxi_lowPeerRefCount
224  * rxi_nCalls
225  * rxi_Alloccnt
226  * rxi_Allocsize
227  * rx_nFreePackets
228  * rx_tq_debug
229  * rx_stats
230  */
231 #else
232 #define INIT_PTHREAD_LOCKS
233 #endif
234
235
236 /* Variables for handling the minProcs implementation.  availProcs gives the
237  * number of threads available in the pool at this moment (not counting dudes
238  * executing right now).  totalMin gives the total number of procs required
239  * for handling all minProcs requests.  minDeficit is a dynamic variable
240  * tracking the # of procs required to satisfy all of the remaining minProcs
241  * demands.
242  * For fine grain locking to work, the quota check and the reservation of
243  * a server thread has to come while rxi_availProcs and rxi_minDeficit
244  * are locked. To this end, the code has been modified under #ifdef
245  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
246  * same time. A new function, ReturnToServerPool() returns the allocation.
247  * 
248  * A call can be on several queue's (but only one at a time). When
249  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
250  * that no one else is touching the queue. To this end, we store the address
251  * of the queue lock in the call structure (under the call lock) when we
252  * put the call on a queue, and we clear the call_queue_lock when the
253  * call is removed from a queue (once the call lock has been obtained).
254  * This allows rxi_ResetCall to safely synchronize with others wishing
255  * to manipulate the queue.
256  */
257
258 #ifdef RX_ENABLE_LOCKS
259 static int rxi_ServerThreadSelectingCall;
260 static afs_kmutex_t rx_rpc_stats;
261 void rxi_StartUnlocked();
262 #endif
263
264 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
265 ** pretty good that the next packet coming in is from the same connection 
266 ** as the last packet, since we're send multiple packets in a transmit window.
267 */
268 struct rx_connection *rxLastConn = 0; 
269
270 #ifdef RX_ENABLE_LOCKS
271 /* The locking hierarchy for rx fine grain locking is composed of these
272  * tiers:
273  *
274  * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
275  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
276  * call->lock - locks call data fields.
277  * These are independent of each other:
278  *      rx_freeCallQueue_lock
279  *      rxi_keyCreate_lock
280  * rx_serverPool_lock
281  * freeSQEList_lock
282  *
283  * serverQueueEntry->lock
284  * rx_rpc_stats
285  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
286  * peer->lock - locks peer data fields.
287  * conn_data_lock - that more than one thread is not updating a conn data
288  *                  field at the same time.
289  * rx_freePktQ_lock
290  *
291  * lowest level:
292  *      multi_handle->lock
293  *      rxevent_lock
294  *      rx_stats_mutex
295  *
296  * Do we need a lock to protect the peer field in the conn structure?
297  *      conn->peer was previously a constant for all intents and so has no
298  *      lock protecting this field. The multihomed client delta introduced
299  *      a RX code change : change the peer field in the connection structure
300  *      to that remote inetrface from which the last packet for this
301  *      connection was sent out. This may become an issue if further changes
302  *      are made.
303  */
304 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
305 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
306 #ifdef RX_LOCKS_DB
307 /* rxdb_fileID is used to identify the lock location, along with line#. */
308 static int rxdb_fileID = RXDB_FILE_RX;
309 #endif /* RX_LOCKS_DB */
310 #else /* RX_ENABLE_LOCKS */
311 #define SET_CALL_QUEUE_LOCK(C, L)
312 #define CLEAR_CALL_QUEUE_LOCK(C)
313 #endif /* RX_ENABLE_LOCKS */
314 struct rx_serverQueueEntry *rx_waitForPacket = 0;
315
316 /* ------------Exported Interfaces------------- */
317
318 /* This function allows rxkad to set the epoch to a suitably random number
319  * which rx_NewConnection will use in the future.  The principle purpose is to
320  * get rxnull connections to use the same epoch as the rxkad connections do, at
321  * least once the first rxkad connection is established.  This is important now
322  * that the host/port addresses aren't used in FindConnection: the uniqueness
323  * of epoch/cid matters and the start time won't do. */
324
325 #ifdef AFS_PTHREAD_ENV
326 /*
327  * This mutex protects the following global variables:
328  * rx_epoch
329  */
330
331 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
332 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
333 #else
334 #define LOCK_EPOCH
335 #define UNLOCK_EPOCH
336 #endif /* AFS_PTHREAD_ENV */
337
338 void rx_SetEpoch (afs_uint32 epoch)
339 {
340     LOCK_EPOCH
341     rx_epoch = epoch;
342     UNLOCK_EPOCH
343 }
344
345 /* Initialize rx.  A port number may be mentioned, in which case this
346  * becomes the default port number for any service installed later.
347  * If 0 is provided for the port number, a random port will be chosen
348  * by the kernel.  Whether this will ever overlap anything in
349  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
350  * error. */
351 static int rxinit_status = 1;
352 #ifdef AFS_PTHREAD_ENV
353 /*
354  * This mutex protects the following global variables:
355  * rxinit_status
356  */
357
358 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
359 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
360 #else
361 #define LOCK_RX_INIT
362 #define UNLOCK_RX_INIT
363 #endif
364
365 int rx_Init(u_int port)
366 {
367 #ifdef KERNEL
368     osi_timeval_t tv;
369 #else /* KERNEL */
370     struct timeval tv;
371 #endif /* KERNEL */
372     char *htable, *ptable;
373     int tmp_status;
374
375 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
376     __djgpp_set_quiet_socket(1);
377 #endif
378
379     SPLVAR;
380
381     INIT_PTHREAD_LOCKS
382     LOCK_RX_INIT
383     if (rxinit_status == 0) {
384         tmp_status = rxinit_status;
385         UNLOCK_RX_INIT
386         return tmp_status; /* Already started; return previous error code. */
387     }
388
389 #ifdef AFS_NT40_ENV
390     if (afs_winsockInit()<0)
391         return -1;
392 #endif
393
394 #ifndef KERNEL
395     /*
396      * Initialize anything necessary to provide a non-premptive threading
397      * environment.
398      */
399     rxi_InitializeThreadSupport();
400 #endif
401
402     /* Allocate and initialize a socket for client and perhaps server
403      * connections. */
404
405     rx_socket = rxi_GetUDPSocket((u_short)port); 
406     if (rx_socket == OSI_NULLSOCKET) {
407         UNLOCK_RX_INIT
408         return RX_ADDRINUSE;
409     }
410     
411
412 #ifdef  RX_ENABLE_LOCKS
413 #ifdef RX_LOCKS_DB
414     rxdb_init();
415 #endif /* RX_LOCKS_DB */
416     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);
417     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);
418     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);
419     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
420     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
421                MUTEX_DEFAULT,0);
422     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
423     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
424     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
425     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
426 #ifndef KERNEL
427     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
428 #endif /* !KERNEL */
429     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
430 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
431     if ( !uniprocessor )
432       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
433 #endif /* KERNEL && AFS_HPUX110_ENV */
434 #else /* RX_ENABLE_LOCKS */
435 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
436     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
437 #endif /* AFS_GLOBAL_SUNLOCK */
438 #endif /* RX_ENABLE_LOCKS */
439
440     rxi_nCalls = 0;
441     rx_connDeadTime = 12;
442     rx_tranquil     = 0;        /* reset flag */
443     memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
444     htable = (char *)
445         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
446     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
447     memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
448     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
449     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
450     memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
451
452     /* Malloc up a bunch of packets & buffers */
453     rx_nFreePackets = 0;
454     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
455     queue_Init(&rx_freePacketQueue);
456     rxi_NeedMorePackets = FALSE;
457     rxi_MorePackets(rx_nPackets);
458     rx_CheckPackets();
459
460     NETPRI;
461     AFS_RXGLOCK();
462
463     clock_Init();
464
465 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
466     tv.tv_sec = clock_now.sec;
467     tv.tv_usec = clock_now.usec;
468     srand((unsigned int) tv.tv_usec);
469 #else
470     osi_GetTime(&tv);
471 #endif
472     if (port) {
473         rx_port = port;
474     } else {
475 #if defined(KERNEL) && !defined(UKERNEL)
476         /* Really, this should never happen in a real kernel */
477         rx_port = 0;
478 #else
479         struct sockaddr_in addr;
480         int addrlen = sizeof(addr);
481         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
482             rx_Finalize();
483             return -1;
484         }
485         rx_port = addr.sin_port;
486 #endif
487     }
488     rx_stats.minRtt.sec = 9999999;
489 #ifdef  KERNEL
490     rx_SetEpoch (tv.tv_sec | 0x80000000);
491 #else
492     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
493                                          * will provide a randomer value. */
494 #endif
495     MUTEX_ENTER(&rx_stats_mutex);
496     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
497     MUTEX_EXIT(&rx_stats_mutex);
498     /* *Slightly* random start time for the cid.  This is just to help
499      * out with the hashing function at the peer */
500     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
501     rx_connHashTable = (struct rx_connection **) htable;
502     rx_peerHashTable = (struct rx_peer **) ptable;
503
504     rx_lastAckDelay.sec = 0;
505     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
506     rx_hardAckDelay.sec = 0;
507     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
508     rx_softAckDelay.sec = 0;
509     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
510
511     rxevent_Init(20, rxi_ReScheduleEvents);
512
513     /* Initialize various global queues */
514     queue_Init(&rx_idleServerQueue);
515     queue_Init(&rx_incomingCallQueue);
516     queue_Init(&rx_freeCallQueue);
517
518 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
519     /* Initialize our list of usable IP addresses. */
520     rx_GetIFInfo();
521 #endif
522
523     /* Start listener process (exact function is dependent on the
524      * implementation environment--kernel or user space) */
525     rxi_StartListener();
526
527     AFS_RXGUNLOCK();
528     USERPRI;
529     tmp_status = rxinit_status = 0;
530     UNLOCK_RX_INIT
531     return tmp_status;
532 }
533
534 /* called with unincremented nRequestsRunning to see if it is OK to start
535  * a new thread in this service.  Could be "no" for two reasons: over the
536  * max quota, or would prevent others from reaching their min quota.
537  */
538 #ifdef RX_ENABLE_LOCKS
539 /* This verion of QuotaOK reserves quota if it's ok while the
540  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
541  */
542 static int QuotaOK(register struct rx_service *aservice)
543 {
544     /* check if over max quota */
545     if (aservice->nRequestsRunning >= aservice->maxProcs) {
546         return 0;
547     }
548
549     /* under min quota, we're OK */
550     /* otherwise, can use only if there are enough to allow everyone
551      * to go to their min quota after this guy starts.
552      */
553     MUTEX_ENTER(&rx_stats_mutex);
554     if ((aservice->nRequestsRunning < aservice->minProcs) ||
555          (rxi_availProcs > rxi_minDeficit)) {
556         aservice->nRequestsRunning++;
557         /* just started call in minProcs pool, need fewer to maintain
558          * guarantee */
559         if (aservice->nRequestsRunning <= aservice->minProcs)
560             rxi_minDeficit--;
561         rxi_availProcs--;
562         MUTEX_EXIT(&rx_stats_mutex);
563         return 1;
564     }
565     MUTEX_EXIT(&rx_stats_mutex);
566
567     return 0;
568 }
569
570 static void ReturnToServerPool(register struct rx_service *aservice)
571 {
572     aservice->nRequestsRunning--;
573     MUTEX_ENTER(&rx_stats_mutex);
574     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
575     rxi_availProcs++;
576     MUTEX_EXIT(&rx_stats_mutex);
577 }
578
579 #else /* RX_ENABLE_LOCKS */
580 static int QuotaOK(register struct rx_service *aservice)
581 {
582     int rc=0;
583     /* under min quota, we're OK */
584     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
585
586     /* check if over max quota */
587     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
588
589     /* otherwise, can use only if there are enough to allow everyone
590      * to go to their min quota after this guy starts.
591      */
592     if (rxi_availProcs > rxi_minDeficit) rc = 1;
593     return rc;
594 }
595 #endif /* RX_ENABLE_LOCKS */
596
597 #ifndef KERNEL
598 /* Called by rx_StartServer to start up lwp's to service calls.
599    NExistingProcs gives the number of procs already existing, and which
600    therefore needn't be created. */
601 void rxi_StartServerProcs(int nExistingProcs)
602 {
603     register struct rx_service *service;
604     register int i;
605     int maxdiff = 0;
606     int nProcs = 0;
607
608     /* For each service, reserve N processes, where N is the "minimum"
609        number of processes that MUST be able to execute a request in parallel,
610        at any time, for that process.  Also compute the maximum difference
611        between any service's maximum number of processes that can run
612        (i.e. the maximum number that ever will be run, and a guarantee
613        that this number will run if other services aren't running), and its
614        minimum number.  The result is the extra number of processes that
615        we need in order to provide the latter guarantee */
616     for (i=0; i<RX_MAX_SERVICES; i++) {
617         int diff;
618         service = rx_services[i];
619         if (service == (struct rx_service *) 0) break;
620         nProcs += service->minProcs;
621         diff = service->maxProcs - service->minProcs;
622         if (diff > maxdiff) maxdiff = diff;
623     }
624     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
625     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
626     for (i = 0; i<nProcs; i++) {
627         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
628     }
629 }
630 #endif /* KERNEL */
631
632 /* This routine must be called if any services are exported.  If the
633  * donateMe flag is set, the calling process is donated to the server
634  * process pool */
635 void rx_StartServer(int donateMe)
636 {
637     register struct rx_service *service;
638     register int i, nProcs=0;
639     SPLVAR;
640     clock_NewTime();
641
642     NETPRI;
643     AFS_RXGLOCK();
644     /* Start server processes, if necessary (exact function is dependent
645      * on the implementation environment--kernel or user space).  DonateMe
646      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
647      * case, one less new proc will be created rx_StartServerProcs.
648      */
649     rxi_StartServerProcs(donateMe);
650
651     /* count up the # of threads in minProcs, and add set the min deficit to
652      * be that value, too.
653      */
654     for (i=0; i<RX_MAX_SERVICES; i++) {
655         service = rx_services[i];
656         if (service == (struct rx_service *) 0) break;
657         MUTEX_ENTER(&rx_stats_mutex);
658         rxi_totalMin += service->minProcs;
659         /* below works even if a thread is running, since minDeficit would
660          * still have been decremented and later re-incremented.
661          */
662         rxi_minDeficit += service->minProcs;
663         MUTEX_EXIT(&rx_stats_mutex);
664     }
665
666     /* Turn on reaping of idle server connections */
667     rxi_ReapConnections();
668
669     AFS_RXGUNLOCK();
670     USERPRI;
671
672     if (donateMe) {
673 #ifndef AFS_NT40_ENV
674 #ifndef KERNEL
675         char name[32];
676 #ifdef AFS_PTHREAD_ENV
677         pid_t pid;
678         pid = (pid_t) pthread_self();
679 #else /* AFS_PTHREAD_ENV */
680         PROCESS pid;
681         LWP_CurrentProcess(&pid);
682 #endif /* AFS_PTHREAD_ENV */
683
684         sprintf(name,"srv_%d", ++nProcs);
685         if (registerProgram)
686             (*registerProgram)(pid, name);
687 #endif /* KERNEL */
688 #endif /* AFS_NT40_ENV */
689         rx_ServerProc(); /* Never returns */
690     }
691     return;
692 }
693
694 /* Create a new client connection to the specified service, using the
695  * specified security object to implement the security model for this
696  * connection. */
697 struct rx_connection *rx_NewConnection(register afs_uint32 shost, 
698         u_short sport, u_short sservice, 
699         register struct rx_securityClass *securityObject, int serviceSecurityIndex)
700 {
701     int hashindex;
702     afs_int32 cid;
703     register struct rx_connection *conn;
704
705     SPLVAR;
706
707     clock_NewTime();
708     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
709           shost, sport, sservice, securityObject, serviceSecurityIndex));
710
711     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
712      * the case of kmem_alloc? */
713     conn = rxi_AllocConnection();
714 #ifdef  RX_ENABLE_LOCKS
715     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
716     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
717     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
718 #endif
719     NETPRI;
720     AFS_RXGLOCK();
721     MUTEX_ENTER(&rx_connHashTable_lock);
722     cid = (rx_nextCid += RX_MAXCALLS);
723     conn->type = RX_CLIENT_CONNECTION;
724     conn->cid = cid;
725     conn->epoch = rx_epoch;
726     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
727     conn->serviceId = sservice;
728     conn->securityObject = securityObject;
729     /* This doesn't work in all compilers with void (they're buggy), so fake it
730      * with VOID */
731     conn->securityData = (VOID *) 0;
732     conn->securityIndex = serviceSecurityIndex;
733     rx_SetConnDeadTime(conn, rx_connDeadTime);
734     conn->ackRate = RX_FAST_ACK_RATE;
735     conn->nSpecific = 0;
736     conn->specific = NULL;
737     conn->challengeEvent = NULL;
738     conn->delayedAbortEvent = NULL;
739     conn->abortCount = 0;
740     conn->error = 0;
741
742     RXS_NewConnection(securityObject, conn);
743     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
744     
745     conn->refCount++; /* no lock required since only this thread knows... */
746     conn->next = rx_connHashTable[hashindex];
747     rx_connHashTable[hashindex] = conn;
748     MUTEX_ENTER(&rx_stats_mutex);
749     rx_stats.nClientConns++;
750     MUTEX_EXIT(&rx_stats_mutex);
751
752     MUTEX_EXIT(&rx_connHashTable_lock);
753     AFS_RXGUNLOCK();
754     USERPRI;
755     return conn;
756 }
757
758 void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
759 {
760     /* The idea is to set the dead time to a value that allows several
761      * keepalives to be dropped without timing out the connection. */
762     conn->secondsUntilDead = MAX(seconds, 6);
763     conn->secondsUntilPing = conn->secondsUntilDead/6;
764 }
765
766 int rxi_lowPeerRefCount = 0;
767 int rxi_lowConnRefCount = 0;
768
769 /*
770  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
771  * NOTE: must not be called with rx_connHashTable_lock held.
772  */
773 void rxi_CleanupConnection(struct rx_connection *conn)
774 {
775     /* Notify the service exporter, if requested, that this connection
776      * is being destroyed */
777     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
778       (*conn->service->destroyConnProc)(conn);
779
780     /* Notify the security module that this connection is being destroyed */
781     RXS_DestroyConnection(conn->securityObject, conn);
782
783     /* If this is the last connection using the rx_peer struct, set its
784      * idle time to now. rxi_ReapConnections will reap it if it's still
785      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
786      */
787     MUTEX_ENTER(&rx_peerHashTable_lock);
788     if (--conn->peer->refCount <= 0) {
789         conn->peer->idleWhen = clock_Sec();
790         if (conn->peer->refCount < 0) {
791             conn->peer->refCount = 0; 
792             MUTEX_ENTER(&rx_stats_mutex);
793             rxi_lowPeerRefCount ++;
794             MUTEX_EXIT(&rx_stats_mutex);
795         }
796     }
797     MUTEX_EXIT(&rx_peerHashTable_lock);
798
799     MUTEX_ENTER(&rx_stats_mutex);
800     if (conn->type == RX_SERVER_CONNECTION)
801       rx_stats.nServerConns--;
802     else
803       rx_stats.nClientConns--;
804     MUTEX_EXIT(&rx_stats_mutex);
805
806 #ifndef KERNEL
807     if (conn->specific) {
808         int i;
809         for (i = 0 ; i < conn->nSpecific ; i++) {
810             if (conn->specific[i] && rxi_keyCreate_destructor[i])
811                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
812             conn->specific[i] = NULL;
813         }
814         free(conn->specific);
815     }
816     conn->specific = NULL;
817     conn->nSpecific = 0;
818 #endif /* !KERNEL */
819
820     MUTEX_DESTROY(&conn->conn_call_lock);
821     MUTEX_DESTROY(&conn->conn_data_lock);
822     CV_DESTROY(&conn->conn_call_cv);
823         
824     rxi_FreeConnection(conn);
825 }
826
827 /* Destroy the specified connection */
828 void rxi_DestroyConnection(register struct rx_connection *conn)
829 {
830     MUTEX_ENTER(&rx_connHashTable_lock);
831     rxi_DestroyConnectionNoLock(conn);
832     /* conn should be at the head of the cleanup list */
833     if (conn == rx_connCleanup_list) {
834         rx_connCleanup_list = rx_connCleanup_list->next;
835         MUTEX_EXIT(&rx_connHashTable_lock);
836         rxi_CleanupConnection(conn);
837     }
838 #ifdef RX_ENABLE_LOCKS
839     else {
840         MUTEX_EXIT(&rx_connHashTable_lock);
841     }
842 #endif /* RX_ENABLE_LOCKS */
843 }
844     
845 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
846 {
847     register struct rx_connection **conn_ptr;
848     register int havecalls = 0;
849     struct rx_packet *packet;
850     int i;
851     SPLVAR;
852
853     clock_NewTime();
854
855     NETPRI;
856     MUTEX_ENTER(&conn->conn_data_lock);
857     if (conn->refCount > 0)
858         conn->refCount--;
859     else {
860         MUTEX_ENTER(&rx_stats_mutex);
861         rxi_lowConnRefCount++;
862         MUTEX_EXIT(&rx_stats_mutex);
863     }
864
865     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
866         /* Busy; wait till the last guy before proceeding */
867         MUTEX_EXIT(&conn->conn_data_lock);
868         USERPRI;
869         return;
870     }
871
872     /* If the client previously called rx_NewCall, but it is still
873      * waiting, treat this as a running call, and wait to destroy the
874      * connection later when the call completes. */
875     if ((conn->type == RX_CLIENT_CONNECTION) &&
876         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
877         conn->flags |= RX_CONN_DESTROY_ME;
878         MUTEX_EXIT(&conn->conn_data_lock);
879         USERPRI;
880         return;
881     }
882     MUTEX_EXIT(&conn->conn_data_lock);
883
884     /* Check for extant references to this connection */
885     for (i = 0; i<RX_MAXCALLS; i++) {
886         register struct rx_call *call = conn->call[i];
887         if (call) {
888             havecalls = 1;
889             if (conn->type == RX_CLIENT_CONNECTION) {
890                 MUTEX_ENTER(&call->lock);
891                 if (call->delayedAckEvent) {
892                     /* Push the final acknowledgment out now--there
893                      * won't be a subsequent call to acknowledge the
894                      * last reply packets */
895                     rxevent_Cancel(call->delayedAckEvent, call,
896                                    RX_CALL_REFCOUNT_DELAY);
897                     if (call->state == RX_STATE_PRECALL ||
898                         call->state == RX_STATE_ACTIVE) {
899                         rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
900                     } else {
901                         rxi_AckAll(NULL, call, 0);
902                     }
903                 }
904                 MUTEX_EXIT(&call->lock);
905             }
906         }
907     }
908 #ifdef RX_ENABLE_LOCKS
909     if (!havecalls) {
910         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
911             MUTEX_EXIT(&conn->conn_data_lock);
912         }
913         else {
914             /* Someone is accessing a packet right now. */
915             havecalls = 1;
916         }
917     }
918 #endif /* RX_ENABLE_LOCKS */
919
920     if (havecalls) {
921         /* Don't destroy the connection if there are any call
922          * structures still in use */
923         MUTEX_ENTER(&conn->conn_data_lock);
924         conn->flags |= RX_CONN_DESTROY_ME;
925         MUTEX_EXIT(&conn->conn_data_lock);
926         USERPRI;
927         return;
928     }
929
930     if (conn->delayedAbortEvent) {
931         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
932         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
933         if (packet) {
934             MUTEX_ENTER(&conn->conn_data_lock);
935             rxi_SendConnectionAbort(conn, packet, 0, 1);
936             MUTEX_EXIT(&conn->conn_data_lock);
937             rxi_FreePacket(packet);
938         }
939     }
940
941     /* Remove from connection hash table before proceeding */
942     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
943                                              conn->epoch, conn->type) ];
944     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
945         if (*conn_ptr == conn) {
946             *conn_ptr = conn->next;
947             break;
948         }
949     }
950     /* if the conn that we are destroying was the last connection, then we
951     * clear rxLastConn as well */
952     if ( rxLastConn == conn )
953         rxLastConn = 0;
954
955     /* Make sure the connection is completely reset before deleting it. */
956     /* get rid of pending events that could zap us later */
957     if (conn->challengeEvent)
958         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
959     if (conn->checkReachEvent)
960         rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
961  
962     /* Add the connection to the list of destroyed connections that
963      * need to be cleaned up. This is necessary to avoid deadlocks
964      * in the routines we call to inform others that this connection is
965      * being destroyed. */
966     conn->next = rx_connCleanup_list;
967     rx_connCleanup_list = conn;
968 }
969
970 /* Externally available version */
971 void rx_DestroyConnection(register struct rx_connection *conn) 
972 {
973     SPLVAR;
974
975     NETPRI;
976     AFS_RXGLOCK();
977     rxi_DestroyConnection (conn);
978     AFS_RXGUNLOCK();
979     USERPRI;
980 }
981
982 /* Start a new rx remote procedure call, on the specified connection.
983  * If wait is set to 1, wait for a free call channel; otherwise return
984  * 0.  Maxtime gives the maximum number of seconds this call may take,
985  * after rx_MakeCall returns.  After this time interval, a call to any
986  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
987  * For fine grain locking, we hold the conn_call_lock in order to 
988  * to ensure that we don't get signalle after we found a call in an active
989  * state and before we go to sleep.
990  */
991 struct rx_call *rx_NewCall(register struct rx_connection *conn)
992 {
993     register int i;
994     register struct rx_call *call;
995     struct clock queueTime;
996     SPLVAR;
997
998     clock_NewTime();
999     dpf (("rx_MakeCall(conn %x)\n", conn));
1000
1001     NETPRI;
1002     clock_GetTime(&queueTime);
1003     AFS_RXGLOCK();
1004     MUTEX_ENTER(&conn->conn_call_lock);
1005
1006     /*
1007      * Check if there are others waiting for a new call.
1008      * If so, let them go first to avoid starving them.
1009      * This is a fairly simple scheme, and might not be
1010      * a complete solution for large numbers of waiters.
1011      */
1012     if (conn->makeCallWaiters) {
1013 #ifdef  RX_ENABLE_LOCKS
1014         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1015 #else
1016         osi_rxSleep(conn);
1017 #endif
1018     }
1019
1020     for (;;) {
1021         for (i=0; i<RX_MAXCALLS; i++) {
1022             call = conn->call[i];
1023             if (call) {
1024                 MUTEX_ENTER(&call->lock);
1025                 if (call->state == RX_STATE_DALLY) {
1026                     rxi_ResetCall(call, 0);
1027                     (*call->callNumber)++;
1028                     break;
1029                 }
1030                 MUTEX_EXIT(&call->lock);
1031             }
1032             else {
1033                 call = rxi_NewCall(conn, i);
1034                 break;
1035             }
1036         }
1037         if (i < RX_MAXCALLS) {
1038             break;
1039         }
1040         MUTEX_ENTER(&conn->conn_data_lock);
1041         conn->flags |= RX_CONN_MAKECALL_WAITING;
1042         MUTEX_EXIT(&conn->conn_data_lock);
1043
1044         conn->makeCallWaiters++;
1045 #ifdef  RX_ENABLE_LOCKS
1046         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1047 #else
1048         osi_rxSleep(conn);
1049 #endif
1050         conn->makeCallWaiters--;
1051     }
1052     /*
1053      * Wake up anyone else who might be giving us a chance to
1054      * run (see code above that avoids resource starvation).
1055      */
1056 #ifdef  RX_ENABLE_LOCKS
1057     CV_BROADCAST(&conn->conn_call_cv);
1058 #else
1059     osi_rxWakeup(conn);
1060 #endif
1061
1062     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1063
1064     /* Client is initially in send mode */
1065     call->state = RX_STATE_ACTIVE;
1066     call->mode = RX_MODE_SENDING;
1067
1068     /* remember start time for call in case we have hard dead time limit */
1069     call->queueTime = queueTime;
1070     clock_GetTime(&call->startTime);
1071     hzero(call->bytesSent);
1072     hzero(call->bytesRcvd);
1073
1074     /* Turn on busy protocol. */
1075     rxi_KeepAliveOn(call);
1076
1077     MUTEX_EXIT(&call->lock);
1078     MUTEX_EXIT(&conn->conn_call_lock);
1079     AFS_RXGUNLOCK();
1080     USERPRI;
1081
1082 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1083     /* Now, if TQ wasn't cleared earlier, do it now. */
1084     AFS_RXGLOCK();
1085     MUTEX_ENTER(&call->lock);
1086     while (call->flags & RX_CALL_TQ_BUSY) {
1087         call->flags |= RX_CALL_TQ_WAIT;
1088 #ifdef RX_ENABLE_LOCKS
1089         CV_WAIT(&call->cv_tq, &call->lock);
1090 #else /* RX_ENABLE_LOCKS */
1091         osi_rxSleep(&call->tq);
1092 #endif /* RX_ENABLE_LOCKS */
1093     }
1094     if (call->flags & RX_CALL_TQ_CLEARME) {
1095         rxi_ClearTransmitQueue(call, 0);
1096         queue_Init(&call->tq);
1097     }
1098     MUTEX_EXIT(&call->lock);
1099     AFS_RXGUNLOCK();
1100 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1101
1102     return call;
1103 }
1104
1105 int rxi_HasActiveCalls(register struct rx_connection *aconn)
1106 {
1107     register int i;
1108     register struct rx_call *tcall;
1109     SPLVAR;
1110
1111     NETPRI;
1112     for(i=0; i<RX_MAXCALLS; i++) {
1113       if ((tcall = aconn->call[i])) {
1114         if ((tcall->state == RX_STATE_ACTIVE) 
1115             || (tcall->state == RX_STATE_PRECALL)) {
1116           USERPRI;
1117           return 1;
1118         }
1119       }
1120     }
1121     USERPRI;
1122     return 0;
1123 }
1124
1125 int rxi_GetCallNumberVector(register struct rx_connection *aconn, 
1126         register afs_int32 *aint32s)
1127 {
1128     register int i;
1129     register struct rx_call *tcall;
1130     SPLVAR;
1131
1132     NETPRI;
1133     for(i=0; i<RX_MAXCALLS; i++) {
1134         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1135             aint32s[i] = aconn->callNumber[i]+1;
1136         else
1137             aint32s[i] = aconn->callNumber[i];
1138     }
1139     USERPRI;
1140     return 0;
1141 }
1142
1143 int rxi_SetCallNumberVector(register struct rx_connection *aconn, 
1144         register afs_int32 *aint32s)
1145 {
1146     register int i;
1147     register struct rx_call *tcall;
1148     SPLVAR;
1149
1150     NETPRI;
1151     for(i=0; i<RX_MAXCALLS; i++) {
1152         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1153             aconn->callNumber[i] = aint32s[i] - 1;
1154         else
1155             aconn->callNumber[i] = aint32s[i];
1156     }
1157     USERPRI;
1158     return 0;
1159 }
1160
1161 /* Advertise a new service.  A service is named locally by a UDP port
1162  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1163  * on a failure. 
1164  *
1165      char *serviceName;  Name for identification purposes (e.g. the
1166                          service name might be used for probing for
1167                          statistics) */
1168 struct rx_service *rx_NewService(u_short port, u_short serviceId, 
1169         char *serviceName, 
1170         struct rx_securityClass **securityObjects,
1171         int nSecurityObjects, afs_int32 (*serviceProc)(struct rx_call *acall))
1172 {    
1173     osi_socket socket = OSI_NULLSOCKET;
1174     register struct rx_service *tservice;    
1175     register int i;
1176     SPLVAR;
1177
1178     clock_NewTime();
1179
1180     if (serviceId == 0) {
1181         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1182          serviceName);
1183         return 0;
1184     }
1185     if (port == 0) {
1186         if (rx_port == 0) {
1187             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1188             return 0;
1189         }
1190         port = rx_port;
1191         socket = rx_socket;
1192     }
1193
1194     tservice = rxi_AllocService();
1195     NETPRI;
1196     AFS_RXGLOCK();
1197     for (i = 0; i<RX_MAX_SERVICES; i++) {
1198         register struct rx_service *service = rx_services[i];
1199         if (service) {
1200             if (port == service->servicePort) {
1201                 if (service->serviceId == serviceId) {
1202                     /* The identical service has already been
1203                      * installed; if the caller was intending to
1204                      * change the security classes used by this
1205                      * service, he/she loses. */
1206                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1207                     AFS_RXGUNLOCK();
1208                     USERPRI;
1209                     rxi_FreeService(tservice);
1210                     return service;
1211                 }
1212                 /* Different service, same port: re-use the socket
1213                  * which is bound to the same port */
1214                 socket = service->socket;
1215             }
1216         } else {
1217             if (socket == OSI_NULLSOCKET) {
1218                 /* If we don't already have a socket (from another
1219                  * service on same port) get a new one */
1220                 socket = rxi_GetUDPSocket(port);
1221                 if (socket == OSI_NULLSOCKET) {
1222                     AFS_RXGUNLOCK();
1223                     USERPRI;
1224                     rxi_FreeService(tservice);
1225                     return 0;
1226                 }
1227             }
1228             service = tservice;
1229             service->socket = socket;
1230             service->servicePort = port;
1231             service->serviceId = serviceId;
1232             service->serviceName = serviceName;
1233             service->nSecurityObjects = nSecurityObjects;
1234             service->securityObjects = securityObjects;
1235             service->minProcs = 0;
1236             service->maxProcs = 1;
1237             service->idleDeadTime = 60;
1238             service->connDeadTime = rx_connDeadTime;
1239             service->executeRequestProc = serviceProc;
1240             service->checkReach = 0;
1241             rx_services[i] = service;   /* not visible until now */
1242             AFS_RXGUNLOCK();
1243             USERPRI;
1244             return service;
1245         }
1246     }
1247     AFS_RXGUNLOCK();
1248     USERPRI;
1249     rxi_FreeService(tservice);
1250     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1251     return 0;
1252 }
1253
1254 /* Generic request processing loop. This routine should be called
1255  * by the implementation dependent rx_ServerProc. If socketp is
1256  * non-null, it will be set to the file descriptor that this thread
1257  * is now listening on. If socketp is null, this routine will never
1258  * returns. */
1259 void rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket *socketp)
1260 {
1261     register struct rx_call *call;
1262     register afs_int32 code;
1263     register struct rx_service *tservice = NULL;
1264
1265     for (;;) {
1266         if (newcall) {
1267             call = newcall;
1268             newcall = NULL;
1269         } else {
1270             call = rx_GetCall(threadID, tservice, socketp);
1271             if (socketp && *socketp != OSI_NULLSOCKET) {
1272                 /* We are now a listener thread */
1273                 return;
1274             }
1275         }
1276
1277         /* if server is restarting( typically smooth shutdown) then do not
1278          * allow any new calls.
1279          */
1280
1281         if ( rx_tranquil && (call != NULL) ) {
1282             SPLVAR;
1283
1284             NETPRI;
1285             AFS_RXGLOCK();
1286             MUTEX_ENTER(&call->lock);
1287
1288             rxi_CallError(call, RX_RESTARTING);
1289             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1290
1291             MUTEX_EXIT(&call->lock);
1292             AFS_RXGUNLOCK();
1293             USERPRI;
1294         }
1295
1296 #ifdef  KERNEL
1297         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1298 #ifdef RX_ENABLE_LOCKS
1299             AFS_GLOCK();
1300 #endif /* RX_ENABLE_LOCKS */
1301             afs_termState = AFSOP_STOP_AFS;
1302             afs_osi_Wakeup(&afs_termState);
1303 #ifdef RX_ENABLE_LOCKS
1304             AFS_GUNLOCK();
1305 #endif /* RX_ENABLE_LOCKS */
1306             return;
1307         }
1308 #endif
1309
1310         tservice = call->conn->service;
1311
1312         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1313
1314         code = call->conn->service->executeRequestProc(call);
1315
1316         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1317
1318         rx_EndCall(call, code);
1319         MUTEX_ENTER(&rx_stats_mutex);
1320         rxi_nCalls++;
1321         MUTEX_EXIT(&rx_stats_mutex);
1322     }
1323 }
1324
1325
1326 void rx_WakeupServerProcs(void)
1327 {
1328     struct rx_serverQueueEntry *np, *tqp;
1329     SPLVAR;
1330
1331     NETPRI;
1332     AFS_RXGLOCK();
1333     MUTEX_ENTER(&rx_serverPool_lock);
1334
1335 #ifdef RX_ENABLE_LOCKS
1336     if (rx_waitForPacket)
1337         CV_BROADCAST(&rx_waitForPacket->cv);
1338 #else /* RX_ENABLE_LOCKS */
1339     if (rx_waitForPacket)
1340         osi_rxWakeup(rx_waitForPacket);
1341 #endif /* RX_ENABLE_LOCKS */
1342     MUTEX_ENTER(&freeSQEList_lock);
1343     for (np = rx_FreeSQEList; np; np = tqp) {
1344       tqp = *(struct rx_serverQueueEntry **)np;
1345 #ifdef RX_ENABLE_LOCKS
1346       CV_BROADCAST(&np->cv);
1347 #else /* RX_ENABLE_LOCKS */
1348       osi_rxWakeup(np);
1349 #endif /* RX_ENABLE_LOCKS */
1350     }
1351     MUTEX_EXIT(&freeSQEList_lock);
1352     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1353 #ifdef RX_ENABLE_LOCKS
1354       CV_BROADCAST(&np->cv);
1355 #else /* RX_ENABLE_LOCKS */
1356       osi_rxWakeup(np);
1357 #endif /* RX_ENABLE_LOCKS */
1358     }
1359     MUTEX_EXIT(&rx_serverPool_lock);
1360     AFS_RXGUNLOCK();
1361     USERPRI;
1362 }
1363
1364 /* meltdown:
1365  * One thing that seems to happen is that all the server threads get
1366  * tied up on some empty or slow call, and then a whole bunch of calls
1367  * arrive at once, using up the packet pool, so now there are more 
1368  * empty calls.  The most critical resources here are server threads
1369  * and the free packet pool.  The "doreclaim" code seems to help in
1370  * general.  I think that eventually we arrive in this state: there
1371  * are lots of pending calls which do have all their packets present,
1372  * so they won't be reclaimed, are multi-packet calls, so they won't
1373  * be scheduled until later, and thus are tying up most of the free 
1374  * packet pool for a very long time.
1375  * future options:
1376  * 1.  schedule multi-packet calls if all the packets are present.  
1377  * Probably CPU-bound operation, useful to return packets to pool. 
1378  * Do what if there is a full window, but the last packet isn't here?
1379  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1380  * it sleeps and waits for that type of call.
1381  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1382  * the current dataquota business is badly broken.  The quota isn't adjusted
1383  * to reflect how many packets are presently queued for a running call.
1384  * So, when we schedule a queued call with a full window of packets queued
1385  * up for it, that *should* free up a window full of packets for other 2d-class
1386  * calls to be able to use from the packet pool.  But it doesn't.
1387  *
1388  * NB.  Most of the time, this code doesn't run -- since idle server threads
1389  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1390  * as a new call arrives.
1391  */
1392 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1393  * for an rx_Read. */
1394 #ifdef RX_ENABLE_LOCKS
1395 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1396 {
1397     struct rx_serverQueueEntry *sq;
1398     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1399     struct rx_service *service = NULL;
1400     SPLVAR;
1401
1402     MUTEX_ENTER(&freeSQEList_lock);
1403
1404     if ((sq = rx_FreeSQEList)) {
1405         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1406         MUTEX_EXIT(&freeSQEList_lock);
1407     } else {    /* otherwise allocate a new one and return that */
1408         MUTEX_EXIT(&freeSQEList_lock);
1409         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1410         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1411         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1412     }
1413
1414     MUTEX_ENTER(&rx_serverPool_lock);
1415     if (cur_service != NULL) {
1416         ReturnToServerPool(cur_service);
1417     }
1418     while (1) {
1419         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1420             register struct rx_call *tcall, *ncall;
1421             choice2 = (struct rx_call *) 0;
1422             /* Scan for eligible incoming calls.  A call is not eligible
1423              * if the maximum number of calls for its service type are
1424              * already executing */
1425             /* One thread will process calls FCFS (to prevent starvation),
1426              * while the other threads may run ahead looking for calls which
1427              * have all their input data available immediately.  This helps 
1428              * keep threads from blocking, waiting for data from the client. */
1429             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1430               service = tcall->conn->service;
1431               if (!QuotaOK(service)) {
1432                 continue;
1433               }
1434               if (!tno || !tcall->queue_item_header.next  ) {
1435                 /* If we're thread 0, then  we'll just use 
1436                  * this call. If we haven't been able to find an optimal 
1437                  * choice, and we're at the end of the list, then use a 
1438                  * 2d choice if one has been identified.  Otherwise... */
1439                 call = (choice2 ? choice2 : tcall);
1440                 service = call->conn->service;
1441               } else if (!queue_IsEmpty(&tcall->rq)) {
1442                 struct rx_packet *rp;
1443                 rp = queue_First(&tcall->rq, rx_packet);
1444                 if (rp->header.seq == 1) {
1445                   if (!meltdown_1pkt ||             
1446                       (rp->header.flags & RX_LAST_PACKET)) {
1447                     call = tcall;
1448                   } else if (rxi_2dchoice && !choice2 &&
1449                              !(tcall->flags & RX_CALL_CLEARED) &&
1450                              (tcall->rprev > rxi_HardAckRate)) {
1451                     choice2 = tcall;
1452                   } else rxi_md2cnt++;
1453                 }
1454               }
1455               if (call)  {
1456                 break;
1457               } else {
1458                   ReturnToServerPool(service);
1459               }
1460             }
1461           }
1462
1463         if (call) {
1464             queue_Remove(call);
1465             rxi_ServerThreadSelectingCall = 1;
1466             MUTEX_EXIT(&rx_serverPool_lock);
1467             MUTEX_ENTER(&call->lock);
1468             MUTEX_ENTER(&rx_serverPool_lock);
1469
1470             if (queue_IsEmpty(&call->rq) ||
1471                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1472               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1473
1474             CLEAR_CALL_QUEUE_LOCK(call);
1475             if (call->error) {
1476                 MUTEX_EXIT(&call->lock);
1477                 ReturnToServerPool(service);
1478                 rxi_ServerThreadSelectingCall = 0;
1479                 CV_SIGNAL(&rx_serverPool_cv);
1480                 call = (struct rx_call*)0;
1481                 continue;
1482             }
1483             call->flags &= (~RX_CALL_WAIT_PROC);
1484             MUTEX_ENTER(&rx_stats_mutex);
1485             rx_nWaiting--;
1486             MUTEX_EXIT(&rx_stats_mutex);
1487             rxi_ServerThreadSelectingCall = 0;
1488             CV_SIGNAL(&rx_serverPool_cv);
1489             MUTEX_EXIT(&rx_serverPool_lock);
1490             break;
1491         }
1492         else {
1493             /* If there are no eligible incoming calls, add this process
1494              * to the idle server queue, to wait for one */
1495             sq->newcall = 0;
1496             sq->tno = tno;
1497             if (socketp) {
1498                 *socketp = OSI_NULLSOCKET;
1499             }
1500             sq->socketp = socketp;
1501             queue_Append(&rx_idleServerQueue, sq);
1502 #ifndef AFS_AIX41_ENV
1503             rx_waitForPacket = sq;
1504 #endif /* AFS_AIX41_ENV */
1505             do {
1506                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1507 #ifdef  KERNEL
1508                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1509                     MUTEX_EXIT(&rx_serverPool_lock);
1510                     return (struct rx_call *)0;
1511                 }
1512 #endif
1513             } while (!(call = sq->newcall) &&
1514                      !(socketp && *socketp != OSI_NULLSOCKET));
1515             MUTEX_EXIT(&rx_serverPool_lock);
1516             if (call) {
1517                 MUTEX_ENTER(&call->lock);
1518             }
1519             break;
1520         }
1521     }
1522
1523     MUTEX_ENTER(&freeSQEList_lock);
1524     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1525     rx_FreeSQEList = sq;
1526     MUTEX_EXIT(&freeSQEList_lock);
1527
1528     if (call) {
1529         clock_GetTime(&call->startTime);
1530         call->state = RX_STATE_ACTIVE;
1531         call->mode = RX_MODE_RECEIVING;
1532
1533         rxi_calltrace(RX_CALL_START, call);
1534         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1535              call->conn->service->servicePort, 
1536              call->conn->service->serviceId, call));
1537
1538         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1539         MUTEX_EXIT(&call->lock);
1540     } else {
1541         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1542     }
1543
1544     return call;
1545 }
1546 #else /* RX_ENABLE_LOCKS */
1547 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
1548 {
1549     struct rx_serverQueueEntry *sq;
1550     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1551     struct rx_service *service = NULL;
1552     SPLVAR;
1553
1554     NETPRI;
1555     AFS_RXGLOCK();
1556     MUTEX_ENTER(&freeSQEList_lock);
1557
1558     if ((sq = rx_FreeSQEList)) {
1559         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1560         MUTEX_EXIT(&freeSQEList_lock);
1561     } else {    /* otherwise allocate a new one and return that */
1562         MUTEX_EXIT(&freeSQEList_lock);
1563         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1564         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);
1565         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1566     }
1567     MUTEX_ENTER(&sq->lock);
1568
1569     if (cur_service != NULL) {
1570         cur_service->nRequestsRunning--;
1571         if (cur_service->nRequestsRunning < cur_service->minProcs)
1572             rxi_minDeficit++;
1573         rxi_availProcs++;
1574     }
1575     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1576         register struct rx_call *tcall, *ncall;
1577         /* Scan for eligible incoming calls.  A call is not eligible
1578          * if the maximum number of calls for its service type are
1579          * already executing */
1580         /* One thread will process calls FCFS (to prevent starvation),
1581          * while the other threads may run ahead looking for calls which
1582          * have all their input data available immediately.  This helps 
1583          * keep threads from blocking, waiting for data from the client. */
1584         choice2 = (struct rx_call *) 0;
1585         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1586           service = tcall->conn->service;
1587           if (QuotaOK(service)) {
1588              if (!tno || !tcall->queue_item_header.next  ) {
1589                  /* If we're thread 0, then  we'll just use 
1590                   * this call. If we haven't been able to find an optimal 
1591                   * choice, and we're at the end of the list, then use a 
1592                   * 2d choice if one has been identified.  Otherwise... */
1593                  call = (choice2 ? choice2 : tcall);
1594                  service = call->conn->service;
1595              } else if (!queue_IsEmpty(&tcall->rq)) {
1596                  struct rx_packet *rp;
1597                  rp = queue_First(&tcall->rq, rx_packet);
1598                  if (rp->header.seq == 1
1599                      && (!meltdown_1pkt ||
1600                          (rp->header.flags & RX_LAST_PACKET))) {
1601                      call = tcall;
1602                  } else if (rxi_2dchoice && !choice2 &&
1603                             !(tcall->flags & RX_CALL_CLEARED) &&
1604                             (tcall->rprev > rxi_HardAckRate)) {
1605                      choice2 = tcall;
1606                  } else rxi_md2cnt++;
1607              }
1608           }
1609           if (call) 
1610              break;
1611         }
1612       }
1613
1614     if (call) {
1615         queue_Remove(call);
1616         /* we can't schedule a call if there's no data!!! */
1617         /* send an ack if there's no data, if we're missing the
1618          * first packet, or we're missing something between first 
1619          * and last -- there's a "hole" in the incoming data. */
1620         if (queue_IsEmpty(&call->rq) ||
1621             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1622             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1623           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1624
1625         call->flags &= (~RX_CALL_WAIT_PROC);
1626         service->nRequestsRunning++;
1627         /* just started call in minProcs pool, need fewer to maintain
1628          * guarantee */
1629         if (service->nRequestsRunning <= service->minProcs)
1630             rxi_minDeficit--;
1631         rxi_availProcs--;
1632         rx_nWaiting--;
1633         /* MUTEX_EXIT(&call->lock); */
1634     }
1635     else {
1636         /* If there are no eligible incoming calls, add this process
1637          * to the idle server queue, to wait for one */
1638         sq->newcall = 0;
1639         if (socketp) {
1640             *socketp = OSI_NULLSOCKET;
1641         }
1642         sq->socketp = socketp;
1643         queue_Append(&rx_idleServerQueue, sq);
1644         do {
1645             osi_rxSleep(sq);
1646 #ifdef  KERNEL
1647                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1648                     AFS_RXGUNLOCK();
1649                     USERPRI;
1650                     return (struct rx_call *)0;
1651                 }
1652 #endif
1653         } while (!(call = sq->newcall) &&
1654                  !(socketp && *socketp != OSI_NULLSOCKET));
1655     }
1656     MUTEX_EXIT(&sq->lock);
1657
1658     MUTEX_ENTER(&freeSQEList_lock);
1659     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1660     rx_FreeSQEList = sq;
1661     MUTEX_EXIT(&freeSQEList_lock);
1662
1663     if (call) {
1664         clock_GetTime(&call->startTime);
1665         call->state = RX_STATE_ACTIVE;
1666         call->mode = RX_MODE_RECEIVING;
1667
1668         rxi_calltrace(RX_CALL_START, call);
1669         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1670          call->conn->service->servicePort, 
1671          call->conn->service->serviceId, call));
1672     } else {
1673         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1674     }
1675
1676     AFS_RXGUNLOCK();
1677     USERPRI;
1678
1679     return call;
1680 }
1681 #endif /* RX_ENABLE_LOCKS */
1682
1683
1684
1685 /* Establish a procedure to be called when a packet arrives for a
1686  * call.  This routine will be called at most once after each call,
1687  * and will also be called if there is an error condition on the or
1688  * the call is complete.  Used by multi rx to build a selection
1689  * function which determines which of several calls is likely to be a
1690  * good one to read from.  
1691  * NOTE: the way this is currently implemented it is probably only a
1692  * good idea to (1) use it immediately after a newcall (clients only)
1693  * and (2) only use it once.  Other uses currently void your warranty
1694  */
1695 void rx_SetArrivalProc(register struct rx_call *call, 
1696         register VOID (*proc)(register struct rx_call *call,
1697         register struct multi_handle *mh, register int index),
1698         register VOID *handle, register VOID *arg)
1699 {
1700     call->arrivalProc = proc;
1701     call->arrivalProcHandle = handle;
1702     call->arrivalProcArg = arg;
1703 }
1704
1705 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1706  * appropriate, and return the final error code from the conversation
1707  * to the caller */
1708
1709 afs_int32 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1710 {
1711     register struct rx_connection *conn = call->conn;
1712     register struct rx_service *service;
1713     register struct rx_packet *tp; /* Temporary packet pointer */
1714     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1715     afs_int32 error;
1716     SPLVAR;
1717
1718     dpf(("rx_EndCall(call %x)\n", call));
1719
1720     NETPRI;
1721     AFS_RXGLOCK();
1722     MUTEX_ENTER(&call->lock);
1723
1724     if (rc == 0 && call->error == 0) {
1725         call->abortCode = 0;
1726         call->abortCount = 0;
1727     }
1728
1729     call->arrivalProc = (VOID (*)()) 0;
1730     if (rc && call->error == 0) {
1731         rxi_CallError(call, rc);
1732         /* Send an abort message to the peer if this error code has
1733          * only just been set.  If it was set previously, assume the
1734          * peer has already been sent the error code or will request it 
1735          */
1736         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1737     }
1738     if (conn->type == RX_SERVER_CONNECTION) {
1739         /* Make sure reply or at least dummy reply is sent */
1740         if (call->mode == RX_MODE_RECEIVING) {
1741             rxi_WriteProc(call, 0, 0);
1742         }
1743         if (call->mode == RX_MODE_SENDING) {
1744             rxi_FlushWrite(call);
1745         }
1746         service = conn->service;
1747         rxi_calltrace(RX_CALL_END, call);
1748         /* Call goes to hold state until reply packets are acknowledged */
1749         if (call->tfirst + call->nSoftAcked < call->tnext) {
1750             call->state = RX_STATE_HOLD;
1751         } else {
1752             call->state = RX_STATE_DALLY;
1753             rxi_ClearTransmitQueue(call, 0);
1754             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1755             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1756         }
1757     }
1758     else { /* Client connection */
1759         char dummy;
1760         /* Make sure server receives input packets, in the case where
1761          * no reply arguments are expected */
1762         if ((call->mode == RX_MODE_SENDING)
1763          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1764             (void) rxi_ReadProc(call, &dummy, 1);
1765         }
1766
1767         /* If we had an outstanding delayed ack, be nice to the server
1768          * and force-send it now.
1769          */
1770         if (call->delayedAckEvent) {
1771             rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
1772             call->delayedAckEvent = NULL;
1773             rxi_SendDelayedAck(NULL, call, NULL);
1774         }
1775
1776         /* We need to release the call lock since it's lower than the
1777          * conn_call_lock and we don't want to hold the conn_call_lock
1778          * over the rx_ReadProc call. The conn_call_lock needs to be held
1779          * here for the case where rx_NewCall is perusing the calls on
1780          * the connection structure. We don't want to signal until
1781          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1782          * have checked this call, found it active and by the time it
1783          * goes to sleep, will have missed the signal.
1784          */
1785         MUTEX_EXIT(&call->lock);
1786         MUTEX_ENTER(&conn->conn_call_lock);
1787         MUTEX_ENTER(&call->lock);
1788         MUTEX_ENTER(&conn->conn_data_lock);
1789         conn->flags |= RX_CONN_BUSY;
1790         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1791             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1792             MUTEX_EXIT(&conn->conn_data_lock);
1793 #ifdef  RX_ENABLE_LOCKS
1794             CV_BROADCAST(&conn->conn_call_cv);
1795 #else
1796             osi_rxWakeup(conn);
1797 #endif
1798         }
1799 #ifdef RX_ENABLE_LOCKS
1800         else {
1801             MUTEX_EXIT(&conn->conn_data_lock);
1802         }
1803 #endif /* RX_ENABLE_LOCKS */
1804         call->state = RX_STATE_DALLY;
1805     }
1806     error = call->error;
1807
1808     /* currentPacket, nLeft, and NFree must be zeroed here, because
1809      * ResetCall cannot: ResetCall may be called at splnet(), in the
1810      * kernel version, and may interrupt the macros rx_Read or
1811      * rx_Write, which run at normal priority for efficiency. */
1812     if (call->currentPacket) {
1813         rxi_FreePacket(call->currentPacket);
1814         call->currentPacket = (struct rx_packet *) 0;
1815         call->nLeft = call->nFree = call->curlen = 0;
1816     }
1817     else
1818         call->nLeft = call->nFree = call->curlen = 0;
1819
1820     /* Free any packets from the last call to ReadvProc/WritevProc */
1821     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1822         queue_Remove(tp);
1823         rxi_FreePacket(tp);
1824     }
1825
1826     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1827     MUTEX_EXIT(&call->lock);
1828     if (conn->type == RX_CLIENT_CONNECTION) {
1829         MUTEX_EXIT(&conn->conn_call_lock);
1830         conn->flags &= ~RX_CONN_BUSY;
1831     }
1832     AFS_RXGUNLOCK();
1833     USERPRI;
1834     /*
1835      * Map errors to the local host's errno.h format.
1836      */
1837     error = ntoh_syserr_conv(error);
1838     return error;
1839 }
1840
1841 #if !defined(KERNEL)
1842
1843 /* Call this routine when shutting down a server or client (especially
1844  * clients).  This will allow Rx to gracefully garbage collect server
1845  * connections, and reduce the number of retries that a server might
1846  * make to a dead client.
1847  * This is not quite right, since some calls may still be ongoing and
1848  * we can't lock them to destroy them. */
1849 void rx_Finalize(void)
1850 {
1851     register struct rx_connection **conn_ptr, **conn_end;
1852
1853     INIT_PTHREAD_LOCKS
1854     LOCK_RX_INIT
1855     if (rxinit_status == 1) {
1856         UNLOCK_RX_INIT
1857         return; /* Already shutdown. */
1858     }
1859     rxi_DeleteCachedConnections();
1860     if (rx_connHashTable) {
1861         MUTEX_ENTER(&rx_connHashTable_lock);
1862         for (conn_ptr = &rx_connHashTable[0], 
1863              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1864              conn_ptr < conn_end; conn_ptr++) {
1865             struct rx_connection *conn, *next;
1866             for (conn = *conn_ptr; conn; conn = next) {
1867                 next = conn->next;
1868                 if (conn->type == RX_CLIENT_CONNECTION) {
1869                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1870                     conn->refCount++;
1871                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1872 #ifdef RX_ENABLE_LOCKS
1873                     rxi_DestroyConnectionNoLock(conn);
1874 #else /* RX_ENABLE_LOCKS */
1875                     rxi_DestroyConnection(conn);
1876 #endif /* RX_ENABLE_LOCKS */
1877                 }
1878             }
1879         }
1880 #ifdef RX_ENABLE_LOCKS
1881         while (rx_connCleanup_list) {
1882             struct rx_connection *conn;
1883             conn = rx_connCleanup_list;
1884             rx_connCleanup_list = rx_connCleanup_list->next;
1885             MUTEX_EXIT(&rx_connHashTable_lock);
1886             rxi_CleanupConnection(conn);
1887             MUTEX_ENTER(&rx_connHashTable_lock);
1888         }
1889         MUTEX_EXIT(&rx_connHashTable_lock);
1890 #endif /* RX_ENABLE_LOCKS */
1891     }
1892     rxi_flushtrace();
1893
1894     rxinit_status = 1;
1895     UNLOCK_RX_INIT
1896 }
1897 #endif
1898
1899 /* if we wakeup packet waiter too often, can get in loop with two
1900     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1901 void rxi_PacketsUnWait(void)
1902 {
1903     if (!rx_waitingForPackets) {
1904         return;
1905     }
1906 #ifdef KERNEL
1907     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1908         return;                                     /* still over quota */
1909     }
1910 #endif /* KERNEL */
1911     rx_waitingForPackets = 0;
1912 #ifdef  RX_ENABLE_LOCKS
1913     CV_BROADCAST(&rx_waitingForPackets_cv);
1914 #else
1915     osi_rxWakeup(&rx_waitingForPackets);
1916 #endif
1917     return;
1918 }
1919
1920
1921 /* ------------------Internal interfaces------------------------- */
1922
1923 /* Return this process's service structure for the
1924  * specified socket and service */
1925 struct rx_service *rxi_FindService(register osi_socket socket, 
1926         register u_short serviceId)
1927 {
1928     register struct rx_service **sp;    
1929     for (sp = &rx_services[0]; *sp; sp++) {
1930         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1931           return *sp;
1932     }
1933     return 0;
1934 }
1935
1936 /* Allocate a call structure, for the indicated channel of the
1937  * supplied connection.  The mode and state of the call must be set by
1938  * the caller. Returns the call with mutex locked. */
1939 struct rx_call *rxi_NewCall(register struct rx_connection *conn,
1940         register int channel)
1941 {
1942     register struct rx_call *call;
1943 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1944     register struct rx_call *cp;        /* Call pointer temp */
1945     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1946 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1947
1948     /* Grab an existing call structure, or allocate a new one.
1949      * Existing call structures are assumed to have been left reset by
1950      * rxi_FreeCall */
1951     MUTEX_ENTER(&rx_freeCallQueue_lock);
1952
1953 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1954     /*
1955      * EXCEPT that the TQ might not yet be cleared out.
1956      * Skip over those with in-use TQs.
1957      */
1958     call = NULL;
1959     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1960         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1961             call = cp;
1962             break;
1963         }
1964     }
1965     if (call) {
1966 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1967     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1968         call = queue_First(&rx_freeCallQueue, rx_call);
1969 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1970         queue_Remove(call);
1971         MUTEX_ENTER(&rx_stats_mutex);
1972         rx_stats.nFreeCallStructs--;
1973         MUTEX_EXIT(&rx_stats_mutex);
1974         MUTEX_EXIT(&rx_freeCallQueue_lock);
1975         MUTEX_ENTER(&call->lock);
1976         CLEAR_CALL_QUEUE_LOCK(call);
1977 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1978         /* Now, if TQ wasn't cleared earlier, do it now. */
1979         if (call->flags & RX_CALL_TQ_CLEARME) {
1980             rxi_ClearTransmitQueue(call, 0);
1981             queue_Init(&call->tq);
1982         }
1983 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1984         /* Bind the call to its connection structure */
1985         call->conn = conn;
1986         rxi_ResetCall(call, 1);
1987     }
1988     else {
1989         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1990
1991         MUTEX_EXIT(&rx_freeCallQueue_lock);
1992         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1993         MUTEX_ENTER(&call->lock);
1994         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1995         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1996         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1997
1998         MUTEX_ENTER(&rx_stats_mutex);
1999         rx_stats.nCallStructs++;
2000         MUTEX_EXIT(&rx_stats_mutex);
2001         /* Initialize once-only items */
2002         queue_Init(&call->tq);
2003         queue_Init(&call->rq);
2004         queue_Init(&call->iovq);
2005         /* Bind the call to its connection structure (prereq for reset) */
2006         call->conn = conn;
2007         rxi_ResetCall(call, 1);
2008     }
2009     call->channel = channel;
2010     call->callNumber = &conn->callNumber[channel];
2011     /* Note that the next expected call number is retained (in
2012      * conn->callNumber[i]), even if we reallocate the call structure
2013      */
2014     conn->call[channel] = call;
2015     /* if the channel's never been used (== 0), we should start at 1, otherwise
2016         the call number is valid from the last time this channel was used */
2017     if (*call->callNumber == 0) *call->callNumber = 1;
2018
2019     return call;
2020 }
2021
2022 /* A call has been inactive long enough that so we can throw away
2023  * state, including the call structure, which is placed on the call
2024  * free list.
2025  * Call is locked upon entry.
2026  * haveCTLock set if called from rxi_ReapConnections
2027  */
2028 #ifdef RX_ENABLE_LOCKS
2029 void rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2030 #else /* RX_ENABLE_LOCKS */
2031 void rxi_FreeCall(register struct rx_call *call)
2032 #endif /* RX_ENABLE_LOCKS */
2033 {
2034     register int channel = call->channel;
2035     register struct rx_connection *conn = call->conn;
2036
2037
2038     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2039       (*call->callNumber)++;
2040     rxi_ResetCall(call, 0);
2041     call->conn->call[channel] = (struct rx_call *) 0;
2042
2043     MUTEX_ENTER(&rx_freeCallQueue_lock);
2044     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2045 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2046     /* A call may be free even though its transmit queue is still in use.
2047      * Since we search the call list from head to tail, put busy calls at
2048      * the head of the list, and idle calls at the tail.
2049      */
2050     if (call->flags & RX_CALL_TQ_BUSY)
2051         queue_Prepend(&rx_freeCallQueue, call);
2052     else
2053         queue_Append(&rx_freeCallQueue, call);
2054 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2055     queue_Append(&rx_freeCallQueue, call);
2056 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2057     MUTEX_ENTER(&rx_stats_mutex);
2058     rx_stats.nFreeCallStructs++;
2059     MUTEX_EXIT(&rx_stats_mutex);
2060
2061     MUTEX_EXIT(&rx_freeCallQueue_lock);
2062  
2063     /* Destroy the connection if it was previously slated for
2064      * destruction, i.e. the Rx client code previously called
2065      * rx_DestroyConnection (client connections), or
2066      * rxi_ReapConnections called the same routine (server
2067      * connections).  Only do this, however, if there are no
2068      * outstanding calls. Note that for fine grain locking, there appears
2069      * to be a deadlock in that rxi_FreeCall has a call locked and
2070      * DestroyConnectionNoLock locks each call in the conn. But note a
2071      * few lines up where we have removed this call from the conn.
2072      * If someone else destroys a connection, they either have no
2073      * call lock held or are going through this section of code.
2074      */
2075     if (conn->flags & RX_CONN_DESTROY_ME) {
2076         MUTEX_ENTER(&conn->conn_data_lock);
2077         conn->refCount++;
2078         MUTEX_EXIT(&conn->conn_data_lock);
2079 #ifdef RX_ENABLE_LOCKS
2080         if (haveCTLock)
2081             rxi_DestroyConnectionNoLock(conn);
2082         else
2083             rxi_DestroyConnection(conn);
2084 #else /* RX_ENABLE_LOCKS */
2085         rxi_DestroyConnection(conn);
2086 #endif /* RX_ENABLE_LOCKS */
2087     }
2088 }
2089
2090 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2091 char *rxi_Alloc(register size_t size)
2092 {
2093     register char *p;
2094
2095 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2096     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2097      * implementation.
2098      */
2099     int glockOwner = ISAFS_GLOCK();
2100     if (!glockOwner)
2101         AFS_GLOCK();
2102 #endif
2103     MUTEX_ENTER(&rx_stats_mutex);
2104     rxi_Alloccnt++; rxi_Allocsize += size;
2105     MUTEX_EXIT(&rx_stats_mutex);
2106 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2107     if (size > AFS_SMALLOCSIZ) {
2108         p = (char *) osi_AllocMediumSpace(size);
2109     } else
2110         p = (char *) osi_AllocSmall(size, 1);
2111 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2112     if (!glockOwner)
2113         AFS_GUNLOCK();
2114 #endif
2115 #else
2116     p = (char *) osi_Alloc(size);
2117 #endif
2118     if (!p) osi_Panic("rxi_Alloc error");
2119     memset(p, 0, size);
2120     return p;
2121 }
2122
2123 void rxi_Free(void *addr, register size_t size)
2124 {
2125 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2126     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2127      * implementation.
2128      */
2129     int glockOwner = ISAFS_GLOCK();
2130     if (!glockOwner)
2131         AFS_GLOCK();
2132 #endif
2133     MUTEX_ENTER(&rx_stats_mutex);
2134     rxi_Alloccnt--; rxi_Allocsize -= size;
2135     MUTEX_EXIT(&rx_stats_mutex);
2136 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2137     if (size > AFS_SMALLOCSIZ)
2138         osi_FreeMediumSpace(addr);
2139     else
2140         osi_FreeSmall(addr);
2141 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2142     if (!glockOwner)
2143         AFS_GUNLOCK();
2144 #endif
2145 #else
2146     osi_Free(addr, size);
2147 #endif    
2148 }
2149
2150 /* Find the peer process represented by the supplied (host,port)
2151  * combination.  If there is no appropriate active peer structure, a
2152  * new one will be allocated and initialized 
2153  * The origPeer, if set, is a pointer to a peer structure on which the
2154  * refcount will be be decremented. This is used to replace the peer
2155  * structure hanging off a connection structure */
2156 struct rx_peer *rxi_FindPeer(register afs_uint32 host, 
2157         register u_short port, struct rx_peer *origPeer, int create)
2158 {
2159     register struct rx_peer *pp;
2160     int hashIndex;
2161     hashIndex = PEER_HASH(host, port);
2162     MUTEX_ENTER(&rx_peerHashTable_lock);
2163     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2164         if ((pp->host == host) && (pp->port == port)) break;
2165     }
2166     if (!pp) {
2167         if (create) {
2168             pp = rxi_AllocPeer(); /* This bzero's *pp */
2169             pp->host = host;      /* set here or in InitPeerParams is zero */
2170             pp->port = port;
2171             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2172             queue_Init(&pp->congestionQueue);
2173             queue_Init(&pp->rpcStats);
2174             pp->next = rx_peerHashTable[hashIndex];
2175             rx_peerHashTable[hashIndex] = pp;
2176             rxi_InitPeerParams(pp);
2177             MUTEX_ENTER(&rx_stats_mutex);
2178             rx_stats.nPeerStructs++;
2179             MUTEX_EXIT(&rx_stats_mutex);
2180         }
2181     }
2182     if (pp && create) {
2183         pp->refCount++;
2184     }
2185     if ( origPeer)
2186         origPeer->refCount--;
2187     MUTEX_EXIT(&rx_peerHashTable_lock);
2188     return pp;
2189 }
2190
2191
2192 /* Find the connection at (host, port) started at epoch, and with the
2193  * given connection id.  Creates the server connection if necessary.
2194  * The type specifies whether a client connection or a server
2195  * connection is desired.  In both cases, (host, port) specify the
2196  * peer's (host, pair) pair.  Client connections are not made
2197  * automatically by this routine.  The parameter socket gives the
2198  * socket descriptor on which the packet was received.  This is used,
2199  * in the case of server connections, to check that *new* connections
2200  * come via a valid (port, serviceId).  Finally, the securityIndex
2201  * parameter must match the existing index for the connection.  If a
2202  * server connection is created, it will be created using the supplied
2203  * index, if the index is valid for this service */
2204 struct rx_connection *rxi_FindConnection(osi_socket socket, 
2205         register afs_int32 host, register u_short port, u_short serviceId, 
2206         afs_uint32 cid, afs_uint32 epoch, int type, u_int securityIndex)
2207 {
2208     int hashindex, flag;
2209     register struct rx_connection *conn;
2210     hashindex = CONN_HASH(host, port, cid, epoch, type);
2211     MUTEX_ENTER(&rx_connHashTable_lock);
2212     rxLastConn ? (conn = rxLastConn, flag = 0) :
2213                  (conn = rx_connHashTable[hashindex], flag = 1);
2214     for (; conn; ) {
2215       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2216           && (epoch == conn->epoch)) {
2217         register struct rx_peer *pp = conn->peer;
2218         if (securityIndex != conn->securityIndex) {
2219             /* this isn't supposed to happen, but someone could forge a packet
2220                like this, and there seems to be some CM bug that makes this
2221                happen from time to time -- in which case, the fileserver
2222                asserts. */  
2223             MUTEX_EXIT(&rx_connHashTable_lock);
2224             return (struct rx_connection *) 0;
2225         }
2226         if (pp->host == host && pp->port == port)
2227             break;
2228         if (type == RX_CLIENT_CONNECTION && pp->port == port)
2229             break;
2230         if (type == RX_CLIENT_CONNECTION && (conn->epoch & 0x80000000))
2231             break;
2232       }
2233       if ( !flag )
2234       {
2235         /* the connection rxLastConn that was used the last time is not the
2236         ** one we are looking for now. Hence, start searching in the hash */
2237         flag = 1;
2238         conn = rx_connHashTable[hashindex];
2239       }
2240       else
2241         conn = conn->next;
2242     }
2243     if (!conn) {
2244         struct rx_service *service;
2245         if (type == RX_CLIENT_CONNECTION) {
2246             MUTEX_EXIT(&rx_connHashTable_lock);
2247             return (struct rx_connection *) 0;
2248         }
2249         service = rxi_FindService(socket, serviceId);
2250         if (!service || (securityIndex >= service->nSecurityObjects) 
2251             || (service->securityObjects[securityIndex] == 0)) {
2252             MUTEX_EXIT(&rx_connHashTable_lock);
2253             return (struct rx_connection *) 0;
2254         }
2255         conn = rxi_AllocConnection(); /* This bzero's the connection */
2256         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2257                    MUTEX_DEFAULT,0);
2258         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2259                    MUTEX_DEFAULT,0);
2260         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2261         conn->next = rx_connHashTable[hashindex];
2262         rx_connHashTable[hashindex] = conn;
2263         conn->peer = rxi_FindPeer(host, port, 0, 1);
2264         conn->type = RX_SERVER_CONNECTION;
2265         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2266         conn->epoch = epoch;
2267         conn->cid = cid & RX_CIDMASK;
2268         /* conn->serial = conn->lastSerial = 0; */
2269         /* conn->timeout = 0; */
2270         conn->ackRate = RX_FAST_ACK_RATE;
2271         conn->service = service;
2272         conn->serviceId = serviceId;
2273         conn->securityIndex = securityIndex;
2274         conn->securityObject = service->securityObjects[securityIndex];
2275         conn->nSpecific = 0;
2276         conn->specific = NULL;
2277         rx_SetConnDeadTime(conn, service->connDeadTime);
2278         /* Notify security object of the new connection */
2279         RXS_NewConnection(conn->securityObject, conn);
2280         /* XXXX Connection timeout? */
2281         if (service->newConnProc) (*service->newConnProc)(conn);
2282         MUTEX_ENTER(&rx_stats_mutex);
2283         rx_stats.nServerConns++;
2284         MUTEX_EXIT(&rx_stats_mutex);
2285     }
2286
2287     MUTEX_ENTER(&conn->conn_data_lock);
2288     conn->refCount++;
2289     MUTEX_EXIT(&conn->conn_data_lock);
2290
2291     rxLastConn = conn;  /* store this connection as the last conn used */
2292     MUTEX_EXIT(&rx_connHashTable_lock);
2293     return conn;
2294 }
2295
2296 /* There are two packet tracing routines available for testing and monitoring
2297  * Rx.  One is called just after every packet is received and the other is
2298  * called just before every packet is sent.  Received packets, have had their
2299  * headers decoded, and packets to be sent have not yet had their headers
2300  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2301  * containing the network address.  Both can be modified.  The return value, if
2302  * non-zero, indicates that the packet should be dropped.  */
2303
2304 int (*rx_justReceived)() = 0;
2305 int (*rx_almostSent)() = 0;
2306
2307 /* A packet has been received off the interface.  Np is the packet, socket is
2308  * the socket number it was received from (useful in determining which service
2309  * this packet corresponds to), and (host, port) reflect the host,port of the
2310  * sender.  This call returns the packet to the caller if it is finished with
2311  * it, rather than de-allocating it, just as a small performance hack */
2312
2313 struct rx_packet *rxi_ReceivePacket(register struct rx_packet *np, 
2314         osi_socket socket, afs_uint32 host, u_short port, 
2315         int *tnop, struct rx_call **newcallp)
2316 {
2317     register struct rx_call *call;
2318     register struct rx_connection *conn;
2319     int channel;
2320     afs_uint32 currentCallNumber;
2321     int type;
2322     int skew;
2323 #ifdef RXDEBUG
2324     char *packetType;
2325 #endif
2326     struct rx_packet *tnp;
2327
2328 #ifdef RXDEBUG
2329 /* We don't print out the packet until now because (1) the time may not be
2330  * accurate enough until now in the lwp implementation (rx_Listener only gets
2331  * the time after the packet is read) and (2) from a protocol point of view,
2332  * this is the first time the packet has been seen */
2333     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2334         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2335     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2336          np->header.serial, packetType, host, port, np->header.serviceId,
2337          np->header.epoch, np->header.cid, np->header.callNumber, 
2338          np->header.seq, np->header.flags, np));
2339 #endif
2340
2341     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2342       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2343     }
2344
2345     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2346         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2347     }
2348 #ifdef RXDEBUG
2349     /* If an input tracer function is defined, call it with the packet and
2350      * network address.  Note this function may modify its arguments. */
2351     if (rx_justReceived) {
2352         struct sockaddr_in addr;
2353         int drop;
2354         addr.sin_family = AF_INET;
2355         addr.sin_port = port;
2356         addr.sin_addr.s_addr = host;
2357 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN 
2358         addr.sin_len = sizeof(addr);
2359 #endif  /* AFS_OSF_ENV */
2360         drop = (*rx_justReceived) (np, &addr);
2361         /* drop packet if return value is non-zero */
2362         if (drop) return np;
2363         port = addr.sin_port;           /* in case fcn changed addr */
2364         host = addr.sin_addr.s_addr;
2365     }
2366 #endif
2367
2368     /* If packet was not sent by the client, then *we* must be the client */
2369     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2370         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2371
2372     /* Find the connection (or fabricate one, if we're the server & if
2373      * necessary) associated with this packet */
2374     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2375                               np->header.cid, np->header.epoch, type, 
2376                               np->header.securityIndex);
2377
2378     if (!conn) {
2379       /* If no connection found or fabricated, just ignore the packet.
2380        * (An argument could be made for sending an abort packet for
2381        * the conn) */
2382       return np;
2383     }   
2384
2385     MUTEX_ENTER(&conn->conn_data_lock);
2386     if (conn->maxSerial < np->header.serial)
2387         conn->maxSerial = np->header.serial;
2388     MUTEX_EXIT(&conn->conn_data_lock);
2389
2390     /* If the connection is in an error state, send an abort packet and ignore
2391      * the incoming packet */
2392     if (conn->error) {
2393         /* Don't respond to an abort packet--we don't want loops! */
2394         MUTEX_ENTER(&conn->conn_data_lock);
2395         if (np->header.type != RX_PACKET_TYPE_ABORT)
2396             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2397         conn->refCount--;
2398         MUTEX_EXIT(&conn->conn_data_lock);
2399         return np;
2400     }
2401
2402     /* Check for connection-only requests (i.e. not call specific). */
2403     if (np->header.callNumber == 0) {
2404         switch (np->header.type) {
2405             case RX_PACKET_TYPE_ABORT:
2406                 /* What if the supplied error is zero? */
2407                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2408                 MUTEX_ENTER(&conn->conn_data_lock);
2409                 conn->refCount--;
2410                 MUTEX_EXIT(&conn->conn_data_lock);
2411                 return np;
2412             case RX_PACKET_TYPE_CHALLENGE:
2413                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2414                 MUTEX_ENTER(&conn->conn_data_lock);
2415                 conn->refCount--;
2416                 MUTEX_EXIT(&conn->conn_data_lock);
2417                 return tnp;
2418             case RX_PACKET_TYPE_RESPONSE:
2419                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2420                 MUTEX_ENTER(&conn->conn_data_lock);
2421                 conn->refCount--;
2422                 MUTEX_EXIT(&conn->conn_data_lock);
2423                 return tnp;
2424             case RX_PACKET_TYPE_PARAMS:
2425             case RX_PACKET_TYPE_PARAMS+1:
2426             case RX_PACKET_TYPE_PARAMS+2:
2427                 /* ignore these packet types for now */
2428                 MUTEX_ENTER(&conn->conn_data_lock);
2429                 conn->refCount--;
2430                 MUTEX_EXIT(&conn->conn_data_lock);
2431                 return np;
2432
2433
2434             default:
2435                 /* Should not reach here, unless the peer is broken: send an
2436                  * abort packet */
2437                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2438                 MUTEX_ENTER(&conn->conn_data_lock);
2439                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2440                 conn->refCount--;
2441                 MUTEX_EXIT(&conn->conn_data_lock);
2442                 return tnp;
2443         }
2444     }
2445
2446     channel = np->header.cid & RX_CHANNELMASK;
2447     call = conn->call[channel];
2448 #ifdef  RX_ENABLE_LOCKS
2449     if (call)
2450         MUTEX_ENTER(&call->lock);
2451     /* Test to see if call struct is still attached to conn. */
2452     if (call != conn->call[channel]) {
2453         if (call)
2454             MUTEX_EXIT(&call->lock);
2455         if (type == RX_SERVER_CONNECTION) {
2456             call = conn->call[channel];
2457             /* If we started with no call attached and there is one now,
2458              * another thread is also running this routine and has gotten
2459              * the connection channel. We should drop this packet in the tests
2460              * below. If there was a call on this connection and it's now
2461              * gone, then we'll be making a new call below.
2462              * If there was previously a call and it's now different then
2463              * the old call was freed and another thread running this routine
2464              * has created a call on this channel. One of these two threads
2465              * has a packet for the old call and the code below handles those
2466              * cases.
2467              */
2468             if (call)
2469                 MUTEX_ENTER(&call->lock);
2470         }
2471         else {
2472             /* This packet can't be for this call. If the new call address is
2473              * 0 then no call is running on this channel. If there is a call
2474              * then, since this is a client connection we're getting data for
2475              * it must be for the previous call.
2476              */
2477             MUTEX_ENTER(&rx_stats_mutex);
2478             rx_stats.spuriousPacketsRead++;
2479             MUTEX_EXIT(&rx_stats_mutex);
2480             MUTEX_ENTER(&conn->conn_data_lock);
2481             conn->refCount--;
2482             MUTEX_EXIT(&conn->conn_data_lock);
2483             return np;
2484         }
2485     }
2486 #endif
2487     currentCallNumber = conn->callNumber[channel];
2488
2489     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2490         if (np->header.callNumber < currentCallNumber) {
2491             MUTEX_ENTER(&rx_stats_mutex);
2492             rx_stats.spuriousPacketsRead++;
2493             MUTEX_EXIT(&rx_stats_mutex);
2494 #ifdef  RX_ENABLE_LOCKS
2495             if (call)
2496                 MUTEX_EXIT(&call->lock);
2497 #endif
2498             MUTEX_ENTER(&conn->conn_data_lock);
2499             conn->refCount--;
2500             MUTEX_EXIT(&conn->conn_data_lock);
2501             return np;
2502         }
2503         if (!call) {
2504             MUTEX_ENTER(&conn->conn_call_lock);
2505             call = rxi_NewCall(conn, channel);
2506             MUTEX_EXIT(&conn->conn_call_lock);
2507             *call->callNumber = np->header.callNumber;
2508             call->state = RX_STATE_PRECALL;
2509             clock_GetTime(&call->queueTime);
2510             hzero(call->bytesSent);
2511             hzero(call->bytesRcvd);
2512             rxi_KeepAliveOn(call);
2513         }
2514         else if (np->header.callNumber != currentCallNumber) {
2515             /* Wait until the transmit queue is idle before deciding
2516              * whether to reset the current call. Chances are that the
2517              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2518              * flag is cleared.
2519              */
2520 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2521             while ((call->state == RX_STATE_ACTIVE) &&
2522                    (call->flags & RX_CALL_TQ_BUSY)) {
2523                 call->flags |= RX_CALL_TQ_WAIT;
2524 #ifdef RX_ENABLE_LOCKS
2525                 CV_WAIT(&call->cv_tq, &call->lock);
2526 #else /* RX_ENABLE_LOCKS */
2527                 osi_rxSleep(&call->tq);
2528 #endif /* RX_ENABLE_LOCKS */
2529             }
2530 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2531             /* If the new call cannot be taken right now send a busy and set
2532              * the error condition in this call, so that it terminates as
2533              * quickly as possible */
2534             if (call->state == RX_STATE_ACTIVE) {
2535                 struct rx_packet *tp;
2536
2537                 rxi_CallError(call, RX_CALL_DEAD);
2538                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1);
2539                 MUTEX_EXIT(&call->lock);
2540                 MUTEX_ENTER(&conn->conn_data_lock);
2541                 conn->refCount--;
2542                 MUTEX_EXIT(&conn->conn_data_lock);
2543                 return tp;
2544             }
2545             rxi_ResetCall(call, 0);
2546             *call->callNumber = np->header.callNumber;
2547             call->state = RX_STATE_PRECALL;
2548             clock_GetTime(&call->queueTime);
2549             hzero(call->bytesSent);
2550             hzero(call->bytesRcvd);
2551             /*
2552              * If the number of queued calls exceeds the overload
2553              * threshold then abort this call.
2554              */
2555             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2556                 struct rx_packet *tp;
2557
2558                 rxi_CallError(call, rx_BusyError);
2559                 tp = rxi_SendCallAbort(call, np, 1, 0);
2560                 MUTEX_EXIT(&call->lock);
2561                 MUTEX_ENTER(&conn->conn_data_lock);
2562                 conn->refCount--;
2563                 MUTEX_EXIT(&conn->conn_data_lock);
2564                 return tp;
2565             }
2566             rxi_KeepAliveOn(call);
2567         }
2568         else {
2569             /* Continuing call; do nothing here. */
2570         }
2571     } else { /* we're the client */
2572         /* Ignore all incoming acknowledgements for calls in DALLY state */
2573         if ( call && (call->state == RX_STATE_DALLY) 
2574          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2575             MUTEX_ENTER(&rx_stats_mutex);
2576             rx_stats.ignorePacketDally++;
2577             MUTEX_EXIT(&rx_stats_mutex);
2578 #ifdef  RX_ENABLE_LOCKS
2579             if (call) {
2580                 MUTEX_EXIT(&call->lock);
2581             }
2582 #endif
2583             MUTEX_ENTER(&conn->conn_data_lock);
2584             conn->refCount--;
2585             MUTEX_EXIT(&conn->conn_data_lock);
2586             return np;
2587         }
2588         
2589         /* Ignore anything that's not relevant to the current call.  If there
2590          * isn't a current call, then no packet is relevant. */
2591         if (!call || (np->header.callNumber != currentCallNumber)) {
2592             MUTEX_ENTER(&rx_stats_mutex);
2593             rx_stats.spuriousPacketsRead++;
2594             MUTEX_EXIT(&rx_stats_mutex);
2595 #ifdef  RX_ENABLE_LOCKS
2596             if (call) {
2597                 MUTEX_EXIT(&call->lock);
2598             }
2599 #endif
2600             MUTEX_ENTER(&conn->conn_data_lock);
2601             conn->refCount--;
2602             MUTEX_EXIT(&conn->conn_data_lock);
2603             return np;  
2604         }
2605         /* If the service security object index stamped in the packet does not
2606          * match the connection's security index, ignore the packet */
2607         if (np->header.securityIndex != conn->securityIndex) {
2608 #ifdef  RX_ENABLE_LOCKS
2609             MUTEX_EXIT(&call->lock);
2610 #endif
2611             MUTEX_ENTER(&conn->conn_data_lock);
2612             conn->refCount--;       
2613             MUTEX_EXIT(&conn->conn_data_lock);
2614             return np;
2615         }
2616
2617         /* If we're receiving the response, then all transmit packets are
2618          * implicitly acknowledged.  Get rid of them. */
2619         if (np->header.type == RX_PACKET_TYPE_DATA) {
2620 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2621             /* XXX Hack. Because we must release the global rx lock when
2622              * sending packets (osi_NetSend) we drop all acks while we're
2623              * traversing the tq in rxi_Start sending packets out because
2624              * packets may move to the freePacketQueue as result of being here!
2625              * So we drop these packets until we're safely out of the
2626              * traversing. Really ugly! 
2627              * For fine grain RX locking, we set the acked field in the
2628              * packets and let rxi_Start remove them from the transmit queue.
2629              */
2630             if (call->flags & RX_CALL_TQ_BUSY) {
2631 #ifdef  RX_ENABLE_LOCKS
2632                 rxi_SetAcksInTransmitQueue(call);
2633 #else
2634                 conn->refCount--;
2635                 return np;              /* xmitting; drop packet */
2636 #endif
2637             }
2638             else {
2639                 rxi_ClearTransmitQueue(call, 0);
2640             }
2641 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2642             rxi_ClearTransmitQueue(call, 0);
2643 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2644         } else {
2645           if (np->header.type == RX_PACKET_TYPE_ACK) {
2646         /* now check to see if this is an ack packet acknowledging that the
2647          * server actually *lost* some hard-acked data.  If this happens we
2648          * ignore this packet, as it may indicate that the server restarted in
2649          * the middle of a call.  It is also possible that this is an old ack
2650          * packet.  We don't abort the connection in this case, because this
2651          * *might* just be an old ack packet.  The right way to detect a server
2652          * restart in the midst of a call is to notice that the server epoch
2653          * changed, btw.  */
2654         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2655          * XXX unacknowledged.  I think that this is off-by-one, but
2656          * XXX I don't dare change it just yet, since it will
2657          * XXX interact badly with the server-restart detection 
2658          * XXX code in receiveackpacket.  */
2659             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2660                 MUTEX_ENTER(&rx_stats_mutex);
2661                 rx_stats.spuriousPacketsRead++;
2662                 MUTEX_EXIT(&rx_stats_mutex);
2663                 MUTEX_EXIT(&call->lock);
2664                 MUTEX_ENTER(&conn->conn_data_lock);
2665                 conn->refCount--;
2666                 MUTEX_EXIT(&conn->conn_data_lock);
2667                 return np;
2668             }
2669           }
2670         } /* else not a data packet */
2671     }
2672
2673     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2674     /* Set remote user defined status from packet */
2675     call->remoteStatus = np->header.userStatus;
2676
2677     /* Note the gap between the expected next packet and the actual
2678      * packet that arrived, when the new packet has a smaller serial number
2679      * than expected.  Rioses frequently reorder packets all by themselves,
2680      * so this will be quite important with very large window sizes.
2681      * Skew is checked against 0 here to avoid any dependence on the type of
2682      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2683      * true! 
2684      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2685      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2686      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2687      */
2688     MUTEX_ENTER(&conn->conn_data_lock);
2689     skew = conn->lastSerial - np->header.serial;
2690     conn->lastSerial = np->header.serial;
2691     MUTEX_EXIT(&conn->conn_data_lock);
2692     if (skew > 0) {
2693       register struct rx_peer *peer;
2694       peer = conn->peer;
2695       if (skew > peer->inPacketSkew) {
2696         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2697         peer->inPacketSkew = skew;
2698       }
2699     }
2700
2701     /* Now do packet type-specific processing */
2702     switch (np->header.type) {
2703         case RX_PACKET_TYPE_DATA:
2704             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2705                                        tnop, newcallp);
2706             break;
2707         case RX_PACKET_TYPE_ACK:
2708             /* Respond immediately to ack packets requesting acknowledgement
2709              * (ping packets) */
2710             if (np->header.flags & RX_REQUEST_ACK) {
2711                 if (call->error)
2712                     (void) rxi_SendCallAbort(call, 0, 1, 0);
2713                 else
2714                     (void) rxi_SendAck(call, 0, 0, np->header.serial, 0,
2715                                        RX_ACK_PING_RESPONSE, 1);
2716             }
2717             np = rxi_ReceiveAckPacket(call, np, 1);
2718             break;
2719         case RX_PACKET_TYPE_ABORT:
2720             /* An abort packet: reset the connection, passing the error up to
2721              * the user */
2722             /* What if error is zero? */
2723             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2724             break;
2725         case RX_PACKET_TYPE_BUSY:
2726             /* XXXX */
2727             break;
2728         case RX_PACKET_TYPE_ACKALL:
2729             /* All packets acknowledged, so we can drop all packets previously
2730              * readied for sending */
2731 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2732             /* XXX Hack. We because we can't release the global rx lock when
2733              * sending packets (osi_NetSend) we drop all ack pkts while we're
2734              * traversing the tq in rxi_Start sending packets out because
2735              * packets may move to the freePacketQueue as result of being
2736              * here! So we drop these packets until we're safely out of the
2737              * traversing. Really ugly! 
2738              * For fine grain RX locking, we set the acked field in the packets
2739              * and let rxi_Start remove the packets from the transmit queue.
2740              */
2741             if (call->flags & RX_CALL_TQ_BUSY) {
2742 #ifdef  RX_ENABLE_LOCKS
2743                 rxi_SetAcksInTransmitQueue(call);
2744                 break;
2745 #else /* RX_ENABLE_LOCKS */
2746                 conn->refCount--;
2747                 return np;              /* xmitting; drop packet */
2748 #endif /* RX_ENABLE_LOCKS */
2749             }
2750 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2751             rxi_ClearTransmitQueue(call, 0);
2752             break;
2753         default:
2754             /* Should not reach here, unless the peer is broken: send an abort
2755              * packet */
2756             rxi_CallError(call, RX_PROTOCOL_ERROR);
2757             np = rxi_SendCallAbort(call, np, 1, 0);
2758             break;
2759     };
2760     /* Note when this last legitimate packet was received, for keep-alive
2761      * processing.  Note, we delay getting the time until now in the hope that
2762      * the packet will be delivered to the user before any get time is required
2763      * (if not, then the time won't actually be re-evaluated here). */
2764     call->lastReceiveTime = clock_Sec();
2765     MUTEX_EXIT(&call->lock);
2766     MUTEX_ENTER(&conn->conn_data_lock);
2767     conn->refCount--;
2768     MUTEX_EXIT(&conn->conn_data_lock);
2769     return np;
2770 }
2771
2772 /* return true if this is an "interesting" connection from the point of view
2773     of someone trying to debug the system */
2774 int rxi_IsConnInteresting(struct rx_connection *aconn)
2775 {
2776     register int i;
2777     register struct rx_call *tcall;
2778
2779     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2780         return 1;
2781     for(i=0;i<RX_MAXCALLS;i++) {
2782         tcall = aconn->call[i];
2783         if (tcall) {
2784             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2785                 return 1;
2786             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2787                 return 1;
2788         }
2789     }
2790     return 0;
2791 }
2792
2793 #ifdef KERNEL
2794 /* if this is one of the last few packets AND it wouldn't be used by the
2795    receiving call to immediately satisfy a read request, then drop it on
2796    the floor, since accepting it might prevent a lock-holding thread from
2797    making progress in its reading. If a call has been cleared while in
2798    the precall state then ignore all subsequent packets until the call
2799    is assigned to a thread. */
2800
2801 static int TooLow(struct rx_packet *ap, struct rx_call *acall)
2802 {
2803     int rc=0;
2804     MUTEX_ENTER(&rx_stats_mutex);
2805     if (((ap->header.seq != 1) &&
2806          (acall->flags & RX_CALL_CLEARED) &&
2807          (acall->state == RX_STATE_PRECALL)) ||
2808         ((rx_nFreePackets < rxi_dataQuota+2) &&
2809          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2810            && (acall->flags & RX_CALL_READER_WAIT)))) {
2811         rc = 1;
2812     }
2813     MUTEX_EXIT(&rx_stats_mutex);
2814     return rc;
2815 }
2816 #endif /* KERNEL */
2817
2818 static void rxi_CheckReachEvent(struct rxevent *event, 
2819         struct rx_connection *conn, struct rx_call *acall)
2820 {
2821     struct rx_call *call = acall;
2822     struct clock when;
2823     int i, waiting;
2824
2825     MUTEX_ENTER(&conn->conn_data_lock);
2826     conn->checkReachEvent = NULL;
2827     waiting = conn->flags & RX_CONN_ATTACHWAIT;
2828     if (event) conn->refCount--;
2829     MUTEX_EXIT(&conn->conn_data_lock);
2830
2831     if (waiting) {
2832         if (!call) {
2833             MUTEX_ENTER(&conn->conn_call_lock);
2834             MUTEX_ENTER(&conn->conn_data_lock);
2835             for (i=0; i<RX_MAXCALLS; i++) {
2836                 struct rx_call *tc = conn->call[i];
2837                 if (tc && tc->state == RX_STATE_PRECALL) {
2838                     call = tc;
2839                     break;
2840                 }
2841             }
2842             if (!call)
2843                 /* Indicate that rxi_CheckReachEvent is no longer running by
2844                  * clearing the flag.  Must be atomic under conn_data_lock to
2845                  * avoid a new call slipping by: rxi_CheckConnReach holds
2846                  * conn_data_lock while checking RX_CONN_ATTACHWAIT.
2847                  */
2848                 conn->flags &= ~RX_CONN_ATTACHWAIT;
2849             MUTEX_EXIT(&conn->conn_data_lock);
2850             MUTEX_EXIT(&conn->conn_call_lock);
2851         }
2852
2853         if (call) {
2854             if (call != acall) MUTEX_ENTER(&call->lock);
2855             rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
2856             if (call != acall) MUTEX_EXIT(&call->lock);
2857
2858             clock_GetTime(&when);
2859             when.sec += RX_CHECKREACH_TIMEOUT;
2860             MUTEX_ENTER(&conn->conn_data_lock);
2861             if (!conn->checkReachEvent) {
2862                 conn->refCount++;
2863                 conn->checkReachEvent =
2864                     rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
2865             }
2866             MUTEX_EXIT(&conn->conn_data_lock);
2867         }
2868     }
2869 }
2870
2871 static int rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
2872 {
2873     struct rx_service *service = conn->service;
2874     struct rx_peer *peer = conn->peer;
2875     afs_uint32 now, lastReach;
2876
2877     if (service->checkReach == 0)
2878         return 0;
2879
2880     now = clock_Sec();
2881     MUTEX_ENTER(&peer->peer_lock);
2882     lastReach = peer->lastReachTime;
2883     MUTEX_EXIT(&peer->peer_lock);
2884     if (now - lastReach < RX_CHECKREACH_TTL)
2885         return 0;
2886
2887     MUTEX_ENTER(&conn->conn_data_lock);
2888     if (conn->flags & RX_CONN_ATTACHWAIT) {
2889         MUTEX_EXIT(&conn->conn_data_lock);
2890         return 1;
2891     }
2892     conn->flags |= RX_CONN_ATTACHWAIT;
2893     MUTEX_EXIT(&conn->conn_data_lock);
2894     if (!conn->checkReachEvent)
2895         rxi_CheckReachEvent(NULL, conn, call);
2896
2897     return 1;
2898 }
2899
2900 /* try to attach call, if authentication is complete */
2901 static void TryAttach(register struct rx_call *acall, 
2902         register osi_socket socket, register int *tnop, 
2903         register struct rx_call **newcallp, int reachOverride)
2904 {
2905     struct rx_connection *conn = acall->conn;
2906
2907     if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
2908         /* Don't attach until we have any req'd. authentication. */
2909         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2910             if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
2911                 rxi_AttachServerProc(acall, socket, tnop, newcallp);
2912                 /* Note:  this does not necessarily succeed; there
2913                  * may not any proc available
2914                  */
2915         }
2916         else {
2917             rxi_ChallengeOn(acall->conn);
2918         }
2919     }
2920 }
2921
2922 /* A data packet has been received off the interface.  This packet is
2923  * appropriate to the call (the call is in the right state, etc.).  This
2924  * routine can return a packet to the caller, for re-use */
2925
2926 struct rx_packet *rxi_ReceiveDataPacket(register struct rx_call *call, 
2927         register struct rx_packet *np, int istack, osi_socket socket, 
2928         afs_uint32 host, u_short port, int *tnop, struct rx_call **newcallp)
2929 {
2930     int ackNeeded = 0;
2931     int newPackets = 0;
2932     int didHardAck = 0;
2933     int haveLast = 0;
2934     afs_uint32 seq, serial, flags;
2935     int isFirst;
2936     struct rx_packet *tnp;
2937     struct clock when;
2938     MUTEX_ENTER(&rx_stats_mutex);
2939     rx_stats.dataPacketsRead++;
2940     MUTEX_EXIT(&rx_stats_mutex);
2941
2942 #ifdef KERNEL
2943     /* If there are no packet buffers, drop this new packet, unless we can find
2944      * packet buffers from inactive calls */
2945     if (!call->error &&
2946         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2947         MUTEX_ENTER(&rx_freePktQ_lock);
2948         rxi_NeedMorePackets = TRUE;
2949         MUTEX_EXIT(&rx_freePktQ_lock);
2950         MUTEX_ENTER(&rx_stats_mutex);
2951         rx_stats.noPacketBuffersOnRead++;
2952         MUTEX_EXIT(&rx_stats_mutex);
2953         call->rprev = np->header.serial;
2954         rxi_calltrace(RX_TRACE_DROP, call);
2955         dpf (("packet %x dropped on receipt - quota problems", np));
2956         if (rxi_doreclaim)
2957             rxi_ClearReceiveQueue(call);
2958         clock_GetTime(&when);
2959         clock_Add(&when, &rx_softAckDelay);
2960         if (!call->delayedAckEvent ||
2961             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2962             rxevent_Cancel(call->delayedAckEvent, call,
2963                            RX_CALL_REFCOUNT_DELAY);
2964             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2965             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2966                                                  call, 0);
2967         }
2968         /* we've damaged this call already, might as well do it in. */
2969         return np;
2970     }
2971 #endif /* KERNEL */
2972
2973     /*
2974      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2975      * packet is one of several packets transmitted as a single
2976      * datagram. Do not send any soft or hard acks until all packets
2977      * in a jumbogram have been processed. Send negative acks right away.
2978      */
2979     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2980         /* tnp is non-null when there are more packets in the
2981          * current jumbo gram */
2982         if (tnp) {
2983             if (np)
2984                 rxi_FreePacket(np);
2985             np = tnp;
2986         }
2987
2988         seq = np->header.seq;
2989         serial = np->header.serial;
2990         flags = np->header.flags;
2991
2992         /* If the call is in an error state, send an abort message */
2993         if (call->error)
2994             return rxi_SendCallAbort(call, np, istack, 0);
2995
2996         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2997          * AFS 3.5 jumbogram. */
2998         if (flags & RX_JUMBO_PACKET) {
2999             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3000         } else {
3001             tnp = NULL;
3002         }
3003
3004         if (np->header.spare != 0) {
3005             MUTEX_ENTER(&call->conn->conn_data_lock);
3006             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3007             MUTEX_EXIT(&call->conn->conn_data_lock);
3008         }
3009
3010         /* The usual case is that this is the expected next packet */
3011         if (seq == call->rnext) {
3012
3013             /* Check to make sure it is not a duplicate of one already queued */
3014             if (queue_IsNotEmpty(&call->rq) 
3015                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3016                 MUTEX_ENTER(&rx_stats_mutex);
3017                 rx_stats.dupPacketsRead++;
3018                 MUTEX_EXIT(&rx_stats_mutex);
3019                 dpf (("packet %x dropped on receipt - duplicate", np));
3020                 rxevent_Cancel(call->delayedAckEvent, call,
3021                                RX_CALL_REFCOUNT_DELAY);
3022                 np = rxi_SendAck(call, np, seq, serial,
3023                                  flags, RX_ACK_DUPLICATE, istack);
3024                 ackNeeded = 0;
3025                 call->rprev = seq;
3026                 continue;
3027             }
3028
3029             /* It's the next packet. Stick it on the receive queue
3030              * for this call. Set newPackets to make sure we wake
3031              * the reader once all packets have been processed */
3032             queue_Prepend(&call->rq, np);
3033             call->nSoftAcks++;
3034             np = NULL; /* We can't use this anymore */
3035             newPackets = 1;
3036
3037             /* If an ack is requested then set a flag to make sure we
3038              * send an acknowledgement for this packet */
3039             if (flags & RX_REQUEST_ACK) {
3040                 ackNeeded = 1;
3041             }
3042
3043             /* Keep track of whether we have received the last packet */
3044             if (flags & RX_LAST_PACKET) {
3045                 call->flags |= RX_CALL_HAVE_LAST;
3046                 haveLast = 1;
3047             }
3048
3049             /* Check whether we have all of the packets for this call */
3050             if (call->flags & RX_CALL_HAVE_LAST) {
3051                 afs_uint32 tseq;                /* temporary sequence number */
3052                 struct rx_packet *tp;   /* Temporary packet pointer */
3053                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
3054
3055                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3056                     if (tseq != tp->header.seq)
3057                         break;
3058                     if (tp->header.flags & RX_LAST_PACKET) {
3059                         call->flags |= RX_CALL_RECEIVE_DONE;
3060                         break;
3061                     }
3062                     tseq++;
3063                 }
3064             }
3065
3066             /* Provide asynchronous notification for those who want it
3067              * (e.g. multi rx) */
3068             if (call->arrivalProc) {
3069                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3070                                      (int) call->arrivalProcArg);
3071                 call->arrivalProc = (VOID (*)()) 0;
3072             }
3073
3074             /* Update last packet received */
3075             call->rprev = seq;
3076
3077             /* If there is no server process serving this call, grab
3078              * one, if available. We only need to do this once. If a
3079              * server thread is available, this thread becomes a server
3080              * thread and the server thread becomes a listener thread. */
3081             if (isFirst) {
3082                 TryAttach(call, socket, tnop, newcallp, 0);
3083             }
3084         }       
3085         /* This is not the expected next packet. */
3086         else {
3087             /* Determine whether this is a new or old packet, and if it's
3088              * a new one, whether it fits into the current receive window.
3089              * Also figure out whether the packet was delivered in sequence.
3090              * We use the prev variable to determine whether the new packet
3091              * is the successor of its immediate predecessor in the
3092              * receive queue, and the missing flag to determine whether
3093              * any of this packets predecessors are missing.  */
3094
3095             afs_uint32 prev;            /* "Previous packet" sequence number */
3096             struct rx_packet *tp;       /* Temporary packet pointer */
3097             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3098             int missing;                /* Are any predecessors missing? */
3099
3100             /* If the new packet's sequence number has been sent to the
3101              * application already, then this is a duplicate */
3102             if (seq < call->rnext) {
3103                 MUTEX_ENTER(&rx_stats_mutex);
3104                 rx_stats.dupPacketsRead++;
3105                 MUTEX_EXIT(&rx_stats_mutex);
3106                 rxevent_Cancel(call->delayedAckEvent, call,
3107                                RX_CALL_REFCOUNT_DELAY);
3108                 np = rxi_SendAck(call, np, seq, serial,
3109                                  flags, RX_ACK_DUPLICATE, istack);
3110                 ackNeeded = 0;
3111                 call->rprev = seq;
3112                 continue;
3113             }
3114
3115             /* If the sequence number is greater than what can be
3116              * accomodated by the current window, then send a negative
3117              * acknowledge and drop the packet */
3118             if ((call->rnext + call->rwind) <= seq) {
3119                 rxevent_Cancel(call->delayedAckEvent, call,
3120                                RX_CALL_REFCOUNT_DELAY);
3121                 np = rxi_SendAck(call, np, seq, serial,
3122                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3123                 ackNeeded = 0;
3124                 call->rprev = seq;
3125                 continue;
3126             }
3127
3128             /* Look for the packet in the queue of old received packets */
3129             for (prev = call->rnext - 1, missing = 0,
3130                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3131                 /*Check for duplicate packet */
3132                 if (seq == tp->header.seq) {
3133                     MUTEX_ENTER(&rx_stats_mutex);
3134                     rx_stats.dupPacketsRead++;
3135                     MUTEX_EXIT(&rx_stats_mutex);
3136                     rxevent_Cancel(call->delayedAckEvent, call,
3137                                    RX_CALL_REFCOUNT_DELAY);
3138                     np = rxi_SendAck(call, np, seq, serial, 
3139                                      flags, RX_ACK_DUPLICATE, istack);
3140                     ackNeeded = 0;
3141                     call->rprev = seq;
3142                     goto nextloop;
3143                 }
3144                 /* If we find a higher sequence packet, break out and
3145                  * insert the new packet here. */
3146                 if (seq < tp->header.seq) break;
3147                 /* Check for missing packet */
3148                 if (tp->header.seq != prev+1) {
3149                     missing = 1;
3150                 }
3151
3152                 prev = tp->header.seq;
3153             }
3154
3155             /* Keep track of whether we have received the last packet. */
3156             if (flags & RX_LAST_PACKET) {
3157                 call->flags |= RX_CALL_HAVE_LAST;
3158             }
3159
3160             /* It's within the window: add it to the the receive queue.
3161              * tp is left by the previous loop either pointing at the
3162              * packet before which to insert the new packet, or at the
3163              * queue head if the queue is empty or the packet should be
3164              * appended. */
3165             queue_InsertBefore(tp, np);
3166             call->nSoftAcks++;
3167             np = NULL;
3168
3169             /* Check whether we have all of the packets for this call */
3170             if ((call->flags & RX_CALL_HAVE_LAST)
3171              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3172                 afs_uint32 tseq;                /* temporary sequence number */
3173
3174                 for (tseq = call->rnext,
3175                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3176                     if (tseq != tp->header.seq)
3177                         break;
3178                     if (tp->header.flags & RX_LAST_PACKET) {
3179                         call->flags |= RX_CALL_RECEIVE_DONE;
3180                         break;
3181                     }
3182                     tseq++;
3183                 }
3184             }
3185
3186             /* We need to send an ack of the packet is out of sequence, 
3187              * or if an ack was requested by the peer. */
3188             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3189                 ackNeeded = 1;
3190             }
3191
3192             /* Acknowledge the last packet for each call */
3193             if (flags & RX_LAST_PACKET) {
3194                 haveLast = 1;
3195             }
3196
3197             call->rprev = seq;
3198         }
3199 nextloop:;
3200     }
3201
3202     if (newPackets) {
3203         /*
3204          * If the receiver is waiting for an iovec, fill the iovec
3205          * using the data from the receive queue */
3206         if (call->flags & RX_CALL_IOVEC_WAIT) {
3207             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3208             /* the call may have been aborted */
3209             if (call->error) {
3210                 return NULL;
3211             }
3212             if (didHardAck) {
3213                 ackNeeded = 0;
3214             }
3215         }
3216
3217         /* Wakeup the reader if any */
3218         if ((call->flags & RX_CALL_READER_WAIT) &&
3219             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3220              (call->iovNext >= call->iovMax) ||
3221              (call->flags & RX_CALL_RECEIVE_DONE))) {
3222             call->flags &= ~RX_CALL_READER_WAIT;
3223 #ifdef  RX_ENABLE_LOCKS
3224             CV_BROADCAST(&call->cv_rq);
3225 #else
3226             osi_rxWakeup(&call->rq);
3227 #endif
3228         }
3229     }
3230
3231     /*
3232      * Send an ack when requested by the peer, or once every
3233      * rxi_SoftAckRate packets until the last packet has been
3234      * received. Always send a soft ack for the last packet in
3235      * the server's reply. */
3236     if (ackNeeded) {
3237         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3238         np = rxi_SendAck(call, np, seq, serial, flags,
3239                          RX_ACK_REQUESTED, istack);
3240     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3241         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3242         np = rxi_SendAck(call, np, seq, serial, flags,
3243                          RX_ACK_IDLE, istack);
3244     } else if (call->nSoftAcks) {
3245         clock_GetTime(&when);
3246         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3247             clock_Add(&when, &rx_lastAckDelay);
3248         } else {
3249             clock_Add(&when, &rx_softAckDelay);
3250         }
3251         if (!call->delayedAckEvent ||
3252             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3253             rxevent_Cancel(call->delayedAckEvent, call,
3254                            RX_CALL_REFCOUNT_DELAY);
3255             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3256             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3257                                                  call, 0);
3258         }
3259     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3260         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3261     }
3262
3263     return np;
3264 }
3265
3266 #ifdef  ADAPT_WINDOW
3267 static void rxi_ComputeRate();
3268 #endif
3269
3270 static void rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3271 {
3272     struct rx_peer *peer = conn->peer;
3273
3274     MUTEX_ENTER(&peer->peer_lock);
3275     peer->lastReachTime = clock_Sec();
3276     MUTEX_EXIT(&peer->peer_lock);
3277
3278     MUTEX_ENTER(&conn->conn_data_lock);
3279     if (conn->flags & RX_CONN_ATTACHWAIT) {
3280         int i;
3281
3282         conn->flags &= ~RX_CONN_ATTACHWAIT;
3283         MUTEX_EXIT(&conn->conn_data_lock);
3284
3285         for (i=0; i<RX_MAXCALLS; i++) {
3286             struct rx_call *call = conn->call[i];
3287             if (call) {
3288                 if (call != acall) MUTEX_ENTER(&call->lock);
3289                 TryAttach(call, (osi_socket) -1, NULL, NULL, 1);
3290                 if (call != acall) MUTEX_EXIT(&call->lock);
3291             }
3292         }
3293     } else
3294         MUTEX_EXIT(&conn->conn_data_lock);
3295 }
3296
3297 /* rxi_ComputePeerNetStats
3298  *
3299  * Called exclusively by rxi_ReceiveAckPacket to compute network link
3300  * estimates (like RTT and throughput) based on ack packets.  Caller
3301  * must ensure that the packet in question is the right one (i.e.
3302  * serial number matches).
3303  */
3304 static void
3305 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3306         struct rx_ackPacket *ap, struct rx_packet *np)
3307 {
3308     struct rx_peer *peer = call->conn->peer;
3309
3310     /* Use RTT if not delayed by client. */
3311     if (ap->reason != RX_ACK_DELAY)
3312         rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3313 #ifdef ADAPT_WINDOW
3314     rxi_ComputeRate(peer, call, p, np, ap->reason);
3315 #endif
3316 }
3317
3318 /* The real smarts of the whole thing.  */
3319 struct rx_packet *rxi_ReceiveAckPacket(register struct rx_call *call, 
3320         struct rx_packet *np, int istack)
3321 {
3322     struct rx_ackPacket *ap;
3323     int nAcks;
3324     register struct rx_packet *tp;
3325     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3326     register struct rx_connection *conn = call->conn;
3327     struct rx_peer *peer = conn->peer;
3328     afs_uint32 first;
3329     afs_uint32 serial;
3330     /* because there are CM's that are bogus, sending weird values for this. */
3331     afs_uint32 skew = 0;
3332     int nbytes;
3333     int missing;
3334     int acked;
3335     int nNacked = 0;
3336     int newAckCount = 0;
3337     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3338     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3339
3340     MUTEX_ENTER(&rx_stats_mutex);
3341     rx_stats.ackPacketsRead++;
3342     MUTEX_EXIT(&rx_stats_mutex);
3343     ap = (struct rx_ackPacket *) rx_DataOf(np);
3344     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3345     if (nbytes < 0)
3346       return np;       /* truncated ack packet */
3347
3348     /* depends on ack packet struct */
3349     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3350     first = ntohl(ap->firstPacket);
3351     serial = ntohl(ap->serial);
3352     /* temporarily disabled -- needs to degrade over time 
3353        skew = ntohs(ap->maxSkew); */
3354
3355     /* Ignore ack packets received out of order */
3356     if (first < call->tfirst) {
3357         return np;
3358     }
3359
3360     if (np->header.flags & RX_SLOW_START_OK) {
3361         call->flags |= RX_CALL_SLOW_START_OK;
3362     }
3363
3364     if (ap->reason == RX_ACK_PING_RESPONSE)
3365         rxi_UpdatePeerReach(conn, call);
3366     
3367 #ifdef RXDEBUG
3368     if (rx_Log) {
3369       fprintf( rx_Log, 
3370               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3371               ap->reason, ntohl(ap->previousPacket), 
3372               (unsigned int) np->header.seq, (unsigned int) serial, 
3373               (unsigned int) skew, ntohl(ap->firstPacket));
3374         if (nAcks) {
3375             int offset;
3376             for (offset = 0; offset < nAcks; offset++) 
3377                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3378         }
3379         putc('\n', rx_Log);
3380     }
3381 #endif
3382
3383     /* Update the outgoing packet skew value to the latest value of
3384      * the peer's incoming packet skew value.  The ack packet, of
3385      * course, could arrive out of order, but that won't affect things
3386      * much */
3387     MUTEX_ENTER(&peer->peer_lock);
3388     peer->outPacketSkew = skew;
3389
3390     /* Check for packets that no longer need to be transmitted, and
3391      * discard them.  This only applies to packets positively
3392      * acknowledged as having been sent to the peer's upper level.
3393      * All other packets must be retained.  So only packets with
3394      * sequence numbers < ap->firstPacket are candidates. */
3395     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3396         if (tp->header.seq >= first) break;
3397         call->tfirst = tp->header.seq + 1;
3398         if (serial && (tp->header.serial == serial ||
3399                        tp->firstSerial == serial))
3400             rxi_ComputePeerNetStats(call, tp, ap, np);
3401 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3402     /* XXX Hack. Because we have to release the global rx lock when sending
3403      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3404      * in rxi_Start sending packets out because packets may move to the
3405      * freePacketQueue as result of being here! So we drop these packets until
3406      * we're safely out of the traversing. Really ugly! 
3407      * To make it even uglier, if we're using fine grain locking, we can
3408      * set the ack bits in the packets and have rxi_Start remove the packets
3409      * when it's done transmitting.
3410      */
3411         if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3412             newAckCount++;
3413         }
3414         if (call->flags & RX_CALL_TQ_BUSY) {
3415 #ifdef RX_ENABLE_LOCKS
3416             tp->flags |= RX_PKTFLAG_ACKED;
3417             call->flags |= RX_CALL_TQ_SOME_ACKED;
3418 #else /* RX_ENABLE_LOCKS */
3419             break;
3420 #endif /* RX_ENABLE_LOCKS */
3421         } else
3422 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3423         {
3424         queue_Remove(tp);
3425         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3426         }
3427     }
3428
3429 #ifdef ADAPT_WINDOW
3430     /* Give rate detector a chance to respond to ping requests */
3431     if (ap->reason == RX_ACK_PING_RESPONSE) {
3432         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3433     }
3434 #endif
3435
3436     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3437    
3438    /* Now go through explicit acks/nacks and record the results in
3439     * the waiting packets.  These are packets that can't be released
3440     * yet, even with a positive acknowledge.  This positive
3441     * acknowledge only means the packet has been received by the
3442     * peer, not that it will be retained long enough to be sent to
3443     * the peer's upper level.  In addition, reset the transmit timers
3444     * of any missing packets (those packets that must be missing
3445     * because this packet was out of sequence) */
3446
3447     call->nSoftAcked = 0;
3448     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3449         /* Update round trip time if the ack was stimulated on receipt
3450          * of this packet */
3451 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3452 #ifdef RX_ENABLE_LOCKS
3453         if (tp->header.seq >= first)
3454 #endif /* RX_ENABLE_LOCKS */
3455 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3456             if (serial && (tp->header.serial == serial ||
3457                            tp->firstSerial == serial))
3458                 rxi_ComputePeerNetStats(call, tp, ap, np);
3459
3460         /* Set the acknowledge flag per packet based on the
3461          * information in the ack packet. An acknowlegded packet can
3462          * be downgraded when the server has discarded a packet it
3463          * soacked previously, or when an ack packet is received
3464          * out of sequence. */
3465         if (tp->header.seq < first) {
3466             /* Implicit ack information */
3467             if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3468                 newAckCount++;
3469             }
3470             tp->flags |= RX_PKTFLAG_ACKED;
3471         }
3472         else if (tp->header.seq < first + nAcks) {
3473             /* Explicit ack information:  set it in the packet appropriately */
3474             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3475                 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3476                     newAckCount++;
3477                     tp->flags |= RX_PKTFLAG_ACKED;
3478                 }
3479                 if (missing) {
3480                     nNacked++;
3481                 } else {
3482                     call->nSoftAcked++;
3483                 }
3484             } else {
3485                 tp->flags &= ~RX_PKTFLAG_ACKED;
3486                 missing = 1;
3487             }
3488         }
3489         else {
3490             tp->flags &= ~RX_PKTFLAG_ACKED;
3491             missing = 1;
3492         }
3493
3494         /* If packet isn't yet acked, and it has been transmitted at least 
3495          * once, reset retransmit time using latest timeout 
3496          * ie, this should readjust the retransmit timer for all outstanding 
3497          * packets...  So we don't just retransmit when we should know better*/
3498
3499         if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3500           tp->retryTime = tp->timeSent;
3501           clock_Add(&tp->retryTime, &peer->timeout);
3502           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3503           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3504         }
3505     }
3506
3507     /* If the window has been extended by this acknowledge packet,
3508      * then wakeup a sender waiting in alloc for window space, or try
3509      * sending packets now, if he's been sitting on packets due to
3510      * lack of window space */
3511     if (call->tnext < (call->tfirst + call->twind))  {
3512 #ifdef  RX_ENABLE_LOCKS
3513         CV_SIGNAL(&call->cv_twind);
3514 #else
3515         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3516             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3517             osi_rxWakeup(&call->twind);
3518         }
3519 #endif
3520         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3521             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3522         }
3523     }
3524
3525     /* if the ack packet has a receivelen field hanging off it,
3526      * update our state */
3527     if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3528       afs_uint32 tSize;
3529
3530       /* If the ack packet has a "recommended" size that is less than 
3531        * what I am using now, reduce my size to match */
3532       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3533                     sizeof(afs_int32), &tSize);
3534       tSize = (afs_uint32) ntohl(tSize);
3535       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3536
3537       /* Get the maximum packet size to send to this peer */
3538       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3539                     &tSize);
3540       tSize = (afs_uint32)ntohl(tSize);
3541       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3542       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3543
3544       /* sanity check - peer might have restarted with different params.
3545        * If peer says "send less", dammit, send less...  Peer should never 
3546        * be unable to accept packets of the size that prior AFS versions would
3547        * send without asking.  */
3548       if (peer->maxMTU != tSize) {
3549           peer->maxMTU = tSize;
3550           peer->MTU = MIN(tSize, peer->MTU);
3551           call->MTU = MIN(call->MTU, tSize);
3552           peer->congestSeq++;
3553       }
3554
3555       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3556           /* AFS 3.4a */
3557           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3558                         sizeof(afs_int32), &tSize);
3559           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3560           if (tSize < call->twind) {       /* smaller than our send */
3561               call->twind = tSize;         /* window, we must send less... */
3562               call->ssthresh = MIN(call->twind, call->ssthresh);
3563           }
3564
3565           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3566            * network MTU confused with the loopback MTU. Calculate the
3567            * maximum MTU here for use in the slow start code below.
3568            */
3569           maxMTU = peer->maxMTU;
3570           /* Did peer restart with older RX version? */
3571           if (peer->maxDgramPackets > 1) {
3572               peer->maxDgramPackets = 1;
3573           }
3574       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3575           /* AFS 3.5 */
3576           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3577                         sizeof(afs_int32), &tSize);
3578           tSize = (afs_uint32) ntohl(tSize);
3579           /*
3580            * As of AFS 3.5 we set the send window to match the receive window. 
3581            */
3582           if (tSize < call->twind) {
3583               call->twind = tSize;
3584               call->ssthresh = MIN(call->twind, call->ssthresh);
3585           } else if (tSize > call->twind) {
3586               call->twind = tSize;
3587           }
3588
3589           /*
3590            * As of AFS 3.5, a jumbogram is more than one fixed size
3591            * packet transmitted in a single UDP datagram. If the remote
3592            * MTU is smaller than our local MTU then never send a datagram
3593            * larger than the natural MTU.
3594            */
3595           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3596                         sizeof(afs_int32), &tSize);
3597           maxDgramPackets = (afs_uint32) ntohl(tSize);
3598           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3599           maxDgramPackets = MIN(maxDgramPackets,
3600                                 (int)(peer->ifDgramPackets));
3601           maxDgramPackets = MIN(maxDgramPackets, tSize);
3602           if (maxDgramPackets > 1) {
3603             peer->maxDgramPackets = maxDgramPackets;
3604             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3605           } else {
3606             peer->maxDgramPackets = 1;
3607             call->MTU = peer->natMTU;
3608           }
3609        } else if (peer->maxDgramPackets > 1) {
3610           /* Restarted with lower version of RX */
3611           peer->maxDgramPackets = 1;
3612        }
3613     } else if (peer->maxDgramPackets > 1 ||
3614                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3615         /* Restarted with lower version of RX */
3616         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3617         peer->natMTU = OLD_MAX_PACKET_SIZE;
3618         peer->MTU = OLD_MAX_PACKET_SIZE;
3619         peer->maxDgramPackets = 1;
3620         peer->nDgramPackets = 1;
3621         peer->congestSeq++;
3622         call->MTU = OLD_MAX_PACKET_SIZE;
3623     }
3624
3625     if (nNacked) {
3626         /*
3627          * Calculate how many datagrams were successfully received after
3628          * the first missing packet and adjust the negative ack counter
3629          * accordingly.
3630          */
3631         call->nAcks = 0;
3632         call->nNacks++;
3633         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3634         if (call->nNacks < nNacked) {
3635             call->nNacks = nNacked;
3636         }
3637     } else {
3638         if (newAckCount) {
3639             call->nAcks++;
3640         }
3641         call->nNacks = 0;
3642     }
3643
3644     if (call->flags & RX_CALL_FAST_RECOVER) {
3645         if (nNacked) {
3646             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3647         } else {
3648             call->flags &= ~RX_CALL_FAST_RECOVER;
3649             call->cwind = call->nextCwind;
3650             call->nextCwind = 0;
3651             call->nAcks = 0;
3652         }
3653         call->nCwindAcks = 0;
3654     }
3655     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3656         /* Three negative acks in a row trigger congestion recovery */
3657 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3658         MUTEX_EXIT(&peer->peer_lock);
3659         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3660                 /* someone else is waiting to start recovery */
3661                 return np;
3662         }
3663         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3664         while (call->flags & RX_CALL_TQ_BUSY) {
3665             call->flags |= RX_CALL_TQ_WAIT;
3666 #ifdef RX_ENABLE_LOCKS
3667             CV_WAIT(&call->cv_tq, &call->lock);
3668 #else /* RX_ENABLE_LOCKS */
3669             osi_rxSleep(&call->tq);
3670 #endif /* RX_ENABLE_LOCKS */
3671         }
3672         MUTEX_ENTER(&peer->peer_lock);
3673 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3674         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3675         call->flags |= RX_CALL_FAST_RECOVER;
3676         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3677         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3678                           rx_maxSendWindow);
3679         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3680         call->nextCwind = call->ssthresh;
3681         call->nAcks = 0;
3682         call->nNacks = 0;
3683         peer->MTU = call->MTU;
3684         peer->cwind = call->nextCwind;
3685         peer->nDgramPackets = call->nDgramPackets;
3686         peer->congestSeq++;
3687         call->congestSeq = peer->congestSeq;
3688         /* Reset the resend times on the packets that were nacked
3689          * so we will retransmit as soon as the window permits*/
3690         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3691             if (acked) {
3692                 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3693                     clock_Zero(&tp->retryTime);
3694                 }
3695             } else if (tp->flags & RX_PKTFLAG_ACKED) {
3696                 acked = 1;
3697             }
3698         }
3699     } else {
3700         /* If cwind is smaller than ssthresh, then increase
3701          * the window one packet for each ack we receive (exponential
3702          * growth).
3703          * If cwind is greater than or equal to ssthresh then increase
3704          * the congestion window by one packet for each cwind acks we
3705          * receive (linear growth).  */
3706         if (call->cwind < call->ssthresh) {
3707             call->cwind = MIN((int)call->ssthresh,
3708                               (int)(call->cwind + newAckCount));
3709             call->nCwindAcks = 0;
3710         } else {
3711             call->nCwindAcks += newAckCount;
3712             if (call->nCwindAcks >= call->cwind) {
3713                 call->nCwindAcks = 0;
3714                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3715             }
3716         }
3717         /*
3718          * If we have received several acknowledgements in a row then
3719          * it is time to increase the size of our datagrams
3720          */
3721         if ((int)call->nAcks > rx_nDgramThreshold) {
3722             if (peer->maxDgramPackets > 1) {
3723                 if (call->nDgramPackets < peer->maxDgramPackets) {
3724                     call->nDgramPackets++;
3725                 }
3726                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3727             } else if (call->MTU < peer->maxMTU) {
3728                 call->MTU += peer->natMTU;
3729                 call->MTU = MIN(call->MTU, peer->maxMTU);
3730             }
3731             call->nAcks = 0;
3732         }
3733     }
3734
3735     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3736
3737     /* Servers need to hold the call until all response packets have
3738      * been acknowledged. Soft acks are good enough since clients
3739      * are not allowed to clear their receive queues. */
3740     if (call->state == RX_STATE_HOLD &&
3741         call->tfirst + call->nSoftAcked >= call->tnext) {
3742         call->state = RX_STATE_DALLY;
3743         rxi_ClearTransmitQueue(call, 0);
3744     } else if (!queue_IsEmpty(&call->tq)) {
3745         rxi_Start(0, call, istack);
3746     }
3747     return np;
3748 }
3749
3750 /* Received a response to a challenge packet */
3751 struct rx_packet *rxi_ReceiveResponsePacket(register struct rx_connection *conn, 
3752         register struct rx_packet *np, int istack)
3753 {
3754     int error;
3755
3756     /* Ignore the packet if we're the client */
3757     if (conn->type == RX_CLIENT_CONNECTION) return np;
3758
3759     /* If already authenticated, ignore the packet (it's probably a retry) */
3760     if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3761         return np;
3762
3763     /* Otherwise, have the security object evaluate the response packet */
3764     error = RXS_CheckResponse(conn->securityObject, conn, np);
3765     if (error) {
3766         /* If the response is invalid, reset the connection, sending
3767          * an abort to the peer */
3768 #ifndef KERNEL
3769         rxi_Delay(1);
3770 #endif
3771         rxi_ConnectionError(conn, error);
3772         MUTEX_ENTER(&conn->conn_data_lock);
3773         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3774         MUTEX_EXIT(&conn->conn_data_lock);
3775         return np;
3776     }
3777     else {
3778         /* If the response is valid, any calls waiting to attach
3779          * servers can now do so */
3780         int i;
3781
3782         for (i=0; i<RX_MAXCALLS; i++) {
3783             struct rx_call *call = conn->call[i];
3784             if (call) {
3785                 MUTEX_ENTER(&call->lock);
3786                  if (call->state == RX_STATE_PRECALL)
3787                      rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
3788                 MUTEX_EXIT(&call->lock);
3789             }
3790         }
3791
3792         /* Update the peer reachability information, just in case
3793          * some calls went into attach-wait while we were waiting
3794          * for authentication..
3795          */
3796         rxi_UpdatePeerReach(conn, NULL);
3797     }
3798     return np;
3799 }
3800
3801 /* A client has received an authentication challenge: the security
3802  * object is asked to cough up a respectable response packet to send
3803  * back to the server.  The server is responsible for retrying the
3804  * challenge if it fails to get a response. */
3805
3806 struct rx_packet *rxi_ReceiveChallengePacket(register struct rx_connection *conn, 
3807         register struct rx_packet *np, int istack)
3808 {
3809     int error;
3810
3811     /* Ignore the challenge if we're the server */
3812     if (conn->type == RX_SERVER_CONNECTION) return np;
3813
3814     /* Ignore the challenge if the connection is otherwise idle; someone's
3815      * trying to use us as an oracle. */
3816     if (!rxi_HasActiveCalls(conn)) return np;
3817
3818     /* Send the security object the challenge packet.  It is expected to fill
3819      * in the response. */
3820     error = RXS_GetResponse(conn->securityObject, conn, np);
3821
3822     /* If the security object is unable to return a valid response, reset the
3823      * connection and send an abort to the peer.  Otherwise send the response
3824      * packet to the peer connection. */
3825     if (error) {
3826         rxi_ConnectionError(conn, error);
3827         MUTEX_ENTER(&conn->conn_data_lock);
3828         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3829         MUTEX_EXIT(&conn->conn_data_lock);
3830     }
3831     else {
3832         np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3833                              RX_PACKET_TYPE_RESPONSE, NULL, -1, istack);
3834     }
3835     return np;
3836 }
3837
3838
3839 /* Find an available server process to service the current request in
3840  * the given call structure.  If one isn't available, queue up this
3841  * call so it eventually gets one */
3842 void rxi_AttachServerProc(register struct rx_call *call, 
3843         register osi_socket socket, register int *tnop, register struct rx_call **newcallp)
3844 {
3845     register struct rx_serverQueueEntry *sq;
3846     register struct rx_service *service = call->conn->service;
3847 #ifdef RX_ENABLE_LOCKS
3848     register int haveQuota = 0;
3849 #endif /* RX_ENABLE_LOCKS */
3850     /* May already be attached */
3851     if (call->state == RX_STATE_ACTIVE) return;
3852
3853     MUTEX_ENTER(&rx_serverPool_lock);
3854 #ifdef RX_ENABLE_LOCKS
3855     while(rxi_ServerThreadSelectingCall) {
3856         MUTEX_EXIT(&call->lock);
3857         CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3858         MUTEX_EXIT(&rx_serverPool_lock);
3859         MUTEX_ENTER(&call->lock);
3860         MUTEX_ENTER(&rx_serverPool_lock);
3861         /* Call may have been attached */
3862         if (call->state == RX_STATE_ACTIVE) return;
3863     }
3864
3865     haveQuota = QuotaOK(service);
3866     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3867         /* If there are no processes available to service this call,
3868          * put the call on the incoming call queue (unless it's
3869          * already on the queue).
3870          */
3871         if (haveQuota)
3872             ReturnToServerPool(service);
3873         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3874             call->flags |= RX_CALL_WAIT_PROC;
3875             MUTEX_ENTER(&rx_stats_mutex);
3876             rx_nWaiting++;
3877             MUTEX_EXIT(&rx_stats_mutex);
3878             rxi_calltrace(RX_CALL_ARRIVAL, call);
3879             SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3880             queue_Append(&rx_incomingCallQueue, call);
3881         }
3882     }
3883 #else /* RX_ENABLE_LOCKS */
3884     if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3885         /* If there are no processes available to service this call,
3886          * put the call on the incoming call queue (unless it's
3887          * already on the queue).
3888          */
3889         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3890             call->flags |= RX_CALL_WAIT_PROC;
3891             rx_nWaiting++;
3892             rxi_calltrace(RX_CALL_ARRIVAL, call);
3893             queue_Append(&rx_incomingCallQueue, call);
3894         }
3895     }
3896 #endif /* RX_ENABLE_LOCKS */
3897     else {
3898         sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3899
3900         /* If hot threads are enabled, and both newcallp and sq->socketp
3901          * are non-null, then this thread will process the call, and the
3902          * idle server thread will start listening on this threads socket.
3903          */
3904         queue_Remove(sq);
3905         if (rx_enable_hot_thread && newcallp && sq->socketp) {
3906             *newcallp = call;
3907             *tnop = sq->tno;
3908             *sq->socketp = socket;
3909             clock_GetTime(&call->startTime);
3910             CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3911         } else {
3912             sq->newcall = call;
3913         }
3914         if (call->flags & RX_CALL_WAIT_PROC) {
3915             /* Conservative:  I don't think this should happen */
3916             call->flags &= ~RX_CALL_WAIT_PROC;
3917             MUTEX_ENTER(&rx_stats_mutex);
3918             rx_nWaiting--;
3919             MUTEX_EXIT(&rx_stats_mutex);
3920             queue_Remove(call);
3921         }
3922         call->state = RX_STATE_ACTIVE;
3923         call->mode = RX_MODE_RECEIVING;
3924         if (call->flags & RX_CALL_CLEARED) {
3925             /* send an ack now to start the packet flow up again */
3926             call->flags &= ~RX_CALL_CLEARED;
3927             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3928         }
3929 #ifdef  RX_ENABLE_LOCKS
3930         CV_SIGNAL(&sq->cv);
3931 #else
3932         service->nRequestsRunning++;
3933         if (service->nRequestsRunning <= service->minProcs)
3934           rxi_minDeficit--;
3935         rxi_availProcs--;
3936         osi_rxWakeup(sq);
3937 #endif
3938     }
3939     MUTEX_EXIT(&rx_serverPool_lock);
3940 }
3941
3942 /* Delay the sending of an acknowledge event for a short while, while
3943  * a new call is being prepared (in the case of a client) or a reply
3944  * is being prepared (in the case of a server).  Rather than sending
3945  * an ack packet, an ACKALL packet is sent. */
3946 void rxi_AckAll(struct rxevent *event, register struct rx_call *call, char *dummy)
3947 {
3948 #ifdef RX_ENABLE_LOCKS
3949     if (event) {
3950         MUTEX_ENTER(&call->lock);
3951         call->delayedAckEvent = NULL;
3952         CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3953     }
3954     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3955                     RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3956     if (event)
3957         MUTEX_EXIT(&call->lock);
3958 #else /* RX_ENABLE_LOCKS */
3959     if (event) call->delayedAckEvent = NULL;
3960     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3961                     RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
3962 #endif /* RX_ENABLE_LOCKS */
3963 }
3964
3965 void rxi_SendDelayedAck(struct rxevent *event, register struct rx_call *call, char *dummy)
3966 {
3967 #ifdef RX_ENABLE_LOCKS
3968     if (event) {
3969         MUTEX_ENTER(&call->lock);
3970         if (event == call->delayedAckEvent)
3971             call->delayedAckEvent = NULL;
3972         CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3973     }
3974     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3975     if (event)
3976         MUTEX_EXIT(&call->lock);
3977 #else /* RX_ENABLE_LOCKS */
3978     if (event) call->delayedAckEvent = NULL;
3979     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3980 #endif /* RX_ENABLE_LOCKS */
3981 }
3982
3983
3984 #ifdef RX_ENABLE_LOCKS
3985 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3986  * clearing them out.
3987  */
3988 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call)
3989 {
3990     register struct rx_packet *p, *tp;
3991     int someAcked = 0;
3992
3993      for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3994          if (!p) 
3995              break;
3996          p->flags |= RX_PKTFLAG_ACKED;
3997          someAcked = 1;
3998      }
3999      if (someAcked) {
4000          call->flags |= RX_CALL_TQ_CLEARME;
4001          call->flags |= RX_CALL_TQ_SOME_ACKED;
4002      }
4003
4004      rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4005      rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4006      call->tfirst = call->tnext;
4007      call->nSoftAcked = 0;
4008
4009      if (call->flags & RX_CALL_FAST_RECOVER) {
4010         call->flags &= ~RX_CALL_FAST_RECOVER;
4011         call->cwind = call->nextCwind;
4012         call->nextCwind = 0;
4013      }
4014
4015      CV_SIGNAL(&call->cv_twind);
4016 }
4017 #endif /* RX_ENABLE_LOCKS */
4018
4019 /* Clear out the transmit queue for the current call (all packets have
4020  * been received by peer) */
4021 void rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
4022 {
4023     register struct rx_packet *p, *tp;
4024
4025 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4026     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
4027         int someAcked = 0;
4028         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4029           if (!p) 
4030              break;
4031           p->flags |= RX_PKTFLAG_ACKED;
4032           someAcked = 1;
4033         }
4034         if (someAcked) {
4035             call->flags |= RX_CALL_TQ_CLEARME;
4036             call->flags |= RX_CALL_TQ_SOME_ACKED;
4037         }
4038     } else {
4039 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4040         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
4041             if (!p) 
4042                 break;
4043             queue_Remove(p);
4044             rxi_FreePacket(p);
4045         }
4046 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4047         call->flags &= ~RX_CALL_TQ_CLEARME;
4048     }
4049 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4050
4051     rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
4052     rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
4053     call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
4054     call->nSoftAcked = 0;
4055
4056     if (call->flags & RX_CALL_FAST_RECOVER) {
4057         call->flags &= ~RX_CALL_FAST_RECOVER;
4058         call->cwind = call->nextCwind;
4059     }
4060
4061 #ifdef  RX_ENABLE_LOCKS
4062     CV_SIGNAL(&call->cv_twind);
4063 #else
4064     osi_rxWakeup(&call->twind);
4065 #endif
4066 }
4067
4068 void rxi_ClearReceiveQueue(register struct rx_call *call)
4069 {
4070     register struct rx_packet *p, *tp;
4071     if (queue_IsNotEmpty(&call->rq)) {
4072       for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4073         if (!p)
4074           break;
4075         queue_Remove(p);
4076         rxi_FreePacket(p);
4077         rx_packetReclaims++;
4078       }
4079       call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4080     }
4081     if (call->state == RX_STATE_PRECALL) {
4082         call->flags |= RX_CALL_CLEARED;
4083     }
4084 }
4085
4086 /* Send an abort packet for the specified call */
4087 struct rx_packet *rxi_SendCallAbort(register struct rx_call *call, 
4088         struct rx_packet *packet, int istack, int force)
4089 {
4090   afs_int32 error;
4091   struct clock when;
4092
4093   if (!call->error)
4094     return packet;
4095
4096   /* Clients should never delay abort messages */
4097   if (rx_IsClientConn(call->conn))
4098     force = 1;
4099
4100   if (call->abortCode != call->error) {
4101     call->abortCode = call->error;
4102     call->abortCount = 0;
4103   }
4104
4105   if (force || rxi_callAbortThreshhold == 0 ||
4106       call->abortCount < rxi_callAbortThreshhold) {
4107     if (call->delayedAbortEvent) {
4108         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4109     }
4110     error = htonl(call->error);
4111     call->abortCount++;
4112     packet = rxi_SendSpecial(call, call->conn, packet,
4113                              RX_PACKET_TYPE_ABORT, (char *)&error,
4114                              sizeof(error), istack);
4115   } else if (!call->delayedAbortEvent) {
4116     clock_GetTime(&when);
4117     clock_Addmsec(&when, rxi_callAbortDelay);
4118     CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4119     call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4120                                            call, 0);
4121   }
4122   return packet;
4123 }
4124
4125 /* Send an abort packet for the specified connection.  Packet is an
4126  * optional pointer to a packet that can be used to send the abort.
4127  * Once the number of abort messages reaches the threshhold, an
4128  * event is scheduled to send the abort. Setting the force flag
4129  * overrides sending delayed abort messages.
4130  *
4131  * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4132  *       to send the abort packet.
4133  */
4134 struct rx_packet *rxi_SendConnectionAbort(register struct rx_connection *conn,
4135         struct rx_packet *packet, int istack, int force)
4136 {
4137   afs_int32 error;
4138   struct clock when;
4139
4140   if (!conn->error)
4141     return packet;
4142
4143   /* Clients should never delay abort messages */
4144   if (rx_IsClientConn(conn))
4145     force = 1;
4146
4147   if (force || rxi_connAbortThreshhold == 0 ||
4148       conn->abortCount < rxi_connAbortThreshhold) {
4149     if (conn->delayedAbortEvent) {
4150         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4151     }
4152     error = htonl(conn->error);
4153     conn->abortCount++;
4154     MUTEX_EXIT(&conn->conn_data_lock);
4155     packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4156                              RX_PACKET_TYPE_ABORT, (char *)&error,
4157                              sizeof(error), istack);
4158     MUTEX_ENTER(&conn->conn_data_lock);
4159   } else if (!conn->delayedAbortEvent) {
4160     clock_GetTime(&when);
4161     clock_Addmsec(&when, rxi_connAbortDelay);
4162     conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4163                                            conn, 0);
4164   }
4165   return packet;
4166 }
4167
4168 /* Associate an error all of the calls owned by a connection.  Called
4169  * with error non-zero.  This is only for really fatal things, like
4170  * bad authentication responses.  The connection itself is set in
4171  * error at this point, so that future packets received will be
4172  * rejected. */
4173 void rxi_ConnectionError(register struct rx_connection *conn, 
4174         register afs_int32 error)
4175 {
4176     if (error) {
4177         register int i;
4178         MUTEX_ENTER(&conn->conn_data_lock);
4179         if (conn->challengeEvent)
4180             rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4181         if (conn->checkReachEvent) {
4182             rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
4183             conn->checkReachEvent = 0;
4184             conn->flags &= ~RX_CONN_ATTACHWAIT;
4185             conn->refCount--;
4186         }
4187         MUTEX_EXIT(&conn->conn_data_lock);
4188         for (i=0; i<RX_MAXCALLS; i++) {
4189             struct rx_call *call = conn->call[i];
4190             if (call) {
4191                 MUTEX_ENTER(&call->lock);
4192                 rxi_CallError(call, error);
4193                 MUTEX_EXIT(&call->lock);
4194             }
4195         }
4196         conn->error = error;
4197         MUTEX_ENTER(&rx_stats_mutex);
4198         rx_stats.fatalErrors++;
4199         MUTEX_EXIT(&rx_stats_mutex);
4200     }
4201 }
4202
4203 void rxi_CallError(register struct rx_call *call, afs_int32 error)
4204 {
4205     if (call->error) error = call->error;
4206 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4207     if (!(call->flags & RX_CALL_TQ_BUSY)) {
4208         rxi_ResetCall(call, 0);
4209     }
4210 #else
4211         rxi_ResetCall(call, 0);
4212 #endif
4213     call->error = error;
4214     call->mode = RX_MODE_ERROR;
4215 }
4216
4217 /* Reset various fields in a call structure, and wakeup waiting
4218  * processes.  Some fields aren't changed: state & mode are not
4219  * touched (these must be set by the caller), and bufptr, nLeft, and
4220  * nFree are not reset, since these fields are manipulated by
4221  * unprotected macros, and may only be reset by non-interrupting code.
4222  */
4223 #ifdef ADAPT_WINDOW
4224 /* this code requires that call->conn be set properly as a pre-condition. */
4225 #endif /* ADAPT_WINDOW */
4226
4227 void rxi_ResetCall(register struct rx_call *call, register int newcall)
4228 {
4229     register int flags;
4230     register struct rx_peer *peer;
4231     struct rx_packet *packet;
4232
4233     /* Notify anyone who is waiting for asynchronous packet arrival */
4234     if (call->arrivalProc) {
4235         (*call->arrivalProc)(call, call->arrivalProcHandle, (int) call->arrivalProcArg);
4236         call->arrivalProc = (VOID (*)()) 0;
4237     }
4238
4239     if (call->delayedAbortEvent) {
4240         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4241         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4242         if (packet) {
4243             rxi_SendCallAbort(call, packet, 0, 1);
4244             rxi_FreePacket(packet);
4245         }
4246     }
4247
4248     /*
4249      * Update the peer with the congestion information in this call
4250      * so other calls on this connection can pick up where this call
4251      * left off. If the congestion sequence numbers don't match then
4252      * another call experienced a retransmission.
4253      */
4254     peer = call->conn->peer;
4255     MUTEX_ENTER(&peer->peer_lock);
4256     if (!newcall) {
4257         if (call->congestSeq == peer->congestSeq) {
4258             peer->cwind = MAX(peer->cwind, call->cwind);
4259             peer->MTU = MAX(peer->MTU, call->MTU);
4260             peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4261         }
4262     } else {
4263         call->abortCode = 0;
4264         call->abortCount = 0;
4265     }
4266     if (peer->maxDgramPackets > 1) {
4267         call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4268     } else {
4269         call->MTU = peer->MTU;
4270     }
4271     call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4272     call->ssthresh = rx_maxSendWindow;
4273     call->nDgramPackets = peer->nDgramPackets;
4274     call->congestSeq = peer->congestSeq;
4275     MUTEX_EXIT(&peer->peer_lock);
4276
4277     flags = call->flags;
4278     rxi_ClearReceiveQueue(call);
4279 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4280     if (call->flags & RX_CALL_TQ_BUSY) {
4281         call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4282         call->flags |= (flags & RX_CALL_TQ_WAIT);
4283     } else
4284 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4285     {
4286         rxi_ClearTransmitQueue(call, 0);
4287         queue_Init(&call->tq);
4288         call->flags = 0;
4289     }
4290     queue_Init(&call->rq);
4291     call->error = 0;
4292     call->rwind = rx_initReceiveWindow; 
4293     call->twind = rx_initSendWindow; 
4294     call->nSoftAcked = 0;
4295     call->nextCwind = 0;
4296     call->nAcks = 0;
4297     call->nNacks = 0;
4298     call->nCwindAcks = 0;
4299     call->nSoftAcks = 0;
4300     call->nHardAcks = 0;
4301
4302     call->tfirst = call->rnext = call->tnext = 1;
4303     call->rprev = 0;
4304     call->lastAcked = 0;
4305     call->localStatus = call->remoteStatus = 0;
4306
4307     if (flags & RX_CALL_READER_WAIT)  {
4308 #ifdef  RX_ENABLE_LOCKS
4309         CV_BROADCAST(&call->cv_rq);
4310 #else
4311         osi_rxWakeup(&call->rq);
4312 #endif
4313     }
4314     if (flags & RX_CALL_WAIT_PACKETS) {
4315         MUTEX_ENTER(&rx_freePktQ_lock);
4316         rxi_PacketsUnWait();            /* XXX */
4317         MUTEX_EXIT(&rx_freePktQ_lock);
4318     }
4319
4320 #ifdef  RX_ENABLE_LOCKS
4321     CV_SIGNAL(&call->cv_twind);
4322 #else
4323     if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4324         osi_rxWakeup(&call->twind);
4325 #endif
4326
4327 #ifdef RX_ENABLE_LOCKS
4328     /* The following ensures that we don't mess with any queue while some
4329      * other thread might also be doing so. The call_queue_lock field is
4330      * is only modified under the call lock. If the call is in the process
4331      * of being removed from a queue, the call is not locked until the
4332      * the queue lock is dropped and only then is the call_queue_lock field
4333      * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4334      * Note that any other routine which removes a call from a queue has to
4335      * obtain the queue lock before examing the queue and removing the call.
4336      */
4337     if (call->call_queue_lock) {
4338         MUTEX_ENTER(call->call_queue_lock);
4339         if (queue_IsOnQueue(call)) {
4340             queue_Remove(call);
4341             if (flags & RX_CALL_WAIT_PROC) {
4342                 MUTEX_ENTER(&rx_stats_mutex);
4343                 rx_nWaiting--;
4344                 MUTEX_EXIT(&rx_stats_mutex);
4345             }
4346         }
4347         MUTEX_EXIT(call->call_queue_lock);
4348         CLEAR_CALL_QUEUE_LOCK(call);
4349     }
4350 #else /* RX_ENABLE_LOCKS */
4351     if (queue_IsOnQueue(call)) {
4352       queue_Remove(call);
4353       if (flags & RX_CALL_WAIT_PROC)
4354         rx_nWaiting--;
4355     }
4356 #endif /* RX_ENABLE_LOCKS */
4357
4358     rxi_KeepAliveOff(call);
4359     rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4360 }
4361
4362 /* Send an acknowledge for the indicated packet (seq,serial) of the
4363  * indicated call, for the indicated reason (reason).  This
4364  * acknowledge will specifically acknowledge receiving the packet, and
4365  * will also specify which other packets for this call have been
4366  * received.  This routine returns the packet that was used to the
4367  * caller.  The caller is responsible for freeing it or re-using it.
4368  * This acknowledgement also returns the highest sequence number
4369  * actually read out by the higher level to the sender; the sender
4370  * promises to keep around packets that have not been read by the
4371  * higher level yet (unless, of course, the sender decides to abort
4372  * the call altogether).  Any of p, seq, serial, pflags, or reason may
4373  * be set to zero without ill effect.  That is, if they are zero, they
4374  * will not convey any information.  
4375  * NOW there is a trailer field, after the ack where it will safely be
4376  * ignored by mundanes, which indicates the maximum size packet this 
4377  * host can swallow.  */  
4378 /*
4379     register struct rx_packet *optionalPacket;  use to send ack (or null) 
4380     int seq;                     Sequence number of the packet we are acking