rxi-newcall-avoid-race-20020115
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #include <afsconfig.h>
13 #ifdef  KERNEL
14 #include "../afs/param.h"
15 #else
16 #include <afs/param.h>
17 #endif
18
19 RCSID("$Header$");
20
21 #ifdef KERNEL
22 #include "../afs/sysincludes.h"
23 #include "../afs/afsincludes.h"
24 #ifndef UKERNEL
25 #include "../h/types.h"
26 #include "../h/time.h"
27 #include "../h/stat.h"
28 #ifdef  AFS_OSF_ENV
29 #include <net/net_globals.h>
30 #endif  /* AFS_OSF_ENV */
31 #ifdef AFS_LINUX20_ENV
32 #include "../h/socket.h"
33 #endif
34 #include "../netinet/in.h"
35 #include "../afs/afs_args.h"
36 #include "../afs/afs_osi.h"
37 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
38 #include "../h/systm.h"
39 #endif
40 #ifdef RXDEBUG
41 #undef RXDEBUG      /* turn off debugging */
42 #endif /* RXDEBUG */
43 #if defined(AFS_SGI_ENV)
44 #include "../sys/debug.h"
45 #endif
46 #include "../afsint/afsint.h"
47 #ifdef  AFS_ALPHA_ENV
48 #undef kmem_alloc
49 #undef kmem_free
50 #undef mem_alloc
51 #undef mem_free
52 #undef register
53 #endif  /* AFS_ALPHA_ENV */
54 #else /* !UKERNEL */
55 #include "../afs/sysincludes.h"
56 #include "../afs/afsincludes.h"
57 #endif /* !UKERNEL */
58 #include "../afs/lock.h"
59 #include "../rx/rx_kmutex.h"
60 #include "../rx/rx_kernel.h"
61 #include "../rx/rx_clock.h"
62 #include "../rx/rx_queue.h"
63 #include "../rx/rx.h"
64 #include "../rx/rx_globals.h"
65 #include "../rx/rx_trace.h"
66 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
67 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
68 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
69 #include "../afsint/afsint.h"
70 extern afs_int32 afs_termState;
71 #ifdef AFS_AIX41_ENV
72 #include "sys/lockl.h"
73 #include "sys/lock_def.h"
74 #endif /* AFS_AIX41_ENV */
75 # include "../afsint/rxgen_consts.h"
76 #else /* KERNEL */
77 # include <sys/types.h>
78 # include <errno.h>
79 #ifdef AFS_NT40_ENV
80 # include <stdlib.h>
81 # include <fcntl.h>
82 # include <afsutil.h>
83 #else
84 # include <sys/socket.h>
85 # include <sys/file.h>
86 # include <netdb.h>
87 # include <sys/stat.h>
88 # include <netinet/in.h>
89 # include <sys/time.h>
90 #endif
91 #ifdef HAVE_STRING_H
92 #include <string.h>
93 #else
94 #ifdef HAVE_STRINGS_H
95 #include <strings.h>
96 #endif
97 #endif
98 # include "rx.h"
99 # include "rx_user.h"
100 # include "rx_clock.h"
101 # include "rx_queue.h"
102 # include "rx_globals.h"
103 # include "rx_trace.h"
104 # include "rx_internal.h"
105 # include <afs/rxgen_consts.h>
106 #endif /* KERNEL */
107
108 int (*registerProgram)() = 0;
109 int (*swapNameProgram)() = 0;
110
111 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
112 struct rx_tq_debug {
113     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
114     afs_int32 rxi_start_in_error;
115 } rx_tq_debug;
116 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
117
118 /*
119  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
120  * currently allocated within rx.  This number is used to allocate the
121  * memory required to return the statistics when queried.
122  */
123
124 static unsigned int rxi_rpc_peer_stat_cnt;
125
126 /*
127  * rxi_rpc_process_stat_cnt counts the total number of local process stat
128  * structures currently allocated within rx.  The number is used to allocate
129  * the memory required to return the statistics when queried.
130  */
131
132 static unsigned int rxi_rpc_process_stat_cnt;
133
134 #if !defined(offsetof)
135 #include <stddef.h>     /* for definition of offsetof() */
136 #endif
137
138 #ifdef AFS_PTHREAD_ENV
139 #include <assert.h>
140
141 /*
142  * Use procedural initialization of mutexes/condition variables
143  * to ease NT porting
144  */
145
146 extern pthread_mutex_t rxkad_stats_mutex;
147 extern pthread_mutex_t des_init_mutex;
148 extern pthread_mutex_t des_random_mutex;
149 extern pthread_mutex_t rx_clock_mutex;
150 extern pthread_mutex_t rxi_connCacheMutex;
151 extern pthread_mutex_t rx_event_mutex;
152 extern pthread_mutex_t osi_malloc_mutex;
153 extern pthread_mutex_t event_handler_mutex;
154 extern pthread_mutex_t listener_mutex;
155 extern pthread_mutex_t rx_if_init_mutex;
156 extern pthread_mutex_t rx_if_mutex;
157 extern pthread_mutex_t rxkad_client_uid_mutex;
158 extern pthread_mutex_t rxkad_random_mutex;
159
160 extern pthread_cond_t rx_event_handler_cond;
161 extern pthread_cond_t rx_listener_cond;
162
163 static pthread_mutex_t epoch_mutex;
164 static pthread_mutex_t rx_init_mutex;
165 static pthread_mutex_t rx_debug_mutex;
166
167 static void rxi_InitPthread(void) {
168     assert(pthread_mutex_init(&rx_clock_mutex,
169                               (const pthread_mutexattr_t*)0)==0);
170     assert(pthread_mutex_init(&rxi_connCacheMutex,
171                               (const pthread_mutexattr_t*)0)==0);
172     assert(pthread_mutex_init(&rx_init_mutex,
173                               (const pthread_mutexattr_t*)0)==0);
174     assert(pthread_mutex_init(&epoch_mutex,
175                               (const pthread_mutexattr_t*)0)==0);
176     assert(pthread_mutex_init(&rx_event_mutex,
177                               (const pthread_mutexattr_t*)0)==0);
178     assert(pthread_mutex_init(&des_init_mutex,
179                               (const pthread_mutexattr_t*)0)==0);
180     assert(pthread_mutex_init(&des_random_mutex,
181                               (const pthread_mutexattr_t*)0)==0);
182     assert(pthread_mutex_init(&osi_malloc_mutex,
183                               (const pthread_mutexattr_t*)0)==0);
184     assert(pthread_mutex_init(&event_handler_mutex,
185                               (const pthread_mutexattr_t*)0)==0);
186     assert(pthread_mutex_init(&listener_mutex,
187                               (const pthread_mutexattr_t*)0)==0);
188     assert(pthread_mutex_init(&rx_if_init_mutex,
189                               (const pthread_mutexattr_t*)0)==0);
190     assert(pthread_mutex_init(&rx_if_mutex,
191                               (const pthread_mutexattr_t*)0)==0);
192     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
193                               (const pthread_mutexattr_t*)0)==0);
194     assert(pthread_mutex_init(&rxkad_random_mutex,
195                               (const pthread_mutexattr_t*)0)==0);
196     assert(pthread_mutex_init(&rxkad_stats_mutex,
197                               (const pthread_mutexattr_t*)0)==0);
198     assert(pthread_mutex_init(&rx_debug_mutex,
199                               (const pthread_mutexattr_t*)0)==0);
200
201     assert(pthread_cond_init(&rx_event_handler_cond,
202                               (const pthread_condattr_t*)0)==0);
203     assert(pthread_cond_init(&rx_listener_cond,
204                               (const pthread_condattr_t*)0)==0);
205     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
206 }
207
208 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
209 #define INIT_PTHREAD_LOCKS \
210 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
211 /*
212  * The rx_stats_mutex mutex protects the following global variables:
213  * rxi_dataQuota
214  * rxi_minDeficit
215  * rxi_availProcs
216  * rxi_totalMin
217  * rxi_lowConnRefCount
218  * rxi_lowPeerRefCount
219  * rxi_nCalls
220  * rxi_Alloccnt
221  * rxi_Allocsize
222  * rx_nFreePackets
223  * rx_tq_debug
224  * rx_stats
225  */
226 #else
227 #define INIT_PTHREAD_LOCKS
228 #endif
229
230
231 /* Variables for handling the minProcs implementation.  availProcs gives the
232  * number of threads available in the pool at this moment (not counting dudes
233  * executing right now).  totalMin gives the total number of procs required
234  * for handling all minProcs requests.  minDeficit is a dynamic variable
235  * tracking the # of procs required to satisfy all of the remaining minProcs
236  * demands.
237  * For fine grain locking to work, the quota check and the reservation of
238  * a server thread has to come while rxi_availProcs and rxi_minDeficit
239  * are locked. To this end, the code has been modified under #ifdef
240  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
241  * same time. A new function, ReturnToServerPool() returns the allocation.
242  * 
243  * A call can be on several queue's (but only one at a time). When
244  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
245  * that no one else is touching the queue. To this end, we store the address
246  * of the queue lock in the call structure (under the call lock) when we
247  * put the call on a queue, and we clear the call_queue_lock when the
248  * call is removed from a queue (once the call lock has been obtained).
249  * This allows rxi_ResetCall to safely synchronize with others wishing
250  * to manipulate the queue.
251  */
252
253 #ifdef RX_ENABLE_LOCKS
254 static int rxi_ServerThreadSelectingCall;
255 static afs_kmutex_t rx_rpc_stats;
256 void rxi_StartUnlocked();
257 #endif
258
259 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
260 ** pretty good that the next packet coming in is from the same connection 
261 ** as the last packet, since we're send multiple packets in a transmit window.
262 */
263 struct rx_connection *rxLastConn = 0; 
264
265 #ifdef RX_ENABLE_LOCKS
266 /* The locking hierarchy for rx fine grain locking is composed of five
267  * tiers:
268  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
269  * call->lock - locks call data fields.
270  * Most any other lock - these are all independent of each other.....
271  *      rx_freePktQ_lock
272  *      rx_freeCallQueue_lock
273  *      freeSQEList_lock
274  *      rx_connHashTable_lock
275  *      rx_serverPool_lock
276  *      rxi_keyCreate_lock
277  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
278
279  * lowest level:
280  *      peer_lock - locks peer data fields.
281  *      conn_data_lock - that more than one thread is not updating a conn data
282  *              field at the same time.
283  * Do we need a lock to protect the peer field in the conn structure?
284  *      conn->peer was previously a constant for all intents and so has no
285  *      lock protecting this field. The multihomed client delta introduced
286  *      a RX code change : change the peer field in the connection structure
287  *      to that remote inetrface from which the last packet for this
288  *      connection was sent out. This may become an issue if further changes
289  *      are made.
290  */
291 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
292 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
293 #ifdef RX_LOCKS_DB
294 /* rxdb_fileID is used to identify the lock location, along with line#. */
295 static int rxdb_fileID = RXDB_FILE_RX;
296 #endif /* RX_LOCKS_DB */
297 static void rxi_SetAcksInTransmitQueue();
298 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
299 #else /* RX_ENABLE_LOCKS */
300 #define SET_CALL_QUEUE_LOCK(C, L)
301 #define CLEAR_CALL_QUEUE_LOCK(C)
302 #endif /* RX_ENABLE_LOCKS */
303 static void rxi_DestroyConnectionNoLock();
304 struct rx_serverQueueEntry *rx_waitForPacket = 0;
305
306 /* ------------Exported Interfaces------------- */
307
308 /* This function allows rxkad to set the epoch to a suitably random number
309  * which rx_NewConnection will use in the future.  The principle purpose is to
310  * get rxnull connections to use the same epoch as the rxkad connections do, at
311  * least once the first rxkad connection is established.  This is important now
312  * that the host/port addresses aren't used in FindConnection: the uniqueness
313  * of epoch/cid matters and the start time won't do. */
314
315 #ifdef AFS_PTHREAD_ENV
316 /*
317  * This mutex protects the following global variables:
318  * rx_epoch
319  */
320
321 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
322 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
323 #else
324 #define LOCK_EPOCH
325 #define UNLOCK_EPOCH
326 #endif /* AFS_PTHREAD_ENV */
327
328 void rx_SetEpoch (epoch)
329   afs_uint32 epoch;
330 {
331     LOCK_EPOCH
332     rx_epoch = epoch;
333     UNLOCK_EPOCH
334 }
335
336 /* Initialize rx.  A port number may be mentioned, in which case this
337  * becomes the default port number for any service installed later.
338  * If 0 is provided for the port number, a random port will be chosen
339  * by the kernel.  Whether this will ever overlap anything in
340  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
341  * error. */
342 static int rxinit_status = 1;
343 #ifdef AFS_PTHREAD_ENV
344 /*
345  * This mutex protects the following global variables:
346  * rxinit_status
347  */
348
349 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
350 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
351 #else
352 #define LOCK_RX_INIT
353 #define UNLOCK_RX_INIT
354 #endif
355
356 int rx_Init(u_int port)
357 {
358 #ifdef KERNEL
359     osi_timeval_t tv;
360 #else /* KERNEL */
361     struct timeval tv;
362 #endif /* KERNEL */
363     char *htable, *ptable;
364     int tmp_status;
365
366 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
367     __djgpp_set_quiet_socket(1);
368 #endif
369
370     SPLVAR;
371
372     INIT_PTHREAD_LOCKS
373     LOCK_RX_INIT
374     if (rxinit_status == 0) {
375         tmp_status = rxinit_status;
376         UNLOCK_RX_INIT
377         return tmp_status; /* Already started; return previous error code. */
378     }
379
380 #ifdef AFS_NT40_ENV
381     if (afs_winsockInit()<0)
382         return -1;
383 #endif
384
385 #ifndef KERNEL
386     /*
387      * Initialize anything necessary to provide a non-premptive threading
388      * environment.
389      */
390     rxi_InitializeThreadSupport();
391 #endif
392
393     /* Allocate and initialize a socket for client and perhaps server
394      * connections. */
395
396     rx_socket = rxi_GetUDPSocket((u_short)port); 
397     if (rx_socket == OSI_NULLSOCKET) {
398         UNLOCK_RX_INIT
399         return RX_ADDRINUSE;
400     }
401     
402
403 #ifdef  RX_ENABLE_LOCKS
404 #ifdef RX_LOCKS_DB
405     rxdb_init();
406 #endif /* RX_LOCKS_DB */
407     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
408     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
409     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
410     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
411     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
412                MUTEX_DEFAULT,0);
413     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
414     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
415     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
416     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
417 #ifndef KERNEL
418     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
419 #endif /* !KERNEL */
420     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
421 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
422     if ( !uniprocessor )
423       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
424 #endif /* KERNEL && AFS_HPUX110_ENV */
425 #else /* RX_ENABLE_LOCKS */
426 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
427     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
428 #endif /* AFS_GLOBAL_SUNLOCK */
429 #endif /* RX_ENABLE_LOCKS */
430
431     rxi_nCalls = 0;
432     rx_connDeadTime = 12;
433     rx_tranquil     = 0;        /* reset flag */
434     memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
435     htable = (char *)
436         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
437     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
438     memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
439     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
440     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
441     memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
442
443     /* Malloc up a bunch of packets & buffers */
444     rx_nFreePackets = 0;
445     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
446     queue_Init(&rx_freePacketQueue);
447     rxi_NeedMorePackets = FALSE;
448     rxi_MorePackets(rx_nPackets);
449     rx_CheckPackets();
450
451     NETPRI;
452     AFS_RXGLOCK();
453
454     clock_Init();
455
456 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
457     tv.tv_sec = clock_now.sec;
458     tv.tv_usec = clock_now.usec;
459     srand((unsigned int) tv.tv_usec);
460 #else
461     osi_GetTime(&tv);
462 #endif
463     if (port) {
464         rx_port = port;
465     } else {
466 #if defined(KERNEL) && !defined(UKERNEL)
467         /* Really, this should never happen in a real kernel */
468         rx_port = 0;
469 #else
470         struct sockaddr_in addr;
471         int addrlen = sizeof(addr);
472         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
473             rx_Finalize();
474             return -1;
475         }
476         rx_port = addr.sin_port;
477 #endif
478     }
479     rx_stats.minRtt.sec = 9999999;
480 #ifdef  KERNEL
481     rx_SetEpoch (tv.tv_sec | 0x80000000);
482 #else
483     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
484                                          * will provide a randomer value. */
485 #endif
486     MUTEX_ENTER(&rx_stats_mutex);
487     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
488     MUTEX_EXIT(&rx_stats_mutex);
489     /* *Slightly* random start time for the cid.  This is just to help
490      * out with the hashing function at the peer */
491     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
492     rx_connHashTable = (struct rx_connection **) htable;
493     rx_peerHashTable = (struct rx_peer **) ptable;
494
495     rx_lastAckDelay.sec = 0;
496     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
497     rx_hardAckDelay.sec = 0;
498     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
499     rx_softAckDelay.sec = 0;
500     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
501
502     rxevent_Init(20, rxi_ReScheduleEvents);
503
504     /* Initialize various global queues */
505     queue_Init(&rx_idleServerQueue);
506     queue_Init(&rx_incomingCallQueue);
507     queue_Init(&rx_freeCallQueue);
508
509 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
510     /* Initialize our list of usable IP addresses. */
511     rx_GetIFInfo();
512 #endif
513
514     /* Start listener process (exact function is dependent on the
515      * implementation environment--kernel or user space) */
516     rxi_StartListener();
517
518     AFS_RXGUNLOCK();
519     USERPRI;
520     tmp_status = rxinit_status = 0;
521     UNLOCK_RX_INIT
522     return tmp_status;
523 }
524
525 /* called with unincremented nRequestsRunning to see if it is OK to start
526  * a new thread in this service.  Could be "no" for two reasons: over the
527  * max quota, or would prevent others from reaching their min quota.
528  */
529 #ifdef RX_ENABLE_LOCKS
530 /* This verion of QuotaOK reserves quota if it's ok while the
531  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
532  */
533 static int QuotaOK(aservice)
534 register struct rx_service *aservice;
535 {
536     /* check if over max quota */
537     if (aservice->nRequestsRunning >= aservice->maxProcs) {
538         return 0;
539     }
540
541     /* under min quota, we're OK */
542     /* otherwise, can use only if there are enough to allow everyone
543      * to go to their min quota after this guy starts.
544      */
545     MUTEX_ENTER(&rx_stats_mutex);
546     if ((aservice->nRequestsRunning < aservice->minProcs) ||
547          (rxi_availProcs > rxi_minDeficit)) {
548         aservice->nRequestsRunning++;
549         /* just started call in minProcs pool, need fewer to maintain
550          * guarantee */
551         if (aservice->nRequestsRunning <= aservice->minProcs)
552             rxi_minDeficit--;
553         rxi_availProcs--;
554         MUTEX_EXIT(&rx_stats_mutex);
555         return 1;
556     }
557     MUTEX_EXIT(&rx_stats_mutex);
558
559     return 0;
560 }
561 static void ReturnToServerPool(aservice)
562 register struct rx_service *aservice;
563 {
564     aservice->nRequestsRunning--;
565     MUTEX_ENTER(&rx_stats_mutex);
566     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
567     rxi_availProcs++;
568     MUTEX_EXIT(&rx_stats_mutex);
569 }
570
571 #else /* RX_ENABLE_LOCKS */
572 static int QuotaOK(aservice)
573 register struct rx_service *aservice; {
574     int rc=0;
575     /* under min quota, we're OK */
576     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
577
578     /* check if over max quota */
579     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
580
581     /* otherwise, can use only if there are enough to allow everyone
582      * to go to their min quota after this guy starts.
583      */
584     if (rxi_availProcs > rxi_minDeficit) rc = 1;
585     return rc;
586 }
587 #endif /* RX_ENABLE_LOCKS */
588
589 #ifndef KERNEL
590 /* Called by rx_StartServer to start up lwp's to service calls.
591    NExistingProcs gives the number of procs already existing, and which
592    therefore needn't be created. */
593 void rxi_StartServerProcs(nExistingProcs)
594     int nExistingProcs;
595 {
596     register struct rx_service *service;
597     register int i;
598     int maxdiff = 0;
599     int nProcs = 0;
600
601     /* For each service, reserve N processes, where N is the "minimum"
602        number of processes that MUST be able to execute a request in parallel,
603        at any time, for that process.  Also compute the maximum difference
604        between any service's maximum number of processes that can run
605        (i.e. the maximum number that ever will be run, and a guarantee
606        that this number will run if other services aren't running), and its
607        minimum number.  The result is the extra number of processes that
608        we need in order to provide the latter guarantee */
609     for (i=0; i<RX_MAX_SERVICES; i++) {
610         int diff;
611         service = rx_services[i];
612         if (service == (struct rx_service *) 0) break;
613         nProcs += service->minProcs;
614         diff = service->maxProcs - service->minProcs;
615         if (diff > maxdiff) maxdiff = diff;
616     }
617     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
618     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
619     for (i = 0; i<nProcs; i++) {
620         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
621     }
622 }
623 #endif /* KERNEL */
624
625 /* This routine must be called if any services are exported.  If the
626  * donateMe flag is set, the calling process is donated to the server
627  * process pool */
628 void rx_StartServer(donateMe)
629 {
630     register struct rx_service *service;
631     register int i, nProcs=0;
632     SPLVAR;
633     clock_NewTime();
634
635     NETPRI;
636     AFS_RXGLOCK();
637     /* Start server processes, if necessary (exact function is dependent
638      * on the implementation environment--kernel or user space).  DonateMe
639      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
640      * case, one less new proc will be created rx_StartServerProcs.
641      */
642     rxi_StartServerProcs(donateMe);
643
644     /* count up the # of threads in minProcs, and add set the min deficit to
645      * be that value, too.
646      */
647     for (i=0; i<RX_MAX_SERVICES; i++) {
648         service = rx_services[i];
649         if (service == (struct rx_service *) 0) break;
650         MUTEX_ENTER(&rx_stats_mutex);
651         rxi_totalMin += service->minProcs;
652         /* below works even if a thread is running, since minDeficit would
653          * still have been decremented and later re-incremented.
654          */
655         rxi_minDeficit += service->minProcs;
656         MUTEX_EXIT(&rx_stats_mutex);
657     }
658
659     /* Turn on reaping of idle server connections */
660     rxi_ReapConnections();
661
662     AFS_RXGUNLOCK();
663     USERPRI;
664
665     if (donateMe) {
666 #ifndef AFS_NT40_ENV
667 #ifndef KERNEL
668         char name[32];
669 #ifdef AFS_PTHREAD_ENV
670         pid_t pid;
671         pid = (pid_t) pthread_self();
672 #else /* AFS_PTHREAD_ENV */
673         PROCESS pid;
674         LWP_CurrentProcess(&pid);
675 #endif /* AFS_PTHREAD_ENV */
676
677         sprintf(name,"srv_%d", ++nProcs);
678         if (registerProgram)
679             (*registerProgram)(pid, name);
680 #endif /* KERNEL */
681 #endif /* AFS_NT40_ENV */
682         rx_ServerProc(); /* Never returns */
683     }
684     return;
685 }
686
687 /* Create a new client connection to the specified service, using the
688  * specified security object to implement the security model for this
689  * connection. */
690 struct rx_connection *
691 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
692     register afs_uint32 shost;      /* Server host */
693     u_short sport;                  /* Server port */
694     u_short sservice;               /* Server service id */
695     register struct rx_securityClass *securityObject;
696     int serviceSecurityIndex;
697 {
698     int hashindex;
699     afs_int32 cid;
700     register struct rx_connection *conn;
701
702     SPLVAR;
703
704     clock_NewTime();
705     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
706           shost, sport, sservice, securityObject, serviceSecurityIndex));
707
708     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
709      * the case of kmem_alloc? */
710     conn = rxi_AllocConnection();
711 #ifdef  RX_ENABLE_LOCKS
712     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
713     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
714     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
715 #endif
716     NETPRI;
717     AFS_RXGLOCK();
718     MUTEX_ENTER(&rx_connHashTable_lock);
719     cid = (rx_nextCid += RX_MAXCALLS);
720     conn->type = RX_CLIENT_CONNECTION;
721     conn->cid = cid;
722     conn->epoch = rx_epoch;
723     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
724     conn->serviceId = sservice;
725     conn->securityObject = securityObject;
726     /* This doesn't work in all compilers with void (they're buggy), so fake it
727      * with VOID */
728     conn->securityData = (VOID *) 0;
729     conn->securityIndex = serviceSecurityIndex;
730     rx_SetConnDeadTime(conn, rx_connDeadTime);
731     conn->ackRate = RX_FAST_ACK_RATE;
732     conn->nSpecific = 0;
733     conn->specific = NULL;
734     conn->challengeEvent = (struct rxevent *)0;
735     conn->delayedAbortEvent = (struct rxevent *)0;
736     conn->abortCount = 0;
737     conn->error = 0;
738
739     RXS_NewConnection(securityObject, conn);
740     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
741     
742     conn->refCount++; /* no lock required since only this thread knows... */
743     conn->next = rx_connHashTable[hashindex];
744     rx_connHashTable[hashindex] = conn;
745     MUTEX_ENTER(&rx_stats_mutex);
746     rx_stats.nClientConns++;
747     MUTEX_EXIT(&rx_stats_mutex);
748
749     MUTEX_EXIT(&rx_connHashTable_lock);
750     AFS_RXGUNLOCK();
751     USERPRI;
752     return conn;
753 }
754
755 void rx_SetConnDeadTime(conn, seconds)
756     register struct rx_connection *conn;
757     register int seconds;
758 {
759     /* The idea is to set the dead time to a value that allows several
760      * keepalives to be dropped without timing out the connection. */
761     conn->secondsUntilDead = MAX(seconds, 6);
762     conn->secondsUntilPing = conn->secondsUntilDead/6;
763 }
764
765 int rxi_lowPeerRefCount = 0;
766 int rxi_lowConnRefCount = 0;
767
768 /*
769  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
770  * NOTE: must not be called with rx_connHashTable_lock held.
771  */
772 void rxi_CleanupConnection(conn)
773     struct rx_connection *conn;
774 {
775     int i;
776
777     /* Notify the service exporter, if requested, that this connection
778      * is being destroyed */
779     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
780       (*conn->service->destroyConnProc)(conn);
781
782     /* Notify the security module that this connection is being destroyed */
783     RXS_DestroyConnection(conn->securityObject, conn);
784
785     /* If this is the last connection using the rx_peer struct, set its
786      * idle time to now. rxi_ReapConnections will reap it if it's still
787      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
788      */
789     MUTEX_ENTER(&rx_peerHashTable_lock);
790     if (--conn->peer->refCount <= 0) {
791         conn->peer->idleWhen = clock_Sec();
792         if (conn->peer->refCount < 0) {
793             conn->peer->refCount = 0; 
794             MUTEX_ENTER(&rx_stats_mutex);
795             rxi_lowPeerRefCount ++;
796             MUTEX_EXIT(&rx_stats_mutex);
797         }
798     }
799     MUTEX_EXIT(&rx_peerHashTable_lock);
800
801     MUTEX_ENTER(&rx_stats_mutex);
802     if (conn->type == RX_SERVER_CONNECTION)
803       rx_stats.nServerConns--;
804     else
805       rx_stats.nClientConns--;
806     MUTEX_EXIT(&rx_stats_mutex);
807
808 #ifndef KERNEL
809     if (conn->specific) {
810         for (i = 0 ; i < conn->nSpecific ; i++) {
811             if (conn->specific[i] && rxi_keyCreate_destructor[i])
812                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
813             conn->specific[i] = NULL;
814         }
815         free(conn->specific);
816     }
817     conn->specific = NULL;
818     conn->nSpecific = 0;
819 #endif /* !KERNEL */
820
821     MUTEX_DESTROY(&conn->conn_call_lock);
822     MUTEX_DESTROY(&conn->conn_data_lock);
823     CV_DESTROY(&conn->conn_call_cv);
824         
825     rxi_FreeConnection(conn);
826 }
827
828 /* Destroy the specified connection */
829 void rxi_DestroyConnection(conn)
830     register struct rx_connection *conn;
831 {
832     MUTEX_ENTER(&rx_connHashTable_lock);
833     rxi_DestroyConnectionNoLock(conn);
834     /* conn should be at the head of the cleanup list */
835     if (conn == rx_connCleanup_list) {
836         rx_connCleanup_list = rx_connCleanup_list->next;
837         MUTEX_EXIT(&rx_connHashTable_lock);
838         rxi_CleanupConnection(conn);
839     }
840 #ifdef RX_ENABLE_LOCKS
841     else {
842         MUTEX_EXIT(&rx_connHashTable_lock);
843     }
844 #endif /* RX_ENABLE_LOCKS */
845 }
846     
847 static void rxi_DestroyConnectionNoLock(conn)
848     register struct rx_connection *conn;
849 {
850     register struct rx_connection **conn_ptr;
851     register int havecalls = 0;
852     struct rx_packet *packet;
853     int i;
854     SPLVAR;
855
856     clock_NewTime();
857
858     NETPRI;
859     MUTEX_ENTER(&conn->conn_data_lock);
860     if (conn->refCount > 0)
861         conn->refCount--;
862     else {
863         MUTEX_ENTER(&rx_stats_mutex);
864         rxi_lowConnRefCount++;
865         MUTEX_EXIT(&rx_stats_mutex);
866     }
867
868     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
869         /* Busy; wait till the last guy before proceeding */
870         MUTEX_EXIT(&conn->conn_data_lock);
871         USERPRI;
872         return;
873     }
874
875     /* If the client previously called rx_NewCall, but it is still
876      * waiting, treat this as a running call, and wait to destroy the
877      * connection later when the call completes. */
878     if ((conn->type == RX_CLIENT_CONNECTION) &&
879         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
880         conn->flags |= RX_CONN_DESTROY_ME;
881         MUTEX_EXIT(&conn->conn_data_lock);
882         USERPRI;
883         return;
884     }
885     MUTEX_EXIT(&conn->conn_data_lock);
886
887     /* Check for extant references to this connection */
888     for (i = 0; i<RX_MAXCALLS; i++) {
889         register struct rx_call *call = conn->call[i];
890         if (call) {
891             havecalls = 1;
892             if (conn->type == RX_CLIENT_CONNECTION) {
893                 MUTEX_ENTER(&call->lock);
894                 if (call->delayedAckEvent) {
895                     /* Push the final acknowledgment out now--there
896                      * won't be a subsequent call to acknowledge the
897                      * last reply packets */
898                     rxevent_Cancel(call->delayedAckEvent, call,
899                                    RX_CALL_REFCOUNT_DELAY);
900                     if (call->state == RX_STATE_PRECALL ||
901                         call->state == RX_STATE_ACTIVE) {
902                         rxi_SendDelayedAck(call->delayedAckEvent, call, 0);
903                     } else {
904                         rxi_AckAll((struct rxevent *)0, call, 0);
905                     }
906                 }
907                 MUTEX_EXIT(&call->lock);
908             }
909         }
910     }
911 #ifdef RX_ENABLE_LOCKS
912     if (!havecalls) {
913         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
914             MUTEX_EXIT(&conn->conn_data_lock);
915         }
916         else {
917             /* Someone is accessing a packet right now. */
918             havecalls = 1;
919         }
920     }
921 #endif /* RX_ENABLE_LOCKS */
922
923     if (havecalls) {
924         /* Don't destroy the connection if there are any call
925          * structures still in use */
926         MUTEX_ENTER(&conn->conn_data_lock);
927         conn->flags |= RX_CONN_DESTROY_ME;
928         MUTEX_EXIT(&conn->conn_data_lock);
929         USERPRI;
930         return;
931     }
932
933     if (conn->delayedAbortEvent) {
934         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
935         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
936         if (packet) {
937             MUTEX_ENTER(&conn->conn_data_lock);
938             rxi_SendConnectionAbort(conn, packet, 0, 1);
939             MUTEX_EXIT(&conn->conn_data_lock);
940             rxi_FreePacket(packet);
941         }
942     }
943
944     /* Remove from connection hash table before proceeding */
945     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
946                                              conn->epoch, conn->type) ];
947     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
948         if (*conn_ptr == conn) {
949             *conn_ptr = conn->next;
950             break;
951         }
952     }
953     /* if the conn that we are destroying was the last connection, then we
954     * clear rxLastConn as well */
955     if ( rxLastConn == conn )
956         rxLastConn = 0;
957
958     /* Make sure the connection is completely reset before deleting it. */
959     /* get rid of pending events that could zap us later */
960     if (conn->challengeEvent) {
961         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
962     }
963  
964     /* Add the connection to the list of destroyed connections that
965      * need to be cleaned up. This is necessary to avoid deadlocks
966      * in the routines we call to inform others that this connection is
967      * being destroyed. */
968     conn->next = rx_connCleanup_list;
969     rx_connCleanup_list = conn;
970 }
971
972 /* Externally available version */
973 void rx_DestroyConnection(conn) 
974     register struct rx_connection *conn;
975 {
976     SPLVAR;
977
978     NETPRI;
979     AFS_RXGLOCK();
980     rxi_DestroyConnection (conn);
981     AFS_RXGUNLOCK();
982     USERPRI;
983 }
984
985 /* Start a new rx remote procedure call, on the specified connection.
986  * If wait is set to 1, wait for a free call channel; otherwise return
987  * 0.  Maxtime gives the maximum number of seconds this call may take,
988  * after rx_MakeCall returns.  After this time interval, a call to any
989  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
990  * For fine grain locking, we hold the conn_call_lock in order to 
991  * to ensure that we don't get signalle after we found a call in an active
992  * state and before we go to sleep.
993  */
994 struct rx_call *rx_NewCall(conn)
995     register struct rx_connection *conn;
996 {
997     register int i;
998     register struct rx_call *call;
999     struct clock queueTime;
1000     SPLVAR;
1001
1002     clock_NewTime();
1003     dpf (("rx_MakeCall(conn %x)\n", conn));
1004
1005     NETPRI;
1006     clock_GetTime(&queueTime);
1007     AFS_RXGLOCK();
1008     MUTEX_ENTER(&conn->conn_call_lock);
1009
1010     /*
1011      * Check if there are others waiting for a new call.
1012      * If so, let them go first to avoid starving them.
1013      * This is a fairly simple scheme, and might not be
1014      * a complete solution for large numbers of waiters.
1015      */
1016     if (conn->makeCallWaiters) {
1017 #ifdef  RX_ENABLE_LOCKS
1018         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1019 #else
1020         osi_rxSleep(conn);
1021 #endif
1022     }
1023
1024     for (;;) {
1025         for (i=0; i<RX_MAXCALLS; i++) {
1026             call = conn->call[i];
1027             if (call) {
1028                 MUTEX_ENTER(&call->lock);
1029                 if (call->state == RX_STATE_DALLY) {
1030                     rxi_ResetCall(call, 0);
1031                     (*call->callNumber)++;
1032                     break;
1033                 }
1034                 MUTEX_EXIT(&call->lock);
1035             }
1036             else {
1037                 call = rxi_NewCall(conn, i);
1038                 break;
1039             }
1040         }
1041         if (i < RX_MAXCALLS) {
1042             break;
1043         }
1044         MUTEX_ENTER(&conn->conn_data_lock);
1045         conn->flags |= RX_CONN_MAKECALL_WAITING;
1046         MUTEX_EXIT(&conn->conn_data_lock);
1047
1048         conn->makeCallWaiters++;
1049 #ifdef  RX_ENABLE_LOCKS
1050         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1051 #else
1052         osi_rxSleep(conn);
1053 #endif
1054         conn->makeCallWaiters--;
1055     }
1056     /*
1057      * Wake up anyone else who might be giving us a chance to
1058      * run (see code above that avoids resource starvation).
1059      */
1060 #ifdef  RX_ENABLE_LOCKS
1061     CV_BROADCAST(&conn->conn_call_cv);
1062 #else
1063     osi_rxWakeup(conn);
1064 #endif
1065
1066     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1067
1068     /* Client is initially in send mode */
1069     call->state = RX_STATE_ACTIVE;
1070     call->mode = RX_MODE_SENDING;
1071
1072     /* remember start time for call in case we have hard dead time limit */
1073     call->queueTime = queueTime;
1074     clock_GetTime(&call->startTime);
1075     hzero(call->bytesSent);
1076     hzero(call->bytesRcvd);
1077
1078     /* Turn on busy protocol. */
1079     rxi_KeepAliveOn(call);
1080
1081     MUTEX_EXIT(&call->lock);
1082     MUTEX_EXIT(&conn->conn_call_lock);
1083     AFS_RXGUNLOCK();
1084     USERPRI;
1085
1086 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1087     /* Now, if TQ wasn't cleared earlier, do it now. */
1088     AFS_RXGLOCK();
1089     MUTEX_ENTER(&call->lock);
1090     while (call->flags & RX_CALL_TQ_BUSY) {
1091         call->flags |= RX_CALL_TQ_WAIT;
1092 #ifdef RX_ENABLE_LOCKS
1093         CV_WAIT(&call->cv_tq, &call->lock);
1094 #else /* RX_ENABLE_LOCKS */
1095         osi_rxSleep(&call->tq);
1096 #endif /* RX_ENABLE_LOCKS */
1097     }
1098     if (call->flags & RX_CALL_TQ_CLEARME) {
1099         rxi_ClearTransmitQueue(call, 0);
1100         queue_Init(&call->tq);
1101     }
1102     MUTEX_EXIT(&call->lock);
1103     AFS_RXGUNLOCK();
1104 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1105
1106     return call;
1107 }
1108
1109 int
1110 rxi_HasActiveCalls(aconn)
1111 register struct rx_connection *aconn; {
1112     register int i;
1113     register struct rx_call *tcall;
1114     SPLVAR;
1115
1116     NETPRI;
1117     for(i=0; i<RX_MAXCALLS; i++) {
1118       if ((tcall = aconn->call[i])) {
1119         if ((tcall->state == RX_STATE_ACTIVE) 
1120             || (tcall->state == RX_STATE_PRECALL)) {
1121           USERPRI;
1122           return 1;
1123         }
1124       }
1125     }
1126     USERPRI;
1127     return 0;
1128 }
1129
1130 int
1131 rxi_GetCallNumberVector(aconn, aint32s)
1132 register struct rx_connection *aconn;
1133 register afs_int32 *aint32s; {
1134     register int i;
1135     register struct rx_call *tcall;
1136     SPLVAR;
1137
1138     NETPRI;
1139     for(i=0; i<RX_MAXCALLS; i++) {
1140         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1141             aint32s[i] = aconn->callNumber[i]+1;
1142         else
1143             aint32s[i] = aconn->callNumber[i];
1144     }
1145     USERPRI;
1146     return 0;
1147 }
1148
1149 int
1150 rxi_SetCallNumberVector(aconn, aint32s)
1151 register struct rx_connection *aconn;
1152 register afs_int32 *aint32s; {
1153     register int i;
1154     register struct rx_call *tcall;
1155     SPLVAR;
1156
1157     NETPRI;
1158     for(i=0; i<RX_MAXCALLS; i++) {
1159         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1160             aconn->callNumber[i] = aint32s[i] - 1;
1161         else
1162             aconn->callNumber[i] = aint32s[i];
1163     }
1164     USERPRI;
1165     return 0;
1166 }
1167
1168 /* Advertise a new service.  A service is named locally by a UDP port
1169  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1170  * on a failure. */
1171 struct rx_service *
1172 rx_NewService(port, serviceId, serviceName, securityObjects,
1173               nSecurityObjects, serviceProc)
1174     u_short port;
1175     u_short serviceId;
1176     char *serviceName;  /* Name for identification purposes (e.g. the
1177                          * service name might be used for probing for
1178                          * statistics) */
1179     struct rx_securityClass **securityObjects;
1180     int nSecurityObjects;
1181     afs_int32 (*serviceProc)();
1182 {    
1183     osi_socket socket = OSI_NULLSOCKET;
1184     register struct rx_service *tservice;    
1185     register int i;
1186     SPLVAR;
1187
1188     clock_NewTime();
1189
1190     if (serviceId == 0) {
1191         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1192          serviceName);
1193         return 0;
1194     }
1195     if (port == 0) {
1196         if (rx_port == 0) {
1197             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1198             return 0;
1199         }
1200         port = rx_port;
1201         socket = rx_socket;
1202     }
1203
1204     tservice = rxi_AllocService();
1205     NETPRI;
1206     AFS_RXGLOCK();
1207     for (i = 0; i<RX_MAX_SERVICES; i++) {
1208         register struct rx_service *service = rx_services[i];
1209         if (service) {
1210             if (port == service->servicePort) {
1211                 if (service->serviceId == serviceId) {
1212                     /* The identical service has already been
1213                      * installed; if the caller was intending to
1214                      * change the security classes used by this
1215                      * service, he/she loses. */
1216                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1217                     AFS_RXGUNLOCK();
1218                     USERPRI;
1219                     rxi_FreeService(tservice);
1220                     return service;
1221                 }
1222                 /* Different service, same port: re-use the socket
1223                  * which is bound to the same port */
1224                 socket = service->socket;
1225             }
1226         } else {
1227             if (socket == OSI_NULLSOCKET) {
1228                 /* If we don't already have a socket (from another
1229                  * service on same port) get a new one */
1230                 socket = rxi_GetUDPSocket(port);
1231                 if (socket == OSI_NULLSOCKET) {
1232                     AFS_RXGUNLOCK();
1233                     USERPRI;
1234                     rxi_FreeService(tservice);
1235                     return 0;
1236                 }
1237             }
1238             service = tservice;
1239             service->socket = socket;
1240             service->servicePort = port;
1241             service->serviceId = serviceId;
1242             service->serviceName = serviceName;
1243             service->nSecurityObjects = nSecurityObjects;
1244             service->securityObjects = securityObjects;
1245             service->minProcs = 0;
1246             service->maxProcs = 1;
1247             service->idleDeadTime = 60;
1248             service->connDeadTime = rx_connDeadTime;
1249             service->executeRequestProc = serviceProc;
1250             rx_services[i] = service;   /* not visible until now */
1251             AFS_RXGUNLOCK();
1252             USERPRI;
1253             return service;
1254         }
1255     }
1256     AFS_RXGUNLOCK();
1257     USERPRI;
1258     rxi_FreeService(tservice);
1259     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1260     return 0;
1261 }
1262
1263 /* Generic request processing loop. This routine should be called
1264  * by the implementation dependent rx_ServerProc. If socketp is
1265  * non-null, it will be set to the file descriptor that this thread
1266  * is now listening on. If socketp is null, this routine will never
1267  * returns. */
1268 void rxi_ServerProc(threadID, newcall, socketp)
1269 int threadID;
1270 struct rx_call *newcall;
1271 osi_socket *socketp;
1272 {
1273     register struct rx_call *call;
1274     register afs_int32 code;
1275     register struct rx_service *tservice = NULL;
1276
1277     for (;;) {
1278         if (newcall) {
1279             call = newcall;
1280             newcall = NULL;
1281         } else {
1282             call = rx_GetCall(threadID, tservice, socketp);
1283             if (socketp && *socketp != OSI_NULLSOCKET) {
1284                 /* We are now a listener thread */
1285                 return;
1286             }
1287         }
1288
1289         /* if server is restarting( typically smooth shutdown) then do not
1290          * allow any new calls.
1291          */
1292
1293         if ( rx_tranquil && (call != NULL) ) {
1294             SPLVAR;
1295
1296             NETPRI;
1297             AFS_RXGLOCK();
1298             MUTEX_ENTER(&call->lock);
1299
1300             rxi_CallError(call, RX_RESTARTING);
1301             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1302
1303             MUTEX_EXIT(&call->lock);
1304             AFS_RXGUNLOCK();
1305             USERPRI;
1306         }
1307
1308 #ifdef  KERNEL
1309         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1310 #ifdef RX_ENABLE_LOCKS
1311             AFS_GLOCK();
1312 #endif /* RX_ENABLE_LOCKS */
1313             afs_termState = AFSOP_STOP_AFS;
1314             afs_osi_Wakeup(&afs_termState);
1315 #ifdef RX_ENABLE_LOCKS
1316             AFS_GUNLOCK();
1317 #endif /* RX_ENABLE_LOCKS */
1318             return;
1319         }
1320 #endif
1321
1322         tservice = call->conn->service;
1323
1324         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1325
1326         code = call->conn->service->executeRequestProc(call);
1327
1328         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1329
1330         rx_EndCall(call, code);
1331         MUTEX_ENTER(&rx_stats_mutex);
1332         rxi_nCalls++;
1333         MUTEX_EXIT(&rx_stats_mutex);
1334     }
1335 }
1336
1337
1338 void rx_WakeupServerProcs()
1339 {
1340     struct rx_serverQueueEntry *np, *tqp;
1341     SPLVAR;
1342
1343     NETPRI;
1344     AFS_RXGLOCK();
1345     MUTEX_ENTER(&rx_serverPool_lock);
1346
1347 #ifdef RX_ENABLE_LOCKS
1348     if (rx_waitForPacket)
1349         CV_BROADCAST(&rx_waitForPacket->cv);
1350 #else /* RX_ENABLE_LOCKS */
1351     if (rx_waitForPacket)
1352         osi_rxWakeup(rx_waitForPacket);
1353 #endif /* RX_ENABLE_LOCKS */
1354     MUTEX_ENTER(&freeSQEList_lock);
1355     for (np = rx_FreeSQEList; np; np = tqp) {
1356       tqp = *(struct rx_serverQueueEntry **)np;
1357 #ifdef RX_ENABLE_LOCKS
1358       CV_BROADCAST(&np->cv);
1359 #else /* RX_ENABLE_LOCKS */
1360       osi_rxWakeup(np);
1361 #endif /* RX_ENABLE_LOCKS */
1362     }
1363     MUTEX_EXIT(&freeSQEList_lock);
1364     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1365 #ifdef RX_ENABLE_LOCKS
1366       CV_BROADCAST(&np->cv);
1367 #else /* RX_ENABLE_LOCKS */
1368       osi_rxWakeup(np);
1369 #endif /* RX_ENABLE_LOCKS */
1370     }
1371     MUTEX_EXIT(&rx_serverPool_lock);
1372     AFS_RXGUNLOCK();
1373     USERPRI;
1374 }
1375
1376 /* meltdown:
1377  * One thing that seems to happen is that all the server threads get
1378  * tied up on some empty or slow call, and then a whole bunch of calls
1379  * arrive at once, using up the packet pool, so now there are more 
1380  * empty calls.  The most critical resources here are server threads
1381  * and the free packet pool.  The "doreclaim" code seems to help in
1382  * general.  I think that eventually we arrive in this state: there
1383  * are lots of pending calls which do have all their packets present,
1384  * so they won't be reclaimed, are multi-packet calls, so they won't
1385  * be scheduled until later, and thus are tying up most of the free 
1386  * packet pool for a very long time.
1387  * future options:
1388  * 1.  schedule multi-packet calls if all the packets are present.  
1389  * Probably CPU-bound operation, useful to return packets to pool. 
1390  * Do what if there is a full window, but the last packet isn't here?
1391  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1392  * it sleeps and waits for that type of call.
1393  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1394  * the current dataquota business is badly broken.  The quota isn't adjusted
1395  * to reflect how many packets are presently queued for a running call.
1396  * So, when we schedule a queued call with a full window of packets queued
1397  * up for it, that *should* free up a window full of packets for other 2d-class
1398  * calls to be able to use from the packet pool.  But it doesn't.
1399  *
1400  * NB.  Most of the time, this code doesn't run -- since idle server threads
1401  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1402  * as a new call arrives.
1403  */
1404 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1405  * for an rx_Read. */
1406 #ifdef RX_ENABLE_LOCKS
1407 struct rx_call *
1408 rx_GetCall(tno, cur_service, socketp)
1409 int tno;
1410 struct rx_service *cur_service;
1411 osi_socket *socketp;
1412 {
1413     struct rx_serverQueueEntry *sq;
1414     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1415     struct rx_service *service = NULL;
1416     SPLVAR;
1417
1418     MUTEX_ENTER(&freeSQEList_lock);
1419
1420     if ((sq = rx_FreeSQEList)) {
1421         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1422         MUTEX_EXIT(&freeSQEList_lock);
1423     } else {    /* otherwise allocate a new one and return that */
1424         MUTEX_EXIT(&freeSQEList_lock);
1425         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1426         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1427         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1428     }
1429
1430     MUTEX_ENTER(&rx_serverPool_lock);
1431     if (cur_service != NULL) {
1432         ReturnToServerPool(cur_service);
1433     }
1434     while (1) {
1435         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1436             register struct rx_call *tcall, *ncall;
1437             choice2 = (struct rx_call *) 0;
1438             /* Scan for eligible incoming calls.  A call is not eligible
1439              * if the maximum number of calls for its service type are
1440              * already executing */
1441             /* One thread will process calls FCFS (to prevent starvation),
1442              * while the other threads may run ahead looking for calls which
1443              * have all their input data available immediately.  This helps 
1444              * keep threads from blocking, waiting for data from the client. */
1445             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1446               service = tcall->conn->service;
1447               if (!QuotaOK(service)) {
1448                 continue;
1449               }
1450               if (!tno || !tcall->queue_item_header.next  ) {
1451                 /* If we're thread 0, then  we'll just use 
1452                  * this call. If we haven't been able to find an optimal 
1453                  * choice, and we're at the end of the list, then use a 
1454                  * 2d choice if one has been identified.  Otherwise... */
1455                 call = (choice2 ? choice2 : tcall);
1456                 service = call->conn->service;
1457               } else if (!queue_IsEmpty(&tcall->rq)) {
1458                 struct rx_packet *rp;
1459                 rp = queue_First(&tcall->rq, rx_packet);
1460                 if (rp->header.seq == 1) {
1461                   if (!meltdown_1pkt ||             
1462                       (rp->header.flags & RX_LAST_PACKET)) {
1463                     call = tcall;
1464                   } else if (rxi_2dchoice && !choice2 &&
1465                              !(tcall->flags & RX_CALL_CLEARED) &&
1466                              (tcall->rprev > rxi_HardAckRate)) {
1467                     choice2 = tcall;
1468                   } else rxi_md2cnt++;
1469                 }
1470               }
1471               if (call)  {
1472                 break;
1473               } else {
1474                   ReturnToServerPool(service);
1475               }
1476             }
1477           }
1478
1479         if (call) {
1480             queue_Remove(call);
1481             rxi_ServerThreadSelectingCall = 1;
1482             MUTEX_EXIT(&rx_serverPool_lock);
1483             MUTEX_ENTER(&call->lock);
1484             MUTEX_ENTER(&rx_serverPool_lock);
1485
1486             if (queue_IsEmpty(&call->rq) ||
1487                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1488               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1489
1490             CLEAR_CALL_QUEUE_LOCK(call);
1491             if (call->error) {
1492                 MUTEX_EXIT(&call->lock);
1493                 ReturnToServerPool(service);
1494                 rxi_ServerThreadSelectingCall = 0;
1495                 CV_SIGNAL(&rx_serverPool_cv);
1496                 call = (struct rx_call*)0;
1497                 continue;
1498             }
1499             call->flags &= (~RX_CALL_WAIT_PROC);
1500             MUTEX_ENTER(&rx_stats_mutex);
1501             rx_nWaiting--;
1502             MUTEX_EXIT(&rx_stats_mutex);
1503             rxi_ServerThreadSelectingCall = 0;
1504             CV_SIGNAL(&rx_serverPool_cv);
1505             MUTEX_EXIT(&rx_serverPool_lock);
1506             break;
1507         }
1508         else {
1509             /* If there are no eligible incoming calls, add this process
1510              * to the idle server queue, to wait for one */
1511             sq->newcall = 0;
1512             sq->tno = tno;
1513             if (socketp) {
1514                 *socketp = OSI_NULLSOCKET;
1515             }
1516             sq->socketp = socketp;
1517             queue_Append(&rx_idleServerQueue, sq);
1518 #ifndef AFS_AIX41_ENV
1519             rx_waitForPacket = sq;
1520 #endif /* AFS_AIX41_ENV */
1521             do {
1522                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1523 #ifdef  KERNEL
1524                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1525                     MUTEX_EXIT(&rx_serverPool_lock);
1526                     return (struct rx_call *)0;
1527                 }
1528 #endif
1529             } while (!(call = sq->newcall) &&
1530                      !(socketp && *socketp != OSI_NULLSOCKET));
1531             MUTEX_EXIT(&rx_serverPool_lock);
1532             if (call) {
1533                 MUTEX_ENTER(&call->lock);
1534             }
1535             break;
1536         }
1537     }
1538
1539     MUTEX_ENTER(&freeSQEList_lock);
1540     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1541     rx_FreeSQEList = sq;
1542     MUTEX_EXIT(&freeSQEList_lock);
1543
1544     if (call) {
1545         clock_GetTime(&call->startTime);
1546         call->state = RX_STATE_ACTIVE;
1547         call->mode = RX_MODE_RECEIVING;
1548
1549         rxi_calltrace(RX_CALL_START, call);
1550         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1551              call->conn->service->servicePort, 
1552              call->conn->service->serviceId, call));
1553
1554         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1555         MUTEX_EXIT(&call->lock);
1556     } else {
1557         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1558     }
1559
1560     return call;
1561 }
1562 #else /* RX_ENABLE_LOCKS */
1563 struct rx_call *
1564 rx_GetCall(tno, cur_service, socketp)
1565   int tno;
1566   struct rx_service *cur_service;
1567   osi_socket *socketp;
1568 {
1569     struct rx_serverQueueEntry *sq;
1570     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1571     struct rx_service *service = NULL;
1572     SPLVAR;
1573
1574     NETPRI;
1575     AFS_RXGLOCK();
1576     MUTEX_ENTER(&freeSQEList_lock);
1577
1578     if ((sq = rx_FreeSQEList)) {
1579         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1580         MUTEX_EXIT(&freeSQEList_lock);
1581     } else {    /* otherwise allocate a new one and return that */
1582         MUTEX_EXIT(&freeSQEList_lock);
1583         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1584         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1585         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1586     }
1587     MUTEX_ENTER(&sq->lock);
1588
1589     if (cur_service != NULL) {
1590         cur_service->nRequestsRunning--;
1591         if (cur_service->nRequestsRunning < cur_service->minProcs)
1592             rxi_minDeficit++;
1593         rxi_availProcs++;
1594     }
1595     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1596         register struct rx_call *tcall, *ncall;
1597         /* Scan for eligible incoming calls.  A call is not eligible
1598          * if the maximum number of calls for its service type are
1599          * already executing */
1600         /* One thread will process calls FCFS (to prevent starvation),
1601          * while the other threads may run ahead looking for calls which
1602          * have all their input data available immediately.  This helps 
1603          * keep threads from blocking, waiting for data from the client. */
1604         choice2 = (struct rx_call *) 0;
1605         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1606           service = tcall->conn->service;
1607           if (QuotaOK(service)) {
1608              if (!tno || !tcall->queue_item_header.next  ) {
1609                  /* If we're thread 0, then  we'll just use 
1610                   * this call. If we haven't been able to find an optimal 
1611                   * choice, and we're at the end of the list, then use a 
1612                   * 2d choice if one has been identified.  Otherwise... */
1613                  call = (choice2 ? choice2 : tcall);
1614                  service = call->conn->service;
1615              } else if (!queue_IsEmpty(&tcall->rq)) {
1616                  struct rx_packet *rp;
1617                  rp = queue_First(&tcall->rq, rx_packet);
1618                  if (rp->header.seq == 1
1619                      && (!meltdown_1pkt ||
1620                          (rp->header.flags & RX_LAST_PACKET))) {
1621                      call = tcall;
1622                  } else if (rxi_2dchoice && !choice2 &&
1623                             !(tcall->flags & RX_CALL_CLEARED) &&
1624                             (tcall->rprev > rxi_HardAckRate)) {
1625                      choice2 = tcall;
1626                  } else rxi_md2cnt++;
1627              }
1628           }
1629           if (call) 
1630              break;
1631         }
1632       }
1633
1634     if (call) {
1635         queue_Remove(call);
1636         /* we can't schedule a call if there's no data!!! */
1637         /* send an ack if there's no data, if we're missing the
1638          * first packet, or we're missing something between first 
1639          * and last -- there's a "hole" in the incoming data. */
1640         if (queue_IsEmpty(&call->rq) ||
1641             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1642             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1643           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1644
1645         call->flags &= (~RX_CALL_WAIT_PROC);
1646         service->nRequestsRunning++;
1647         /* just started call in minProcs pool, need fewer to maintain
1648          * guarantee */
1649         if (service->nRequestsRunning <= service->minProcs)
1650             rxi_minDeficit--;
1651         rxi_availProcs--;
1652         rx_nWaiting--;
1653         /* MUTEX_EXIT(&call->lock); */
1654     }
1655     else {
1656         /* If there are no eligible incoming calls, add this process
1657          * to the idle server queue, to wait for one */
1658         sq->newcall = 0;
1659         if (socketp) {
1660             *socketp = OSI_NULLSOCKET;
1661         }
1662         sq->socketp = socketp;
1663         queue_Append(&rx_idleServerQueue, sq);
1664         do {
1665             osi_rxSleep(sq);
1666 #ifdef  KERNEL
1667                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1668                     AFS_RXGUNLOCK();
1669                     USERPRI;
1670                     return (struct rx_call *)0;
1671                 }
1672 #endif
1673         } while (!(call = sq->newcall) &&
1674                  !(socketp && *socketp != OSI_NULLSOCKET));
1675     }
1676     MUTEX_EXIT(&sq->lock);
1677
1678     MUTEX_ENTER(&freeSQEList_lock);
1679     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1680     rx_FreeSQEList = sq;
1681     MUTEX_EXIT(&freeSQEList_lock);
1682
1683     if (call) {
1684         clock_GetTime(&call->startTime);
1685         call->state = RX_STATE_ACTIVE;
1686         call->mode = RX_MODE_RECEIVING;
1687
1688         rxi_calltrace(RX_CALL_START, call);
1689         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1690          call->conn->service->servicePort, 
1691          call->conn->service->serviceId, call));
1692     } else {
1693         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1694     }
1695
1696     AFS_RXGUNLOCK();
1697     USERPRI;
1698
1699     return call;
1700 }
1701 #endif /* RX_ENABLE_LOCKS */
1702
1703
1704
1705 /* Establish a procedure to be called when a packet arrives for a
1706  * call.  This routine will be called at most once after each call,
1707  * and will also be called if there is an error condition on the or
1708  * the call is complete.  Used by multi rx to build a selection
1709  * function which determines which of several calls is likely to be a
1710  * good one to read from.  
1711  * NOTE: the way this is currently implemented it is probably only a
1712  * good idea to (1) use it immediately after a newcall (clients only)
1713  * and (2) only use it once.  Other uses currently void your warranty
1714  */
1715 void rx_SetArrivalProc(call, proc, handle, arg)
1716     register struct rx_call *call;
1717     register VOID (*proc)();
1718     register VOID *handle;
1719     register VOID *arg;
1720 {
1721     call->arrivalProc = proc;
1722     call->arrivalProcHandle = handle;
1723     call->arrivalProcArg = arg;
1724 }
1725
1726 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1727  * appropriate, and return the final error code from the conversation
1728  * to the caller */
1729
1730 afs_int32 rx_EndCall(call, rc)
1731     register struct rx_call *call;
1732     afs_int32 rc;
1733 {
1734     register struct rx_connection *conn = call->conn;
1735     register struct rx_service *service;
1736     register struct rx_packet *tp; /* Temporary packet pointer */
1737     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1738     afs_int32 error;
1739     SPLVAR;
1740
1741     dpf(("rx_EndCall(call %x)\n", call));
1742
1743     NETPRI;
1744     AFS_RXGLOCK();
1745     MUTEX_ENTER(&call->lock);
1746
1747     if (rc == 0 && call->error == 0) {
1748         call->abortCode = 0;
1749         call->abortCount = 0;
1750     }
1751
1752     call->arrivalProc = (VOID (*)()) 0;
1753     if (rc && call->error == 0) {
1754         rxi_CallError(call, rc);
1755         /* Send an abort message to the peer if this error code has
1756          * only just been set.  If it was set previously, assume the
1757          * peer has already been sent the error code or will request it 
1758          */
1759         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1760     }
1761     if (conn->type == RX_SERVER_CONNECTION) {
1762         /* Make sure reply or at least dummy reply is sent */
1763         if (call->mode == RX_MODE_RECEIVING) {
1764             rxi_WriteProc(call, 0, 0);
1765         }
1766         if (call->mode == RX_MODE_SENDING) {
1767             rxi_FlushWrite(call);
1768         }
1769         service = conn->service;
1770         rxi_calltrace(RX_CALL_END, call);
1771         /* Call goes to hold state until reply packets are acknowledged */
1772         if (call->tfirst + call->nSoftAcked < call->tnext) {
1773             call->state = RX_STATE_HOLD;
1774         } else {
1775             call->state = RX_STATE_DALLY;
1776             rxi_ClearTransmitQueue(call, 0);
1777             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1778             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1779         }
1780     }
1781     else { /* Client connection */
1782         char dummy;
1783         /* Make sure server receives input packets, in the case where
1784          * no reply arguments are expected */
1785         if ((call->mode == RX_MODE_SENDING)
1786          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1787             (void) rxi_ReadProc(call, &dummy, 1);
1788         }
1789         /* We need to release the call lock since it's lower than the
1790          * conn_call_lock and we don't want to hold the conn_call_lock
1791          * over the rx_ReadProc call. The conn_call_lock needs to be held
1792          * here for the case where rx_NewCall is perusing the calls on
1793          * the connection structure. We don't want to signal until
1794          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1795          * have checked this call, found it active and by the time it
1796          * goes to sleep, will have missed the signal.
1797          */
1798         MUTEX_EXIT(&call->lock);
1799         MUTEX_ENTER(&conn->conn_call_lock);
1800         MUTEX_ENTER(&call->lock);
1801         MUTEX_ENTER(&conn->conn_data_lock);
1802         conn->flags |= RX_CONN_BUSY;
1803         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1804             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1805             MUTEX_EXIT(&conn->conn_data_lock);
1806 #ifdef  RX_ENABLE_LOCKS
1807             CV_BROADCAST(&conn->conn_call_cv);
1808 #else
1809             osi_rxWakeup(conn);
1810 #endif
1811         }
1812 #ifdef RX_ENABLE_LOCKS
1813         else {
1814             MUTEX_EXIT(&conn->conn_data_lock);
1815         }
1816 #endif /* RX_ENABLE_LOCKS */
1817         call->state = RX_STATE_DALLY;
1818     }
1819     error = call->error;
1820
1821     /* currentPacket, nLeft, and NFree must be zeroed here, because
1822      * ResetCall cannot: ResetCall may be called at splnet(), in the
1823      * kernel version, and may interrupt the macros rx_Read or
1824      * rx_Write, which run at normal priority for efficiency. */
1825     if (call->currentPacket) {
1826         rxi_FreePacket(call->currentPacket);
1827         call->currentPacket = (struct rx_packet *) 0;
1828         call->nLeft = call->nFree = call->curlen = 0;
1829     }
1830     else
1831         call->nLeft = call->nFree = call->curlen = 0;
1832
1833     /* Free any packets from the last call to ReadvProc/WritevProc */
1834     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1835         queue_Remove(tp);
1836         rxi_FreePacket(tp);
1837     }
1838
1839     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1840     MUTEX_EXIT(&call->lock);
1841     if (conn->type == RX_CLIENT_CONNECTION) {
1842         MUTEX_EXIT(&conn->conn_call_lock);
1843         conn->flags &= ~RX_CONN_BUSY;
1844     }
1845     AFS_RXGUNLOCK();
1846     USERPRI;
1847     /*
1848      * Map errors to the local host's errno.h format.
1849      */
1850     error = ntoh_syserr_conv(error);
1851     return error;
1852 }
1853
1854 #if !defined(KERNEL)
1855
1856 /* Call this routine when shutting down a server or client (especially
1857  * clients).  This will allow Rx to gracefully garbage collect server
1858  * connections, and reduce the number of retries that a server might
1859  * make to a dead client.
1860  * This is not quite right, since some calls may still be ongoing and
1861  * we can't lock them to destroy them. */
1862 void rx_Finalize() {
1863     register struct rx_connection **conn_ptr, **conn_end;
1864
1865     INIT_PTHREAD_LOCKS
1866     LOCK_RX_INIT
1867     if (rxinit_status == 1) {
1868         UNLOCK_RX_INIT
1869         return; /* Already shutdown. */
1870     }
1871     rxi_DeleteCachedConnections();
1872     if (rx_connHashTable) {
1873         MUTEX_ENTER(&rx_connHashTable_lock);
1874         for (conn_ptr = &rx_connHashTable[0], 
1875              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1876              conn_ptr < conn_end; conn_ptr++) {
1877             struct rx_connection *conn, *next;
1878             for (conn = *conn_ptr; conn; conn = next) {
1879                 next = conn->next;
1880                 if (conn->type == RX_CLIENT_CONNECTION) {
1881                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1882                     conn->refCount++;
1883                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1884 #ifdef RX_ENABLE_LOCKS
1885                     rxi_DestroyConnectionNoLock(conn);
1886 #else /* RX_ENABLE_LOCKS */
1887                     rxi_DestroyConnection(conn);
1888 #endif /* RX_ENABLE_LOCKS */
1889                 }
1890             }
1891         }
1892 #ifdef RX_ENABLE_LOCKS
1893         while (rx_connCleanup_list) {
1894             struct rx_connection *conn;
1895             conn = rx_connCleanup_list;
1896             rx_connCleanup_list = rx_connCleanup_list->next;
1897             MUTEX_EXIT(&rx_connHashTable_lock);
1898             rxi_CleanupConnection(conn);
1899             MUTEX_ENTER(&rx_connHashTable_lock);
1900         }
1901         MUTEX_EXIT(&rx_connHashTable_lock);
1902 #endif /* RX_ENABLE_LOCKS */
1903     }
1904     rxi_flushtrace();
1905
1906     rxinit_status = 1;
1907     UNLOCK_RX_INIT
1908 }
1909 #endif
1910
1911 /* if we wakeup packet waiter too often, can get in loop with two
1912     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1913 void
1914 rxi_PacketsUnWait() {
1915
1916     if (!rx_waitingForPackets) {
1917         return;
1918     }
1919 #ifdef KERNEL
1920     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1921         return;                                     /* still over quota */
1922     }
1923 #endif /* KERNEL */
1924     rx_waitingForPackets = 0;
1925 #ifdef  RX_ENABLE_LOCKS
1926     CV_BROADCAST(&rx_waitingForPackets_cv);
1927 #else
1928     osi_rxWakeup(&rx_waitingForPackets);
1929 #endif
1930     return;
1931 }
1932
1933
1934 /* ------------------Internal interfaces------------------------- */
1935
1936 /* Return this process's service structure for the
1937  * specified socket and service */
1938 struct rx_service *rxi_FindService(socket, serviceId)
1939     register osi_socket socket;
1940     register u_short serviceId;
1941 {
1942     register struct rx_service **sp;    
1943     for (sp = &rx_services[0]; *sp; sp++) {
1944         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1945           return *sp;
1946     }
1947     return 0;
1948 }
1949
1950 /* Allocate a call structure, for the indicated channel of the
1951  * supplied connection.  The mode and state of the call must be set by
1952  * the caller. Returns the call with mutex locked. */
1953 struct rx_call *rxi_NewCall(conn, channel)
1954     register struct rx_connection *conn;
1955     register int channel;
1956 {
1957     register struct rx_call *call;
1958 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1959     register struct rx_call *cp;        /* Call pointer temp */
1960     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1961 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1962
1963     /* Grab an existing call structure, or allocate a new one.
1964      * Existing call structures are assumed to have been left reset by
1965      * rxi_FreeCall */
1966     MUTEX_ENTER(&rx_freeCallQueue_lock);
1967
1968 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1969     /*
1970      * EXCEPT that the TQ might not yet be cleared out.
1971      * Skip over those with in-use TQs.
1972      */
1973     call = NULL;
1974     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1975         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1976             call = cp;
1977             break;
1978         }
1979     }
1980     if (call) {
1981 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1982     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1983         call = queue_First(&rx_freeCallQueue, rx_call);
1984 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1985         queue_Remove(call);
1986         MUTEX_ENTER(&rx_stats_mutex);
1987         rx_stats.nFreeCallStructs--;
1988         MUTEX_EXIT(&rx_stats_mutex);
1989         MUTEX_EXIT(&rx_freeCallQueue_lock);
1990         MUTEX_ENTER(&call->lock);
1991         CLEAR_CALL_QUEUE_LOCK(call);
1992 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1993         /* Now, if TQ wasn't cleared earlier, do it now. */
1994         if (call->flags & RX_CALL_TQ_CLEARME) {
1995             rxi_ClearTransmitQueue(call, 0);
1996             queue_Init(&call->tq);
1997         }
1998 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1999         /* Bind the call to its connection structure */
2000         call->conn = conn;
2001         rxi_ResetCall(call, 1);
2002     }
2003     else {
2004         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
2005
2006         MUTEX_EXIT(&rx_freeCallQueue_lock);
2007         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
2008         MUTEX_ENTER(&call->lock);
2009         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
2010         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
2011         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
2012
2013         MUTEX_ENTER(&rx_stats_mutex);
2014         rx_stats.nCallStructs++;
2015         MUTEX_EXIT(&rx_stats_mutex);
2016         /* Initialize once-only items */
2017         queue_Init(&call->tq);
2018         queue_Init(&call->rq);
2019         queue_Init(&call->iovq);
2020         /* Bind the call to its connection structure (prereq for reset) */
2021         call->conn = conn;
2022         rxi_ResetCall(call, 1);
2023     }
2024     call->channel = channel;
2025     call->callNumber = &conn->callNumber[channel];
2026     /* Note that the next expected call number is retained (in
2027      * conn->callNumber[i]), even if we reallocate the call structure
2028      */
2029     conn->call[channel] = call;
2030     /* if the channel's never been used (== 0), we should start at 1, otherwise
2031         the call number is valid from the last time this channel was used */
2032     if (*call->callNumber == 0) *call->callNumber = 1;
2033
2034     return call;
2035 }
2036
2037 /* A call has been inactive long enough that so we can throw away
2038  * state, including the call structure, which is placed on the call
2039  * free list.
2040  * Call is locked upon entry.
2041  */
2042 #ifdef RX_ENABLE_LOCKS
2043 void rxi_FreeCall(call, haveCTLock)
2044     int haveCTLock; /* Set if called from rxi_ReapConnections */
2045 #else /* RX_ENABLE_LOCKS */
2046 void rxi_FreeCall(call)
2047 #endif /* RX_ENABLE_LOCKS */
2048     register struct rx_call *call;
2049 {
2050     register int channel = call->channel;
2051     register struct rx_connection *conn = call->conn;
2052
2053
2054     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2055       (*call->callNumber)++;
2056     rxi_ResetCall(call, 0);
2057     call->conn->call[channel] = (struct rx_call *) 0;
2058
2059     MUTEX_ENTER(&rx_freeCallQueue_lock);
2060     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2061 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2062     /* A call may be free even though its transmit queue is still in use.
2063      * Since we search the call list from head to tail, put busy calls at
2064      * the head of the list, and idle calls at the tail.
2065      */
2066     if (call->flags & RX_CALL_TQ_BUSY)
2067         queue_Prepend(&rx_freeCallQueue, call);
2068     else
2069         queue_Append(&rx_freeCallQueue, call);
2070 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2071     queue_Append(&rx_freeCallQueue, call);
2072 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2073     MUTEX_ENTER(&rx_stats_mutex);
2074     rx_stats.nFreeCallStructs++;
2075     MUTEX_EXIT(&rx_stats_mutex);
2076
2077     MUTEX_EXIT(&rx_freeCallQueue_lock);
2078  
2079     /* Destroy the connection if it was previously slated for
2080      * destruction, i.e. the Rx client code previously called
2081      * rx_DestroyConnection (client connections), or
2082      * rxi_ReapConnections called the same routine (server
2083      * connections).  Only do this, however, if there are no
2084      * outstanding calls. Note that for fine grain locking, there appears
2085      * to be a deadlock in that rxi_FreeCall has a call locked and
2086      * DestroyConnectionNoLock locks each call in the conn. But note a
2087      * few lines up where we have removed this call from the conn.
2088      * If someone else destroys a connection, they either have no
2089      * call lock held or are going through this section of code.
2090      */
2091     if (conn->flags & RX_CONN_DESTROY_ME) {
2092         MUTEX_ENTER(&conn->conn_data_lock);
2093         conn->refCount++;
2094         MUTEX_EXIT(&conn->conn_data_lock);
2095 #ifdef RX_ENABLE_LOCKS
2096         if (haveCTLock)
2097             rxi_DestroyConnectionNoLock(conn);
2098         else
2099             rxi_DestroyConnection(conn);
2100 #else /* RX_ENABLE_LOCKS */
2101         rxi_DestroyConnection(conn);
2102 #endif /* RX_ENABLE_LOCKS */
2103     }
2104 }
2105
2106 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2107 char *rxi_Alloc(size)
2108 register size_t size;
2109 {
2110     register char *p;
2111
2112 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2113     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2114      * implementation.
2115      */
2116     int glockOwner = ISAFS_GLOCK();
2117     if (!glockOwner)
2118         AFS_GLOCK();
2119 #endif
2120     MUTEX_ENTER(&rx_stats_mutex);
2121     rxi_Alloccnt++; rxi_Allocsize += size;
2122     MUTEX_EXIT(&rx_stats_mutex);
2123 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2124     if (size > AFS_SMALLOCSIZ) {
2125         p = (char *) osi_AllocMediumSpace(size);
2126     } else
2127         p = (char *) osi_AllocSmall(size, 1);
2128 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2129     if (!glockOwner)
2130         AFS_GUNLOCK();
2131 #endif
2132 #else
2133     p = (char *) osi_Alloc(size);
2134 #endif
2135     if (!p) osi_Panic("rxi_Alloc error");
2136     memset(p, 0, size);
2137     return p;
2138 }
2139
2140 void rxi_Free(addr, size)
2141 void *addr;
2142 register size_t size;
2143 {
2144 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2145     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2146      * implementation.
2147      */
2148     int glockOwner = ISAFS_GLOCK();
2149     if (!glockOwner)
2150         AFS_GLOCK();
2151 #endif
2152     MUTEX_ENTER(&rx_stats_mutex);
2153     rxi_Alloccnt--; rxi_Allocsize -= size;
2154     MUTEX_EXIT(&rx_stats_mutex);
2155 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2156     if (size > AFS_SMALLOCSIZ)
2157         osi_FreeMediumSpace(addr);
2158     else
2159         osi_FreeSmall(addr);
2160 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2161     if (!glockOwner)
2162         AFS_GUNLOCK();
2163 #endif
2164 #else
2165     osi_Free(addr, size);
2166 #endif    
2167 }
2168
2169 /* Find the peer process represented by the supplied (host,port)
2170  * combination.  If there is no appropriate active peer structure, a
2171  * new one will be allocated and initialized 
2172  * The origPeer, if set, is a pointer to a peer structure on which the
2173  * refcount will be be decremented. This is used to replace the peer
2174  * structure hanging off a connection structure */
2175 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2176     register afs_uint32 host;
2177     register u_short port;
2178     struct rx_peer *origPeer;
2179     int create;
2180 {
2181     register struct rx_peer *pp;
2182     int hashIndex;
2183     hashIndex = PEER_HASH(host, port);
2184     MUTEX_ENTER(&rx_peerHashTable_lock);
2185     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2186         if ((pp->host == host) && (pp->port == port)) break;
2187     }
2188     if (!pp) {
2189         if (create) {
2190             pp = rxi_AllocPeer(); /* This bzero's *pp */
2191             pp->host = host;      /* set here or in InitPeerParams is zero */
2192             pp->port = port;
2193             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2194             queue_Init(&pp->congestionQueue);
2195             queue_Init(&pp->rpcStats);
2196             pp->next = rx_peerHashTable[hashIndex];
2197             rx_peerHashTable[hashIndex] = pp;
2198             rxi_InitPeerParams(pp);
2199             MUTEX_ENTER(&rx_stats_mutex);
2200             rx_stats.nPeerStructs++;
2201             MUTEX_EXIT(&rx_stats_mutex);
2202         }
2203     }
2204     if (pp && create) {
2205         pp->refCount++;
2206     }
2207     if ( origPeer)
2208         origPeer->refCount--;
2209     MUTEX_EXIT(&rx_peerHashTable_lock);
2210     return pp;
2211 }
2212
2213
2214 /* Find the connection at (host, port) started at epoch, and with the
2215  * given connection id.  Creates the server connection if necessary.
2216  * The type specifies whether a client connection or a server
2217  * connection is desired.  In both cases, (host, port) specify the
2218  * peer's (host, pair) pair.  Client connections are not made
2219  * automatically by this routine.  The parameter socket gives the
2220  * socket descriptor on which the packet was received.  This is used,
2221  * in the case of server connections, to check that *new* connections
2222  * come via a valid (port, serviceId).  Finally, the securityIndex
2223  * parameter must match the existing index for the connection.  If a
2224  * server connection is created, it will be created using the supplied
2225  * index, if the index is valid for this service */
2226 struct rx_connection *
2227 rxi_FindConnection(socket, host, port, serviceId, cid, 
2228                    epoch, type, securityIndex)
2229     osi_socket socket;
2230     register afs_int32 host;
2231     register u_short port;
2232     u_short serviceId;
2233     afs_uint32 cid;
2234     afs_uint32 epoch;
2235     int type;
2236     u_int securityIndex;
2237 {
2238     int hashindex, flag;
2239     register struct rx_connection *conn;
2240     struct rx_peer *peer;
2241     hashindex = CONN_HASH(host, port, cid, epoch, type);
2242     MUTEX_ENTER(&rx_connHashTable_lock);
2243     rxLastConn ? (conn = rxLastConn, flag = 0) :
2244                  (conn = rx_connHashTable[hashindex], flag = 1);
2245     for (; conn; ) {
2246       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2247           && (epoch == conn->epoch)) {
2248         register struct rx_peer *pp = conn->peer;
2249         if (securityIndex != conn->securityIndex) {
2250             /* this isn't supposed to happen, but someone could forge a packet
2251                like this, and there seems to be some CM bug that makes this
2252                happen from time to time -- in which case, the fileserver
2253                asserts. */  
2254             MUTEX_EXIT(&rx_connHashTable_lock);
2255             return (struct rx_connection *) 0;
2256         }
2257         /* epoch's high order bits mean route for security reasons only on
2258          * the cid, not the host and port fields.
2259          */
2260         if (conn->epoch & 0x80000000) break;
2261         if (((type == RX_CLIENT_CONNECTION) 
2262              || (pp->host == host)) && (pp->port == port))
2263           break;
2264       }
2265       if ( !flag )
2266       {
2267         /* the connection rxLastConn that was used the last time is not the
2268         ** one we are looking for now. Hence, start searching in the hash */
2269         flag = 1;
2270         conn = rx_connHashTable[hashindex];
2271       }
2272       else
2273         conn = conn->next;
2274     }
2275     if (!conn) {
2276         struct rx_service *service;
2277         if (type == RX_CLIENT_CONNECTION) {
2278             MUTEX_EXIT(&rx_connHashTable_lock);
2279             return (struct rx_connection *) 0;
2280         }
2281         service = rxi_FindService(socket, serviceId);
2282         if (!service || (securityIndex >= service->nSecurityObjects) 
2283             || (service->securityObjects[securityIndex] == 0)) {
2284             MUTEX_EXIT(&rx_connHashTable_lock);
2285             return (struct rx_connection *) 0;
2286         }
2287         conn = rxi_AllocConnection(); /* This bzero's the connection */
2288         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2289                    MUTEX_DEFAULT,0);
2290         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2291                    MUTEX_DEFAULT,0);
2292         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2293         conn->next = rx_connHashTable[hashindex];
2294         rx_connHashTable[hashindex] = conn;
2295         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2296         conn->type = RX_SERVER_CONNECTION;
2297         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2298         conn->epoch = epoch;
2299         conn->cid = cid & RX_CIDMASK;
2300         /* conn->serial = conn->lastSerial = 0; */
2301         /* conn->timeout = 0; */
2302         conn->ackRate = RX_FAST_ACK_RATE;
2303         conn->service = service;
2304         conn->serviceId = serviceId;
2305         conn->securityIndex = securityIndex;
2306         conn->securityObject = service->securityObjects[securityIndex];
2307         conn->nSpecific = 0;
2308         conn->specific = NULL;
2309         rx_SetConnDeadTime(conn, service->connDeadTime);
2310         /* Notify security object of the new connection */
2311         RXS_NewConnection(conn->securityObject, conn);
2312         /* XXXX Connection timeout? */
2313         if (service->newConnProc) (*service->newConnProc)(conn);
2314         MUTEX_ENTER(&rx_stats_mutex);
2315         rx_stats.nServerConns++;
2316         MUTEX_EXIT(&rx_stats_mutex);
2317     }
2318     else
2319     {
2320     /* Ensure that the peer structure is set up in such a way that
2321     ** replies in this connection go back to that remote interface
2322     ** from which the last packet was sent out. In case, this packet's
2323     ** source IP address does not match the peer struct for this conn,
2324     ** then drop the refCount on conn->peer and get a new peer structure.
2325     ** We can check the host,port field in the peer structure without the
2326     ** rx_peerHashTable_lock because the peer structure has its refCount
2327     ** incremented and the only time the host,port in the peer struct gets
2328     ** updated is when the peer structure is created.
2329     */
2330         if (conn->peer->host == host )
2331                 peer = conn->peer; /* no change to the peer structure */
2332         else
2333                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2334     }
2335
2336     MUTEX_ENTER(&conn->conn_data_lock);
2337     conn->refCount++;
2338     conn->peer = peer;
2339     MUTEX_EXIT(&conn->conn_data_lock);
2340
2341     rxLastConn = conn;  /* store this connection as the last conn used */
2342     MUTEX_EXIT(&rx_connHashTable_lock);
2343     return conn;
2344 }
2345
2346 /* There are two packet tracing routines available for testing and monitoring
2347  * Rx.  One is called just after every packet is received and the other is
2348  * called just before every packet is sent.  Received packets, have had their
2349  * headers decoded, and packets to be sent have not yet had their headers
2350  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2351  * containing the network address.  Both can be modified.  The return value, if
2352  * non-zero, indicates that the packet should be dropped.  */
2353
2354 int (*rx_justReceived)() = 0;
2355 int (*rx_almostSent)() = 0;
2356
2357 /* A packet has been received off the interface.  Np is the packet, socket is
2358  * the socket number it was received from (useful in determining which service
2359  * this packet corresponds to), and (host, port) reflect the host,port of the
2360  * sender.  This call returns the packet to the caller if it is finished with
2361  * it, rather than de-allocating it, just as a small performance hack */
2362
2363 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2364     register struct rx_packet *np;
2365     osi_socket socket;
2366     afs_uint32 host;
2367     u_short port;
2368     int *tnop;
2369     struct rx_call **newcallp;
2370 {
2371     register struct rx_call *call;
2372     register struct rx_connection *conn;
2373     int channel;
2374     afs_uint32 currentCallNumber;
2375     int type;
2376     int skew;
2377 #ifdef RXDEBUG
2378     char *packetType;
2379 #endif
2380     struct rx_packet *tnp;
2381
2382 #ifdef RXDEBUG
2383 /* We don't print out the packet until now because (1) the time may not be
2384  * accurate enough until now in the lwp implementation (rx_Listener only gets
2385  * the time after the packet is read) and (2) from a protocol point of view,
2386  * this is the first time the packet has been seen */
2387     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2388         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2389     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2390          np->header.serial, packetType, host, port, np->header.serviceId,
2391          np->header.epoch, np->header.cid, np->header.callNumber, 
2392          np->header.seq, np->header.flags, np));
2393 #endif
2394
2395     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2396       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2397     }
2398
2399     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2400         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2401     }
2402 #ifdef RXDEBUG
2403     /* If an input tracer function is defined, call it with the packet and
2404      * network address.  Note this function may modify its arguments. */
2405     if (rx_justReceived) {
2406         struct sockaddr_in addr;
2407         int drop;
2408         addr.sin_family = AF_INET;
2409         addr.sin_port = port;
2410         addr.sin_addr.s_addr = host;
2411 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2412         addr.sin_len = sizeof(addr);
2413 #endif  /* AFS_OSF_ENV */
2414         drop = (*rx_justReceived) (np, &addr);
2415         /* drop packet if return value is non-zero */
2416         if (drop) return np;
2417         port = addr.sin_port;           /* in case fcn changed addr */
2418         host = addr.sin_addr.s_addr;
2419     }
2420 #endif
2421
2422     /* If packet was not sent by the client, then *we* must be the client */
2423     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2424         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2425
2426     /* Find the connection (or fabricate one, if we're the server & if
2427      * necessary) associated with this packet */
2428     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2429                               np->header.cid, np->header.epoch, type, 
2430                               np->header.securityIndex);
2431
2432     if (!conn) {
2433       /* If no connection found or fabricated, just ignore the packet.
2434        * (An argument could be made for sending an abort packet for
2435        * the conn) */
2436       return np;
2437     }   
2438
2439     MUTEX_ENTER(&conn->conn_data_lock);
2440     if (conn->maxSerial < np->header.serial)
2441       conn->maxSerial = np->header.serial;
2442     MUTEX_EXIT(&conn->conn_data_lock);
2443
2444     /* If the connection is in an error state, send an abort packet and ignore
2445      * the incoming packet */
2446     if (conn->error) {
2447         /* Don't respond to an abort packet--we don't want loops! */
2448         MUTEX_ENTER(&conn->conn_data_lock);
2449         if (np->header.type != RX_PACKET_TYPE_ABORT)
2450             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2451         conn->refCount--;
2452         MUTEX_EXIT(&conn->conn_data_lock);
2453         return np;
2454     }
2455
2456     /* Check for connection-only requests (i.e. not call specific). */
2457     if (np->header.callNumber == 0) {
2458         switch (np->header.type) {
2459             case RX_PACKET_TYPE_ABORT:
2460                 /* What if the supplied error is zero? */
2461                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2462                 MUTEX_ENTER(&conn->conn_data_lock);
2463                 conn->refCount--;
2464                 MUTEX_EXIT(&conn->conn_data_lock);
2465                 return np;
2466             case RX_PACKET_TYPE_CHALLENGE:
2467                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2468                 MUTEX_ENTER(&conn->conn_data_lock);
2469                 conn->refCount--;
2470                 MUTEX_EXIT(&conn->conn_data_lock);
2471                 return tnp;
2472             case RX_PACKET_TYPE_RESPONSE:
2473                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2474                 MUTEX_ENTER(&conn->conn_data_lock);
2475                 conn->refCount--;
2476                 MUTEX_EXIT(&conn->conn_data_lock);
2477                 return tnp;
2478             case RX_PACKET_TYPE_PARAMS:
2479             case RX_PACKET_TYPE_PARAMS+1:
2480             case RX_PACKET_TYPE_PARAMS+2:
2481                 /* ignore these packet types for now */
2482                 MUTEX_ENTER(&conn->conn_data_lock);
2483                 conn->refCount--;
2484                 MUTEX_EXIT(&conn->conn_data_lock);
2485                 return np;
2486
2487
2488             default:
2489                 /* Should not reach here, unless the peer is broken: send an
2490                  * abort packet */
2491                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2492                 MUTEX_ENTER(&conn->conn_data_lock);
2493                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2494                 conn->refCount--;
2495                 MUTEX_EXIT(&conn->conn_data_lock);
2496                 return tnp;
2497         }
2498     }
2499
2500     channel = np->header.cid & RX_CHANNELMASK;
2501     call = conn->call[channel];
2502 #ifdef  RX_ENABLE_LOCKS
2503     if (call)
2504         MUTEX_ENTER(&call->lock);
2505     /* Test to see if call struct is still attached to conn. */
2506     if (call != conn->call[channel]) {
2507         if (call)
2508             MUTEX_EXIT(&call->lock);
2509         if (type == RX_SERVER_CONNECTION) {
2510             call = conn->call[channel];
2511             /* If we started with no call attached and there is one now,
2512              * another thread is also running this routine and has gotten
2513              * the connection channel. We should drop this packet in the tests
2514              * below. If there was a call on this connection and it's now
2515              * gone, then we'll be making a new call below.
2516              * If there was previously a call and it's now different then
2517              * the old call was freed and another thread running this routine
2518              * has created a call on this channel. One of these two threads
2519              * has a packet for the old call and the code below handles those
2520              * cases.
2521              */
2522             if (call)
2523                 MUTEX_ENTER(&call->lock);
2524         }
2525         else {
2526             /* This packet can't be for this call. If the new call address is
2527              * 0 then no call is running on this channel. If there is a call
2528              * then, since this is a client connection we're getting data for
2529              * it must be for the previous call.
2530              */
2531             MUTEX_ENTER(&rx_stats_mutex);
2532             rx_stats.spuriousPacketsRead++;
2533             MUTEX_EXIT(&rx_stats_mutex);
2534             MUTEX_ENTER(&conn->conn_data_lock);
2535             conn->refCount--;
2536             MUTEX_EXIT(&conn->conn_data_lock);
2537             return np;
2538         }
2539     }
2540 #endif
2541     currentCallNumber = conn->callNumber[channel];
2542
2543     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2544         if (np->header.callNumber < currentCallNumber) {
2545             MUTEX_ENTER(&rx_stats_mutex);
2546             rx_stats.spuriousPacketsRead++;
2547             MUTEX_EXIT(&rx_stats_mutex);
2548 #ifdef  RX_ENABLE_LOCKS
2549             if (call)
2550                 MUTEX_EXIT(&call->lock);
2551 #endif
2552             MUTEX_ENTER(&conn->conn_data_lock);
2553             conn->refCount--;
2554             MUTEX_EXIT(&conn->conn_data_lock);
2555             return np;
2556         }
2557         if (!call) {
2558             call = rxi_NewCall(conn, channel);
2559             *call->callNumber = np->header.callNumber;
2560             call->state = RX_STATE_PRECALL;
2561             clock_GetTime(&call->queueTime);
2562             hzero(call->bytesSent);
2563             hzero(call->bytesRcvd);
2564             rxi_KeepAliveOn(call);
2565         }
2566         else if (np->header.callNumber != currentCallNumber) {
2567             /* Wait until the transmit queue is idle before deciding
2568              * whether to reset the current call. Chances are that the
2569              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2570              * flag is cleared.
2571              */
2572 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2573             while ((call->state == RX_STATE_ACTIVE) &&
2574                    (call->flags & RX_CALL_TQ_BUSY)) {
2575                 call->flags |= RX_CALL_TQ_WAIT;
2576 #ifdef RX_ENABLE_LOCKS
2577                 CV_WAIT(&call->cv_tq, &call->lock);
2578 #else /* RX_ENABLE_LOCKS */
2579                 osi_rxSleep(&call->tq);
2580 #endif /* RX_ENABLE_LOCKS */
2581             }
2582 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2583             /* If the new call cannot be taken right now send a busy and set
2584              * the error condition in this call, so that it terminates as
2585              * quickly as possible */
2586             if (call->state == RX_STATE_ACTIVE) {
2587                 struct rx_packet *tp;
2588
2589                 rxi_CallError(call, RX_CALL_DEAD);
2590                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2591                 MUTEX_EXIT(&call->lock);
2592                 MUTEX_ENTER(&conn->conn_data_lock);
2593                 conn->refCount--;
2594                 MUTEX_EXIT(&conn->conn_data_lock);
2595                 return tp;
2596             }
2597             rxi_ResetCall(call, 0);
2598             *call->callNumber = np->header.callNumber;
2599             call->state = RX_STATE_PRECALL;
2600             clock_GetTime(&call->queueTime);
2601             hzero(call->bytesSent);
2602             hzero(call->bytesRcvd);
2603             /*
2604              * If the number of queued calls exceeds the overload
2605              * threshold then abort this call.
2606              */
2607             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2608                 struct rx_packet *tp;
2609
2610                 rxi_CallError(call, rx_BusyError);
2611                 tp = rxi_SendCallAbort(call, np, 1, 0);
2612                 MUTEX_EXIT(&call->lock);
2613                 MUTEX_ENTER(&conn->conn_data_lock);
2614                 conn->refCount--;
2615                 MUTEX_EXIT(&conn->conn_data_lock);
2616                 return tp;
2617             }
2618             rxi_KeepAliveOn(call);
2619         }
2620         else {
2621             /* Continuing call; do nothing here. */
2622         }
2623     } else { /* we're the client */
2624         /* Ignore all incoming acknowledgements for calls in DALLY state */
2625         if ( call && (call->state == RX_STATE_DALLY) 
2626          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2627             MUTEX_ENTER(&rx_stats_mutex);
2628             rx_stats.ignorePacketDally++;
2629             MUTEX_EXIT(&rx_stats_mutex);
2630 #ifdef  RX_ENABLE_LOCKS
2631             if (call) {
2632                 MUTEX_EXIT(&call->lock);
2633             }
2634 #endif
2635             MUTEX_ENTER(&conn->conn_data_lock);
2636             conn->refCount--;
2637             MUTEX_EXIT(&conn->conn_data_lock);
2638             return np;
2639         }
2640         
2641         /* Ignore anything that's not relevant to the current call.  If there
2642          * isn't a current call, then no packet is relevant. */
2643         if (!call || (np->header.callNumber != currentCallNumber)) {
2644             MUTEX_ENTER(&rx_stats_mutex);
2645             rx_stats.spuriousPacketsRead++;
2646             MUTEX_EXIT(&rx_stats_mutex);
2647 #ifdef  RX_ENABLE_LOCKS
2648             if (call) {
2649                 MUTEX_EXIT(&call->lock);
2650             }
2651 #endif
2652             MUTEX_ENTER(&conn->conn_data_lock);
2653             conn->refCount--;
2654             MUTEX_EXIT(&conn->conn_data_lock);
2655             return np;  
2656         }
2657         /* If the service security object index stamped in the packet does not
2658          * match the connection's security index, ignore the packet */
2659         if (np->header.securityIndex != conn->securityIndex) {
2660 #ifdef  RX_ENABLE_LOCKS
2661             MUTEX_EXIT(&call->lock);
2662 #endif
2663             MUTEX_ENTER(&conn->conn_data_lock);
2664             conn->refCount--;       
2665             MUTEX_EXIT(&conn->conn_data_lock);
2666             return np;
2667         }
2668
2669         /* If we're receiving the response, then all transmit packets are
2670          * implicitly acknowledged.  Get rid of them. */
2671         if (np->header.type == RX_PACKET_TYPE_DATA) {
2672 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2673             /* XXX Hack. Because we must release the global rx lock when
2674              * sending packets (osi_NetSend) we drop all acks while we're
2675              * traversing the tq in rxi_Start sending packets out because
2676              * packets may move to the freePacketQueue as result of being here!
2677              * So we drop these packets until we're safely out of the
2678              * traversing. Really ugly! 
2679              * For fine grain RX locking, we set the acked field in the
2680              * packets and let rxi_Start remove them from the transmit queue.
2681              */
2682             if (call->flags & RX_CALL_TQ_BUSY) {
2683 #ifdef  RX_ENABLE_LOCKS
2684                 rxi_SetAcksInTransmitQueue(call);
2685 #else
2686                 conn->refCount--;
2687                 return np;              /* xmitting; drop packet */
2688 #endif
2689             }
2690             else {
2691                 rxi_ClearTransmitQueue(call, 0);
2692             }
2693 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2694             rxi_ClearTransmitQueue(call, 0);
2695 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2696         } else {
2697           if (np->header.type == RX_PACKET_TYPE_ACK) {
2698         /* now check to see if this is an ack packet acknowledging that the
2699          * server actually *lost* some hard-acked data.  If this happens we
2700          * ignore this packet, as it may indicate that the server restarted in
2701          * the middle of a call.  It is also possible that this is an old ack
2702          * packet.  We don't abort the connection in this case, because this
2703          * *might* just be an old ack packet.  The right way to detect a server
2704          * restart in the midst of a call is to notice that the server epoch
2705          * changed, btw.  */
2706         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2707          * XXX unacknowledged.  I think that this is off-by-one, but
2708          * XXX I don't dare change it just yet, since it will
2709          * XXX interact badly with the server-restart detection 
2710          * XXX code in receiveackpacket.  */
2711             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2712                 MUTEX_ENTER(&rx_stats_mutex);
2713                 rx_stats.spuriousPacketsRead++;
2714                 MUTEX_EXIT(&rx_stats_mutex);
2715                 MUTEX_EXIT(&call->lock);
2716                 MUTEX_ENTER(&conn->conn_data_lock);
2717                 conn->refCount--;
2718                 MUTEX_EXIT(&conn->conn_data_lock);
2719                 return np;
2720             }
2721           }
2722         } /* else not a data packet */
2723     }
2724
2725     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2726     /* Set remote user defined status from packet */
2727     call->remoteStatus = np->header.userStatus;
2728
2729     /* Note the gap between the expected next packet and the actual
2730      * packet that arrived, when the new packet has a smaller serial number
2731      * than expected.  Rioses frequently reorder packets all by themselves,
2732      * so this will be quite important with very large window sizes.
2733      * Skew is checked against 0 here to avoid any dependence on the type of
2734      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2735      * true! 
2736      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2737      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2738      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2739      */
2740     MUTEX_ENTER(&conn->conn_data_lock);
2741     skew = conn->lastSerial - np->header.serial;
2742     conn->lastSerial = np->header.serial;
2743     MUTEX_EXIT(&conn->conn_data_lock);
2744     if (skew > 0) {
2745       register struct rx_peer *peer;
2746       peer = conn->peer;
2747       if (skew > peer->inPacketSkew) {
2748         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2749         peer->inPacketSkew = skew;
2750       }
2751     }
2752
2753     /* Now do packet type-specific processing */
2754     switch (np->header.type) {
2755         case RX_PACKET_TYPE_DATA:
2756             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2757                                        tnop, newcallp);
2758             break;
2759         case RX_PACKET_TYPE_ACK:
2760             /* Respond immediately to ack packets requesting acknowledgement
2761              * (ping packets) */
2762             if (np->header.flags & RX_REQUEST_ACK) {
2763                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2764                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2765             }
2766             np = rxi_ReceiveAckPacket(call, np, 1);
2767             break;
2768         case RX_PACKET_TYPE_ABORT:
2769             /* An abort packet: reset the connection, passing the error up to
2770              * the user */
2771             /* What if error is zero? */
2772             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2773             break;
2774         case RX_PACKET_TYPE_BUSY:
2775             /* XXXX */
2776             break;
2777         case RX_PACKET_TYPE_ACKALL:
2778             /* All packets acknowledged, so we can drop all packets previously
2779              * readied for sending */
2780 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2781             /* XXX Hack. We because we can't release the global rx lock when
2782              * sending packets (osi_NetSend) we drop all ack pkts while we're
2783              * traversing the tq in rxi_Start sending packets out because
2784              * packets may move to the freePacketQueue as result of being
2785              * here! So we drop these packets until we're safely out of the
2786              * traversing. Really ugly! 
2787              * For fine grain RX locking, we set the acked field in the packets
2788              * and let rxi_Start remove the packets from the transmit queue.
2789              */
2790             if (call->flags & RX_CALL_TQ_BUSY) {
2791 #ifdef  RX_ENABLE_LOCKS
2792                 rxi_SetAcksInTransmitQueue(call);
2793                 break;
2794 #else /* RX_ENABLE_LOCKS */
2795                 conn->refCount--;
2796                 return np;              /* xmitting; drop packet */
2797 #endif /* RX_ENABLE_LOCKS */
2798             }
2799 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2800             rxi_ClearTransmitQueue(call, 0);
2801             break;
2802         default:
2803             /* Should not reach here, unless the peer is broken: send an abort
2804              * packet */
2805             rxi_CallError(call, RX_PROTOCOL_ERROR);
2806             np = rxi_SendCallAbort(call, np, 1, 0);
2807             break;
2808     };
2809     /* Note when this last legitimate packet was received, for keep-alive
2810      * processing.  Note, we delay getting the time until now in the hope that
2811      * the packet will be delivered to the user before any get time is required
2812      * (if not, then the time won't actually be re-evaluated here). */
2813     call->lastReceiveTime = clock_Sec();
2814     MUTEX_EXIT(&call->lock);
2815     MUTEX_ENTER(&conn->conn_data_lock);
2816     conn->refCount--;
2817     MUTEX_EXIT(&conn->conn_data_lock);
2818     return np;
2819 }
2820
2821 /* return true if this is an "interesting" connection from the point of view
2822     of someone trying to debug the system */
2823 int rxi_IsConnInteresting(struct rx_connection *aconn)
2824 {
2825     register int i;
2826     register struct rx_call *tcall;
2827
2828     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2829         return 1;
2830     for(i=0;i<RX_MAXCALLS;i++) {
2831         tcall = aconn->call[i];
2832         if (tcall) {
2833             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2834                 return 1;
2835             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2836                 return 1;
2837         }
2838     }
2839     return 0;
2840 }
2841
2842 #ifdef KERNEL
2843 /* if this is one of the last few packets AND it wouldn't be used by the
2844    receiving call to immediately satisfy a read request, then drop it on
2845    the floor, since accepting it might prevent a lock-holding thread from
2846    making progress in its reading. If a call has been cleared while in
2847    the precall state then ignore all subsequent packets until the call
2848    is assigned to a thread. */
2849
2850 static TooLow(ap, acall)
2851   struct rx_call *acall;
2852   struct rx_packet *ap; {
2853     int rc=0;
2854     MUTEX_ENTER(&rx_stats_mutex);
2855     if (((ap->header.seq != 1) &&
2856          (acall->flags & RX_CALL_CLEARED) &&
2857          (acall->state == RX_STATE_PRECALL)) ||
2858         ((rx_nFreePackets < rxi_dataQuota+2) &&
2859          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2860            && (acall->flags & RX_CALL_READER_WAIT)))) {
2861         rc = 1;
2862     }
2863     MUTEX_EXIT(&rx_stats_mutex);
2864     return rc;
2865 }
2866 #endif /* KERNEL */
2867
2868 /* try to attach call, if authentication is complete */
2869 static void TryAttach(acall, socket, tnop, newcallp)
2870 register struct rx_call *acall;
2871 register osi_socket socket;
2872 register int *tnop;
2873 register struct rx_call **newcallp; {
2874     register struct rx_connection *conn;
2875     conn = acall->conn;
2876     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2877         /* Don't attach until we have any req'd. authentication. */
2878         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2879             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2880             /* Note:  this does not necessarily succeed; there
2881                may not any proc available */
2882         }
2883         else {
2884             rxi_ChallengeOn(acall->conn);
2885         }
2886     }
2887 }
2888
2889 /* A data packet has been received off the interface.  This packet is
2890  * appropriate to the call (the call is in the right state, etc.).  This
2891  * routine can return a packet to the caller, for re-use */
2892
2893 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2894                                         port, tnop, newcallp)
2895     register struct rx_call *call;
2896     register struct rx_packet *np;
2897     int istack;
2898     osi_socket socket;
2899     afs_uint32 host;
2900     u_short port;
2901     int *tnop;
2902     struct rx_call **newcallp;
2903 {
2904     int ackNeeded = 0;
2905     int newPackets = 0;
2906     int didHardAck = 0;
2907     int haveLast = 0;
2908     afs_uint32 seq, serial, flags;
2909     int isFirst;
2910     struct rx_packet *tnp;
2911     struct clock when;
2912     MUTEX_ENTER(&rx_stats_mutex);
2913     rx_stats.dataPacketsRead++;
2914     MUTEX_EXIT(&rx_stats_mutex);
2915
2916 #ifdef KERNEL
2917     /* If there are no packet buffers, drop this new packet, unless we can find
2918      * packet buffers from inactive calls */
2919     if (!call->error &&
2920         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2921         MUTEX_ENTER(&rx_freePktQ_lock);
2922         rxi_NeedMorePackets = TRUE;
2923         MUTEX_EXIT(&rx_freePktQ_lock);
2924         MUTEX_ENTER(&rx_stats_mutex);
2925         rx_stats.noPacketBuffersOnRead++;
2926         MUTEX_EXIT(&rx_stats_mutex);
2927         call->rprev = np->header.serial;
2928         rxi_calltrace(RX_TRACE_DROP, call);
2929         dpf (("packet %x dropped on receipt - quota problems", np));
2930         if (rxi_doreclaim)
2931             rxi_ClearReceiveQueue(call);
2932         clock_GetTime(&when);
2933         clock_Add(&when, &rx_softAckDelay);
2934         if (!call->delayedAckEvent ||
2935             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2936             rxevent_Cancel(call->delayedAckEvent, call,
2937                            RX_CALL_REFCOUNT_DELAY);
2938             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2939             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2940                                                  call, 0);
2941         }
2942         /* we've damaged this call already, might as well do it in. */
2943         return np;
2944     }
2945 #endif /* KERNEL */
2946
2947     /*
2948      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2949      * packet is one of several packets transmitted as a single
2950      * datagram. Do not send any soft or hard acks until all packets
2951      * in a jumbogram have been processed. Send negative acks right away.
2952      */
2953     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2954         /* tnp is non-null when there are more packets in the
2955          * current jumbo gram */
2956         if (tnp) {
2957             if (np)
2958                 rxi_FreePacket(np);
2959             np = tnp;
2960         }
2961
2962         seq = np->header.seq;
2963         serial = np->header.serial;
2964         flags = np->header.flags;
2965
2966         /* If the call is in an error state, send an abort message */
2967         if (call->error)
2968             return rxi_SendCallAbort(call, np, istack, 0);
2969
2970         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2971          * AFS 3.5 jumbogram. */
2972         if (flags & RX_JUMBO_PACKET) {
2973             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2974         } else {
2975             tnp = NULL;
2976         }
2977
2978         if (np->header.spare != 0) {
2979             MUTEX_ENTER(&call->conn->conn_data_lock);
2980             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2981             MUTEX_EXIT(&call->conn->conn_data_lock);
2982         }
2983
2984         /* The usual case is that this is the expected next packet */
2985         if (seq == call->rnext) {
2986
2987             /* Check to make sure it is not a duplicate of one already queued */
2988             if (queue_IsNotEmpty(&call->rq) 
2989                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2990                 MUTEX_ENTER(&rx_stats_mutex);
2991                 rx_stats.dupPacketsRead++;
2992                 MUTEX_EXIT(&rx_stats_mutex);
2993                 dpf (("packet %x dropped on receipt - duplicate", np));
2994                 rxevent_Cancel(call->delayedAckEvent, call,
2995                                RX_CALL_REFCOUNT_DELAY);
2996                 np = rxi_SendAck(call, np, seq, serial,
2997                                  flags, RX_ACK_DUPLICATE, istack);
2998                 ackNeeded = 0;
2999                 call->rprev = seq;
3000                 continue;
3001             }
3002
3003             /* It's the next packet. Stick it on the receive queue
3004              * for this call. Set newPackets to make sure we wake
3005              * the reader once all packets have been processed */
3006             queue_Prepend(&call->rq, np);
3007             call->nSoftAcks++;
3008             np = NULL; /* We can't use this anymore */
3009             newPackets = 1;
3010
3011             /* If an ack is requested then set a flag to make sure we
3012              * send an acknowledgement for this packet */
3013             if (flags & RX_REQUEST_ACK) {
3014                 ackNeeded = 1;
3015             }
3016
3017             /* Keep track of whether we have received the last packet */
3018             if (flags & RX_LAST_PACKET) {
3019                 call->flags |= RX_CALL_HAVE_LAST;
3020                 haveLast = 1;
3021             }
3022
3023             /* Check whether we have all of the packets for this call */
3024             if (call->flags & RX_CALL_HAVE_LAST) {
3025                 afs_uint32 tseq;                /* temporary sequence number */
3026                 struct rx_packet *tp;   /* Temporary packet pointer */
3027                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
3028
3029                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3030                     if (tseq != tp->header.seq)
3031                         break;
3032                     if (tp->header.flags & RX_LAST_PACKET) {
3033                         call->flags |= RX_CALL_RECEIVE_DONE;
3034                         break;
3035                     }
3036                     tseq++;
3037                 }
3038             }
3039
3040             /* Provide asynchronous notification for those who want it
3041              * (e.g. multi rx) */
3042             if (call->arrivalProc) {
3043                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3044                                      call->arrivalProcArg);
3045                 call->arrivalProc = (VOID (*)()) 0;
3046             }
3047
3048             /* Update last packet received */
3049             call->rprev = seq;
3050
3051             /* If there is no server process serving this call, grab
3052              * one, if available. We only need to do this once. If a
3053              * server thread is available, this thread becomes a server
3054              * thread and the server thread becomes a listener thread. */
3055             if (isFirst) {
3056                 TryAttach(call, socket, tnop, newcallp);
3057             }
3058         }       
3059         /* This is not the expected next packet. */
3060         else {
3061             /* Determine whether this is a new or old packet, and if it's
3062              * a new one, whether it fits into the current receive window.
3063              * Also figure out whether the packet was delivered in sequence.
3064              * We use the prev variable to determine whether the new packet
3065              * is the successor of its immediate predecessor in the
3066              * receive queue, and the missing flag to determine whether
3067              * any of this packets predecessors are missing.  */
3068
3069             afs_uint32 prev;            /* "Previous packet" sequence number */
3070             struct rx_packet *tp;       /* Temporary packet pointer */
3071             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3072             int missing;                /* Are any predecessors missing? */
3073
3074             /* If the new packet's sequence number has been sent to the
3075              * application already, then this is a duplicate */
3076             if (seq < call->rnext) {
3077                 MUTEX_ENTER(&rx_stats_mutex);
3078                 rx_stats.dupPacketsRead++;
3079                 MUTEX_EXIT(&rx_stats_mutex);
3080                 rxevent_Cancel(call->delayedAckEvent, call,
3081                                RX_CALL_REFCOUNT_DELAY);
3082                 np = rxi_SendAck(call, np, seq, serial,
3083                                  flags, RX_ACK_DUPLICATE, istack);
3084                 ackNeeded = 0;
3085                 call->rprev = seq;
3086                 continue;
3087             }
3088
3089             /* If the sequence number is greater than what can be
3090              * accomodated by the current window, then send a negative
3091              * acknowledge and drop the packet */
3092             if ((call->rnext + call->rwind) <= seq) {
3093                 rxevent_Cancel(call->delayedAckEvent, call,
3094                                RX_CALL_REFCOUNT_DELAY);
3095                 np = rxi_SendAck(call, np, seq, serial,
3096                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3097                 ackNeeded = 0;
3098                 call->rprev = seq;
3099                 continue;
3100             }
3101
3102             /* Look for the packet in the queue of old received packets */
3103             for (prev = call->rnext - 1, missing = 0,
3104                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3105                 /*Check for duplicate packet */
3106                 if (seq == tp->header.seq) {
3107                     MUTEX_ENTER(&rx_stats_mutex);
3108                     rx_stats.dupPacketsRead++;
3109                     MUTEX_EXIT(&rx_stats_mutex);
3110                     rxevent_Cancel(call->delayedAckEvent, call,
3111                                    RX_CALL_REFCOUNT_DELAY);
3112                     np = rxi_SendAck(call, np, seq, serial, 
3113                                      flags, RX_ACK_DUPLICATE, istack);
3114                     ackNeeded = 0;
3115                     call->rprev = seq;
3116                     goto nextloop;
3117                 }
3118                 /* If we find a higher sequence packet, break out and
3119                  * insert the new packet here. */
3120                 if (seq < tp->header.seq) break;
3121                 /* Check for missing packet */
3122                 if (tp->header.seq != prev+1) {
3123                     missing = 1;
3124                 }
3125
3126                 prev = tp->header.seq;
3127             }
3128
3129             /* Keep track of whether we have received the last packet. */
3130             if (flags & RX_LAST_PACKET) {
3131                 call->flags |= RX_CALL_HAVE_LAST;
3132             }
3133
3134             /* It's within the window: add it to the the receive queue.
3135              * tp is left by the previous loop either pointing at the
3136              * packet before which to insert the new packet, or at the
3137              * queue head if the queue is empty or the packet should be
3138              * appended. */
3139             queue_InsertBefore(tp, np);
3140             call->nSoftAcks++;
3141             np = NULL;
3142
3143             /* Check whether we have all of the packets for this call */
3144             if ((call->flags & RX_CALL_HAVE_LAST)
3145              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3146                 afs_uint32 tseq;                /* temporary sequence number */
3147
3148                 for (tseq = call->rnext,
3149                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3150                     if (tseq != tp->header.seq)
3151                         break;
3152                     if (tp->header.flags & RX_LAST_PACKET) {
3153                         call->flags |= RX_CALL_RECEIVE_DONE;
3154                         break;
3155                     }
3156                     tseq++;
3157                 }
3158             }
3159
3160             /* We need to send an ack of the packet is out of sequence, 
3161              * or if an ack was requested by the peer. */
3162             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3163                 ackNeeded = 1;
3164             }
3165
3166             /* Acknowledge the last packet for each call */
3167             if (flags & RX_LAST_PACKET) {
3168                 haveLast = 1;
3169             }
3170
3171             call->rprev = seq;
3172         }
3173 nextloop:;
3174     }
3175
3176     if (newPackets) {
3177         /*
3178          * If the receiver is waiting for an iovec, fill the iovec
3179          * using the data from the receive queue */
3180         if (call->flags & RX_CALL_IOVEC_WAIT) {
3181             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3182             /* the call may have been aborted */
3183             if (call->error) {
3184                 return NULL;
3185             }
3186             if (didHardAck) {
3187                 ackNeeded = 0;
3188             }
3189         }
3190
3191         /* Wakeup the reader if any */
3192         if ((call->flags & RX_CALL_READER_WAIT) &&
3193             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3194              (call->iovNext >= call->iovMax) ||
3195              (call->flags & RX_CALL_RECEIVE_DONE))) {
3196             call->flags &= ~RX_CALL_READER_WAIT;
3197 #ifdef  RX_ENABLE_LOCKS
3198             CV_BROADCAST(&call->cv_rq);
3199 #else
3200             osi_rxWakeup(&call->rq);
3201 #endif
3202         }
3203     }
3204
3205     /*
3206      * Send an ack when requested by the peer, or once every
3207      * rxi_SoftAckRate packets until the last packet has been
3208      * received. Always send a soft ack for the last packet in
3209      * the server's reply. */
3210     if (ackNeeded) {
3211         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3212         np = rxi_SendAck(call, np, seq, serial, flags,
3213                          RX_ACK_REQUESTED, istack);
3214     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3215         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3216         np = rxi_SendAck(call, np, seq, serial, flags,
3217                          RX_ACK_IDLE, istack);
3218     } else if (call->nSoftAcks) {
3219         clock_GetTime(&when);
3220         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3221             clock_Add(&when, &rx_lastAckDelay);
3222         } else {
3223             clock_Add(&when, &rx_softAckDelay);
3224         }
3225         if (!call->delayedAckEvent ||
3226             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3227             rxevent_Cancel(call->delayedAckEvent, call,
3228                            RX_CALL_REFCOUNT_DELAY);
3229             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3230             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3231                                                  call, 0);
3232         }
3233     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3234         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3235     }
3236
3237     return np;
3238 }
3239
3240 #ifdef  ADAPT_WINDOW
3241 static void rxi_ComputeRate();
3242 #endif
3243
3244 /* The real smarts of the whole thing.  */
3245 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3246     register struct rx_call *call;
3247     struct rx_packet *np;
3248     int istack;
3249 {
3250     struct rx_ackPacket *ap;
3251     int nAcks;
3252     register struct rx_packet *tp;
3253     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3254     register struct rx_connection *conn = call->conn;
3255     struct rx_peer *peer = conn->peer;
3256     afs_uint32 first;
3257     afs_uint32 serial;
3258     /* because there are CM's that are bogus, sending weird values for this. */
3259     afs_uint32 skew = 0;
3260     int nbytes;
3261     int missing;
3262     int acked;
3263     int nNacked = 0;
3264     int newAckCount = 0;
3265     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3266     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3267
3268     MUTEX_ENTER(&rx_stats_mutex);
3269     rx_stats.ackPacketsRead++;
3270     MUTEX_EXIT(&rx_stats_mutex);
3271     ap = (struct rx_ackPacket *) rx_DataOf(np);
3272     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3273     if (nbytes < 0)
3274       return np;       /* truncated ack packet */
3275
3276     /* depends on ack packet struct */
3277     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3278     first = ntohl(ap->firstPacket);
3279     serial = ntohl(ap->serial);
3280     /* temporarily disabled -- needs to degrade over time 
3281        skew = ntohs(ap->maxSkew); */
3282
3283     /* Ignore ack packets received out of order */
3284     if (first < call->tfirst) {
3285         return np;
3286     }
3287
3288     if (np->header.flags & RX_SLOW_START_OK) {
3289         call->flags |= RX_CALL_SLOW_START_OK;
3290     }
3291     
3292 #ifdef RXDEBUG
3293     if (rx_Log) {
3294       fprintf( rx_Log, 
3295               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3296               ap->reason, ntohl(ap->previousPacket), 
3297               (unsigned int) np->header.seq, (unsigned int) serial, 
3298               (unsigned int) skew, ntohl(ap->firstPacket));
3299         if (nAcks) {
3300             int offset;
3301             for (offset = 0; offset < nAcks; offset++) 
3302                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3303         }
3304         putc('\n', rx_Log);
3305     }
3306 #endif
3307
3308     /* if a server connection has been re-created, it doesn't remember what
3309         serial # it was up to.  An ack will tell us, since the serial field
3310         contains the largest serial received by the other side */
3311     MUTEX_ENTER(&conn->conn_data_lock);
3312     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3313         conn->serial = serial+1;
3314     }
3315     MUTEX_EXIT(&conn->conn_data_lock);
3316
3317     /* Update the outgoing packet skew value to the latest value of
3318      * the peer's incoming packet skew value.  The ack packet, of
3319      * course, could arrive out of order, but that won't affect things
3320      * much */
3321     MUTEX_ENTER(&peer->peer_lock);
3322     peer->outPacketSkew = skew;
3323
3324     /* Check for packets that no longer need to be transmitted, and
3325      * discard them.  This only applies to packets positively
3326      * acknowledged as having been sent to the peer's upper level.
3327      * All other packets must be retained.  So only packets with
3328      * sequence numbers < ap->firstPacket are candidates. */
3329     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3330         if (tp->header.seq >= first) break;
3331         call->tfirst = tp->header.seq + 1;
3332         if (tp->header.serial == serial) {
3333           /* Use RTT if not delayed by client. */
3334           if (ap->reason != RX_ACK_DELAY)
3335               rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3336 #ifdef ADAPT_WINDOW
3337           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3338 #endif
3339         }
3340         else if (tp->firstSerial == serial) {
3341             /* Use RTT if not delayed by client. */
3342             if (ap->reason != RX_ACK_DELAY)
3343                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3344 #ifdef ADAPT_WINDOW
3345           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3346 #endif
3347         }
3348 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3349     /* XXX Hack. Because we have to release the global rx lock when sending
3350      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3351      * in rxi_Start sending packets out because packets may move to the
3352      * freePacketQueue as result of being here! So we drop these packets until
3353      * we're safely out of the traversing. Really ugly! 
3354      * To make it even uglier, if we're using fine grain locking, we can
3355      * set the ack bits in the packets and have rxi_Start remove the packets
3356      * when it's done transmitting.
3357      */
3358         if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3359             newAckCount++;
3360         }
3361         if (call->flags & RX_CALL_TQ_BUSY) {
3362 #ifdef RX_ENABLE_LOCKS
3363             tp->flags |= RX_PKTFLAG_ACKED;
3364             call->flags |= RX_CALL_TQ_SOME_ACKED;
3365 #else /* RX_ENABLE_LOCKS */
3366             break;
3367 #endif /* RX_ENABLE_LOCKS */
3368         } else
3369 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3370         {
3371         queue_Remove(tp);
3372         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3373         }
3374     }
3375
3376 #ifdef ADAPT_WINDOW
3377     /* Give rate detector a chance to respond to ping requests */
3378     if (ap->reason == RX_ACK_PING_RESPONSE) {
3379         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3380     }
3381 #endif
3382
3383     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3384    
3385    /* Now go through explicit acks/nacks and record the results in
3386     * the waiting packets.  These are packets that can't be released
3387     * yet, even with a positive acknowledge.  This positive
3388     * acknowledge only means the packet has been received by the
3389     * peer, not that it will be retained long enough to be sent to
3390     * the peer's upper level.  In addition, reset the transmit timers
3391     * of any missing packets (those packets that must be missing
3392     * because this packet was out of sequence) */
3393
3394     call->nSoftAcked = 0;
3395     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3396         /* Update round trip time if the ack was stimulated on receipt
3397          * of this packet */
3398 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3399 #ifdef RX_ENABLE_LOCKS
3400         if (tp->header.seq >= first) {
3401 #endif /* RX_ENABLE_LOCKS */
3402 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3403         if (tp->header.serial == serial) {
3404             /* Use RTT if not delayed by client. */
3405             if (ap->reason != RX_ACK_DELAY)
3406                 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3407 #ifdef ADAPT_WINDOW
3408           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3409 #endif
3410         }
3411         else if ((tp->firstSerial == serial)) {
3412             /* Use RTT if not delayed by client. */
3413             if (ap->reason != RX_ACK_DELAY)
3414                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3415 #ifdef ADAPT_WINDOW
3416           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3417 #endif
3418         }
3419 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3420 #ifdef RX_ENABLE_LOCKS
3421         }
3422 #endif /* RX_ENABLE_LOCKS */
3423 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3424
3425         /* Set the acknowledge flag per packet based on the
3426          * information in the ack packet. An acknowlegded packet can
3427          * be downgraded when the server has discarded a packet it
3428          * soacked previously, or when an ack packet is received
3429          * out of sequence. */
3430         if (tp->header.seq < first) {
3431             /* Implicit ack information */
3432             if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3433                 newAckCount++;
3434             }
3435             tp->flags |= RX_PKTFLAG_ACKED;
3436         }
3437         else if (tp->header.seq < first + nAcks) {
3438             /* Explicit ack information:  set it in the packet appropriately */
3439             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3440                 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3441                     newAckCount++;
3442                     tp->flags |= RX_PKTFLAG_ACKED;
3443                 }
3444                 if (missing) {
3445                     nNacked++;
3446                 } else {
3447                     call->nSoftAcked++;
3448                 }
3449             } else {
3450                 tp->flags &= ~RX_PKTFLAG_ACKED;
3451                 missing = 1;
3452             }
3453         }
3454         else {
3455             tp->flags &= ~RX_PKTFLAG_ACKED;
3456             missing = 1;
3457         }
3458
3459         /* If packet isn't yet acked, and it has been transmitted at least 
3460          * once, reset retransmit time using latest timeout 
3461          * ie, this should readjust the retransmit timer for all outstanding 
3462          * packets...  So we don't just retransmit when we should know better*/
3463
3464         if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
3465           tp->retryTime = tp->timeSent;
3466           clock_Add(&tp->retryTime, &peer->timeout);
3467           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3468           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3469         }
3470     }
3471
3472     /* If the window has been extended by this acknowledge packet,
3473      * then wakeup a sender waiting in alloc for window space, or try
3474      * sending packets now, if he's been sitting on packets due to
3475      * lack of window space */
3476     if (call->tnext < (call->tfirst + call->twind))  {
3477 #ifdef  RX_ENABLE_LOCKS
3478         CV_SIGNAL(&call->cv_twind);
3479 #else
3480         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3481             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3482             osi_rxWakeup(&call->twind);
3483         }
3484 #endif
3485         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3486             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3487         }
3488     }
3489
3490     /* if the ack packet has a receivelen field hanging off it,
3491      * update our state */
3492     if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
3493       afs_uint32 tSize;
3494
3495       /* If the ack packet has a "recommended" size that is less than 
3496        * what I am using now, reduce my size to match */
3497       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3498                     sizeof(afs_int32), &tSize);
3499       tSize = (afs_uint32) ntohl(tSize);
3500       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3501
3502       /* Get the maximum packet size to send to this peer */
3503       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3504                     &tSize);
3505       tSize = (afs_uint32)ntohl(tSize);
3506       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3507       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3508
3509       /* sanity check - peer might have restarted with different params.
3510        * If peer says "send less", dammit, send less...  Peer should never 
3511        * be unable to accept packets of the size that prior AFS versions would
3512        * send without asking.  */
3513       if (peer->maxMTU != tSize) {
3514           peer->maxMTU = tSize;
3515           peer->MTU = MIN(tSize, peer->MTU);
3516           call->MTU = MIN(call->MTU, tSize);
3517           peer->congestSeq++;
3518       }
3519
3520       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3521           /* AFS 3.4a */
3522           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3523                         sizeof(afs_int32), &tSize);
3524           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3525           if (tSize < call->twind) {       /* smaller than our send */
3526               call->twind = tSize;         /* window, we must send less... */
3527               call->ssthresh = MIN(call->twind, call->ssthresh);
3528           }
3529
3530           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3531            * network MTU confused with the loopback MTU. Calculate the
3532            * maximum MTU here for use in the slow start code below.
3533            */
3534           maxMTU = peer->maxMTU;
3535           /* Did peer restart with older RX version? */
3536           if (peer->maxDgramPackets > 1) {
3537               peer->maxDgramPackets = 1;
3538           }
3539       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3540           /* AFS 3.5 */
3541           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3542                         sizeof(afs_int32), &tSize);
3543           tSize = (afs_uint32) ntohl(tSize);
3544           /*
3545            * As of AFS 3.5 we set the send window to match the receive window. 
3546            */
3547           if (tSize < call->twind) {
3548               call->twind = tSize;
3549               call->ssthresh = MIN(call->twind, call->ssthresh);
3550           } else if (tSize > call->twind) {
3551               call->twind = tSize;
3552           }
3553
3554           /*
3555            * As of AFS 3.5, a jumbogram is more than one fixed size
3556            * packet transmitted in a single UDP datagram. If the remote
3557            * MTU is smaller than our local MTU then never send a datagram
3558            * larger than the natural MTU.
3559            */
3560           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3561                         sizeof(afs_int32), &tSize);
3562           maxDgramPackets = (afs_uint32) ntohl(tSize);
3563           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3564           maxDgramPackets = MIN(maxDgramPackets,
3565                                 (int)(peer->ifDgramPackets));
3566           maxDgramPackets = MIN(maxDgramPackets, tSize);
3567           if (maxDgramPackets > 1) {
3568             peer->maxDgramPackets = maxDgramPackets;
3569             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3570           } else {
3571             peer->maxDgramPackets = 1;
3572             call->MTU = peer->natMTU;
3573           }
3574        } else if (peer->maxDgramPackets > 1) {
3575           /* Restarted with lower version of RX */
3576           peer->maxDgramPackets = 1;
3577        }
3578     } else if (peer->maxDgramPackets > 1 ||
3579                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3580         /* Restarted with lower version of RX */
3581         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3582         peer->natMTU = OLD_MAX_PACKET_SIZE;
3583         peer->MTU = OLD_MAX_PACKET_SIZE;
3584         peer->maxDgramPackets = 1;
3585         peer->nDgramPackets = 1;
3586         peer->congestSeq++;
3587         call->MTU = OLD_MAX_PACKET_SIZE;
3588     }
3589
3590     if (nNacked) {
3591         /*
3592          * Calculate how many datagrams were successfully received after
3593          * the first missing packet and adjust the negative ack counter
3594          * accordingly.
3595          */
3596         call->nAcks = 0;
3597         call->nNacks++;
3598         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3599         if (call->nNacks < nNacked) {
3600             call->nNacks = nNacked;
3601         }
3602     } else {
3603         if (newAckCount) {
3604             call->nAcks++;
3605         }
3606         call->nNacks = 0;
3607     }
3608
3609     if (call->flags & RX_CALL_FAST_RECOVER) {
3610         if (nNacked) {
3611             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3612         } else {
3613             call->flags &= ~RX_CALL_FAST_RECOVER;
3614             call->cwind = call->nextCwind;
3615             call->nextCwind = 0;
3616             call->nAcks = 0;
3617         }
3618         call->nCwindAcks = 0;
3619     }
3620     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3621         /* Three negative acks in a row trigger congestion recovery */
3622 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3623         MUTEX_EXIT(&peer->peer_lock);
3624         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3625                 /* someone else is waiting to start recovery */
3626                 return np;
3627         }
3628         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3629         while (call->flags & RX_CALL_TQ_BUSY) {
3630             call->flags |= RX_CALL_TQ_WAIT;
3631 #ifdef RX_ENABLE_LOCKS
3632             CV_WAIT(&call->cv_tq, &call->lock);
3633 #else /* RX_ENABLE_LOCKS */
3634             osi_rxSleep(&call->tq);
3635 #endif /* RX_ENABLE_LOCKS */
3636         }
3637         MUTEX_ENTER(&peer->peer_lock);
3638 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3639         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3640         call->flags |= RX_CALL_FAST_RECOVER;
3641         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3642         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3643                           rx_maxSendWindow);
3644         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3645         call->nextCwind = call->ssthresh;
3646         call->nAcks = 0;
3647         call->nNacks = 0;
3648         peer->MTU = call->MTU;
3649         peer->cwind = call->nextCwind;
3650         peer->nDgramPackets = call->nDgramPackets;
3651         peer->congestSeq++;
3652         call->congestSeq = peer->congestSeq;
3653         /* Reset the resend times on the packets that were nacked
3654          * so we will retransmit as soon as the window permits*/
3655         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3656             if (acked) {
3657                 if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3658                     clock_Zero(&tp->retryTime);
3659                 }
3660             } else if (tp->flags & RX_PKTFLAG_ACKED) {
3661                 acked = 1;
3662             }
3663         }
3664     } else {
3665         /* If cwind is smaller than ssthresh, then increase
3666          * the window one packet for each ack we receive (exponential
3667          * growth).
3668          * If cwind is greater than or equal to ssthresh then increase
3669          * the congestion window by one packet for each cwind acks we
3670          * receive (linear growth).  */
3671         if (call->cwind < call->ssthresh) {
3672             call->cwind = MIN((int)call->ssthresh,
3673                               (int)(call->cwind + newAckCount));
3674             call->nCwindAcks = 0;
3675         } else {
3676             call->nCwindAcks += newAckCount;
3677             if (call->nCwindAcks >= call->cwind) {
3678                 call->nCwindAcks = 0;
3679                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3680             }
3681         }
3682         /*
3683          * If we have received several acknowledgements in a row then
3684