94fe774676cd454c52ad093fc40aa0ed65862db7
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #ifdef  KERNEL
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
16 #ifndef UKERNEL
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
20 #ifdef  AFS_OSF_ENV
21 #include <net/net_globals.h>
22 #endif  /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
25 #endif
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
31 #endif
32 #ifdef RXDEBUG
33 #undef RXDEBUG      /* turn off debugging */
34 #endif /* RXDEBUG */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
37 #endif
38 #include "../afsint/afsint.h"
39 #ifdef  AFS_ALPHA_ENV
40 #undef kmem_alloc
41 #undef kmem_free
42 #undef mem_alloc
43 #undef mem_free
44 #undef register
45 #endif  /* AFS_ALPHA_ENV */
46 #else /* !UKERNEL */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
49 #endif /* !UKERNEL */
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
55 #include "../rx/rx.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
60 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
63 #ifdef AFS_AIX41_ENV
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
68 #else /* KERNEL */
69 # include <afs/param.h>
70 # include <sys/types.h>
71 # include <errno.h>
72 #ifdef AFS_NT40_ENV
73 # include <stdlib.h>
74 # include <fcntl.h>
75 # include <afsutil.h>
76 #else
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
83 #endif
84 # include "rx.h"
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
92 #endif /* KERNEL */
93
94 #ifdef RXDEBUG
95 extern afs_uint32 LWP_ThreadId();
96 #endif /* RXDEBUG */
97
98 int (*registerProgram)() = 0;
99 int (*swapNameProgram)() = 0;
100
101 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
102 struct rx_tq_debug {
103     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
104     afs_int32 rxi_start_in_error;
105 } rx_tq_debug;
106 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
107
108 /*
109  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
110  * currently allocated within rx.  This number is used to allocate the
111  * memory required to return the statistics when queried.
112  */
113
114 static unsigned int rxi_rpc_peer_stat_cnt;
115
116 /*
117  * rxi_rpc_process_stat_cnt counts the total number of local process stat
118  * structures currently allocated within rx.  The number is used to allocate
119  * the memory required to return the statistics when queried.
120  */
121
122 static unsigned int rxi_rpc_process_stat_cnt;
123
124 #if !defined(offsetof)
125 #include <stddef.h>     /* for definition of offsetof() */
126 #endif
127
128 #ifdef AFS_PTHREAD_ENV
129 #include <assert.h>
130
131 /*
132  * Use procedural initialization of mutexes/condition variables
133  * to ease NT porting
134  */
135
136 extern pthread_mutex_t rxkad_stats_mutex;
137 extern pthread_mutex_t des_init_mutex;
138 extern pthread_mutex_t des_random_mutex;
139 extern pthread_mutex_t rx_clock_mutex;
140 extern pthread_mutex_t rxi_connCacheMutex;
141 extern pthread_mutex_t rx_event_mutex;
142 extern pthread_mutex_t osi_malloc_mutex;
143 extern pthread_mutex_t event_handler_mutex;
144 extern pthread_mutex_t listener_mutex;
145 extern pthread_mutex_t rx_if_init_mutex;
146 extern pthread_mutex_t rx_if_mutex;
147 extern pthread_mutex_t rxkad_client_uid_mutex;
148 extern pthread_mutex_t rxkad_random_mutex;
149
150 extern pthread_cond_t rx_event_handler_cond;
151 extern pthread_cond_t rx_listener_cond;
152
153 static pthread_mutex_t epoch_mutex;
154 static pthread_mutex_t rx_init_mutex;
155 static pthread_mutex_t rx_debug_mutex;
156
157 static void rxi_InitPthread(void) {
158     assert(pthread_mutex_init(&rx_clock_mutex,
159                               (const pthread_mutexattr_t*)0)==0);
160     assert(pthread_mutex_init(&rxi_connCacheMutex,
161                               (const pthread_mutexattr_t*)0)==0);
162     assert(pthread_mutex_init(&rx_init_mutex,
163                               (const pthread_mutexattr_t*)0)==0);
164     assert(pthread_mutex_init(&epoch_mutex,
165                               (const pthread_mutexattr_t*)0)==0);
166     assert(pthread_mutex_init(&rx_event_mutex,
167                               (const pthread_mutexattr_t*)0)==0);
168     assert(pthread_mutex_init(&des_init_mutex,
169                               (const pthread_mutexattr_t*)0)==0);
170     assert(pthread_mutex_init(&des_random_mutex,
171                               (const pthread_mutexattr_t*)0)==0);
172     assert(pthread_mutex_init(&osi_malloc_mutex,
173                               (const pthread_mutexattr_t*)0)==0);
174     assert(pthread_mutex_init(&event_handler_mutex,
175                               (const pthread_mutexattr_t*)0)==0);
176     assert(pthread_mutex_init(&listener_mutex,
177                               (const pthread_mutexattr_t*)0)==0);
178     assert(pthread_mutex_init(&rx_if_init_mutex,
179                               (const pthread_mutexattr_t*)0)==0);
180     assert(pthread_mutex_init(&rx_if_mutex,
181                               (const pthread_mutexattr_t*)0)==0);
182     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
183                               (const pthread_mutexattr_t*)0)==0);
184     assert(pthread_mutex_init(&rxkad_random_mutex,
185                               (const pthread_mutexattr_t*)0)==0);
186     assert(pthread_mutex_init(&rxkad_stats_mutex,
187                               (const pthread_mutexattr_t*)0)==0);
188     assert(pthread_mutex_init(&rx_debug_mutex,
189                               (const pthread_mutexattr_t*)0)==0);
190
191     assert(pthread_cond_init(&rx_event_handler_cond,
192                               (const pthread_condattr_t*)0)==0);
193     assert(pthread_cond_init(&rx_listener_cond,
194                               (const pthread_condattr_t*)0)==0);
195     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
196 }
197
198 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
199 #define INIT_PTHREAD_LOCKS \
200 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
201 /*
202  * The rx_stats_mutex mutex protects the following global variables:
203  * rxi_dataQuota
204  * rxi_minDeficit
205  * rxi_availProcs
206  * rxi_totalMin
207  * rxi_lowConnRefCount
208  * rxi_lowPeerRefCount
209  * rxi_nCalls
210  * rxi_Alloccnt
211  * rxi_Allocsize
212  * rx_nFreePackets
213  * rx_tq_debug
214  * rx_stats
215  */
216 #else
217 #define INIT_PTHREAD_LOCKS
218 #endif
219
220 extern void rxi_DeleteCachedConnections(void);
221
222
223 /* Variables for handling the minProcs implementation.  availProcs gives the
224  * number of threads available in the pool at this moment (not counting dudes
225  * executing right now).  totalMin gives the total number of procs required
226  * for handling all minProcs requests.  minDeficit is a dynamic variable
227  * tracking the # of procs required to satisfy all of the remaining minProcs
228  * demands.
229  * For fine grain locking to work, the quota check and the reservation of
230  * a server thread has to come while rxi_availProcs and rxi_minDeficit
231  * are locked. To this end, the code has been modified under #ifdef
232  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
233  * same time. A new function, ReturnToServerPool() returns the allocation.
234  * 
235  * A call can be on several queue's (but only one at a time). When
236  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
237  * that no one else is touching the queue. To this end, we store the address
238  * of the queue lock in the call structure (under the call lock) when we
239  * put the call on a queue, and we clear the call_queue_lock when the
240  * call is removed from a queue (once the call lock has been obtained).
241  * This allows rxi_ResetCall to safely synchronize with others wishing
242  * to manipulate the queue.
243  */
244
245 extern void rxi_Delay(int);
246
247 static int rxi_ServerThreadSelectingCall;
248
249 #ifdef RX_ENABLE_LOCKS
250 static afs_kmutex_t rx_rpc_stats;
251 void rxi_StartUnlocked();
252 #endif
253
254 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
255 ** pretty good that the next packet coming in is from the same connection 
256 ** as the last packet, since we're send multiple packets in a transmit window.
257 */
258 struct rx_connection *rxLastConn = 0; 
259
260 #ifdef RX_ENABLE_LOCKS
261 /* The locking hierarchy for rx fine grain locking is composed of five
262  * tiers:
263  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
264  * call->lock - locks call data fields.
265  * Most any other lock - these are all independent of each other.....
266  *      rx_freePktQ_lock
267  *      rx_freeCallQueue_lock
268  *      freeSQEList_lock
269  *      rx_connHashTable_lock
270  *      rx_serverPool_lock
271  *      rxi_keyCreate_lock
272  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
273
274  * lowest level:
275  *      peer_lock - locks peer data fields.
276  *      conn_data_lock - that more than one thread is not updating a conn data
277  *              field at the same time.
278  * Do we need a lock to protect the peer field in the conn structure?
279  *      conn->peer was previously a constant for all intents and so has no
280  *      lock protecting this field. The multihomed client delta introduced
281  *      a RX code change : change the peer field in the connection structure
282  *      to that remote inetrface from which the last packet for this
283  *      connection was sent out. This may become an issue if further changes
284  *      are made.
285  */
286 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
287 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
288 #ifdef RX_LOCKS_DB
289 /* rxdb_fileID is used to identify the lock location, along with line#. */
290 static int rxdb_fileID = RXDB_FILE_RX;
291 #endif /* RX_LOCKS_DB */
292 static void rxi_SetAcksInTransmitQueue();
293 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
294 #else /* RX_ENABLE_LOCKS */
295 #define SET_CALL_QUEUE_LOCK(C, L)
296 #define CLEAR_CALL_QUEUE_LOCK(C)
297 #endif /* RX_ENABLE_LOCKS */
298 static void rxi_DestroyConnectionNoLock();
299 void rxi_DestroyConnection();
300 void rxi_CleanupConnection();
301 struct rx_serverQueueEntry *rx_waitForPacket = 0;
302
303 /* ------------Exported Interfaces------------- */
304
305 /* This function allows rxkad to set the epoch to a suitably random number
306  * which rx_NewConnection will use in the future.  The principle purpose is to
307  * get rxnull connections to use the same epoch as the rxkad connections do, at
308  * least once the first rxkad connection is established.  This is important now
309  * that the host/port addresses aren't used in FindConnection: the uniqueness
310  * of epoch/cid matters and the start time won't do. */
311
312 #ifdef AFS_PTHREAD_ENV
313 /*
314  * This mutex protects the following global variables:
315  * rx_epoch
316  */
317
318 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
319 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
320 #else
321 #define LOCK_EPOCH
322 #define UNLOCK_EPOCH
323 #endif /* AFS_PTHREAD_ENV */
324
325 void rx_SetEpoch (epoch)
326   afs_uint32 epoch;
327 {
328     LOCK_EPOCH
329     rx_epoch = epoch;
330     UNLOCK_EPOCH
331 }
332
333 /* Initialize rx.  A port number may be mentioned, in which case this
334  * becomes the default port number for any service installed later.
335  * If 0 is provided for the port number, a random port will be chosen
336  * by the kernel.  Whether this will ever overlap anything in
337  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
338  * error. */
339 static int rxinit_status = 1;
340 #ifdef AFS_PTHREAD_ENV
341 /*
342  * This mutex protects the following global variables:
343  * rxinit_status
344  */
345
346 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
347 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
348 #else
349 #define LOCK_RX_INIT
350 #define UNLOCK_RX_INIT
351 #endif
352
353 int rx_Init(u_int port)
354 {
355 #ifdef KERNEL
356     osi_timeval_t tv;
357 #else /* KERNEL */
358     struct timeval tv;
359 #endif /* KERNEL */
360     char *htable, *ptable;
361     int tmp_status;
362
363 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
364     __djgpp_set_quiet_socket(1);
365 #endif
366
367     SPLVAR;
368
369     INIT_PTHREAD_LOCKS
370     LOCK_RX_INIT
371     if (rxinit_status == 0) {
372         tmp_status = rxinit_status;
373         UNLOCK_RX_INIT
374         return tmp_status; /* Already started; return previous error code. */
375     }
376
377 #ifdef AFS_NT40_ENV
378     if (afs_winsockInit()<0)
379         return -1;
380 #endif
381
382 #ifndef KERNEL
383     /*
384      * Initialize anything necessary to provide a non-premptive threading
385      * environment.
386      */
387     rxi_InitializeThreadSupport();
388 #endif
389
390     /* Allocate and initialize a socket for client and perhaps server
391      * connections. */
392
393     rx_socket = rxi_GetUDPSocket((u_short)port); 
394     if (rx_socket == OSI_NULLSOCKET) {
395         UNLOCK_RX_INIT
396         return RX_ADDRINUSE;
397     }
398     
399
400 #ifdef  RX_ENABLE_LOCKS
401 #ifdef RX_LOCKS_DB
402     rxdb_init();
403 #endif /* RX_LOCKS_DB */
404     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
405     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
406     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
407     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
408     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
409                MUTEX_DEFAULT,0);
410     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
411     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
412     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
413     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
414 #ifndef KERNEL
415     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
416 #endif /* !KERNEL */
417     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
418 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
419     if ( !uniprocessor )
420       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
421 #endif /* KERNEL && AFS_HPUX110_ENV */
422 #else /* RX_ENABLE_LOCKS */
423 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
424     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
425 #endif /* AFS_GLOBAL_SUNLOCK */
426 #endif /* RX_ENABLE_LOCKS */
427
428     rxi_nCalls = 0;
429     rx_connDeadTime = 12;
430     rx_tranquil     = 0;        /* reset flag */
431     bzero((char *)&rx_stats, sizeof(struct rx_stats));
432     htable = (char *)
433         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
434     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
435     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
436     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
437     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
438     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
439
440     /* Malloc up a bunch of packets & buffers */
441     rx_nFreePackets = 0;
442     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
443     queue_Init(&rx_freePacketQueue);
444     rxi_NeedMorePackets = FALSE;
445     rxi_MorePackets(rx_nPackets);
446     rx_CheckPackets();
447
448     NETPRI;
449     AFS_RXGLOCK();
450
451     clock_Init();
452
453 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
454     tv.tv_sec = clock_now.sec;
455     tv.tv_usec = clock_now.usec;
456     srand((unsigned int) tv.tv_usec);
457 #else
458     osi_GetTime(&tv);
459 #endif
460     if (port) {
461         rx_port = port;
462     } else {
463 #if defined(KERNEL) && !defined(UKERNEL)
464         /* Really, this should never happen in a real kernel */
465         rx_port = 0;
466 #else
467         struct sockaddr_in addr;
468         int addrlen = sizeof(addr);
469         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
470             rx_Finalize();
471             return -1;
472         }
473         rx_port = addr.sin_port;
474 #endif
475     }
476     rx_stats.minRtt.sec = 9999999;
477 #ifdef  KERNEL
478     rx_SetEpoch (tv.tv_sec | 0x80000000);
479 #else
480     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
481                                          * will provide a randomer value. */
482 #endif
483     MUTEX_ENTER(&rx_stats_mutex);
484     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
485     MUTEX_EXIT(&rx_stats_mutex);
486     /* *Slightly* random start time for the cid.  This is just to help
487      * out with the hashing function at the peer */
488     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
489     rx_connHashTable = (struct rx_connection **) htable;
490     rx_peerHashTable = (struct rx_peer **) ptable;
491
492     rx_lastAckDelay.sec = 0;
493     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
494     rx_hardAckDelay.sec = 0;
495     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
496     rx_softAckDelay.sec = 0;
497     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
498
499     rxevent_Init(20, rxi_ReScheduleEvents);
500
501     /* Initialize various global queues */
502     queue_Init(&rx_idleServerQueue);
503     queue_Init(&rx_incomingCallQueue);
504     queue_Init(&rx_freeCallQueue);
505
506 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
507     /* Initialize our list of usable IP addresses. */
508     rx_GetIFInfo();
509 #endif
510
511     /* Start listener process (exact function is dependent on the
512      * implementation environment--kernel or user space) */
513     rxi_StartListener();
514
515     AFS_RXGUNLOCK();
516     USERPRI;
517     tmp_status = rxinit_status = 0;
518     UNLOCK_RX_INIT
519     return tmp_status;
520 }
521
522 /* called with unincremented nRequestsRunning to see if it is OK to start
523  * a new thread in this service.  Could be "no" for two reasons: over the
524  * max quota, or would prevent others from reaching their min quota.
525  */
526 #ifdef RX_ENABLE_LOCKS
527 /* This verion of QuotaOK reserves quota if it's ok while the
528  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
529  */
530 static int QuotaOK(aservice)
531 register struct rx_service *aservice;
532 {
533     /* check if over max quota */
534     if (aservice->nRequestsRunning >= aservice->maxProcs) {
535         return 0;
536     }
537
538     /* under min quota, we're OK */
539     /* otherwise, can use only if there are enough to allow everyone
540      * to go to their min quota after this guy starts.
541      */
542     MUTEX_ENTER(&rx_stats_mutex);
543     if ((aservice->nRequestsRunning < aservice->minProcs) ||
544          (rxi_availProcs > rxi_minDeficit)) {
545         aservice->nRequestsRunning++;
546         /* just started call in minProcs pool, need fewer to maintain
547          * guarantee */
548         if (aservice->nRequestsRunning <= aservice->minProcs)
549             rxi_minDeficit--;
550         rxi_availProcs--;
551         MUTEX_EXIT(&rx_stats_mutex);
552         return 1;
553     }
554     MUTEX_EXIT(&rx_stats_mutex);
555
556     return 0;
557 }
558 static void ReturnToServerPool(aservice)
559 register struct rx_service *aservice;
560 {
561     aservice->nRequestsRunning--;
562     MUTEX_ENTER(&rx_stats_mutex);
563     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
564     rxi_availProcs++;
565     MUTEX_EXIT(&rx_stats_mutex);
566 }
567
568 #else /* RX_ENABLE_LOCKS */
569 static QuotaOK(aservice)
570 register struct rx_service *aservice; {
571     int rc=0;
572     /* under min quota, we're OK */
573     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
574
575     /* check if over max quota */
576     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
577
578     /* otherwise, can use only if there are enough to allow everyone
579      * to go to their min quota after this guy starts.
580      */
581     if (rxi_availProcs > rxi_minDeficit) rc = 1;
582     return rc;
583 }
584 #endif /* RX_ENABLE_LOCKS */
585
586 #ifndef KERNEL
587 /* Called by rx_StartServer to start up lwp's to service calls.
588    NExistingProcs gives the number of procs already existing, and which
589    therefore needn't be created. */
590 void rxi_StartServerProcs(nExistingProcs)
591     int nExistingProcs;
592 {
593     register struct rx_service *service;
594     register int i;
595     int maxdiff = 0;
596     int nProcs = 0;
597
598     /* For each service, reserve N processes, where N is the "minimum"
599        number of processes that MUST be able to execute a request in parallel,
600        at any time, for that process.  Also compute the maximum difference
601        between any service's maximum number of processes that can run
602        (i.e. the maximum number that ever will be run, and a guarantee
603        that this number will run if other services aren't running), and its
604        minimum number.  The result is the extra number of processes that
605        we need in order to provide the latter guarantee */
606     for (i=0; i<RX_MAX_SERVICES; i++) {
607         int diff;
608         service = rx_services[i];
609         if (service == (struct rx_service *) 0) break;
610         nProcs += service->minProcs;
611         diff = service->maxProcs - service->minProcs;
612         if (diff > maxdiff) maxdiff = diff;
613     }
614     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
615     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
616     for (i = 0; i<nProcs; i++) {
617         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
618     }
619 }
620 #endif /* KERNEL */
621
622 /* This routine must be called if any services are exported.  If the
623  * donateMe flag is set, the calling process is donated to the server
624  * process pool */
625 void rx_StartServer(donateMe)
626 {
627     register struct rx_service *service;
628     register int i, nProcs;
629     SPLVAR;
630     clock_NewTime();
631
632     NETPRI;
633     AFS_RXGLOCK();
634     /* Start server processes, if necessary (exact function is dependent
635      * on the implementation environment--kernel or user space).  DonateMe
636      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
637      * case, one less new proc will be created rx_StartServerProcs.
638      */
639     rxi_StartServerProcs(donateMe);
640
641     /* count up the # of threads in minProcs, and add set the min deficit to
642      * be that value, too.
643      */
644     for (i=0; i<RX_MAX_SERVICES; i++) {
645         service = rx_services[i];
646         if (service == (struct rx_service *) 0) break;
647         MUTEX_ENTER(&rx_stats_mutex);
648         rxi_totalMin += service->minProcs;
649         /* below works even if a thread is running, since minDeficit would
650          * still have been decremented and later re-incremented.
651          */
652         rxi_minDeficit += service->minProcs;
653         MUTEX_EXIT(&rx_stats_mutex);
654     }
655
656     /* Turn on reaping of idle server connections */
657     rxi_ReapConnections();
658
659     AFS_RXGUNLOCK();
660     USERPRI;
661
662     if (donateMe) {
663 #ifndef AFS_NT40_ENV
664 #ifndef KERNEL
665         int code;
666         char name[32];
667 #ifdef AFS_PTHREAD_ENV
668         pid_t pid;
669         pid = pthread_self();
670 #else /* AFS_PTHREAD_ENV */
671         PROCESS pid;
672         code = LWP_CurrentProcess(&pid);
673 #endif /* AFS_PTHREAD_ENV */
674
675         sprintf(name,"srv_%d", ++nProcs);
676         if (registerProgram)
677             (*registerProgram)(pid, name);
678 #endif /* KERNEL */
679 #endif /* AFS_NT40_ENV */
680         rx_ServerProc(); /* Never returns */
681     }
682     return;
683 }
684
685 /* Create a new client connection to the specified service, using the
686  * specified security object to implement the security model for this
687  * connection. */
688 struct rx_connection *
689 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
690     register afs_uint32 shost;      /* Server host */
691     u_short sport;                  /* Server port */
692     u_short sservice;               /* Server service id */
693     register struct rx_securityClass *securityObject;
694     int serviceSecurityIndex;
695 {
696     int hashindex;
697     afs_int32 cid;
698     register struct rx_connection *conn;
699
700     SPLVAR;
701
702     clock_NewTime();
703     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
704           shost, sport, sservice, securityObject, serviceSecurityIndex));
705
706     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
707      * the case of kmem_alloc? */
708     conn = rxi_AllocConnection();
709 #ifdef  RX_ENABLE_LOCKS
710     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
711     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
712     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
713 #endif
714     NETPRI;
715     AFS_RXGLOCK();
716     MUTEX_ENTER(&rx_connHashTable_lock);
717     cid = (rx_nextCid += RX_MAXCALLS);
718     conn->type = RX_CLIENT_CONNECTION;
719     conn->cid = cid;
720     conn->epoch = rx_epoch;
721     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
722     conn->serviceId = sservice;
723     conn->securityObject = securityObject;
724     /* This doesn't work in all compilers with void (they're buggy), so fake it
725      * with VOID */
726     conn->securityData = (VOID *) 0;
727     conn->securityIndex = serviceSecurityIndex;
728     rx_SetConnDeadTime(conn, rx_connDeadTime);
729     conn->ackRate = RX_FAST_ACK_RATE;
730     conn->nSpecific = 0;
731     conn->specific = NULL;
732     conn->challengeEvent = (struct rxevent *)0;
733     conn->delayedAbortEvent = (struct rxevent *)0;
734     conn->abortCount = 0;
735     conn->error = 0;
736
737     RXS_NewConnection(securityObject, conn);
738     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
739     
740     conn->refCount++; /* no lock required since only this thread knows... */
741     conn->next = rx_connHashTable[hashindex];
742     rx_connHashTable[hashindex] = conn;
743     MUTEX_ENTER(&rx_stats_mutex);
744     rx_stats.nClientConns++;
745     MUTEX_EXIT(&rx_stats_mutex);
746
747     MUTEX_EXIT(&rx_connHashTable_lock);
748     AFS_RXGUNLOCK();
749     USERPRI;
750     return conn;
751 }
752
753 void rx_SetConnDeadTime(conn, seconds)
754     register struct rx_connection *conn;
755     register int seconds;
756 {
757     /* The idea is to set the dead time to a value that allows several
758      * keepalives to be dropped without timing out the connection. */
759     conn->secondsUntilDead = MAX(seconds, 6);
760     conn->secondsUntilPing = conn->secondsUntilDead/6;
761 }
762
763 int rxi_lowPeerRefCount = 0;
764 int rxi_lowConnRefCount = 0;
765
766 /*
767  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
768  * NOTE: must not be called with rx_connHashTable_lock held.
769  */
770 void rxi_CleanupConnection(conn)
771     struct rx_connection *conn;
772 {
773     int i;
774
775     /* Notify the service exporter, if requested, that this connection
776      * is being destroyed */
777     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
778       (*conn->service->destroyConnProc)(conn);
779
780     /* Notify the security module that this connection is being destroyed */
781     RXS_DestroyConnection(conn->securityObject, conn);
782
783     /* If this is the last connection using the rx_peer struct, set its
784      * idle time to now. rxi_ReapConnections will reap it if it's still
785      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
786      */
787     MUTEX_ENTER(&rx_peerHashTable_lock);
788     if (--conn->peer->refCount <= 0) {
789         conn->peer->idleWhen = clock_Sec();
790         if (conn->peer->refCount < 0) {
791             conn->peer->refCount = 0; 
792             MUTEX_ENTER(&rx_stats_mutex);
793             rxi_lowPeerRefCount ++;
794             MUTEX_EXIT(&rx_stats_mutex);
795         }
796     }
797     MUTEX_EXIT(&rx_peerHashTable_lock);
798
799     MUTEX_ENTER(&rx_stats_mutex);
800     if (conn->type == RX_SERVER_CONNECTION)
801       rx_stats.nServerConns--;
802     else
803       rx_stats.nClientConns--;
804     MUTEX_EXIT(&rx_stats_mutex);
805
806 #ifndef KERNEL
807     if (conn->specific) {
808         for (i = 0 ; i < conn->nSpecific ; i++) {
809             if (conn->specific[i] && rxi_keyCreate_destructor[i])
810                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
811             conn->specific[i] = NULL;
812         }
813         free(conn->specific);
814     }
815     conn->specific = NULL;
816     conn->nSpecific = 0;
817 #endif /* !KERNEL */
818
819     MUTEX_DESTROY(&conn->conn_call_lock);
820     MUTEX_DESTROY(&conn->conn_data_lock);
821     CV_DESTROY(&conn->conn_call_cv);
822         
823     rxi_FreeConnection(conn);
824 }
825
826 /* Destroy the specified connection */
827 void rxi_DestroyConnection(conn)
828     register struct rx_connection *conn;
829 {
830     MUTEX_ENTER(&rx_connHashTable_lock);
831     rxi_DestroyConnectionNoLock(conn);
832     /* conn should be at the head of the cleanup list */
833     if (conn == rx_connCleanup_list) {
834         rx_connCleanup_list = rx_connCleanup_list->next;
835         MUTEX_EXIT(&rx_connHashTable_lock);
836         rxi_CleanupConnection(conn);
837     }
838 #ifdef RX_ENABLE_LOCKS
839     else {
840         MUTEX_EXIT(&rx_connHashTable_lock);
841     }
842 #endif /* RX_ENABLE_LOCKS */
843 }
844     
845 static void rxi_DestroyConnectionNoLock(conn)
846     register struct rx_connection *conn;
847 {
848     register struct rx_connection **conn_ptr;
849     register int havecalls = 0;
850     struct rx_packet *packet;
851     int i;
852     SPLVAR;
853
854     clock_NewTime();
855
856     NETPRI;
857     MUTEX_ENTER(&conn->conn_data_lock);
858     if (conn->refCount > 0)
859         conn->refCount--;
860     else {
861         MUTEX_ENTER(&rx_stats_mutex);
862         rxi_lowConnRefCount++;
863         MUTEX_EXIT(&rx_stats_mutex);
864     }
865
866     if (conn->refCount > 0) {
867         /* Busy; wait till the last guy before proceeding */
868         MUTEX_EXIT(&conn->conn_data_lock);
869         USERPRI;
870         return;
871     }
872
873     /* If the client previously called rx_NewCall, but it is still
874      * waiting, treat this as a running call, and wait to destroy the
875      * connection later when the call completes. */
876     if ((conn->type == RX_CLIENT_CONNECTION) &&
877         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
878         conn->flags |= RX_CONN_DESTROY_ME;
879         MUTEX_EXIT(&conn->conn_data_lock);
880         USERPRI;
881         return;
882     }
883     MUTEX_EXIT(&conn->conn_data_lock);
884
885     /* Check for extant references to this connection */
886     for (i = 0; i<RX_MAXCALLS; i++) {
887         register struct rx_call *call = conn->call[i];
888         if (call) {
889             havecalls = 1;
890             if (conn->type == RX_CLIENT_CONNECTION) {
891                 MUTEX_ENTER(&call->lock);
892                 if (call->delayedAckEvent) {
893                     /* Push the final acknowledgment out now--there
894                      * won't be a subsequent call to acknowledge the
895                      * last reply packets */
896                     rxevent_Cancel(call->delayedAckEvent, call,
897                                    RX_CALL_REFCOUNT_DELAY);
898                     rxi_AckAll((struct rxevent *)0, call, 0);
899                 }
900                 MUTEX_EXIT(&call->lock);
901             }
902         }
903     }
904 #ifdef RX_ENABLE_LOCKS
905     if (!havecalls) {
906         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
907             MUTEX_EXIT(&conn->conn_data_lock);
908         }
909         else {
910             /* Someone is accessing a packet right now. */
911             havecalls = 1;
912         }
913     }
914 #endif /* RX_ENABLE_LOCKS */
915
916     if (havecalls) {
917         /* Don't destroy the connection if there are any call
918          * structures still in use */
919         MUTEX_ENTER(&conn->conn_data_lock);
920         conn->flags |= RX_CONN_DESTROY_ME;
921         MUTEX_EXIT(&conn->conn_data_lock);
922         USERPRI;
923         return;
924     }
925
926     if (conn->delayedAbortEvent) {
927         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
928         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
929         if (packet) {
930             MUTEX_ENTER(&conn->conn_data_lock);
931             rxi_SendConnectionAbort(conn, packet, 0, 1);
932             MUTEX_EXIT(&conn->conn_data_lock);
933             rxi_FreePacket(packet);
934         }
935     }
936
937     /* Remove from connection hash table before proceeding */
938     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
939                                              conn->epoch, conn->type) ];
940     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
941         if (*conn_ptr == conn) {
942             *conn_ptr = conn->next;
943             break;
944         }
945     }
946     /* if the conn that we are destroying was the last connection, then we
947     * clear rxLastConn as well */
948     if ( rxLastConn == conn )
949         rxLastConn = 0;
950
951     /* Make sure the connection is completely reset before deleting it. */
952     /* get rid of pending events that could zap us later */
953     if (conn->challengeEvent) {
954         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
955     }
956  
957     /* Add the connection to the list of destroyed connections that
958      * need to be cleaned up. This is necessary to avoid deadlocks
959      * in the routines we call to inform others that this connection is
960      * being destroyed. */
961     conn->next = rx_connCleanup_list;
962     rx_connCleanup_list = conn;
963 }
964
965 /* Externally available version */
966 void rx_DestroyConnection(conn) 
967     register struct rx_connection *conn;
968 {
969     SPLVAR;
970
971     NETPRI;
972     AFS_RXGLOCK();
973     rxi_DestroyConnection (conn);
974     AFS_RXGUNLOCK();
975     USERPRI;
976 }
977
978 /* Start a new rx remote procedure call, on the specified connection.
979  * If wait is set to 1, wait for a free call channel; otherwise return
980  * 0.  Maxtime gives the maximum number of seconds this call may take,
981  * after rx_MakeCall returns.  After this time interval, a call to any
982  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
983  * For fine grain locking, we hold the conn_call_lock in order to 
984  * to ensure that we don't get signalle after we found a call in an active
985  * state and before we go to sleep.
986  */
987 struct rx_call *rx_NewCall(conn)
988     register struct rx_connection *conn;
989 {
990     register int i;
991     register struct rx_call *call;
992     struct clock queueTime;
993     SPLVAR;
994
995     clock_NewTime();
996     dpf (("rx_MakeCall(conn %x)\n", conn));
997
998     NETPRI;
999     clock_GetTime(&queueTime);
1000     AFS_RXGLOCK();
1001     MUTEX_ENTER(&conn->conn_call_lock);
1002     for (;;) {
1003         for (i=0; i<RX_MAXCALLS; i++) {
1004             call = conn->call[i];
1005             if (call) {
1006                 MUTEX_ENTER(&call->lock);
1007                 if (call->state == RX_STATE_DALLY) {
1008                     rxi_ResetCall(call, 0);
1009                     (*call->callNumber)++;
1010                     break;
1011                 }
1012                 MUTEX_EXIT(&call->lock);
1013             }
1014             else {
1015                 call = rxi_NewCall(conn, i);
1016                 MUTEX_ENTER(&call->lock);
1017                 break;
1018             }
1019         }
1020         if (i < RX_MAXCALLS) {
1021             break;
1022         }
1023         MUTEX_ENTER(&conn->conn_data_lock);
1024         conn->flags |= RX_CONN_MAKECALL_WAITING;
1025         MUTEX_EXIT(&conn->conn_data_lock);
1026 #ifdef  RX_ENABLE_LOCKS
1027         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1028 #else
1029         osi_rxSleep(conn);
1030 #endif
1031     }
1032
1033     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1034
1035     /* Client is initially in send mode */
1036     call->state = RX_STATE_ACTIVE;
1037     call->mode = RX_MODE_SENDING;
1038
1039     /* remember start time for call in case we have hard dead time limit */
1040     call->queueTime = queueTime;
1041     clock_GetTime(&call->startTime);
1042     hzero(call->bytesSent);
1043     hzero(call->bytesRcvd);
1044
1045     /* Turn on busy protocol. */
1046     rxi_KeepAliveOn(call);
1047
1048     MUTEX_EXIT(&call->lock);
1049     MUTEX_EXIT(&conn->conn_call_lock);
1050     AFS_RXGUNLOCK();
1051     USERPRI;
1052
1053 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1054     /* Now, if TQ wasn't cleared earlier, do it now. */
1055     AFS_RXGLOCK();
1056     MUTEX_ENTER(&call->lock);
1057     while (call->flags & RX_CALL_TQ_BUSY) {
1058         call->flags |= RX_CALL_TQ_WAIT;
1059 #ifdef RX_ENABLE_LOCKS
1060         CV_WAIT(&call->cv_tq, &call->lock);
1061 #else /* RX_ENABLE_LOCKS */
1062         osi_rxSleep(&call->tq);
1063 #endif /* RX_ENABLE_LOCKS */
1064     }
1065     if (call->flags & RX_CALL_TQ_CLEARME) {
1066         rxi_ClearTransmitQueue(call, 0);
1067         queue_Init(&call->tq);
1068     }
1069     MUTEX_EXIT(&call->lock);
1070     AFS_RXGUNLOCK();
1071 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1072
1073     return call;
1074 }
1075
1076 rxi_HasActiveCalls(aconn)
1077 register struct rx_connection *aconn; {
1078     register int i;
1079     register struct rx_call *tcall;
1080     SPLVAR;
1081
1082     NETPRI;
1083     for(i=0; i<RX_MAXCALLS; i++) {
1084       if (tcall = aconn->call[i]) {
1085         if ((tcall->state == RX_STATE_ACTIVE) 
1086             || (tcall->state == RX_STATE_PRECALL)) {
1087           USERPRI;
1088           return 1;
1089         }
1090       }
1091     }
1092     USERPRI;
1093     return 0;
1094 }
1095
1096 rxi_GetCallNumberVector(aconn, aint32s)
1097 register struct rx_connection *aconn;
1098 register afs_int32 *aint32s; {
1099     register int i;
1100     register struct rx_call *tcall;
1101     SPLVAR;
1102
1103     NETPRI;
1104     for(i=0; i<RX_MAXCALLS; i++) {
1105         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1106             aint32s[i] = aconn->callNumber[i]+1;
1107         else
1108             aint32s[i] = aconn->callNumber[i];
1109     }
1110     USERPRI;
1111     return 0;
1112 }
1113
1114 rxi_SetCallNumberVector(aconn, aint32s)
1115 register struct rx_connection *aconn;
1116 register afs_int32 *aint32s; {
1117     register int i;
1118     register struct rx_call *tcall;
1119     SPLVAR;
1120
1121     NETPRI;
1122     for(i=0; i<RX_MAXCALLS; i++) {
1123         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1124             aconn->callNumber[i] = aint32s[i] - 1;
1125         else
1126             aconn->callNumber[i] = aint32s[i];
1127     }
1128     USERPRI;
1129     return 0;
1130 }
1131
1132 /* Advertise a new service.  A service is named locally by a UDP port
1133  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1134  * on a failure. */
1135 struct rx_service *
1136 rx_NewService(port, serviceId, serviceName, securityObjects,
1137               nSecurityObjects, serviceProc)
1138     u_short port;
1139     u_short serviceId;
1140     char *serviceName;  /* Name for identification purposes (e.g. the
1141                          * service name might be used for probing for
1142                          * statistics) */
1143     struct rx_securityClass **securityObjects;
1144     int nSecurityObjects;
1145     afs_int32 (*serviceProc)();
1146 {    
1147     osi_socket socket = OSI_NULLSOCKET;
1148     register struct rx_service *tservice;    
1149     register int i;
1150     SPLVAR;
1151
1152     clock_NewTime();
1153
1154     if (serviceId == 0) {
1155         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1156          serviceName);
1157         return 0;
1158     }
1159     if (port == 0) {
1160         if (rx_port == 0) {
1161             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1162             return 0;
1163         }
1164         port = rx_port;
1165         socket = rx_socket;
1166     }
1167
1168     tservice = rxi_AllocService();
1169     NETPRI;
1170     AFS_RXGLOCK();
1171     for (i = 0; i<RX_MAX_SERVICES; i++) {
1172         register struct rx_service *service = rx_services[i];
1173         if (service) {
1174             if (port == service->servicePort) {
1175                 if (service->serviceId == serviceId) {
1176                     /* The identical service has already been
1177                      * installed; if the caller was intending to
1178                      * change the security classes used by this
1179                      * service, he/she loses. */
1180                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1181                     AFS_RXGUNLOCK();
1182                     USERPRI;
1183                     rxi_FreeService(tservice);
1184                     return service;
1185                 }
1186                 /* Different service, same port: re-use the socket
1187                  * which is bound to the same port */
1188                 socket = service->socket;
1189             }
1190         } else {
1191             if (socket == OSI_NULLSOCKET) {
1192                 /* If we don't already have a socket (from another
1193                  * service on same port) get a new one */
1194                 socket = rxi_GetUDPSocket(port);
1195                 if (socket == OSI_NULLSOCKET) {
1196                     AFS_RXGUNLOCK();
1197                     USERPRI;
1198                     rxi_FreeService(tservice);
1199                     return 0;
1200                 }
1201             }
1202             service = tservice;
1203             service->socket = socket;
1204             service->servicePort = port;
1205             service->serviceId = serviceId;
1206             service->serviceName = serviceName;
1207             service->nSecurityObjects = nSecurityObjects;
1208             service->securityObjects = securityObjects;
1209             service->minProcs = 0;
1210             service->maxProcs = 1;
1211             service->idleDeadTime = 60;
1212             service->connDeadTime = rx_connDeadTime;
1213             service->executeRequestProc = serviceProc;
1214             rx_services[i] = service;   /* not visible until now */
1215             AFS_RXGUNLOCK();
1216             USERPRI;
1217             return service;
1218         }
1219     }
1220     AFS_RXGUNLOCK();
1221     USERPRI;
1222     rxi_FreeService(tservice);
1223     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1224     return 0;
1225 }
1226
1227 /* Generic request processing loop. This routine should be called
1228  * by the implementation dependent rx_ServerProc. If socketp is
1229  * non-null, it will be set to the file descriptor that this thread
1230  * is now listening on. If socketp is null, this routine will never
1231  * returns. */
1232 void rxi_ServerProc(threadID, newcall, socketp)
1233 int threadID;
1234 struct rx_call *newcall;
1235 osi_socket *socketp;
1236 {
1237     register struct rx_call *call;
1238     register afs_int32 code;
1239     register struct rx_service *tservice = NULL;
1240
1241     for (;;) {
1242         if (newcall) {
1243             call = newcall;
1244             newcall = NULL;
1245         } else {
1246             call = rx_GetCall(threadID, tservice, socketp);
1247             if (socketp && *socketp != OSI_NULLSOCKET) {
1248                 /* We are now a listener thread */
1249                 return;
1250             }
1251         }
1252
1253         /* if server is restarting( typically smooth shutdown) then do not
1254          * allow any new calls.
1255          */
1256
1257         if ( rx_tranquil && (call != NULL) ) {
1258             SPLVAR;
1259
1260             NETPRI;
1261             AFS_RXGLOCK();
1262             MUTEX_ENTER(&call->lock);
1263
1264             rxi_CallError(call, RX_RESTARTING);
1265             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1266
1267             MUTEX_EXIT(&call->lock);
1268             AFS_RXGUNLOCK();
1269             USERPRI;
1270         }
1271
1272 #ifdef  KERNEL
1273         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1274 #ifdef RX_ENABLE_LOCKS
1275             AFS_GLOCK();
1276 #endif /* RX_ENABLE_LOCKS */
1277             afs_termState = AFSOP_STOP_AFS;
1278             afs_osi_Wakeup(&afs_termState);
1279 #ifdef RX_ENABLE_LOCKS
1280             AFS_GUNLOCK();
1281 #endif /* RX_ENABLE_LOCKS */
1282             return;
1283         }
1284 #endif
1285
1286         tservice = call->conn->service;
1287
1288         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1289
1290         code = call->conn->service->executeRequestProc(call);
1291
1292         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1293
1294         rx_EndCall(call, code);
1295         MUTEX_ENTER(&rx_stats_mutex);
1296         rxi_nCalls++;
1297         MUTEX_EXIT(&rx_stats_mutex);
1298     }
1299 }
1300
1301
1302 void rx_WakeupServerProcs()
1303 {
1304     struct rx_serverQueueEntry *np, *tqp;
1305     SPLVAR;
1306
1307     NETPRI;
1308     AFS_RXGLOCK();
1309     MUTEX_ENTER(&rx_serverPool_lock);
1310
1311 #ifdef RX_ENABLE_LOCKS
1312     if (rx_waitForPacket)
1313         CV_BROADCAST(&rx_waitForPacket->cv);
1314 #else /* RX_ENABLE_LOCKS */
1315     if (rx_waitForPacket)
1316         osi_rxWakeup(rx_waitForPacket);
1317 #endif /* RX_ENABLE_LOCKS */
1318     MUTEX_ENTER(&freeSQEList_lock);
1319     for (np = rx_FreeSQEList; np; np = tqp) {
1320       tqp = *(struct rx_serverQueueEntry **)np;
1321 #ifdef RX_ENABLE_LOCKS
1322       CV_BROADCAST(&np->cv);
1323 #else /* RX_ENABLE_LOCKS */
1324       osi_rxWakeup(np);
1325 #endif /* RX_ENABLE_LOCKS */
1326     }
1327     MUTEX_EXIT(&freeSQEList_lock);
1328     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1329 #ifdef RX_ENABLE_LOCKS
1330       CV_BROADCAST(&np->cv);
1331 #else /* RX_ENABLE_LOCKS */
1332       osi_rxWakeup(np);
1333 #endif /* RX_ENABLE_LOCKS */
1334     }
1335     MUTEX_EXIT(&rx_serverPool_lock);
1336     AFS_RXGUNLOCK();
1337     USERPRI;
1338 }
1339
1340 /* meltdown:
1341  * One thing that seems to happen is that all the server threads get
1342  * tied up on some empty or slow call, and then a whole bunch of calls
1343  * arrive at once, using up the packet pool, so now there are more 
1344  * empty calls.  The most critical resources here are server threads
1345  * and the free packet pool.  The "doreclaim" code seems to help in
1346  * general.  I think that eventually we arrive in this state: there
1347  * are lots of pending calls which do have all their packets present,
1348  * so they won't be reclaimed, are multi-packet calls, so they won't
1349  * be scheduled until later, and thus are tying up most of the free 
1350  * packet pool for a very long time.
1351  * future options:
1352  * 1.  schedule multi-packet calls if all the packets are present.  
1353  * Probably CPU-bound operation, useful to return packets to pool. 
1354  * Do what if there is a full window, but the last packet isn't here?
1355  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1356  * it sleeps and waits for that type of call.
1357  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1358  * the current dataquota business is badly broken.  The quota isn't adjusted
1359  * to reflect how many packets are presently queued for a running call.
1360  * So, when we schedule a queued call with a full window of packets queued
1361  * up for it, that *should* free up a window full of packets for other 2d-class
1362  * calls to be able to use from the packet pool.  But it doesn't.
1363  *
1364  * NB.  Most of the time, this code doesn't run -- since idle server threads
1365  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1366  * as a new call arrives.
1367  */
1368 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1369  * for an rx_Read. */
1370 #ifdef RX_ENABLE_LOCKS
1371 struct rx_call *
1372 rx_GetCall(tno, cur_service, socketp)
1373 int tno;
1374 struct rx_service *cur_service;
1375 osi_socket *socketp;
1376 {
1377     struct rx_serverQueueEntry *sq;
1378     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1379     struct rx_service *service;
1380     SPLVAR;
1381
1382     MUTEX_ENTER(&freeSQEList_lock);
1383
1384     if (sq = rx_FreeSQEList) {
1385         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1386         MUTEX_EXIT(&freeSQEList_lock);
1387     } else {    /* otherwise allocate a new one and return that */
1388         MUTEX_EXIT(&freeSQEList_lock);
1389         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1390         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1391         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1392     }
1393
1394     MUTEX_ENTER(&rx_serverPool_lock);
1395     if (cur_service != NULL) {
1396         ReturnToServerPool(cur_service);
1397     }
1398     while (1) {
1399         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1400             register struct rx_call *tcall, *ncall;
1401             choice2 = (struct rx_call *) 0;
1402             /* Scan for eligible incoming calls.  A call is not eligible
1403              * if the maximum number of calls for its service type are
1404              * already executing */
1405             /* One thread will process calls FCFS (to prevent starvation),
1406              * while the other threads may run ahead looking for calls which
1407              * have all their input data available immediately.  This helps 
1408              * keep threads from blocking, waiting for data from the client. */
1409             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1410               service = tcall->conn->service;
1411               if (!QuotaOK(service)) {
1412                 continue;
1413               }
1414               if (!tno || !tcall->queue_item_header.next  ) {
1415                 /* If we're thread 0, then  we'll just use 
1416                  * this call. If we haven't been able to find an optimal 
1417                  * choice, and we're at the end of the list, then use a 
1418                  * 2d choice if one has been identified.  Otherwise... */
1419                 call = (choice2 ? choice2 : tcall);
1420                 service = call->conn->service;
1421               } else if (!queue_IsEmpty(&tcall->rq)) {
1422                 struct rx_packet *rp;
1423                 rp = queue_First(&tcall->rq, rx_packet);
1424                 if (rp->header.seq == 1) {
1425                   if (!meltdown_1pkt ||             
1426                       (rp->header.flags & RX_LAST_PACKET)) {
1427                     call = tcall;
1428                   } else if (rxi_2dchoice && !choice2 &&
1429                              !(tcall->flags & RX_CALL_CLEARED) &&
1430                              (tcall->rprev > rxi_HardAckRate)) {
1431                     choice2 = tcall;
1432                   } else rxi_md2cnt++;
1433                 }
1434               }
1435               if (call)  {
1436                 break;
1437               } else {
1438                   ReturnToServerPool(service);
1439               }
1440             }
1441           }
1442
1443         if (call) {
1444             queue_Remove(call);
1445             rxi_ServerThreadSelectingCall = 1;
1446             MUTEX_EXIT(&rx_serverPool_lock);
1447             MUTEX_ENTER(&call->lock);
1448             MUTEX_ENTER(&rx_serverPool_lock);
1449
1450             if (queue_IsEmpty(&call->rq) ||
1451                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1452               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1453
1454             CLEAR_CALL_QUEUE_LOCK(call);
1455             if (call->error) {
1456                 MUTEX_EXIT(&call->lock);
1457                 ReturnToServerPool(service);
1458                 rxi_ServerThreadSelectingCall = 0;
1459                 CV_SIGNAL(&rx_serverPool_cv);
1460                 call = (struct rx_call*)0;
1461                 continue;
1462             }
1463             call->flags &= (~RX_CALL_WAIT_PROC);
1464             MUTEX_ENTER(&rx_stats_mutex);
1465             rx_nWaiting--;
1466             MUTEX_EXIT(&rx_stats_mutex);
1467             rxi_ServerThreadSelectingCall = 0;
1468             CV_SIGNAL(&rx_serverPool_cv);
1469             MUTEX_EXIT(&rx_serverPool_lock);
1470             break;
1471         }
1472         else {
1473             /* If there are no eligible incoming calls, add this process
1474              * to the idle server queue, to wait for one */
1475             sq->newcall = 0;
1476             sq->tno = tno;
1477             if (socketp) {
1478                 *socketp = OSI_NULLSOCKET;
1479             }
1480             sq->socketp = socketp;
1481             queue_Append(&rx_idleServerQueue, sq);
1482 #ifndef AFS_AIX41_ENV
1483             rx_waitForPacket = sq;
1484 #endif /* AFS_AIX41_ENV */
1485             do {
1486                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1487 #ifdef  KERNEL
1488                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1489                     MUTEX_EXIT(&rx_serverPool_lock);
1490                     return (struct rx_call *)0;
1491                 }
1492 #endif
1493             } while (!(call = sq->newcall) &&
1494                      !(socketp && *socketp != OSI_NULLSOCKET));
1495             MUTEX_EXIT(&rx_serverPool_lock);
1496             if (call) {
1497                 MUTEX_ENTER(&call->lock);
1498             }
1499             break;
1500         }
1501     }
1502
1503     MUTEX_ENTER(&freeSQEList_lock);
1504     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1505     rx_FreeSQEList = sq;
1506     MUTEX_EXIT(&freeSQEList_lock);
1507
1508     if (call) {
1509         clock_GetTime(&call->startTime);
1510         call->state = RX_STATE_ACTIVE;
1511         call->mode = RX_MODE_RECEIVING;
1512
1513         rxi_calltrace(RX_CALL_START, call);
1514         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1515              call->conn->service->servicePort, 
1516              call->conn->service->serviceId, call));
1517
1518         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1519         MUTEX_EXIT(&call->lock);
1520     } else {
1521         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1522     }
1523
1524     return call;
1525 }
1526 #else /* RX_ENABLE_LOCKS */
1527 struct rx_call *
1528 rx_GetCall(tno, cur_service, socketp)
1529   int tno;
1530   struct rx_service *cur_service;
1531   osi_socket *socketp;
1532 {
1533     struct rx_serverQueueEntry *sq;
1534     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1535     struct rx_service *service;
1536     SPLVAR;
1537
1538     NETPRI;
1539     AFS_RXGLOCK();
1540     MUTEX_ENTER(&freeSQEList_lock);
1541
1542     if (sq = rx_FreeSQEList) {
1543         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1544         MUTEX_EXIT(&freeSQEList_lock);
1545     } else {    /* otherwise allocate a new one and return that */
1546         MUTEX_EXIT(&freeSQEList_lock);
1547         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1548         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1549         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1550     }
1551     MUTEX_ENTER(&sq->lock);
1552
1553     if (cur_service != NULL) {
1554         cur_service->nRequestsRunning--;
1555         if (cur_service->nRequestsRunning < cur_service->minProcs)
1556             rxi_minDeficit++;
1557         rxi_availProcs++;
1558     }
1559     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1560         register struct rx_call *tcall, *ncall;
1561         /* Scan for eligible incoming calls.  A call is not eligible
1562          * if the maximum number of calls for its service type are
1563          * already executing */
1564         /* One thread will process calls FCFS (to prevent starvation),
1565          * while the other threads may run ahead looking for calls which
1566          * have all their input data available immediately.  This helps 
1567          * keep threads from blocking, waiting for data from the client. */
1568         choice2 = (struct rx_call *) 0;
1569         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1570           service = tcall->conn->service;
1571           if (QuotaOK(service)) {
1572              if (!tno || !tcall->queue_item_header.next  ) {
1573                  /* If we're thread 0, then  we'll just use 
1574                   * this call. If we haven't been able to find an optimal 
1575                   * choice, and we're at the end of the list, then use a 
1576                   * 2d choice if one has been identified.  Otherwise... */
1577                  call = (choice2 ? choice2 : tcall);
1578                  service = call->conn->service;
1579              } else if (!queue_IsEmpty(&tcall->rq)) {
1580                  struct rx_packet *rp;
1581                  rp = queue_First(&tcall->rq, rx_packet);
1582                  if (rp->header.seq == 1
1583                      && (!meltdown_1pkt ||
1584                          (rp->header.flags & RX_LAST_PACKET))) {
1585                      call = tcall;
1586                  } else if (rxi_2dchoice && !choice2 &&
1587                             !(tcall->flags & RX_CALL_CLEARED) &&
1588                             (tcall->rprev > rxi_HardAckRate)) {
1589                      choice2 = tcall;
1590                  } else rxi_md2cnt++;
1591              }
1592           }
1593           if (call) 
1594              break;
1595         }
1596       }
1597
1598     if (call) {
1599         queue_Remove(call);
1600         /* we can't schedule a call if there's no data!!! */
1601         /* send an ack if there's no data, if we're missing the
1602          * first packet, or we're missing something between first 
1603          * and last -- there's a "hole" in the incoming data. */
1604         if (queue_IsEmpty(&call->rq) ||
1605             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1606             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1607           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1608
1609         call->flags &= (~RX_CALL_WAIT_PROC);
1610         service->nRequestsRunning++;
1611         /* just started call in minProcs pool, need fewer to maintain
1612          * guarantee */
1613         if (service->nRequestsRunning <= service->minProcs)
1614             rxi_minDeficit--;
1615         rxi_availProcs--;
1616         rx_nWaiting--;
1617         /* MUTEX_EXIT(&call->lock); */
1618     }
1619     else {
1620         /* If there are no eligible incoming calls, add this process
1621          * to the idle server queue, to wait for one */
1622         sq->newcall = 0;
1623         if (socketp) {
1624             *socketp = OSI_NULLSOCKET;
1625         }
1626         sq->socketp = socketp;
1627         queue_Append(&rx_idleServerQueue, sq);
1628         do {
1629             osi_rxSleep(sq);
1630 #ifdef  KERNEL
1631                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1632                     AFS_RXGUNLOCK();
1633                     USERPRI;
1634                     return (struct rx_call *)0;
1635                 }
1636 #endif
1637         } while (!(call = sq->newcall) &&
1638                  !(socketp && *socketp != OSI_NULLSOCKET));
1639     }
1640     MUTEX_EXIT(&sq->lock);
1641
1642     MUTEX_ENTER(&freeSQEList_lock);
1643     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1644     rx_FreeSQEList = sq;
1645     MUTEX_EXIT(&freeSQEList_lock);
1646
1647     if (call) {
1648         clock_GetTime(&call->startTime);
1649         call->state = RX_STATE_ACTIVE;
1650         call->mode = RX_MODE_RECEIVING;
1651
1652         rxi_calltrace(RX_CALL_START, call);
1653         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1654          call->conn->service->servicePort, 
1655          call->conn->service->serviceId, call));
1656     } else {
1657         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1658     }
1659
1660     AFS_RXGUNLOCK();
1661     USERPRI;
1662
1663     return call;
1664 }
1665 #endif /* RX_ENABLE_LOCKS */
1666
1667
1668
1669 /* Establish a procedure to be called when a packet arrives for a
1670  * call.  This routine will be called at most once after each call,
1671  * and will also be called if there is an error condition on the or
1672  * the call is complete.  Used by multi rx to build a selection
1673  * function which determines which of several calls is likely to be a
1674  * good one to read from.  
1675  * NOTE: the way this is currently implemented it is probably only a
1676  * good idea to (1) use it immediately after a newcall (clients only)
1677  * and (2) only use it once.  Other uses currently void your warranty
1678  */
1679 void rx_SetArrivalProc(call, proc, handle, arg)
1680     register struct rx_call *call;
1681     register VOID (*proc)();
1682     register VOID *handle;
1683     register VOID *arg;
1684 {
1685     call->arrivalProc = proc;
1686     call->arrivalProcHandle = handle;
1687     call->arrivalProcArg = arg;
1688 }
1689
1690 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1691  * appropriate, and return the final error code from the conversation
1692  * to the caller */
1693
1694 afs_int32 rx_EndCall(call, rc)
1695     register struct rx_call *call;
1696     afs_int32 rc;
1697 {
1698     register struct rx_connection *conn = call->conn;
1699     register struct rx_service *service;
1700     register struct rx_packet *tp; /* Temporary packet pointer */
1701     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1702     afs_int32 error;
1703     SPLVAR;
1704
1705     dpf(("rx_EndCall(call %x)\n", call));
1706
1707     NETPRI;
1708     AFS_RXGLOCK();
1709     MUTEX_ENTER(&call->lock);
1710
1711     if (rc == 0 && call->error == 0) {
1712         call->abortCode = 0;
1713         call->abortCount = 0;
1714     }
1715
1716     call->arrivalProc = (VOID (*)()) 0;
1717     if (rc && call->error == 0) {
1718         rxi_CallError(call, rc);
1719         /* Send an abort message to the peer if this error code has
1720          * only just been set.  If it was set previously, assume the
1721          * peer has already been sent the error code or will request it 
1722          */
1723         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1724     }
1725     if (conn->type == RX_SERVER_CONNECTION) {
1726         /* Make sure reply or at least dummy reply is sent */
1727         if (call->mode == RX_MODE_RECEIVING) {
1728             rxi_WriteProc(call, 0, 0);
1729         }
1730         if (call->mode == RX_MODE_SENDING) {
1731             rxi_FlushWrite(call);
1732         }
1733         service = conn->service;
1734         rxi_calltrace(RX_CALL_END, call);
1735         /* Call goes to hold state until reply packets are acknowledged */
1736         if (call->tfirst + call->nSoftAcked < call->tnext) {
1737             call->state = RX_STATE_HOLD;
1738         } else {
1739             call->state = RX_STATE_DALLY;
1740             rxi_ClearTransmitQueue(call, 0);
1741             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1742             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1743         }
1744     }
1745     else { /* Client connection */
1746         char dummy;
1747         /* Make sure server receives input packets, in the case where
1748          * no reply arguments are expected */
1749         if ((call->mode == RX_MODE_SENDING)
1750          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1751             (void) rxi_ReadProc(call, &dummy, 1);
1752         }
1753         /* We need to release the call lock since it's lower than the
1754          * conn_call_lock and we don't want to hold the conn_call_lock
1755          * over the rx_ReadProc call. The conn_call_lock needs to be held
1756          * here for the case where rx_NewCall is perusing the calls on
1757          * the connection structure. We don't want to signal until
1758          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1759          * have checked this call, found it active and by the time it
1760          * goes to sleep, will have missed the signal.
1761          */
1762         MUTEX_EXIT(&call->lock);
1763         MUTEX_ENTER(&conn->conn_call_lock);
1764         MUTEX_ENTER(&call->lock);
1765         MUTEX_ENTER(&conn->conn_data_lock);
1766         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1767             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1768             MUTEX_EXIT(&conn->conn_data_lock);
1769 #ifdef  RX_ENABLE_LOCKS
1770             CV_BROADCAST(&conn->conn_call_cv);
1771 #else
1772             osi_rxWakeup(conn);
1773 #endif
1774         }
1775 #ifdef RX_ENABLE_LOCKS
1776         else {
1777             MUTEX_EXIT(&conn->conn_data_lock);
1778         }
1779 #endif /* RX_ENABLE_LOCKS */
1780         call->state = RX_STATE_DALLY;
1781     }
1782     error = call->error;
1783
1784     /* currentPacket, nLeft, and NFree must be zeroed here, because
1785      * ResetCall cannot: ResetCall may be called at splnet(), in the
1786      * kernel version, and may interrupt the macros rx_Read or
1787      * rx_Write, which run at normal priority for efficiency. */
1788     if (call->currentPacket) {
1789         rxi_FreePacket(call->currentPacket);
1790         call->currentPacket = (struct rx_packet *) 0;
1791         call->nLeft = call->nFree = call->curlen = 0;
1792     }
1793     else
1794         call->nLeft = call->nFree = call->curlen = 0;
1795
1796     /* Free any packets from the last call to ReadvProc/WritevProc */
1797     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1798         queue_Remove(tp);
1799         rxi_FreePacket(tp);
1800     }
1801
1802     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1803     MUTEX_EXIT(&call->lock);
1804     if (conn->type == RX_CLIENT_CONNECTION)
1805         MUTEX_EXIT(&conn->conn_call_lock);
1806     AFS_RXGUNLOCK();
1807     USERPRI;
1808     /*
1809      * Map errors to the local host's errno.h format.
1810      */
1811     error = ntoh_syserr_conv(error);
1812     return error;
1813 }
1814
1815 #if !defined(KERNEL)
1816
1817 /* Call this routine when shutting down a server or client (especially
1818  * clients).  This will allow Rx to gracefully garbage collect server
1819  * connections, and reduce the number of retries that a server might
1820  * make to a dead client.
1821  * This is not quite right, since some calls may still be ongoing and
1822  * we can't lock them to destroy them. */
1823 void rx_Finalize() {
1824     register struct rx_connection **conn_ptr, **conn_end;
1825
1826     INIT_PTHREAD_LOCKS
1827     LOCK_RX_INIT
1828     if (rxinit_status == 1) {
1829         UNLOCK_RX_INIT
1830         return; /* Already shutdown. */
1831     }
1832     rxi_DeleteCachedConnections();
1833     if (rx_connHashTable) {
1834         MUTEX_ENTER(&rx_connHashTable_lock);
1835         for (conn_ptr = &rx_connHashTable[0], 
1836              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1837              conn_ptr < conn_end; conn_ptr++) {
1838             struct rx_connection *conn, *next;
1839             for (conn = *conn_ptr; conn; conn = next) {
1840                 next = conn->next;
1841                 if (conn->type == RX_CLIENT_CONNECTION) {
1842                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1843                     conn->refCount++;
1844                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1845 #ifdef RX_ENABLE_LOCKS
1846                     rxi_DestroyConnectionNoLock(conn);
1847 #else /* RX_ENABLE_LOCKS */
1848                     rxi_DestroyConnection(conn);
1849 #endif /* RX_ENABLE_LOCKS */
1850                 }
1851             }
1852         }
1853 #ifdef RX_ENABLE_LOCKS
1854         while (rx_connCleanup_list) {
1855             struct rx_connection *conn;
1856             conn = rx_connCleanup_list;
1857             rx_connCleanup_list = rx_connCleanup_list->next;
1858             MUTEX_EXIT(&rx_connHashTable_lock);
1859             rxi_CleanupConnection(conn);
1860             MUTEX_ENTER(&rx_connHashTable_lock);
1861         }
1862         MUTEX_EXIT(&rx_connHashTable_lock);
1863 #endif /* RX_ENABLE_LOCKS */
1864     }
1865     rxi_flushtrace();
1866
1867     rxinit_status = 1;
1868     UNLOCK_RX_INIT
1869 }
1870 #endif
1871
1872 /* if we wakeup packet waiter too often, can get in loop with two
1873     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1874 void
1875 rxi_PacketsUnWait() {
1876
1877     if (!rx_waitingForPackets) {
1878         return;
1879     }
1880 #ifdef KERNEL
1881     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1882         return;                                     /* still over quota */
1883     }
1884 #endif /* KERNEL */
1885     rx_waitingForPackets = 0;
1886 #ifdef  RX_ENABLE_LOCKS
1887     CV_BROADCAST(&rx_waitingForPackets_cv);
1888 #else
1889     osi_rxWakeup(&rx_waitingForPackets);
1890 #endif
1891     return;
1892 }
1893
1894
1895 /* ------------------Internal interfaces------------------------- */
1896
1897 /* Return this process's service structure for the
1898  * specified socket and service */
1899 struct rx_service *rxi_FindService(socket, serviceId)
1900     register osi_socket socket;
1901     register u_short serviceId;
1902 {
1903     register struct rx_service **sp;    
1904     for (sp = &rx_services[0]; *sp; sp++) {
1905         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1906           return *sp;
1907     }
1908     return 0;
1909 }
1910
1911 /* Allocate a call structure, for the indicated channel of the
1912  * supplied connection.  The mode and state of the call must be set by
1913  * the caller. */
1914 struct rx_call *rxi_NewCall(conn, channel)
1915     register struct rx_connection *conn;
1916     register int channel;
1917 {
1918     register struct rx_call *call;
1919 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1920     register struct rx_call *cp;        /* Call pointer temp */
1921     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1922 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1923
1924     /* Grab an existing call structure, or allocate a new one.
1925      * Existing call structures are assumed to have been left reset by
1926      * rxi_FreeCall */
1927     MUTEX_ENTER(&rx_freeCallQueue_lock);
1928
1929 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1930     /*
1931      * EXCEPT that the TQ might not yet be cleared out.
1932      * Skip over those with in-use TQs.
1933      */
1934     call = NULL;
1935     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1936         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1937             call = cp;
1938             break;
1939         }
1940     }
1941     if (call) {
1942 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1943     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1944         call = queue_First(&rx_freeCallQueue, rx_call);
1945 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1946         queue_Remove(call);
1947         MUTEX_ENTER(&rx_stats_mutex);
1948         rx_stats.nFreeCallStructs--;
1949         MUTEX_EXIT(&rx_stats_mutex);
1950         MUTEX_EXIT(&rx_freeCallQueue_lock);
1951         MUTEX_ENTER(&call->lock);
1952         CLEAR_CALL_QUEUE_LOCK(call);
1953 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1954         /* Now, if TQ wasn't cleared earlier, do it now. */
1955         if (call->flags & RX_CALL_TQ_CLEARME) {
1956             rxi_ClearTransmitQueue(call, 0);
1957             queue_Init(&call->tq);
1958         }
1959 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1960         /* Bind the call to its connection structure */
1961         call->conn = conn;
1962         rxi_ResetCall(call, 1);
1963     }
1964     else {
1965         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1966
1967         MUTEX_EXIT(&rx_freeCallQueue_lock);
1968         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1969         MUTEX_ENTER(&call->lock);
1970         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1971         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1972         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1973
1974         MUTEX_ENTER(&rx_stats_mutex);
1975         rx_stats.nCallStructs++;
1976         MUTEX_EXIT(&rx_stats_mutex);
1977         /* Initialize once-only items */
1978         queue_Init(&call->tq);
1979         queue_Init(&call->rq);
1980         queue_Init(&call->iovq);
1981         /* Bind the call to its connection structure (prereq for reset) */
1982         call->conn = conn;
1983         rxi_ResetCall(call, 1);
1984     }
1985     call->channel = channel;
1986     call->callNumber = &conn->callNumber[channel];
1987     /* Note that the next expected call number is retained (in
1988      * conn->callNumber[i]), even if we reallocate the call structure
1989      */
1990     conn->call[channel] = call;
1991     /* if the channel's never been used (== 0), we should start at 1, otherwise
1992         the call number is valid from the last time this channel was used */
1993     if (*call->callNumber == 0) *call->callNumber = 1;
1994
1995     MUTEX_EXIT(&call->lock);
1996     return call;
1997 }
1998
1999 /* A call has been inactive long enough that so we can throw away
2000  * state, including the call structure, which is placed on the call
2001  * free list.
2002  * Call is locked upon entry.
2003  */
2004 #ifdef RX_ENABLE_LOCKS
2005 void rxi_FreeCall(call, haveCTLock)
2006     int haveCTLock; /* Set if called from rxi_ReapConnections */
2007 #else /* RX_ENABLE_LOCKS */
2008 void rxi_FreeCall(call)
2009 #endif /* RX_ENABLE_LOCKS */
2010     register struct rx_call *call;
2011 {
2012     register int channel = call->channel;
2013     register struct rx_connection *conn = call->conn;
2014
2015
2016     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2017       (*call->callNumber)++;
2018     rxi_ResetCall(call, 0);
2019     call->conn->call[channel] = (struct rx_call *) 0;
2020
2021     MUTEX_ENTER(&rx_freeCallQueue_lock);
2022     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2023 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2024     /* A call may be free even though its transmit queue is still in use.
2025      * Since we search the call list from head to tail, put busy calls at
2026      * the head of the list, and idle calls at the tail.
2027      */
2028     if (call->flags & RX_CALL_TQ_BUSY)
2029         queue_Prepend(&rx_freeCallQueue, call);
2030     else
2031         queue_Append(&rx_freeCallQueue, call);
2032 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2033     queue_Append(&rx_freeCallQueue, call);
2034 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2035     MUTEX_ENTER(&rx_stats_mutex);
2036     rx_stats.nFreeCallStructs++;
2037     MUTEX_EXIT(&rx_stats_mutex);
2038
2039     MUTEX_EXIT(&rx_freeCallQueue_lock);
2040  
2041     /* Destroy the connection if it was previously slated for
2042      * destruction, i.e. the Rx client code previously called
2043      * rx_DestroyConnection (client connections), or
2044      * rxi_ReapConnections called the same routine (server
2045      * connections).  Only do this, however, if there are no
2046      * outstanding calls. Note that for fine grain locking, there appears
2047      * to be a deadlock in that rxi_FreeCall has a call locked and
2048      * DestroyConnectionNoLock locks each call in the conn. But note a
2049      * few lines up where we have removed this call from the conn.
2050      * If someone else destroys a connection, they either have no
2051      * call lock held or are going through this section of code.
2052      */
2053     if (conn->flags & RX_CONN_DESTROY_ME) {
2054         MUTEX_ENTER(&conn->conn_data_lock);
2055         conn->refCount++;
2056         MUTEX_EXIT(&conn->conn_data_lock);
2057 #ifdef RX_ENABLE_LOCKS
2058         if (haveCTLock)
2059             rxi_DestroyConnectionNoLock(conn);
2060         else
2061             rxi_DestroyConnection(conn);
2062 #else /* RX_ENABLE_LOCKS */
2063         rxi_DestroyConnection(conn);
2064 #endif /* RX_ENABLE_LOCKS */
2065     }
2066 }
2067
2068 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2069 char *rxi_Alloc(size)
2070 register size_t size;
2071 {
2072     register char *p;
2073
2074 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2075     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2076      * implementation.
2077      */
2078     int glockOwner = ISAFS_GLOCK();
2079     if (!glockOwner)
2080         AFS_GLOCK();
2081 #endif
2082     MUTEX_ENTER(&rx_stats_mutex);
2083     rxi_Alloccnt++; rxi_Allocsize += size;
2084     MUTEX_EXIT(&rx_stats_mutex);
2085 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2086     if (size > AFS_SMALLOCSIZ) {
2087         p = (char *) osi_AllocMediumSpace(size);
2088     } else
2089         p = (char *) osi_AllocSmall(size, 1);
2090 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2091     if (!glockOwner)
2092         AFS_GUNLOCK();
2093 #endif
2094 #else
2095     p = (char *) osi_Alloc(size);
2096 #endif
2097     if (!p) osi_Panic("rxi_Alloc error");
2098     bzero(p, size);
2099     return p;
2100 }
2101
2102 void rxi_Free(addr, size)
2103 void *addr;
2104 register size_t size;
2105 {
2106 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2107     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2108      * implementation.
2109      */
2110     int glockOwner = ISAFS_GLOCK();
2111     if (!glockOwner)
2112         AFS_GLOCK();
2113 #endif
2114     MUTEX_ENTER(&rx_stats_mutex);
2115     rxi_Alloccnt--; rxi_Allocsize -= size;
2116     MUTEX_EXIT(&rx_stats_mutex);
2117 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2118     if (size > AFS_SMALLOCSIZ)
2119         osi_FreeMediumSpace(addr);
2120     else
2121         osi_FreeSmall(addr);
2122 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2123     if (!glockOwner)
2124         AFS_GUNLOCK();
2125 #endif
2126 #else
2127     osi_Free(addr, size);
2128 #endif    
2129 }
2130
2131 /* Find the peer process represented by the supplied (host,port)
2132  * combination.  If there is no appropriate active peer structure, a
2133  * new one will be allocated and initialized 
2134  * The origPeer, if set, is a pointer to a peer structure on which the
2135  * refcount will be be decremented. This is used to replace the peer
2136  * structure hanging off a connection structure */
2137 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2138     register afs_uint32 host;
2139     register u_short port;
2140     struct rx_peer *origPeer;
2141     int create;
2142 {
2143     register struct rx_peer *pp;
2144     int hashIndex;
2145     hashIndex = PEER_HASH(host, port);
2146     MUTEX_ENTER(&rx_peerHashTable_lock);
2147     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2148         if ((pp->host == host) && (pp->port == port)) break;
2149     }
2150     if (!pp) {
2151         if (create) {
2152             pp = rxi_AllocPeer(); /* This bzero's *pp */
2153             pp->host = host;      /* set here or in InitPeerParams is zero */
2154             pp->port = port;
2155             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2156             queue_Init(&pp->congestionQueue);
2157             queue_Init(&pp->rpcStats);
2158             pp->next = rx_peerHashTable[hashIndex];
2159             rx_peerHashTable[hashIndex] = pp;
2160             rxi_InitPeerParams(pp);
2161             MUTEX_ENTER(&rx_stats_mutex);
2162             rx_stats.nPeerStructs++;
2163             MUTEX_EXIT(&rx_stats_mutex);
2164         }
2165     }
2166     if (pp && create) {
2167         pp->refCount++;
2168     }
2169     if ( origPeer)
2170         origPeer->refCount--;
2171     MUTEX_EXIT(&rx_peerHashTable_lock);
2172     return pp;
2173 }
2174
2175
2176 /* Find the connection at (host, port) started at epoch, and with the
2177  * given connection id.  Creates the server connection if necessary.
2178  * The type specifies whether a client connection or a server
2179  * connection is desired.  In both cases, (host, port) specify the
2180  * peer's (host, pair) pair.  Client connections are not made
2181  * automatically by this routine.  The parameter socket gives the
2182  * socket descriptor on which the packet was received.  This is used,
2183  * in the case of server connections, to check that *new* connections
2184  * come via a valid (port, serviceId).  Finally, the securityIndex
2185  * parameter must match the existing index for the connection.  If a
2186  * server connection is created, it will be created using the supplied
2187  * index, if the index is valid for this service */
2188 struct rx_connection *
2189 rxi_FindConnection(socket, host, port, serviceId, cid, 
2190                    epoch, type, securityIndex)
2191     osi_socket socket;
2192     register afs_int32 host;
2193     register u_short port;
2194     u_short serviceId;
2195     afs_uint32 cid;
2196     afs_uint32 epoch;
2197     int type;
2198     u_int securityIndex;
2199 {
2200     int hashindex, flag;
2201     register struct rx_connection *conn;
2202     struct rx_peer *peer;
2203     hashindex = CONN_HASH(host, port, cid, epoch, type);
2204     MUTEX_ENTER(&rx_connHashTable_lock);
2205     rxLastConn ? (conn = rxLastConn, flag = 0) :
2206                  (conn = rx_connHashTable[hashindex], flag = 1);
2207     for (; conn; ) {
2208       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2209           && (epoch == conn->epoch)) {
2210         register struct rx_peer *pp = conn->peer;
2211         if (securityIndex != conn->securityIndex) {
2212             /* this isn't supposed to happen, but someone could forge a packet
2213                like this, and there seems to be some CM bug that makes this
2214                happen from time to time -- in which case, the fileserver
2215                asserts. */  
2216             MUTEX_EXIT(&rx_connHashTable_lock);
2217             return (struct rx_connection *) 0;
2218         }
2219         /* epoch's high order bits mean route for security reasons only on
2220          * the cid, not the host and port fields.
2221          */
2222         if (conn->epoch & 0x80000000) break;
2223         if (((type == RX_CLIENT_CONNECTION) 
2224              || (pp->host == host)) && (pp->port == port))
2225           break;
2226       }
2227       if ( !flag )
2228       {
2229         /* the connection rxLastConn that was used the last time is not the
2230         ** one we are looking for now. Hence, start searching in the hash */
2231         flag = 1;
2232         conn = rx_connHashTable[hashindex];
2233       }
2234       else
2235         conn = conn->next;
2236     }
2237     if (!conn) {
2238         struct rx_service *service;
2239         if (type == RX_CLIENT_CONNECTION) {
2240             MUTEX_EXIT(&rx_connHashTable_lock);
2241             return (struct rx_connection *) 0;
2242         }
2243         service = rxi_FindService(socket, serviceId);
2244         if (!service || (securityIndex >= service->nSecurityObjects) 
2245             || (service->securityObjects[securityIndex] == 0)) {
2246             MUTEX_EXIT(&rx_connHashTable_lock);
2247             return (struct rx_connection *) 0;
2248         }
2249         conn = rxi_AllocConnection(); /* This bzero's the connection */
2250         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2251                    MUTEX_DEFAULT,0);
2252         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2253                    MUTEX_DEFAULT,0);
2254         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2255         conn->next = rx_connHashTable[hashindex];
2256         rx_connHashTable[hashindex] = conn;
2257         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2258         conn->type = RX_SERVER_CONNECTION;
2259         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2260         conn->epoch = epoch;
2261         conn->cid = cid & RX_CIDMASK;
2262         /* conn->serial = conn->lastSerial = 0; */
2263         /* conn->timeout = 0; */
2264         conn->ackRate = RX_FAST_ACK_RATE;
2265         conn->service = service;
2266         conn->serviceId = serviceId;
2267         conn->securityIndex = securityIndex;
2268         conn->securityObject = service->securityObjects[securityIndex];
2269         conn->nSpecific = 0;
2270         conn->specific = NULL;
2271         rx_SetConnDeadTime(conn, service->connDeadTime);
2272         /* Notify security object of the new connection */
2273         RXS_NewConnection(conn->securityObject, conn);
2274         /* XXXX Connection timeout? */
2275         if (service->newConnProc) (*service->newConnProc)(conn);
2276         MUTEX_ENTER(&rx_stats_mutex);
2277         rx_stats.nServerConns++;
2278         MUTEX_EXIT(&rx_stats_mutex);
2279     }
2280     else
2281     {
2282     /* Ensure that the peer structure is set up in such a way that
2283     ** replies in this connection go back to that remote interface
2284     ** from which the last packet was sent out. In case, this packet's
2285     ** source IP address does not match the peer struct for this conn,
2286     ** then drop the refCount on conn->peer and get a new peer structure.
2287     ** We can check the host,port field in the peer structure without the
2288     ** rx_peerHashTable_lock because the peer structure has its refCount
2289     ** incremented and the only time the host,port in the peer struct gets
2290     ** updated is when the peer structure is created.
2291     */
2292         if (conn->peer->host == host )
2293                 peer = conn->peer; /* no change to the peer structure */
2294         else
2295                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2296     }
2297
2298     MUTEX_ENTER(&conn->conn_data_lock);
2299     conn->refCount++;
2300     conn->peer = peer;
2301     MUTEX_EXIT(&conn->conn_data_lock);
2302
2303     rxLastConn = conn;  /* store this connection as the last conn used */
2304     MUTEX_EXIT(&rx_connHashTable_lock);
2305     return conn;
2306 }
2307
2308 /* There are two packet tracing routines available for testing and monitoring
2309  * Rx.  One is called just after every packet is received and the other is
2310  * called just before every packet is sent.  Received packets, have had their
2311  * headers decoded, and packets to be sent have not yet had their headers
2312  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2313  * containing the network address.  Both can be modified.  The return value, if
2314  * non-zero, indicates that the packet should be dropped.  */
2315
2316 int (*rx_justReceived)() = 0;
2317 int (*rx_almostSent)() = 0;
2318
2319 /* A packet has been received off the interface.  Np is the packet, socket is
2320  * the socket number it was received from (useful in determining which service
2321  * this packet corresponds to), and (host, port) reflect the host,port of the
2322  * sender.  This call returns the packet to the caller if it is finished with
2323  * it, rather than de-allocating it, just as a small performance hack */
2324
2325 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2326     register struct rx_packet *np;
2327     osi_socket socket;
2328     afs_uint32 host;
2329     u_short port;
2330     int *tnop;
2331     struct rx_call **newcallp;
2332 {
2333     register struct rx_call *call;
2334     register struct rx_connection *conn;
2335     int channel;
2336     afs_uint32 currentCallNumber;
2337     int type;
2338     int skew;
2339 #ifdef RXDEBUG
2340     char *packetType;
2341 #endif
2342     struct rx_packet *tnp;
2343
2344 #ifdef RXDEBUG
2345 /* We don't print out the packet until now because (1) the time may not be
2346  * accurate enough until now in the lwp implementation (rx_Listener only gets
2347  * the time after the packet is read) and (2) from a protocol point of view,
2348  * this is the first time the packet has been seen */
2349     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2350         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2351     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2352          np->header.serial, packetType, host, port, np->header.serviceId,
2353          np->header.epoch, np->header.cid, np->header.callNumber, 
2354          np->header.seq, np->header.flags, np));
2355 #endif
2356
2357     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2358       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2359     }
2360
2361     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2362         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2363     }
2364 #ifdef RXDEBUG
2365     /* If an input tracer function is defined, call it with the packet and
2366      * network address.  Note this function may modify its arguments. */
2367     if (rx_justReceived) {
2368         struct sockaddr_in addr;
2369         int drop;
2370         addr.sin_family = AF_INET;
2371         addr.sin_port = port;
2372         addr.sin_addr.s_addr = host;
2373 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2374         addr.sin_len = sizeof(addr);
2375 #endif  /* AFS_OSF_ENV */
2376         drop = (*rx_justReceived) (np, &addr);
2377         /* drop packet if return value is non-zero */
2378         if (drop) return np;
2379         port = addr.sin_port;           /* in case fcn changed addr */
2380         host = addr.sin_addr.s_addr;
2381     }
2382 #endif
2383
2384     /* If packet was not sent by the client, then *we* must be the client */
2385     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2386         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2387
2388     /* Find the connection (or fabricate one, if we're the server & if
2389      * necessary) associated with this packet */
2390     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2391                               np->header.cid, np->header.epoch, type, 
2392                               np->header.securityIndex);
2393
2394     if (!conn) {
2395       /* If no connection found or fabricated, just ignore the packet.
2396        * (An argument could be made for sending an abort packet for
2397        * the conn) */
2398       return np;
2399     }   
2400
2401     MUTEX_ENTER(&conn->conn_data_lock);
2402     if (conn->maxSerial < np->header.serial)
2403       conn->maxSerial = np->header.serial;
2404     MUTEX_EXIT(&conn->conn_data_lock);
2405
2406     /* If the connection is in an error state, send an abort packet and ignore
2407      * the incoming packet */
2408     if (conn->error) {
2409         /* Don't respond to an abort packet--we don't want loops! */
2410         MUTEX_ENTER(&conn->conn_data_lock);
2411         if (np->header.type != RX_PACKET_TYPE_ABORT)
2412             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2413         conn->refCount--;
2414         MUTEX_EXIT(&conn->conn_data_lock);
2415         return np;
2416     }
2417
2418     /* Check for connection-only requests (i.e. not call specific). */
2419     if (np->header.callNumber == 0) {
2420         switch (np->header.type) {
2421             case RX_PACKET_TYPE_ABORT:
2422                 /* What if the supplied error is zero? */
2423                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2424                 MUTEX_ENTER(&conn->conn_data_lock);
2425                 conn->refCount--;
2426                 MUTEX_EXIT(&conn->conn_data_lock);
2427                 return np;
2428             case RX_PACKET_TYPE_CHALLENGE:
2429                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2430                 MUTEX_ENTER(&conn->conn_data_lock);
2431                 conn->refCount--;
2432                 MUTEX_EXIT(&conn->conn_data_lock);
2433                 return tnp;
2434             case RX_PACKET_TYPE_RESPONSE:
2435                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2436                 MUTEX_ENTER(&conn->conn_data_lock);
2437                 conn->refCount--;
2438                 MUTEX_EXIT(&conn->conn_data_lock);
2439                 return tnp;
2440             case RX_PACKET_TYPE_PARAMS:
2441             case RX_PACKET_TYPE_PARAMS+1:
2442             case RX_PACKET_TYPE_PARAMS+2:
2443                 /* ignore these packet types for now */
2444                 MUTEX_ENTER(&conn->conn_data_lock);
2445                 conn->refCount--;
2446                 MUTEX_EXIT(&conn->conn_data_lock);
2447                 return np;
2448
2449
2450             default:
2451                 /* Should not reach here, unless the peer is broken: send an
2452                  * abort packet */
2453                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2454                 MUTEX_ENTER(&conn->conn_data_lock);
2455                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2456                 conn->refCount--;
2457                 MUTEX_EXIT(&conn->conn_data_lock);
2458                 return tnp;
2459         }
2460     }
2461
2462     channel = np->header.cid & RX_CHANNELMASK;
2463     call = conn->call[channel];
2464 #ifdef  RX_ENABLE_LOCKS
2465     if (call)
2466         MUTEX_ENTER(&call->lock);
2467     /* Test to see if call struct is still attached to conn. */
2468     if (call != conn->call[channel]) {
2469         if (call)
2470             MUTEX_EXIT(&call->lock);
2471         if (type == RX_SERVER_CONNECTION) {
2472             call = conn->call[channel];
2473             /* If we started with no call attached and there is one now,
2474              * another thread is also running this routine and has gotten
2475              * the connection channel. We should drop this packet in the tests
2476              * below. If there was a call on this connection and it's now
2477              * gone, then we'll be making a new call below.
2478              * If there was previously a call and it's now different then
2479              * the old call was freed and another thread running this routine
2480              * has created a call on this channel. One of these two threads
2481              * has a packet for the old call and the code below handles those
2482              * cases.
2483              */
2484             if (call)
2485                 MUTEX_ENTER(&call->lock);
2486         }
2487         else {
2488             /* This packet can't be for this call. If the new call address is
2489              * 0 then no call is running on this channel. If there is a call
2490              * then, since this is a client connection we're getting data for
2491              * it must be for the previous call.
2492              */
2493             MUTEX_ENTER(&rx_stats_mutex);
2494             rx_stats.spuriousPacketsRead++;
2495             MUTEX_EXIT(&rx_stats_mutex);
2496             MUTEX_ENTER(&conn->conn_data_lock);
2497             conn->refCount--;
2498             MUTEX_EXIT(&conn->conn_data_lock);
2499             return np;
2500         }
2501     }
2502 #endif
2503     currentCallNumber = conn->callNumber[channel];
2504
2505     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2506         if (np->header.callNumber < currentCallNumber) {
2507             MUTEX_ENTER(&rx_stats_mutex);
2508             rx_stats.spuriousPacketsRead++;
2509             MUTEX_EXIT(&rx_stats_mutex);
2510 #ifdef  RX_ENABLE_LOCKS
2511             if (call)
2512                 MUTEX_EXIT(&call->lock);
2513 #endif
2514             MUTEX_ENTER(&conn->conn_data_lock);
2515             conn->refCount--;
2516             MUTEX_EXIT(&conn->conn_data_lock);
2517             return np;
2518         }
2519         if (!call) {
2520             call = rxi_NewCall(conn, channel);
2521             MUTEX_ENTER(&call->lock);
2522             *call->callNumber = np->header.callNumber;
2523             call->state = RX_STATE_PRECALL;
2524             clock_GetTime(&call->queueTime);
2525             hzero(call->bytesSent);
2526             hzero(call->bytesRcvd);
2527             rxi_KeepAliveOn(call);
2528         }
2529         else if (np->header.callNumber != currentCallNumber) {
2530             /* Wait until the transmit queue is idle before deciding
2531              * whether to reset the current call. Chances are that the
2532              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2533              * flag is cleared.
2534              */
2535 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2536             while ((call->state == RX_STATE_ACTIVE) &&
2537                    (call->flags & RX_CALL_TQ_BUSY)) {
2538                 call->flags |= RX_CALL_TQ_WAIT;
2539 #ifdef RX_ENABLE_LOCKS
2540                 CV_WAIT(&call->cv_tq, &call->lock);
2541 #else /* RX_ENABLE_LOCKS */
2542                 osi_rxSleep(&call->tq);
2543 #endif /* RX_ENABLE_LOCKS */
2544             }
2545 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2546             /* If the new call cannot be taken right now send a busy and set
2547              * the error condition in this call, so that it terminates as
2548              * quickly as possible */
2549             if (call->state == RX_STATE_ACTIVE) {
2550                 struct rx_packet *tp;
2551
2552                 rxi_CallError(call, RX_CALL_DEAD);
2553                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2554                 MUTEX_EXIT(&call->lock);
2555                 MUTEX_ENTER(&conn->conn_data_lock);
2556                 conn->refCount--;
2557                 MUTEX_EXIT(&conn->conn_data_lock);
2558                 return tp;
2559             }
2560             rxi_ResetCall(call, 0);
2561             *call->callNumber = np->header.callNumber;
2562             call->state = RX_STATE_PRECALL;
2563             clock_GetTime(&call->queueTime);
2564             hzero(call->bytesSent);
2565             hzero(call->bytesRcvd);
2566             /*
2567              * If the number of queued calls exceeds the overload
2568              * threshold then abort this call.
2569              */
2570             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2571                 struct rx_packet *tp;
2572
2573                 rxi_CallError(call, rx_BusyError);
2574                 tp = rxi_SendCallAbort(call, np, 1, 0);
2575                 MUTEX_EXIT(&call->lock);
2576                 MUTEX_ENTER(&conn->conn_data_lock);
2577                 conn->refCount--;
2578                 MUTEX_EXIT(&conn->conn_data_lock);
2579                 return tp;
2580             }
2581             rxi_KeepAliveOn(call);
2582         }
2583         else {
2584             /* Continuing call; do nothing here. */
2585         }
2586     } else { /* we're the client */
2587         /* Ignore all incoming acknowledgements for calls in DALLY state */
2588         if ( call && (call->state == RX_STATE_DALLY) 
2589          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2590             MUTEX_ENTER(&rx_stats_mutex);
2591             rx_stats.ignorePacketDally++;
2592             MUTEX_EXIT(&rx_stats_mutex);
2593 #ifdef  RX_ENABLE_LOCKS
2594             if (call) {
2595                 MUTEX_EXIT(&call->lock);
2596             }
2597 #endif
2598             MUTEX_ENTER(&conn->conn_data_lock);
2599             conn->refCount--;
2600             MUTEX_EXIT(&conn->conn_data_lock);
2601             return np;
2602         }
2603         
2604         /* Ignore anything that's not relevant to the current call.  If there
2605          * isn't a current call, then no packet is relevant. */
2606         if (!call || (np->header.callNumber != currentCallNumber)) {
2607             MUTEX_ENTER(&rx_stats_mutex);
2608             rx_stats.spuriousPacketsRead++;
2609             MUTEX_EXIT(&rx_stats_mutex);
2610 #ifdef  RX_ENABLE_LOCKS
2611             if (call) {
2612                 MUTEX_EXIT(&call->lock);
2613             }
2614 #endif
2615             MUTEX_ENTER(&conn->conn_data_lock);
2616             conn->refCount--;
2617             MUTEX_EXIT(&conn->conn_data_lock);
2618             return np;  
2619         }
2620         /* If the service security object index stamped in the packet does not
2621          * match the connection's security index, ignore the packet */
2622         if (np->header.securityIndex != conn->securityIndex) {
2623 #ifdef  RX_ENABLE_LOCKS
2624             MUTEX_EXIT(&call->lock);
2625 #endif
2626             MUTEX_ENTER(&conn->conn_data_lock);
2627             conn->refCount--;       
2628             MUTEX_EXIT(&conn->conn_data_lock);
2629             return np;
2630         }
2631
2632         /* If we're receiving the response, then all transmit packets are
2633          * implicitly acknowledged.  Get rid of them. */
2634         if (np->header.type == RX_PACKET_TYPE_DATA) {
2635 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2636             /* XXX Hack. Because we must release the global rx lock when
2637              * sending packets (osi_NetSend) we drop all acks while we're
2638              * traversing the tq in rxi_Start sending packets out because
2639              * packets may move to the freePacketQueue as result of being here!
2640              * So we drop these packets until we're safely out of the
2641              * traversing. Really ugly! 
2642              * For fine grain RX locking, we set the acked field in the
2643              * packets and let rxi_Start remove them from the transmit queue.
2644              */
2645             if (call->flags & RX_CALL_TQ_BUSY) {
2646 #ifdef  RX_ENABLE_LOCKS
2647                 rxi_SetAcksInTransmitQueue(call);
2648 #else
2649                 conn->refCount--;
2650                 return np;              /* xmitting; drop packet */
2651 #endif
2652             }
2653             else {
2654                 rxi_ClearTransmitQueue(call, 0);
2655             }
2656 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2657             rxi_ClearTransmitQueue(call, 0);
2658 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2659         } else {
2660           if (np->header.type == RX_PACKET_TYPE_ACK) {
2661         /* now check to see if this is an ack packet acknowledging that the
2662          * server actually *lost* some hard-acked data.  If this happens we
2663          * ignore this packet, as it may indicate that the server restarted in
2664          * the middle of a call.  It is also possible that this is an old ack
2665          * packet.  We don't abort the connection in this case, because this
2666          * *might* just be an old ack packet.  The right way to detect a server
2667          * restart in the midst of a call is to notice that the server epoch
2668          * changed, btw.  */
2669         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2670          * XXX unacknowledged.  I think that this is off-by-one, but
2671          * XXX I don't dare change it just yet, since it will
2672          * XXX interact badly with the server-restart detection 
2673          * XXX code in receiveackpacket.  */
2674             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2675                 MUTEX_ENTER(&rx_stats_mutex);
2676                 rx_stats.spuriousPacketsRead++;
2677                 MUTEX_EXIT(&rx_stats_mutex);
2678                 MUTEX_EXIT(&call->lock);
2679                 MUTEX_ENTER(&conn->conn_data_lock);
2680                 conn->refCount--;
2681                 MUTEX_EXIT(&conn->conn_data_lock);
2682                 return np;
2683             }
2684           }
2685         } /* else not a data packet */
2686     }
2687
2688     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2689     /* Set remote user defined status from packet */
2690     call->remoteStatus = np->header.userStatus;
2691
2692     /* Note the gap between the expected next packet and the actual
2693      * packet that arrived, when the new packet has a smaller serial number
2694      * than expected.  Rioses frequently reorder packets all by themselves,
2695      * so this will be quite important with very large window sizes.
2696      * Skew is checked against 0 here to avoid any dependence on the type of
2697      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2698      * true! 
2699      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2700      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2701      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2702      */
2703     MUTEX_ENTER(&conn->conn_data_lock);
2704     skew = conn->lastSerial - np->header.serial;
2705     conn->lastSerial = np->header.serial;
2706     MUTEX_EXIT(&conn->conn_data_lock);
2707     if (skew > 0) {
2708       register struct rx_peer *peer;
2709       peer = conn->peer;
2710       if (skew > peer->inPacketSkew) {
2711         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2712         peer->inPacketSkew = skew;
2713       }
2714     }
2715
2716     /* Now do packet type-specific processing */
2717     switch (np->header.type) {
2718         case RX_PACKET_TYPE_DATA:
2719             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2720                                        tnop, newcallp);
2721             break;
2722         case RX_PACKET_TYPE_ACK:
2723             /* Respond immediately to ack packets requesting acknowledgement
2724              * (ping packets) */
2725             if (np->header.flags & RX_REQUEST_ACK) {
2726                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2727                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2728             }
2729             np = rxi_ReceiveAckPacket(call, np, 1);
2730             break;
2731         case RX_PACKET_TYPE_ABORT:
2732             /* An abort packet: reset the connection, passing the error up to
2733              * the user */
2734             /* What if error is zero? */
2735             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2736             break;
2737         case RX_PACKET_TYPE_BUSY:
2738             /* XXXX */
2739             break;
2740         case RX_PACKET_TYPE_ACKALL:
2741             /* All packets acknowledged, so we can drop all packets previously
2742              * readied for sending */
2743 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2744             /* XXX Hack. We because we can't release the global rx lock when
2745              * sending packets (osi_NetSend) we drop all ack pkts while we're
2746              * traversing the tq in rxi_Start sending packets out because
2747              * packets may move to the freePacketQueue as result of being
2748              * here! So we drop these packets until we're safely out of the
2749              * traversing. Really ugly! 
2750              * For fine grain RX locking, we set the acked field in the packets
2751              * and let rxi_Start remove the packets from the transmit queue.
2752              */
2753             if (call->flags & RX_CALL_TQ_BUSY) {
2754 #ifdef  RX_ENABLE_LOCKS
2755                 rxi_SetAcksInTransmitQueue(call);
2756                 break;
2757 #else /* RX_ENABLE_LOCKS */
2758                 conn->refCount--;
2759                 return np;              /* xmitting; drop packet */
2760 #endif /* RX_ENABLE_LOCKS */
2761             }
2762 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2763             rxi_ClearTransmitQueue(call, 0);
2764             break;
2765         default:
2766             /* Should not reach here, unless the peer is broken: send an abort
2767              * packet */
2768             rxi_CallError(call, RX_PROTOCOL_ERROR);
2769             np = rxi_SendCallAbort(call, np, 1, 0);
2770             break;
2771     };
2772     /* Note when this last legitimate packet was received, for keep-alive
2773      * processing.  Note, we delay getting the time until now in the hope that
2774      * the packet will be delivered to the user before any get time is required
2775      * (if not, then the time won't actually be re-evaluated here). */
2776     call->lastReceiveTime = clock_Sec();
2777     MUTEX_EXIT(&call->lock);
2778     MUTEX_ENTER(&conn->conn_data_lock);
2779     conn->refCount--;
2780     MUTEX_EXIT(&conn->conn_data_lock);
2781     return np;
2782 }
2783
2784 /* return true if this is an "interesting" connection from the point of view
2785     of someone trying to debug the system */
2786 int rxi_IsConnInteresting(struct rx_connection *aconn)
2787 {
2788     register int i;
2789     register struct rx_call *tcall;
2790
2791     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2792         return 1;
2793     for(i=0;i<RX_MAXCALLS;i++) {
2794         tcall = aconn->call[i];
2795         if (tcall) {
2796             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2797                 return 1;
2798             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2799                 return 1;
2800         }
2801     }
2802     return 0;
2803 }
2804
2805 #ifdef KERNEL
2806 /* if this is one of the last few packets AND it wouldn't be used by the
2807    receiving call to immediately satisfy a read request, then drop it on
2808    the floor, since accepting it might prevent a lock-holding thread from
2809    making progress in its reading. If a call has been cleared while in
2810    the precall state then ignore all subsequent packets until the call
2811    is assigned to a thread. */
2812
2813 static TooLow(ap, acall)
2814   struct rx_call *acall;
2815   struct rx_packet *ap; {
2816     int rc=0;
2817     MUTEX_ENTER(&rx_stats_mutex);
2818     if (((ap->header.seq != 1) &&
2819          (acall->flags & RX_CALL_CLEARED) &&
2820          (acall->state == RX_STATE_PRECALL)) ||
2821         ((rx_nFreePackets < rxi_dataQuota+2) &&
2822          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2823            && (acall->flags & RX_CALL_READER_WAIT)))) {
2824         rc = 1;
2825     }
2826     MUTEX_EXIT(&rx_stats_mutex);
2827     return rc;
2828 }
2829 #endif /* KERNEL */
2830
2831 /* try to attach call, if authentication is complete */
2832 static void TryAttach(acall, socket, tnop, newcallp)
2833 register struct rx_call *acall;
2834 register osi_socket socket;
2835 register int *tnop;
2836 register struct rx_call **newcallp; {
2837     register struct rx_connection *conn;
2838     conn = acall->conn;
2839     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2840         /* Don't attach until we have any req'd. authentication. */
2841         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2842             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2843             /* Note:  this does not necessarily succeed; there
2844                may not any proc available */
2845         }
2846         else {
2847             rxi_ChallengeOn(acall->conn);
2848         }
2849     }
2850 }
2851
2852 /* A data packet has been received off the interface.  This packet is
2853  * appropriate to the call (the call is in the right state, etc.).  This
2854  * routine can return a packet to the caller, for re-use */
2855
2856 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2857                                         port, tnop, newcallp)
2858     register struct rx_call *call;
2859     register struct rx_packet *np;
2860     int istack;
2861     osi_socket socket;
2862     afs_uint32 host;
2863     u_short port;
2864     int *tnop;
2865     struct rx_call **newcallp;
2866 {
2867     int ackNeeded = 0;
2868     int newPackets = 0;
2869     int didHardAck = 0;
2870     int haveLast = 0;
2871     afs_uint32 seq, serial, flags;
2872     int isFirst;
2873     struct rx_packet *tnp;
2874     struct clock when;
2875     MUTEX_ENTER(&rx_stats_mutex);
2876     rx_stats.dataPacketsRead++;
2877     MUTEX_EXIT(&rx_stats_mutex);
2878
2879 #ifdef KERNEL
2880     /* If there are no packet buffers, drop this new packet, unless we can find
2881      * packet buffers from inactive calls */
2882     if (!call->error &&
2883         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2884         MUTEX_ENTER(&rx_freePktQ_lock);
2885         rxi_NeedMorePackets = TRUE;
2886         MUTEX_EXIT(&rx_freePktQ_lock);
2887         MUTEX_ENTER(&rx_stats_mutex);
2888         rx_stats.noPacketBuffersOnRead++;
2889         MUTEX_EXIT(&rx_stats_mutex);
2890         call->rprev = np->header.serial;
2891         rxi_calltrace(RX_TRACE_DROP, call);
2892         dpf (("packet %x dropped on receipt - quota problems", np));
2893         if (rxi_doreclaim)
2894             rxi_ClearReceiveQueue(call);
2895         clock_GetTime(&when);
2896         clock_Add(&when, &rx_softAckDelay);
2897         if (!call->delayedAckEvent ||
2898             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2899             rxevent_Cancel(call->delayedAckEvent, call,
2900                            RX_CALL_REFCOUNT_DELAY);
2901             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2902             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2903                                                  call, 0);
2904         }
2905         /* we've damaged this call already, might as well do it in. */
2906         return np;
2907     }
2908 #endif /* KERNEL */
2909
2910     /*
2911      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2912      * packet is one of several packets transmitted as a single
2913      * datagram. Do not send any soft or hard acks until all packets
2914      * in a jumbogram have been processed. Send negative acks right away.
2915      */
2916     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2917         /* tnp is non-null when there are more packets in the
2918          * current jumbo gram */
2919         if (tnp) {
2920             if (np)
2921                 rxi_FreePacket(np);
2922             np = tnp;
2923         }
2924
2925         seq = np->header.seq;
2926         serial = np->header.serial;
2927         flags = np->header.flags;
2928
2929         /* If the call is in an error state, send an abort message */
2930         if (call->error)
2931             return rxi_SendCallAbort(call, np, istack, 0);
2932
2933         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2934          * AFS 3.5 jumbogram. */
2935         if (flags & RX_JUMBO_PACKET) {
2936             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2937         } else {
2938             tnp = NULL;
2939         }
2940
2941         if (np->header.spare != 0) {
2942             MUTEX_ENTER(&call->conn->conn_data_lock);
2943             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2944             MUTEX_EXIT(&call->conn->conn_data_lock);
2945         }
2946
2947         /* The usual case is that this is the expected next packet */
2948         if (seq == call->rnext) {
2949
2950             /* Check to make sure it is not a duplicate of one already queued */
2951             if (queue_IsNotEmpty(&call->rq) 
2952                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2953                 MUTEX_ENTER(&rx_stats_mutex);
2954                 rx_stats.dupPacketsRead++;
2955                 MUTEX_EXIT(&rx_stats_mutex);
2956                 dpf (("packet %x dropped on receipt - duplicate", np));
2957                 rxevent_Cancel(call->delayedAckEvent, call,
2958                                RX_CALL_REFCOUNT_DELAY);
2959                 np = rxi_SendAck(call, np, seq, serial,
2960                                  flags, RX_ACK_DUPLICATE, istack);
2961                 ackNeeded = 0;
2962                 call->rprev = seq;
2963                 continue;
2964             }
2965
2966             /* It's the next packet. Stick it on the receive queue
2967              * for this call. Set newPackets to make sure we wake
2968              * the reader once all packets have been processed */
2969             queue_Prepend(&call->rq, np);
2970             call->nSoftAcks++;
2971             np = NULL; /* We can't use this anymore */
2972             newPackets = 1;
2973
2974             /* If an ack is requested then set a flag to make sure we
2975              * send an acknowledgement for this packet */
2976             if (flags & RX_REQUEST_ACK) {
2977                 ackNeeded = 1;
2978             }
2979
2980             /* Keep track of whether we have received the last packet */
2981             if (flags & RX_LAST_PACKET) {
2982                 call->flags |= RX_CALL_HAVE_LAST;
2983                 haveLast = 1;
2984             }
2985
2986             /* Check whether we have all of the packets for this call */
2987             if (call->flags & RX_CALL_HAVE_LAST) {
2988                 afs_uint32 tseq;                /* temporary sequence number */
2989                 struct rx_packet *tp;   /* Temporary packet pointer */
2990                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2991
2992                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2993                     if (tseq != tp->header.seq)
2994                         break;
2995                     if (tp->header.flags & RX_LAST_PACKET) {
2996                         call->flags |= RX_CALL_RECEIVE_DONE;
2997                         break;
2998                     }
2999                     tseq++;
3000                 }
3001             }
3002
3003             /* Provide asynchronous notification for those who want it
3004              * (e.g. multi rx) */
3005             if (call->arrivalProc) {
3006                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3007                                      call->arrivalProcArg);
3008                 call->arrivalProc = (VOID (*)()) 0;
3009             }
3010
3011             /* Update last packet received */
3012             call->rprev = seq;
3013
3014             /* If there is no server process serving this call, grab
3015              * one, if available. We only need to do this once. If a
3016              * server thread is available, this thread becomes a server
3017              * thread and the server thread becomes a listener thread. */
3018             if (isFirst) {
3019                 TryAttach(call, socket, tnop, newcallp);
3020             }
3021         }       
3022         /* This is not the expected next packet. */
3023         else {
3024             /* Determine whether this is a new or old packet, and if it's
3025              * a new one, whether it fits into the current receive window.
3026              * Also figure out whether the packet was delivered in sequence.
3027              * We use the prev variable to determine whether the new packet
3028              * is the successor of its immediate predecessor in the
3029              * receive queue, and the missing flag to determine whether
3030              * any of this packets predecessors are missing.  */
3031
3032             afs_uint32 prev;            /* "Previous packet" sequence number */
3033             struct rx_packet *tp;       /* Temporary packet pointer */
3034             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3035             int missing;                /* Are any predecessors missing? */
3036
3037             /* If the new packet's sequence number has been sent to the
3038              * application already, then this is a duplicate */
3039             if (seq < call->rnext) {
3040                 MUTEX_ENTER(&rx_stats_mutex);
3041                 rx_stats.dupPacketsRead++;
3042                 MUTEX_EXIT(&rx_stats_mutex);
3043                 rxevent_Cancel(call->delayedAckEvent, call,
3044                                RX_CALL_REFCOUNT_DELAY);
3045                 np = rxi_SendAck(call, np, seq, serial,
3046                                  flags, RX_ACK_DUPLICATE, istack);
3047                 ackNeeded = 0;
3048                 call->rprev = seq;
3049                 continue;
3050             }
3051
3052             /* If the sequence number is greater than what can be
3053              * accomodated by the current window, then send a negative
3054              * acknowledge and drop the packet */
3055             if ((call->rnext + call->rwind) <= seq) {
3056                 rxevent_Cancel(call->delayedAckEvent, call,
3057                                RX_CALL_REFCOUNT_DELAY);
3058                 np = rxi_SendAck(call, np, seq, serial,
3059                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3060                 ackNeeded = 0;
3061                 call->rprev = seq;
3062                 continue;
3063             }
3064
3065             /* Look for the packet in the queue of old received packets */
3066             for (prev = call->rnext - 1, missing = 0,
3067                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3068                 /*Check for duplicate packet */
3069                 if (seq == tp->header.seq) {
3070                     MUTEX_ENTER(&rx_stats_mutex);
3071                     rx_stats.dupPacketsRead++;
3072                     MUTEX_EXIT(&rx_stats_mutex);
3073                     rxevent_Cancel(call->delayedAckEvent, call,
3074                                    RX_CALL_REFCOUNT_DELAY);
3075                     np = rxi_SendAck(call, np, seq, serial, 
3076                                      flags, RX_ACK_DUPLICATE, istack);
3077                     ackNeeded = 0;
3078                     call->rprev = seq;
3079                     goto nextloop;
3080                 }
3081                 /* If we find a higher sequence packet, break out and
3082                  * insert the new packet here. */
3083                 if (seq < tp->header.seq) break;
3084                 /* Check for missing packet */
3085                 if (tp->header.seq != prev+1) {
3086                     missing = 1;
3087                 }
3088
3089                 prev = tp->header.seq;
3090             }
3091
3092             /* Keep track of whether we have received the last packet. */
3093             if (flags & RX_LAST_PACKET) {
3094                 call->flags |= RX_CALL_HAVE_LAST;
3095             }
3096
3097             /* It's within the window: add it to the the receive queue.
3098              * tp is left by the previous loop either pointing at the
3099              * packet before which to insert the new packet, or at the
3100              * queue head if the queue is empty or the packet should be
3101              * appended. */
3102             queue_InsertBefore(tp, np);
3103             call->nSoftAcks++;
3104             np = NULL;
3105
3106             /* Check whether we have all of the packets for this call */
3107             if ((call->flags & RX_CALL_HAVE_LAST)
3108              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3109                 afs_uint32 tseq;                /* temporary sequence number */
3110
3111                 for (tseq = call->rnext,
3112                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3113                     if (tseq != tp->header.seq)
3114                         break;
3115                     if (tp->header.flags & RX_LAST_PACKET) {
3116                         call->flags |= RX_CALL_RECEIVE_DONE;
3117                         break;
3118                     }
3119                     tseq++;
3120                 }
3121             }
3122
3123             /* We need to send an ack of the packet is out of sequence, 
3124              * or if an ack was requested by the peer. */
3125             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3126                 ackNeeded = 1;
3127             }
3128
3129             /* Acknowledge the last packet for each call */
3130             if (flags & RX_LAST_PACKET) {
3131                 haveLast = 1;
3132             }
3133
3134             call->rprev = seq;
3135         }
3136 nextloop:;
3137     }
3138
3139     if (newPackets) {
3140         /*
3141          * If the receiver is waiting for an iovec, fill the iovec
3142          * using the data from the receive queue */
3143         if (call->flags & RX_CALL_IOVEC_WAIT) {
3144             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3145             /* the call may have been aborted */
3146             if (call->error) {
3147                 return NULL;
3148             }
3149             if (didHardAck) {
3150                 ackNeeded = 0;
3151             }
3152         }
3153
3154         /* Wakeup the reader if any */
3155         if ((call->flags & RX_CALL_READER_WAIT) &&
3156             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3157              (call->iovNext >= call->iovMax) ||
3158              (call->flags & RX_CALL_RECEIVE_DONE))) {
3159             call->flags &= ~RX_CALL_READER_WAIT;
3160 #ifdef  RX_ENABLE_LOCKS
3161             CV_BROADCAST(&call->cv_rq);
3162 #else
3163             osi_rxWakeup(&call->rq);
3164 #endif
3165         }
3166     }
3167
3168     /*
3169      * Send an ack when requested by the peer, or once every
3170      * rxi_SoftAckRate packets until the last packet has been
3171      * received. Always send a soft ack for the last packet in
3172      * the server's reply. */
3173     if (ackNeeded) {
3174         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3175         np = rxi_SendAck(call, np, seq, serial, flags,
3176                          RX_ACK_REQUESTED, istack);
3177     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3178         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3179         np = rxi_SendAck(call, np, seq, serial, flags,
3180                          RX_ACK_IDLE, istack);
3181     } else if (call->nSoftAcks) {
3182         clock_GetTime(&when);
3183         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3184             clock_Add(&when, &rx_lastAckDelay);
3185         } else {
3186             clock_Add(&when, &rx_softAckDelay);
3187         }
3188         if (!call->delayedAckEvent ||
3189             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3190             rxevent_Cancel(call->delayedAckEvent, call,
3191                            RX_CALL_REFCOUNT_DELAY);
3192             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3193             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3194                                                  call, 0);
3195         }
3196     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3197         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3198     }
3199
3200     return np;
3201 }
3202
3203 #ifdef  ADAPT_WINDOW
3204 static void rxi_ComputeRate();
3205 #endif
3206
3207 /* The real smarts of the whole thing.  */
3208 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3209     register struct rx_call *call;
3210     struct rx_packet *np;
3211     int istack;
3212 {
3213     struct rx_ackPacket *ap;
3214     int nAcks;
3215     register struct rx_packet *tp;
3216     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3217     register struct rx_connection *conn = call->conn;
3218     struct rx_peer *peer = conn->peer;
3219     afs_uint32 first;
3220     afs_uint32 serial;
3221     /* because there are CM's that are bogus, sending weird values for this. */
3222     afs_uint32 skew = 0;
3223     int needRxStart = 0;
3224     int nbytes;
3225     int missing;
3226     int acked;
3227     int nNacked = 0;
3228     int newAckCount = 0;
3229     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3230     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3231
3232     MUTEX_ENTER(&rx_stats_mutex);
3233     rx_stats.ackPacketsRead++;
3234     MUTEX_EXIT(&rx_stats_mutex);
3235     ap = (struct rx_ackPacket *) rx_DataOf(np);
3236     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3237     if (nbytes < 0)
3238       return np;       /* truncated ack packet */
3239
3240     /* depends on ack packet struct */
3241     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3242     first = ntohl(ap->firstPacket);
3243     serial = ntohl(ap->serial);
3244     /* temporarily disabled -- needs to degrade over time 
3245        skew = ntohs(ap->maxSkew); */
3246
3247     /* Ignore ack packets received out of order */
3248     if (first < call->tfirst) {
3249         return np;
3250     }
3251
3252     if (np->header.flags & RX_SLOW_START_OK) {
3253         call->flags |= RX_CALL_SLOW_START_OK;
3254     }
3255     
3256 #ifdef RXDEBUG
3257     if (rx_Log) {
3258       fprintf( rx_Log, 
3259               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3260               ap->reason, ntohl(ap->previousPacket), np->header.seq, serial, 
3261               skew, ntohl(ap->firstPacket));
3262         if (nAcks) {
3263             int offset;
3264             for (offset = 0; offset < nAcks; offset++) 
3265                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3266         }
3267         putc('\n', rx_Log);
3268     }
3269 #endif
3270
3271     /* if a server connection has been re-created, it doesn't remember what
3272         serial # it was up to.  An ack will tell us, since the serial field
3273         contains the largest serial received by the other side */
3274     MUTEX_ENTER(&conn->conn_data_lock);
3275     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3276         conn->serial = serial+1;
3277     }
3278     MUTEX_EXIT(&conn->conn_data_lock);
3279
3280     /* Update the outgoing packet skew value to the latest value of
3281      * the peer's incoming packet skew value.  The ack packet, of
3282      * course, could arrive out of order, but that won't affect things
3283      * much */
3284     MUTEX_ENTER(&peer->peer_lock);
3285     peer->outPacketSkew = skew;
3286
3287     /* Check for packets that no longer need to be transmitted, and
3288      * discard them.  This only applies to packets positively
3289      * acknowledged as having been sent to the peer's upper level.
3290      * All other packets must be retained.  So only packets with
3291      * sequence numbers < ap->firstPacket are candidates. */
3292     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3293         if (tp->header.seq >= first) break;
3294         call->tfirst = tp->header.seq + 1;
3295         if (tp->header.serial == serial) {
3296           /* Use RTT if not delayed by client. */
3297           if (ap->reason != RX_ACK_DELAY)
3298               rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3299 #ifdef ADAPT_WINDOW
3300           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3301 #endif
3302         }
3303         else if (tp->firstSerial == serial) {
3304             /* Use RTT if not delayed by client. */
3305             if (ap->reason != RX_ACK_DELAY)
3306                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3307 #ifdef ADAPT_WINDOW
3308           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3309 #endif
3310         }
3311 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3312     /* XXX Hack. Because we have to release the global rx lock when sending
3313      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3314      * in rxi_Start sending packets out because packets may move to the
3315      * freePacketQueue as result of being here! So we drop these packets until
3316      * we're safely out of the traversing. Really ugly! 
3317      * To make it even uglier, if we're using fine grain locking, we can
3318      * set the ack bits in the packets and have rxi_Start remove the packets
3319      * when it's done transmitting.
3320      */
3321         if (!tp->acked) {
3322             newAckCount++;
3323         }
3324         if (call->flags & RX_CALL_TQ_BUSY) {
3325 #ifdef RX_ENABLE_LOCKS
3326             tp->acked = 1;
3327             call->flags |= RX_CALL_TQ_SOME_ACKED;
3328 #else /* RX_ENABLE_LOCKS */
3329             break;
3330 #endif /* RX_ENABLE_LOCKS */
3331         } else
3332 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3333         {
3334         queue_Remove(tp);
3335         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3336         }
3337     }
3338
3339 #ifdef ADAPT_WINDOW
3340     /* Give rate detector a chance to respond to ping requests */
3341     if (ap->reason == RX_ACK_PING_RESPONSE) {
3342         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3343     }
3344 #endif
3345
3346     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3347    
3348    /* Now go through explicit acks/nacks and record the results in
3349     * the waiting packets.  These are packets that can't be released
3350     * yet, even with a positive acknowledge.  This positive
3351     * acknowledge only means the packet has been received by the
3352     * peer, not that it will be retained long enough to be sent to
3353     * the peer's upper level.  In addition, reset the transmit timers
3354     * of any missing packets (those packets that must be missing
3355     * because this packet was out of sequence) */
3356
3357     call->nSoftAcked = 0;
3358     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3359         /* Update round trip time if the ack was stimulated on receipt
3360          * of this packet */
3361 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3362 #ifdef RX_ENABLE_LOCKS
3363         if (tp->header.seq >= first) {
3364 #endif /* RX_ENABLE_LOCKS */
3365 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3366         if (tp->header.serial == serial) {
3367             /* Use RTT if not delayed by client. */
3368             if (ap->reason != RX_ACK_DELAY)
3369                 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3370 #ifdef ADAPT_WINDOW
3371           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3372 #endif
3373         }
3374         else if ((tp->firstSerial == serial)) {
3375             /* Use RTT if not delayed by client. */
3376             if (ap->reason != RX_ACK_DELAY)
3377                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3378 #ifdef ADAPT_WINDOW
3379           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3380 #endif
3381         }
3382 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3383 #ifdef RX_ENABLE_LOCKS
3384         }
3385 #endif /* RX_ENABLE_LOCKS */
3386 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3387
3388         /* Set the acknowledge flag per packet based on the
3389          * information in the ack packet. An acknowlegded packet can
3390          * be downgraded when the server has discarded a packet it
3391          * soacked previously, or when an ack packet is received
3392          * out of sequence. */
3393         if (tp->header.seq < first) {
3394             /* Implicit ack information */
3395             if (!tp->acked) {
3396                 newAckCount++;
3397             }
3398             tp->acked = 1;
3399         }
3400         else if (tp->header.seq < first + nAcks) {
3401             /* Explicit ack information:  set it in the packet appropriately */
3402             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3403                 if (!tp->acked) {
3404                     newAckCount++;
3405                     tp->acked = 1;
3406                 }
3407                 if (missing) {
3408                     nNacked++;
3409                 } else {
3410                     call->nSoftAcked++;
3411                 }
3412             } else {
3413                 tp->acked = 0;
3414                 missing = 1;
3415             }
3416         }
3417         else {
3418             tp->acked = 0;
3419             missing = 1;
3420         }
3421
3422         /* If packet isn't yet acked, and it has been transmitted at least 
3423          * once, reset retransmit time using latest timeout 
3424          * ie, this should readjust the retransmit timer for all outstanding 
3425          * packets...  So we don't just retransmit when we should know better*/
3426
3427         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3428           tp->retryTime = tp->timeSent;
3429           clock_Add(&tp->retryTime, &peer->timeout);
3430           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3431           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3432         }
3433     }
3434
3435     /* If the window has been extended by this acknowledge packet,
3436      * then wakeup a sender waiting in alloc for window space, or try
3437      * sending packets now, if he's been sitting on packets due to
3438      * lack of window space */
3439     if (call->tnext < (call->tfirst + call->twind))  {
3440 #ifdef  RX_ENABLE_LOCKS
3441         CV_SIGNAL(&call->cv_twind);
3442 #else
3443         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3444             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3445             osi_rxWakeup(&call->twind);
3446         }
3447 #endif
3448         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3449             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3450         }
3451     }
3452
3453     /* if the ack packet has a receivelen field hanging off it,
3454      * update our state */
3455     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3456       afs_uint32 tSize;
3457
3458       /* If the ack packet has a "recommended" size that is less than 
3459        * what I am using now, reduce my size to match */
3460       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3461                     sizeof(afs_int32), &tSize);
3462       tSize = (afs_uint32) ntohl(tSize);
3463       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3464
3465       /* Get the maximum packet size to send to this peer */
3466       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3467                     &tSize);
3468       tSize = (afs_uint32)ntohl(tSize);
3469       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3470       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3471
3472       /* sanity check - peer might have restarted with different params.
3473        * If peer says "send less", dammit, send less...  Peer should never 
3474        * be unable to accept packets of the size that prior AFS versions would
3475        * send without asking.  */
3476       if (peer->maxMTU != tSize) {
3477           peer->maxMTU = tSize;
3478           peer->MTU = MIN(tSize, peer->MTU);
3479           call->MTU = MIN(call->MTU, tSize);
3480           peer->congestSeq++;
3481       }
3482
3483       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3484           /* AFS 3.4a */
3485           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3486                         sizeof(afs_int32), &tSize);
3487           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3488           if (tSize < call->twind) {       /* smaller than our send */
3489               call->twind = tSize;         /* window, we must send less... */
3490               call->ssthresh = MIN(call->twind, call->ssthresh);
3491           }
3492
3493           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3494            * network MTU confused with the loopback MTU. Calculate the
3495            * maximum MTU here for use in the slow start code below.
3496            */
3497           maxMTU = peer->maxMTU;
3498           /* Did peer restart with older RX version? */
3499           if (peer->maxDgramPackets > 1) {
3500               peer->maxDgramPackets = 1;
3501           }
3502       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3503           /* AFS 3.5 */
3504           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3505                         sizeof(afs_int32), &tSize);
3506           tSize = (afs_uint32) ntohl(tSize);
3507           /*
3508            * As of AFS 3.5 we set the send window to match the receive window. 
3509            */
3510           if (tSize < call->twind) {
3511               call->twind = tSize;
3512               call->ssthresh = MIN(call->twind, call->ssthresh);
3513           } else if (tSize > call->twind) {
3514               call->twind = tSize;
3515           }
3516
3517           /*
3518            * As of AFS 3.5, a jumbogram is more than one fixed size
3519            * packet transmitted in a single UDP datagram. If the remote
3520            * MTU is smaller than our local MTU then never send a datagram
3521            * larger than the natural MTU.
3522            */
3523           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3524                         sizeof(afs_int32), &tSize);
3525           maxDgramPackets = (afs_uint32) ntohl(tSize);
3526           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3527           maxDgramPackets = MIN(maxDgramPackets,
3528                                 (int)(peer->ifDgramPackets));
3529           maxDgramPackets = MIN(maxDgramPackets, tSize);
3530           if (maxDgramPackets > 1) {
3531             peer->maxDgramPackets = maxDgramPackets;
3532             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3533           } else {
3534             peer->maxDgramPackets = 1;
3535             call->MTU = peer->natMTU;
3536           }
3537        } else if (peer->maxDgramPackets > 1) {
3538           /* Restarted with lower version of RX */
3539           peer->maxDgramPackets = 1;
3540        }
3541     } else if (peer->maxDgramPackets > 1 ||
3542                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3543         /* Restarted with lower version of RX */
3544         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3545         peer->natMTU = OLD_MAX_PACKET_SIZE;
3546         peer->MTU = OLD_MAX_PACKET_SIZE;
3547         peer->maxDgramPackets = 1;
3548         peer->nDgramPackets = 1;
3549         peer->congestSeq++;
3550         call->MTU = OLD_MAX_PACKET_SIZE;
3551     }
3552
3553     if (nNacked) {
3554         /*
3555          * Calculate how many datagrams were successfully received after
3556          * the first missing packet and adjust the negative ack counter
3557          * accordingly.
3558          */
3559         call->nAcks = 0;
3560         call->nNacks++;
3561         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3562         if (call->nNacks < nNacked) {
3563             call->nNacks = nNacked;
3564         }
3565     } else {
3566         if (newAckCount) {
3567             call->nAcks++;
3568         }
3569         call->nNacks = 0;
3570     }
3571
3572     if (call->flags & RX_CALL_FAST_RECOVER) {
3573         if (nNacked) {
3574             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3575         } else {
3576             call->flags &= ~RX_CALL_FAST_RECOVER;
3577             call->cwind = call->nextCwind;
3578             call->nextCwind = 0;
3579             call->nAcks = 0;
3580         }
3581         call->nCwindAcks = 0;
3582     }
3583     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3584         /* Three negative acks in a row trigger congestion recovery */
3585 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3586         MUTEX_EXIT(&peer->peer_lock);
3587         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3588                 /* someone else is waiting to start recovery */
3589                 return np;
3590         }
3591         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3592         while (call->flags & RX_CALL_TQ_BUSY) {
3593             call->flags |= RX_CALL_TQ_WAIT;
3594 #ifdef RX_ENABLE_LOCKS
3595             CV_WAIT(&call->cv_tq, &call->lock);
3596 #else /* RX_ENABLE_LOCKS */
3597             osi_rxSleep(&call->tq);
3598 #endif /* RX_ENABLE_LOCKS */
3599         }
3600         MUTEX_ENTER(&peer->peer_lock);
3601 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3602         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3603         call->flags |= RX_CALL_FAST_RECOVER;
3604         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3605         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3606                           rx_maxSendWindow);
3607         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3608         call->nextCwind = call->ssthresh;
3609         call->nAcks = 0;
3610         call->nNacks = 0;
3611         peer->MTU = call->MTU;
3612         peer->cwind = call->nextCwind;
3613         peer->nDgramPackets = call->nDgramPackets;
3614         peer->congestSeq++;
3615         call->congestSeq = peer->congestSeq;
3616         /* Reset the resend times on the packets that were nacked
3617          * so we will retransmit as soon as the window permits*/
3618         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3619             if (acked) {
3620                 if (!tp->acked) {
3621                     clock_Zero(&tp->retryTime);
3622                 }
3623             } else if (tp->acked) {
3624                 acked = 1;
3625             }
3626         }
3627     } else {
3628         /* If cwind is smaller than ssthresh, then increase
3629          * the window one packet for each ack we receive (exponential
3630          * growth).
3631          * If cwind is greater than or equal to ssthresh then increase
3632          * the congestion window by one packet for each cwind acks we
3633          * receive (linear growth).  */
3634         if (call->cwind < call->ssthresh) {
3635             call->cwind = MIN((int)call->ssthresh,
3636                               (int)(call->cwind + newAckCount));
3637             call->nCwindAcks = 0;
3638         } else {
3639             call->nCwindAcks += newAckCount;
3640             if (call->nCwindAcks >= call->cwind) {
3641                 call->nCwindAcks = 0;
3642                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3643             }
3644         }
3645         /*
3646          * If we have received several acknowledgements in a row then
3647          * it is time to increase the size of our datagrams
3648          */
3649         if ((int)call->nAcks > rx_nDgramThreshold) {
3650             if (peer->maxDgramPackets > 1) {
3651                 if (call->nDgramPackets < peer->maxDgramPackets) {
3652                     call->nDgramPackets++;
3653                 }
3654                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3655             } else if (call->MTU < peer->maxMTU) {
3656                 call->MTU += peer->natMTU;
3657                 call->MTU = MIN(call->MTU, peer->maxMTU);
3658             }
3659             call->nAcks = 0;
3660         }
3661     }
3662
3663     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3664
3665     /* Servers need to hold the call until all response packets have
3666      * been acknowledged. Soft acks are good enough since clients
3667      * are not allowed to clear their receive queues. */
3668     if (call->state == RX_STATE_HOLD &&
3669         call->tfirst + call->nSoftAcked >= call->tnext) {
3670         call->state = RX_STATE_DALLY;
3671         rxi_ClearTransmitQueue(call, 0);
3672     } else if (!queue_IsEmpty(&call->tq)) {
3673         rxi_Start(0, call, istack);
3674     }
3675     return np;
3676 }
3677
3678 /* Received a response to a challenge packet */
3679 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3680     register struct rx_connection *conn;
3681     register struct rx_packet *np;
3682     int istack;
3683 {
3684     int error;
3685
3686     /* Ignore the packet if we're the client */
3687     if (conn->type == RX_CLIENT_CONNECTION) return np;
3688
3689     /* If already authenticated, ignore the packet (it's probably a retry) */
3690     if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3691         return np;
3692
3693     /* Otherwise, have the security object evaluate the response packet */
3694     error = RXS_CheckResponse(conn->securityObject, conn, np);
3695     if (error) {
3696         /* If the response is invalid, reset the connection, sending
3697          * an abort to the peer */
3698 #ifndef KERNEL
3699         rxi_Delay(1);
3700 #endif
3701         rxi_ConnectionError(conn, error);
3702         MUTEX_ENTER(&conn->conn_data_lock);
3703         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3704         MUTEX_EXIT(&conn->conn_data_lock);
3705         return np;
3706     }
3707     else {
3708         /* If the response is valid, any calls waiting to attach
3709          * servers can now do so */
3710         int i;
3711         for (i=0; i<RX_MAXCALLS; i++) {
3712             struct rx_call *call = conn->call[i];
3713             if (call) {
3714                 MUTEX_ENTER(&call->lock);
3715                  if (call->state == RX_STATE_PRECALL)
3716                      rxi_AttachServerProc(call, -1, NULL, NULL);
3717                 MUTEX_EXIT(&call->lock);
3718             }
3719         }
3720     }
3721     return np;
3722 }
3723
3724 /* A client has received an authentication challenge: the security
3725  * object is asked to cough up a respectable response packet to send
3726  * back to the server.  The server is responsible for retrying the
3727  * challenge if it fails to get a response. */
3728
3729 struct rx_packet *
3730 rxi_ReceiveChallengePacket(conn, np, istack)
3731     register struct rx_connection *conn;
3732     register struct rx_packet *np;
3733     int istack;
3734 {
3735     int error;
3736
3737     /* Ignore the challenge if we're the server */
3738     if (conn->type == RX_SERVER_CONNECTION) return np;
3739
3740     /* Ignore the challenge if the connection is otherwise idle; someone's
3741      * trying to use us as an oracle. */
3742     if (!rxi_HasActiveCalls(conn)) return np;
3743
3744     /* Send the security object the challenge packet.  It is expected to fill
3745      * in the response. */
3746     error = RXS_GetResponse(conn->securityObject, conn, np);
3747
3748     /* If the security object is unable to return a valid response, reset the
3749      * connection and send an abort to the peer.  Otherwise send the response
3750      * packet to the peer connection. */
3751     if (error) {
3752         rxi_ConnectionError(conn, error);
3753         MUTEX_ENTER(&conn->conn_data_lock);
3754         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3755         MUTEX_EXIT(&conn->conn_data_lock);
3756     }
3757     else {
3758         np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3759                              RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3760     }
3761     return np;
3762 }
3763
3764
3765 /* Find an available server process to service the current request in
3766  * the given call structure.  If one isn't available, queue up this
3767  * call so it eventually gets one */
3768 void 
3769 rxi_AttachServerProc(call, socket, tnop, newcallp)
3770 register struct rx_call *call;
3771 register osi_socket socket;
3772 register int *tnop;
3773 register struct rx_call **newcallp;
3774 {
3775     register struct rx_serverQueueEntry *sq;
3776     register struct rx_service *service = call->conn->service;
3777 #ifdef RX_ENABLE_LOCKS
3778     register int haveQuota = 0;
3779 #endif /* RX_ENABLE_LOCKS */
3780     /* May already be attached */
3781     if (call->state == RX_STATE_ACTIVE) return;
3782
3783     MUTEX_ENTER(&rx_serverPool_lock);
3784 #ifdef RX_ENABLE_LOCKS
3785     while(rxi_ServerThreadSelectingCall) {
3786         MUTEX_EXIT(&call->lock);
3787         CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3788         MUTEX_EXIT(&rx_serverPool_lock);
3789         MUTEX_ENTER(&call->lock);
3790         MUTEX_ENTER(&rx_serverPool_lock);
3791         /* Call may have been attached */
3792         if (call->state == RX_STATE_ACTIVE) return;
3793     }
3794
3795     haveQuota = QuotaOK(service);
3796     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3797         /* If there are no processes available to service this call,
3798          * put the call on the incoming call queue (unless it's
3799          * already on the queue).
3800          */
3801         if (haveQuota)
3802             ReturnToServerPool(service);
3803         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3804             call->flags |= RX_CALL_WAIT_PROC;
3805             MUTEX_ENTER(&rx_stats_mutex);
3806             rx_nWaiting++;
3807             MUTEX_EXIT(&rx_stats_mutex);
3808             rxi_calltrace(RX_CALL_ARRIVAL, call);
3809             SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3810             queue_Append(&rx_incomingCallQueue, call);
3811         }
3812     }
3813 #else /* RX_ENABLE_LOCKS */
3814     if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3815         /* If there are no processes available to service this call,
3816          * put the call on the incoming call queue (unless it's
3817          * already on the queue).
3818          */
3819         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3820             call->flags |= RX_CALL_WAIT_PROC;
3821             rx_nWaiting++;
3822             rxi_calltrace(RX_CALL_ARRIVAL, call);
3823             queue_Append(&rx_incomingCallQueue, call);
3824         }
3825     }
3826 #endif /* RX_ENABLE_LOCKS */
3827     else {
3828         sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3829
3830         /* If hot threads are enabled, and both newcallp and sq->socketp
3831          * are non-null, then this thread will process the call, and the
3832          * idle server thread will start listening on this threads socket.
3833          */
3834         queue_Remove(sq);
3835         if (rx_enable_hot_thread && newcallp && sq->socketp) {
3836             *newcallp = call;
3837             *tnop = sq->tno;
3838             *sq->socketp = socket;
3839             clock_GetTime(&call->startTime);
3840             CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3841         } else {
3842             sq->newcall = call;
3843         }
3844         if (call->flags & RX_CALL_WAIT_PROC) {
3845             /* Conservative:  I don't think this should happen */
3846             call->flags &= ~RX_CALL_WAIT_PROC;
3847             MUTEX_ENTER(&rx_stats_mutex);
3848             rx_nWaiting--;
3849             MUTEX_EXIT(&rx_stats_mutex);
3850             queue_Remove(call);
3851         }
3852         call->state = RX_STATE_ACTIVE;
3853         call->mode = RX_MODE_RECEIVING;
3854         if (call->flags & RX_CALL_CLEARED) {
3855             /* send an ack now to start the packet flow up again */
3856             call->flags &= ~RX_CALL_CLEARED;
3857             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3858         }
3859 #ifdef  RX_ENABLE_LOCKS
3860         CV_SIGNAL(&sq->cv);
3861 #else
3862         service->nRequestsRunning++;
3863         if (service->nRequestsRunning <= service->minProcs)
3864           rxi_minDeficit--;
3865         rxi_availProcs--;
3866         osi_rxWakeup(sq);
3867 #endif
3868     }
3869     MUTEX_EXIT(&rx_serverPool_lock);
3870 }
3871
3872 /* Delay the sending of an acknowledge event for a short while, while
3873  * a new call is being prepared (in the case of a client) or a reply
3874  * is being prepared (in the case of a server).  Rather than sending
3875  * an ack packet, an ACKALL packet is sent. */
3876 void rxi_AckAll(event, call, dummy)
3877 struct rxevent *event;
3878 register struct rx_call *call;
3879 char *dummy;
3880 {
3881 #ifdef RX_ENABLE_LOCKS
3882     if (event) {
3883         MUTEX_ENTER(&call->lock);
3884         call->delayedAckEvent = (struct rxevent *) 0;
3885         CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3886     }
3887     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3888                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3889     if (event)
3890         MUTEX_EXIT(&call->lock);
3891 #else /* RX_ENABLE_LOCKS */
3892     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3893     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3894                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3895 #endif /* RX_ENABLE_LOCKS */
3896 }
3897
3898 void rxi_SendDelayedAck(event, call, dummy)     
3899 struct rxevent *event;
3900 register struct rx_call *call;
3901 char *dummy;
3902 {
3903 #ifdef RX_ENABLE_LOCKS
3904     if (event) {
3905         MUTEX_ENTER(&call->lock);
3906         if (event == call->delayedAckEvent)
3907             call->delayedAckEvent = (struct rxevent *) 0;
3908         CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3909     }
3910     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3911     if (event)
3912         MUTEX_EXIT(&call->lock);
3913 #else /* RX_ENABLE_LOCKS */
3914     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3915     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3916 #endif /* RX_ENABLE_LOCKS */
3917 }
3918
3919
3920 #ifdef RX_ENABLE_LOCKS
3921 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3922  * clearing them out.
3923  */
3924 static void rxi_SetAcksInTransmitQueue(call)
3925       register struct rx_call *call;
3926 {
3927     register struct rx_packet *p, *tp;
3928     int someAcked = 0;
3929
3930      for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3931          if (!p) 
3932              break;
3933          p->acked = 1;
3934          someAcked = 1;
3935      }
3936      if (someAcked) {
3937          call->flags |= RX_CALL_TQ_CLEARME;
3938          call->flags |= RX_CALL_TQ_SOME_ACKED;
3939      }
3940
3941      rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3942      rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3943      call->tfirst = call->tnext;
3944      call->nSoftAcked = 0;
3945
3946      if (call->flags & RX_CALL_FAST_RECOVER) {
3947         call->flags &= ~RX_CALL_FAST_RECOVER;
3948         call->cwind = call->nextCwind;
3949         call->nextCwind = 0;
3950      }
3951
3952      CV_SIGNAL(&call->cv_twind);
3953 }
3954 #endif /* RX_ENABLE_LOCKS */
3955
3956 /* Clear out the transmit queue for the current call (all packets have
3957  * been received by peer) */
3958 void rxi_ClearTransmitQueue(call, force)
3959     register struct rx_call *call;
3960     register int force;
3961 {
3962     register struct rx_packet *p, *tp;
3963
3964 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3965     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3966         int someAcked = 0;
3967         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3968           if (!p) 
3969              break;
3970           p->acked = 1;
3971           someAcked = 1;
3972         }
3973         if (someAcked) {
3974             call->flags |= RX_CALL_TQ_CLEARME;
3975             call->flags |= RX_CALL_TQ_SOME_ACKED;
3976         }
3977     } else {
3978 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3979         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3980             if (!p) 
3981                 break;
3982             queue_Remove(p);
3983             rxi_FreePacket(p);
3984         }
3985 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3986         call->flags &= ~RX_CALL_TQ_CLEARME;
3987     }
3988 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3989
3990     rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3991     rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3992     call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3993     call->nSoftAcked = 0;
3994
3995     if (call->flags & RX_CALL_FAST_RECOVER) {
3996         call->flags &= ~RX_CALL_FAST_RECOVER;
3997         call->cwind = call->nextCwind;
3998     }
3999
4000 #ifdef  RX_ENABLE_LOCKS
4001     CV_SIGNAL(&call->cv_twind);
4002 #else
4003     osi_rxWakeup(&call->twind);
4004 #endif
4005 }
4006
4007 void rxi_ClearReceiveQueue(call)
4008     register struct rx_call *call;
4009 {
4010     register struct rx_packet *p, *tp;
4011     if (queue_IsNotEmpty(&call->rq)) {
4012       for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4013         if (!p)
4014           break;
4015         queue_Remove(p);
4016         rxi_FreePacket(p);
4017         rx_packetReclaims++;
4018       }
4019       call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4020     }
4021     if (call->state == RX_STATE_PRECALL) {
4022         call->flags |= RX_CALL_CLEARED;
4023     }
4024 }
4025
4026 /* Send an abort packet for the specified call */
4027 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4028     register struct rx_call *call;
4029     struct rx_packet *packet;
4030     int istack;
4031     int force;
4032 {
4033   afs_int32 error;
4034   struct clock when;
4035
4036   if (!call->error)
4037     return packet;
4038
4039   /* Clients should never delay abort messages */
4040   if (rx_IsClientConn(call->conn))
4041     force = 1;
4042
4043   if (call->abortCode != call->error) {
4044     call->abortCode = call->error;
4045     call->abortCount = 0;
4046   }
4047
4048   if (force || rxi_callAbortThreshhold == 0 ||
4049       call->abortCount < rxi_callAbortThreshhold) {
4050     if (call->delayedAbortEvent) {
4051         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4052     }
4053     error = htonl(call->error);
4054     call->abortCount++;
4055     packet = rxi_SendSpecial(call, call->conn, packet,
4056                              RX_PACKET_TYPE_ABORT, (char *)&error,
4057                              sizeof(error), istack);
4058   } else if (!call->delayedAbortEvent) {
4059     clock_GetTime(&when);
4060     clock_Addmsec(&when, rxi_callAbortDelay);
4061     CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4062     call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4063                                            call, 0);
4064   }
4065   return packet;
4066 }
4067
4068 /* Send an abort packet for the specified connection.  Packet is an
4069  * optional pointer to a packet that can be used to send the abort.
4070  * Once the number of abort messages reaches the threshhold, an
4071  * event is scheduled to send the abort. Setting the force flag
4072  * overrides sending delayed abort messages.
4073  *
4074  * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4075  *       to send the abort packet.
4076  */
4077 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4078     register struct rx_connection *conn;
4079     struct rx_packet *packet;
4080     int istack;
4081     int force;
4082 {
4083   afs_int32 error;
4084   struct clock when;
4085
4086   if (!conn->error)
4087     return packet;
4088
4089   /* Clients should never delay abort messages */
4090   if (rx_IsClientConn(conn))
4091     force = 1;
4092
4093   if (force || rxi_connAbortThreshhold == 0 ||
4094       conn->abortCount < rxi_connAbortThreshhold) {
4095     if (conn->delayedAbortEvent) {
4096         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4097     }
4098     error = htonl(conn->error);
4099     conn->abortCount++;
4100     MUTEX_EXIT(&conn->conn_data_lock);
4101     packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4102                              RX_PACKET_TYPE_ABORT, (char *)&error,
4103                              sizeof(error), istack);
4104     MUTEX_ENTER(&conn->conn_data_lock);
4105   } else if (!conn->delayedAbortEvent) {
4106     clock_GetTime(&when);
4107     clock_Addmsec(&when, rxi_connAbortDelay);
4108     conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4109                                            conn, 0);
4110   }
4111   return packet;
4112 }
4113
4114 /* Associate an error all of the calls owned by a connection.  Called
4115  * with error non-zero.  This is only for really fatal things, like
4116  * bad authentication responses.  The connection itself is set in
4117  * error at this point, so that future packets received will be
4118  * rejected. */
4119 void rxi_ConnectionError(conn, error)
4120     register struct rx_connection *conn;
4121     register afs_int32 error;
4122 {
4123     if (error) {
4124         register int i;
4125         if (conn->challengeEvent)
4126             rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4127         for (i=0; i<RX_MAXCALLS; i++) {
4128             struct rx_call *call = conn->call[i];
4129             if (call) {
4130                 MUTEX_ENTER(&call->lock);
4131                 rxi_CallError(call, error);
4132                 MUTEX_EXIT(&call->lock);
4133             }
4134         }
4135         conn->error = error;
4136         MUTEX_ENTER(&rx_stats_mutex);
4137         rx_stats.fatalErrors++;
4138         MUTEX_EXIT(&rx_stats_mutex);
4139     }
4140 }
4141
4142 void rxi_CallError(call, error)
4143     register struct rx_call *call;
4144     afs_int32 error;
4145 {
4146     if (call->error) error = call->error;
4147 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4148     if (!(call->flags & RX_CALL_TQ_BUSY)) {
4149         rxi_ResetCall(call, 0);
4150     }
4151 #else
4152         rxi_ResetCall(call, 0);
4153 #endif
4154     call->error = error;
4155     call->mode = RX_MODE_ERROR;
4156 }
4157
4158 /* Reset various fields in a call structure, and wakeup waiting
4159  * processes.  Some fields aren't changed: state & mode are not
4160  * touched (these must be set by the caller), and bufptr, nLeft, and
4161  * nFree are not reset, since these fields are manipulated by
4162  * unprotected macros, and may only be reset by non-interrupting code.
4163  */
4164 #ifdef ADAPT_WINDOW
4165 /* this code requires that call->conn be set properly as a pre-condition. */
4166 #endif /* ADAPT_WINDOW */
4167
4168 void rxi_ResetCall(call, newcall)
4169     register struct rx_call *call;
4170     register int newcall;
4171 {
4172     register int flags;
4173     register struct rx_peer *peer;
4174     struct rx_packet *packet;
4175
4176     /* Notify anyone who is waiting for asynchronous packet arrival */
4177     if (call->arrivalProc) {
4178         (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4179         call->arrivalProc = (VOID (*)()) 0;
4180     }
4181
4182     if (call->delayedAbortEvent) {
4183         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4184         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4185         if (packet) {
4186             rxi_SendCallAbort(call, packet, 0, 1);
4187             rxi_FreePacket(packet);
4188         }
4189     }
4190
4191     /*
4192      * Update the peer with the congestion information in this call
4193      * so other calls on this connection can pick up where this call
4194      * left off. If the congestion sequence numbers don't match then
4195      * another call experienced a retransmission.
4196      */
4197     peer = call->conn->peer;
4198     MUTEX_ENTER(&peer->peer_lock);
4199     if (!newcall) {
4200         if (call->congestSeq == peer->congestSeq) {
4201             peer->cwind = MAX(peer->cwind, call->cwind);
4202             peer->MTU = MAX(peer->MTU, call->MTU);
4203             peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4204         }
4205     } else {
4206         call->abortCode = 0;
4207         call->abortCount = 0;
4208     }
4209     if (peer->maxDgramPackets > 1) {
4210         call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4211     } else {
4212         call->MTU = peer->MTU;
4213     }
4214     call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4215     call->ssthresh = rx_maxSendWindow;
4216     call->nDgramPackets = peer->nDgramPackets;
4217     call->congestSeq = peer->congestSeq;
4218     MUTEX_EXIT(&peer->peer_lock);
4219
4220     flags = call->flags;
4221     rxi_ClearReceiveQueue(call);
4222 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4223     if (call->flags & RX_CALL_TQ_BUSY) {
4224         call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4225         call->flags |= (flags & RX_CALL_TQ_WAIT);
4226     } else
4227 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4228     {
4229         rxi_ClearTransmitQueue(call, 0);
4230         queue_Init(&call->tq);
4231         call->flags = 0;
4232     }
4233     queue_Init(&call->rq);
4234     call->error = 0;
4235     call->rwind = rx_initReceiveWindow; 
4236     call->twind = rx_initSendWindow; 
4237     call->nSoftAcked = 0;
4238     call->nextCwind = 0;
4239     call->nAcks = 0;
4240     call->nNacks = 0;
4241     call->nCwindAcks = 0;
4242     call->nSoftAcks = 0;
4243     call->nHardAcks = 0;
4244
4245     call->tfirst = call->rnext = call->tnext = 1;
4246     call->rprev = 0;
4247     call->lastAcked = 0;
4248     call->localStatus = call->remoteStatus = 0;
4249
4250     if (flags & RX_CALL_READER_WAIT)  {
4251 #ifdef  RX_ENABLE_LOCKS
4252         CV_BROADCAST(&call->cv_rq);
4253 #else
4254         osi_rxWakeup(&call->rq);
4255 #endif
4256     }
4257     if (flags & RX_CALL_WAIT_PACKETS) {
4258         MUTEX_ENTER(&rx_freePktQ_lock);
4259         rxi_PacketsUnWait();            /* XXX */
4260         MUTEX_EXIT(&rx_freePktQ_lock);
4261     }
4262
4263 #ifdef  RX_ENABLE_LOCKS
4264     CV_SIGNAL(&call->cv_twind);
4265 #else
4266     if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4267         osi_rxWakeup(&call->twind);
4268 #endif
4269
4270 #ifdef RX_ENABLE_LOCKS
4271     /* The following ensures that we don't mess with any queue while some
4272      * other thread might also be doing so. The call_queue_lock field is
4273      * is only modified under the call lock. If the call is in the process
4274      * of being removed from a queue, the call is not locked until the
4275      * the queue lock is dropped and only then is the call_queue_lock field
4276      * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4277      * Note that any other routine which removes a call from a queue has to
4278      * obtain the queue lock before examing the queue and removing the call.
4279      */
4280     if (call->call_queue_lock) {
4281         MUTEX_ENTER(call->call_queue_lock);
4282         if (queue_IsOnQueue(call)) {
4283             queue_Remove(call);
4284             if (flags & RX_CALL_WAIT_PROC) {
4285                 MUTEX_ENTER(&rx_stats_mutex);
4286                 rx_nWaiting--;
4287                 MUTEX_EXIT(&rx_stats_mutex);
4288             }
4289         }
4290         MUTEX_EXIT(call->call_queue_lock);
4291         CLEAR_CALL_QUEUE_LOCK(call);
4292     }
4293 #else /* RX_ENABLE_LOCKS */
4294     if (queue_IsOnQueue(call)) {
4295       queue_Remove(call);
4296       if (flags & RX_CALL_WAIT_PROC)
4297         rx_nWaiting--;
4298     }
4299 #endif /* RX_ENABLE_LOCKS */
4300
4301     rxi_KeepAliveOff(call);
4302     rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4303 }
4304
4305 /* Send an acknowledge for the indicated packet (seq,serial) of the
4306  * indicated call, for the indicated reason (reason).  This
4307  * acknowledge will specifically acknowledge receiving the packet, and
4308  * will also specify which other packets for this call have been
4309  * received.  This routine returns the packet that was used to the
4310  * caller.  The caller is responsible for freeing it or re-using it.
4311  * This acknowledgement also returns the highest sequence number
4312  * actually read out by the higher level to the sender; the sender
4313  * promises to keep around packets that have not been read by the
4314  * higher level yet (unless, of course, the sender decides to abort
4315  * the call altogether).  Any of p, seq, serial, pflags, or reason may
4316  * be set to zero without ill effect.  That is, if they are zero, they
4317  * will not convey any information.  
4318  * NOW there is a trailer field, after the ack where it will safely be
4319  * ignored by mundanes, which indicates the maximum size packet this 
4320  * host can swallow.  */  
4321 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4322     register struct rx_call *call;
4323     register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4324     int seq;                    /* Sequence number of the packet we are acking */
4325     int serial;                 /* Serial number of the packet */
4326     int pflags;                 /* Flags field from packet header */
4327     int reason;                 /* Reason an acknowledge was prompted */
4328     int istack;
4329 {
4330     struct rx_ackPacket *ap;
4331     register struct rx_packet *rqp;
4332     register struct rx_packet *nxp;  /* For queue_Scan */
4333     register struct rx_packet *p;
4334     u_char offset;
4335     afs_int32 templ;
4336
4337     /*
4338      * Open the receive window once a thread starts reading packets
4339      */
4340     if (call->rnext > 1) {
4341         call->rwind = rx_maxReceiveWindow;
4342     }
4343
4344     call->nHardAcks = 0;
4345     call->nSoftAcks = 0;
4346     if (call->rnext > call->lastAcked)
4347       call->lastAcked = call->rnext;
4348     p = optionalPacket;
4349
4350     if (p) {
4351       rx_computelen(p, p->length);  /* reset length, you never know */
4352     }                               /* where that's been...         */
4353     else
4354       if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4355           /* We won't send the ack, but don't panic. */
4356           return optionalPacket;
4357       }
4358
4359     templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4360     if (templ > 0) {
4361       if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4362           if (!optionalPacket) rxi_FreePacket(p);
4363           return optionalPacket;
4364       }
4365       templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32); 
4366       if (rx_Contiguous(p)<templ) {
4367           if (!optionalPacket) rxi_FreePacket(p);
4368           return optionalPacket;
4369       }
4370     }    /* MTUXXX failing to send an ack is very serious.  We should */
4371          /* try as hard as possible to send even a partial ack; it's */
4372          /* better than nothing. */
4373
4374     ap = (struct rx_ackPacket *) rx_DataOf(p);
4375     ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4376     ap->reason = reason;
4377
4378     /* The skew computation used to be bogus, I think it's better now. */
4379     /* We should start paying attention to skew.    XXX  */
4380     ap->serial = htonl(call->conn->maxSerial);
4381     ap->maxSkew = 0;    /* used to be peer->inPacketSkew */
4382
4383     ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4384     ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4385
4386     /* No fear of running out of ack packet here because there can only be at most
4387      * one window full of unacknowledged packets.  The window size must be constrained 
4388      * to be less than the maximum ack size, of course.  Also, an ack should always
4389      * fit into a single packet -- it should not ever be fragmented.  */
4390     for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4391         if (!rqp || !call->rq.next 
4392             || (rqp->header.seq > (call->rnext + call->rwind))) {