rx-keep-track-of-resent-packets-20010406
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #ifdef  KERNEL
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
16 #ifndef UKERNEL
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
20 #ifdef  AFS_OSF_ENV
21 #include <net/net_globals.h>
22 #endif  /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
25 #endif
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
31 #endif
32 #ifdef RXDEBUG
33 #undef RXDEBUG      /* turn off debugging */
34 #endif /* RXDEBUG */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
37 #endif
38 #include "../afsint/afsint.h"
39 #ifdef  AFS_ALPHA_ENV
40 #undef kmem_alloc
41 #undef kmem_free
42 #undef mem_alloc
43 #undef mem_free
44 #undef register
45 #endif  /* AFS_ALPHA_ENV */
46 #else /* !UKERNEL */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
49 #endif /* !UKERNEL */
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
55 #include "../rx/rx.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
60 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
63 #ifdef AFS_AIX41_ENV
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
68 #else /* KERNEL */
69 # include <afs/param.h>
70 # include <sys/types.h>
71 # include <errno.h>
72 #ifdef AFS_NT40_ENV
73 # include <stdlib.h>
74 # include <fcntl.h>
75 # include <afsutil.h>
76 #else
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
83 #endif
84 # include "rx.h"
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
92 #endif /* KERNEL */
93
94 #ifdef RXDEBUG
95 extern afs_uint32 LWP_ThreadId();
96 #endif /* RXDEBUG */
97
98 int (*registerProgram)() = 0;
99 int (*swapNameProgram)() = 0;
100
101 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
102 struct rx_tq_debug {
103     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
104     afs_int32 rxi_start_in_error;
105 } rx_tq_debug;
106 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
107
108 /*
109  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
110  * currently allocated within rx.  This number is used to allocate the
111  * memory required to return the statistics when queried.
112  */
113
114 static unsigned int rxi_rpc_peer_stat_cnt;
115
116 /*
117  * rxi_rpc_process_stat_cnt counts the total number of local process stat
118  * structures currently allocated within rx.  The number is used to allocate
119  * the memory required to return the statistics when queried.
120  */
121
122 static unsigned int rxi_rpc_process_stat_cnt;
123
124 #if !defined(offsetof)
125 #include <stddef.h>     /* for definition of offsetof() */
126 #endif
127
128 #ifdef AFS_PTHREAD_ENV
129 #include <assert.h>
130
131 /*
132  * Use procedural initialization of mutexes/condition variables
133  * to ease NT porting
134  */
135
136 extern pthread_mutex_t rxkad_stats_mutex;
137 extern pthread_mutex_t des_init_mutex;
138 extern pthread_mutex_t des_random_mutex;
139 extern pthread_mutex_t rx_clock_mutex;
140 extern pthread_mutex_t rxi_connCacheMutex;
141 extern pthread_mutex_t rx_event_mutex;
142 extern pthread_mutex_t osi_malloc_mutex;
143 extern pthread_mutex_t event_handler_mutex;
144 extern pthread_mutex_t listener_mutex;
145 extern pthread_mutex_t rx_if_init_mutex;
146 extern pthread_mutex_t rx_if_mutex;
147 extern pthread_mutex_t rxkad_client_uid_mutex;
148 extern pthread_mutex_t rxkad_random_mutex;
149
150 extern pthread_cond_t rx_event_handler_cond;
151 extern pthread_cond_t rx_listener_cond;
152
153 static pthread_mutex_t epoch_mutex;
154 static pthread_mutex_t rx_init_mutex;
155 static pthread_mutex_t rx_debug_mutex;
156
157 static void rxi_InitPthread(void) {
158     assert(pthread_mutex_init(&rx_clock_mutex,
159                               (const pthread_mutexattr_t*)0)==0);
160     assert(pthread_mutex_init(&rxi_connCacheMutex,
161                               (const pthread_mutexattr_t*)0)==0);
162     assert(pthread_mutex_init(&rx_init_mutex,
163                               (const pthread_mutexattr_t*)0)==0);
164     assert(pthread_mutex_init(&epoch_mutex,
165                               (const pthread_mutexattr_t*)0)==0);
166     assert(pthread_mutex_init(&rx_event_mutex,
167                               (const pthread_mutexattr_t*)0)==0);
168     assert(pthread_mutex_init(&des_init_mutex,
169                               (const pthread_mutexattr_t*)0)==0);
170     assert(pthread_mutex_init(&des_random_mutex,
171                               (const pthread_mutexattr_t*)0)==0);
172     assert(pthread_mutex_init(&osi_malloc_mutex,
173                               (const pthread_mutexattr_t*)0)==0);
174     assert(pthread_mutex_init(&event_handler_mutex,
175                               (const pthread_mutexattr_t*)0)==0);
176     assert(pthread_mutex_init(&listener_mutex,
177                               (const pthread_mutexattr_t*)0)==0);
178     assert(pthread_mutex_init(&rx_if_init_mutex,
179                               (const pthread_mutexattr_t*)0)==0);
180     assert(pthread_mutex_init(&rx_if_mutex,
181                               (const pthread_mutexattr_t*)0)==0);
182     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
183                               (const pthread_mutexattr_t*)0)==0);
184     assert(pthread_mutex_init(&rxkad_random_mutex,
185                               (const pthread_mutexattr_t*)0)==0);
186     assert(pthread_mutex_init(&rxkad_stats_mutex,
187                               (const pthread_mutexattr_t*)0)==0);
188     assert(pthread_mutex_init(&rx_debug_mutex,
189                               (const pthread_mutexattr_t*)0)==0);
190
191     assert(pthread_cond_init(&rx_event_handler_cond,
192                               (const pthread_condattr_t*)0)==0);
193     assert(pthread_cond_init(&rx_listener_cond,
194                               (const pthread_condattr_t*)0)==0);
195     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
196 }
197
198 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
199 #define INIT_PTHREAD_LOCKS \
200 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
201 /*
202  * The rx_stats_mutex mutex protects the following global variables:
203  * rxi_dataQuota
204  * rxi_minDeficit
205  * rxi_availProcs
206  * rxi_totalMin
207  * rxi_lowConnRefCount
208  * rxi_lowPeerRefCount
209  * rxi_nCalls
210  * rxi_Alloccnt
211  * rxi_Allocsize
212  * rx_nFreePackets
213  * rx_tq_debug
214  * rx_stats
215  */
216 #else
217 #define INIT_PTHREAD_LOCKS
218 #endif
219
220 extern void rxi_DeleteCachedConnections(void);
221
222
223 /* Variables for handling the minProcs implementation.  availProcs gives the
224  * number of threads available in the pool at this moment (not counting dudes
225  * executing right now).  totalMin gives the total number of procs required
226  * for handling all minProcs requests.  minDeficit is a dynamic variable
227  * tracking the # of procs required to satisfy all of the remaining minProcs
228  * demands.
229  * For fine grain locking to work, the quota check and the reservation of
230  * a server thread has to come while rxi_availProcs and rxi_minDeficit
231  * are locked. To this end, the code has been modified under #ifdef
232  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
233  * same time. A new function, ReturnToServerPool() returns the allocation.
234  * 
235  * A call can be on several queue's (but only one at a time). When
236  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
237  * that no one else is touching the queue. To this end, we store the address
238  * of the queue lock in the call structure (under the call lock) when we
239  * put the call on a queue, and we clear the call_queue_lock when the
240  * call is removed from a queue (once the call lock has been obtained).
241  * This allows rxi_ResetCall to safely synchronize with others wishing
242  * to manipulate the queue.
243  */
244
245 extern void rxi_Delay(int);
246
247 static int rxi_ServerThreadSelectingCall;
248
249 #ifdef RX_ENABLE_LOCKS
250 static afs_kmutex_t rx_rpc_stats;
251 void rxi_StartUnlocked();
252 #endif
253
254 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
255 ** pretty good that the next packet coming in is from the same connection 
256 ** as the last packet, since we're send multiple packets in a transmit window.
257 */
258 struct rx_connection *rxLastConn = 0; 
259
260 #ifdef RX_ENABLE_LOCKS
261 /* The locking hierarchy for rx fine grain locking is composed of five
262  * tiers:
263  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
264  * call->lock - locks call data fields.
265  * Most any other lock - these are all independent of each other.....
266  *      rx_freePktQ_lock
267  *      rx_freeCallQueue_lock
268  *      freeSQEList_lock
269  *      rx_connHashTable_lock
270  *      rx_serverPool_lock
271  *      rxi_keyCreate_lock
272  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
273
274  * lowest level:
275  *      peer_lock - locks peer data fields.
276  *      conn_data_lock - that more than one thread is not updating a conn data
277  *              field at the same time.
278  * Do we need a lock to protect the peer field in the conn structure?
279  *      conn->peer was previously a constant for all intents and so has no
280  *      lock protecting this field. The multihomed client delta introduced
281  *      a RX code change : change the peer field in the connection structure
282  *      to that remote inetrface from which the last packet for this
283  *      connection was sent out. This may become an issue if further changes
284  *      are made.
285  */
286 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
287 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
288 #ifdef RX_LOCKS_DB
289 /* rxdb_fileID is used to identify the lock location, along with line#. */
290 static int rxdb_fileID = RXDB_FILE_RX;
291 #endif /* RX_LOCKS_DB */
292 static void rxi_SetAcksInTransmitQueue();
293 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
294 #else /* RX_ENABLE_LOCKS */
295 #define SET_CALL_QUEUE_LOCK(C, L)
296 #define CLEAR_CALL_QUEUE_LOCK(C)
297 #endif /* RX_ENABLE_LOCKS */
298 static void rxi_DestroyConnectionNoLock();
299 void rxi_DestroyConnection();
300 void rxi_CleanupConnection();
301 struct rx_serverQueueEntry *rx_waitForPacket = 0;
302
303 /* ------------Exported Interfaces------------- */
304
305 /* This function allows rxkad to set the epoch to a suitably random number
306  * which rx_NewConnection will use in the future.  The principle purpose is to
307  * get rxnull connections to use the same epoch as the rxkad connections do, at
308  * least once the first rxkad connection is established.  This is important now
309  * that the host/port addresses aren't used in FindConnection: the uniqueness
310  * of epoch/cid matters and the start time won't do. */
311
312 #ifdef AFS_PTHREAD_ENV
313 /*
314  * This mutex protects the following global variables:
315  * rx_epoch
316  */
317
318 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
319 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
320 #else
321 #define LOCK_EPOCH
322 #define UNLOCK_EPOCH
323 #endif /* AFS_PTHREAD_ENV */
324
325 void rx_SetEpoch (epoch)
326   afs_uint32 epoch;
327 {
328     LOCK_EPOCH
329     rx_epoch = epoch;
330     UNLOCK_EPOCH
331 }
332
333 /* Initialize rx.  A port number may be mentioned, in which case this
334  * becomes the default port number for any service installed later.
335  * If 0 is provided for the port number, a random port will be chosen
336  * by the kernel.  Whether this will ever overlap anything in
337  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
338  * error. */
339 static int rxinit_status = 1;
340 #ifdef AFS_PTHREAD_ENV
341 /*
342  * This mutex protects the following global variables:
343  * rxinit_status
344  */
345
346 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
347 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
348 #else
349 #define LOCK_RX_INIT
350 #define UNLOCK_RX_INIT
351 #endif
352
353 int rx_Init(u_int port)
354 {
355 #ifdef KERNEL
356     osi_timeval_t tv;
357 #else /* KERNEL */
358     struct timeval tv;
359 #endif /* KERNEL */
360     char *htable, *ptable;
361     int tmp_status;
362
363     SPLVAR;
364
365     INIT_PTHREAD_LOCKS
366     LOCK_RX_INIT
367     if (rxinit_status == 0) {
368         tmp_status = rxinit_status;
369         UNLOCK_RX_INIT
370         return tmp_status; /* Already started; return previous error code. */
371     }
372
373 #ifdef AFS_NT40_ENV
374     if (afs_winsockInit()<0)
375         return -1;
376 #endif
377
378 #ifndef KERNEL
379     /*
380      * Initialize anything necessary to provide a non-premptive threading
381      * environment.
382      */
383     rxi_InitializeThreadSupport();
384 #endif
385
386     /* Allocate and initialize a socket for client and perhaps server
387      * connections. */
388
389     rx_socket = rxi_GetUDPSocket((u_short)port); 
390     if (rx_socket == OSI_NULLSOCKET) {
391         UNLOCK_RX_INIT
392         return RX_ADDRINUSE;
393     }
394     
395
396 #ifdef  RX_ENABLE_LOCKS
397 #ifdef RX_LOCKS_DB
398     rxdb_init();
399 #endif /* RX_LOCKS_DB */
400     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
401     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
402     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
403     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
404     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
405                MUTEX_DEFAULT,0);
406     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
407     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
408     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
409     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
410 #ifndef KERNEL
411     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
412 #endif /* !KERNEL */
413     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
414 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
415     if ( !uniprocessor )
416       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
417 #endif /* KERNEL && AFS_HPUX110_ENV */
418 #else /* RX_ENABLE_LOCKS */
419 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
420     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
421 #endif /* AFS_GLOBAL_SUNLOCK */
422 #endif /* RX_ENABLE_LOCKS */
423
424     rxi_nCalls = 0;
425     rx_connDeadTime = 12;
426     rx_tranquil     = 0;        /* reset flag */
427     bzero((char *)&rx_stats, sizeof(struct rx_stats));
428     htable = (char *)
429         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
430     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
431     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
432     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
433     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
434     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
435
436     /* Malloc up a bunch of packets & buffers */
437     rx_nFreePackets = 0;
438     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
439     queue_Init(&rx_freePacketQueue);
440     rxi_NeedMorePackets = FALSE;
441     rxi_MorePackets(rx_nPackets);
442     rx_CheckPackets();
443
444     NETPRI;
445     AFS_RXGLOCK();
446
447     clock_Init();
448
449 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
450     tv.tv_sec = clock_now.sec;
451     tv.tv_usec = clock_now.usec;
452     srand((unsigned int) tv.tv_usec);
453 #else
454     osi_GetTime(&tv);
455 #endif
456     if (port) {
457         rx_port = port;
458     } else {
459 #if defined(KERNEL) && !defined(UKERNEL)
460         /* Really, this should never happen in a real kernel */
461         rx_port = 0;
462 #else
463         struct sockaddr_in addr;
464         int addrlen = sizeof(addr);
465         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
466             rx_Finalize();
467             return -1;
468         }
469         rx_port = addr.sin_port;
470 #endif
471     }
472     rx_stats.minRtt.sec = 9999999;
473 #ifdef  KERNEL
474     rx_SetEpoch (tv.tv_sec | 0x80000000);
475 #else
476     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
477                                          * will provide a randomer value. */
478 #endif
479     MUTEX_ENTER(&rx_stats_mutex);
480     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
481     MUTEX_EXIT(&rx_stats_mutex);
482     /* *Slightly* random start time for the cid.  This is just to help
483      * out with the hashing function at the peer */
484     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
485     rx_connHashTable = (struct rx_connection **) htable;
486     rx_peerHashTable = (struct rx_peer **) ptable;
487
488     rx_lastAckDelay.sec = 0;
489     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
490     rx_hardAckDelay.sec = 0;
491     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
492     rx_softAckDelay.sec = 0;
493     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
494
495     rxevent_Init(20, rxi_ReScheduleEvents);
496
497     /* Initialize various global queues */
498     queue_Init(&rx_idleServerQueue);
499     queue_Init(&rx_incomingCallQueue);
500     queue_Init(&rx_freeCallQueue);
501
502 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
503     /* Initialize our list of usable IP addresses. */
504     rx_GetIFInfo();
505 #endif
506
507     /* Start listener process (exact function is dependent on the
508      * implementation environment--kernel or user space) */
509     rxi_StartListener();
510
511     AFS_RXGUNLOCK();
512     USERPRI;
513     tmp_status = rxinit_status = 0;
514     UNLOCK_RX_INIT
515     return tmp_status;
516 }
517
518 /* called with unincremented nRequestsRunning to see if it is OK to start
519  * a new thread in this service.  Could be "no" for two reasons: over the
520  * max quota, or would prevent others from reaching their min quota.
521  */
522 #ifdef RX_ENABLE_LOCKS
523 /* This verion of QuotaOK reserves quota if it's ok while the
524  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
525  */
526 static int QuotaOK(aservice)
527 register struct rx_service *aservice;
528 {
529     /* check if over max quota */
530     if (aservice->nRequestsRunning >= aservice->maxProcs) {
531         return 0;
532     }
533
534     /* under min quota, we're OK */
535     /* otherwise, can use only if there are enough to allow everyone
536      * to go to their min quota after this guy starts.
537      */
538     MUTEX_ENTER(&rx_stats_mutex);
539     if ((aservice->nRequestsRunning < aservice->minProcs) ||
540          (rxi_availProcs > rxi_minDeficit)) {
541         aservice->nRequestsRunning++;
542         /* just started call in minProcs pool, need fewer to maintain
543          * guarantee */
544         if (aservice->nRequestsRunning <= aservice->minProcs)
545             rxi_minDeficit--;
546         rxi_availProcs--;
547         MUTEX_EXIT(&rx_stats_mutex);
548         return 1;
549     }
550     MUTEX_EXIT(&rx_stats_mutex);
551
552     return 0;
553 }
554 static void ReturnToServerPool(aservice)
555 register struct rx_service *aservice;
556 {
557     aservice->nRequestsRunning--;
558     MUTEX_ENTER(&rx_stats_mutex);
559     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
560     rxi_availProcs++;
561     MUTEX_EXIT(&rx_stats_mutex);
562 }
563
564 #else /* RX_ENABLE_LOCKS */
565 static QuotaOK(aservice)
566 register struct rx_service *aservice; {
567     int rc=0;
568     /* under min quota, we're OK */
569     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
570
571     /* check if over max quota */
572     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
573
574     /* otherwise, can use only if there are enough to allow everyone
575      * to go to their min quota after this guy starts.
576      */
577     if (rxi_availProcs > rxi_minDeficit) rc = 1;
578     return rc;
579 }
580 #endif /* RX_ENABLE_LOCKS */
581
582 #ifndef KERNEL
583 /* Called by rx_StartServer to start up lwp's to service calls.
584    NExistingProcs gives the number of procs already existing, and which
585    therefore needn't be created. */
586 void rxi_StartServerProcs(nExistingProcs)
587     int nExistingProcs;
588 {
589     register struct rx_service *service;
590     register int i;
591     int maxdiff = 0;
592     int nProcs = 0;
593
594     /* For each service, reserve N processes, where N is the "minimum"
595        number of processes that MUST be able to execute a request in parallel,
596        at any time, for that process.  Also compute the maximum difference
597        between any service's maximum number of processes that can run
598        (i.e. the maximum number that ever will be run, and a guarantee
599        that this number will run if other services aren't running), and its
600        minimum number.  The result is the extra number of processes that
601        we need in order to provide the latter guarantee */
602     for (i=0; i<RX_MAX_SERVICES; i++) {
603         int diff;
604         service = rx_services[i];
605         if (service == (struct rx_service *) 0) break;
606         nProcs += service->minProcs;
607         diff = service->maxProcs - service->minProcs;
608         if (diff > maxdiff) maxdiff = diff;
609     }
610     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
611     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
612     for (i = 0; i<nProcs; i++) {
613         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
614     }
615 }
616 #endif /* KERNEL */
617
618 /* This routine must be called if any services are exported.  If the
619  * donateMe flag is set, the calling process is donated to the server
620  * process pool */
621 void rx_StartServer(donateMe)
622 {
623     register struct rx_service *service;
624     register int i, nProcs;
625     SPLVAR;
626     clock_NewTime();
627
628     NETPRI;
629     AFS_RXGLOCK();
630     /* Start server processes, if necessary (exact function is dependent
631      * on the implementation environment--kernel or user space).  DonateMe
632      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
633      * case, one less new proc will be created rx_StartServerProcs.
634      */
635     rxi_StartServerProcs(donateMe);
636
637     /* count up the # of threads in minProcs, and add set the min deficit to
638      * be that value, too.
639      */
640     for (i=0; i<RX_MAX_SERVICES; i++) {
641         service = rx_services[i];
642         if (service == (struct rx_service *) 0) break;
643         MUTEX_ENTER(&rx_stats_mutex);
644         rxi_totalMin += service->minProcs;
645         /* below works even if a thread is running, since minDeficit would
646          * still have been decremented and later re-incremented.
647          */
648         rxi_minDeficit += service->minProcs;
649         MUTEX_EXIT(&rx_stats_mutex);
650     }
651
652     /* Turn on reaping of idle server connections */
653     rxi_ReapConnections();
654
655     AFS_RXGUNLOCK();
656     USERPRI;
657
658     if (donateMe) {
659 #ifndef AFS_NT40_ENV
660 #ifndef KERNEL
661         int code;
662         char name[32];
663 #ifdef AFS_PTHREAD_ENV
664         pid_t pid;
665         pid = pthread_self();
666 #else /* AFS_PTHREAD_ENV */
667         PROCESS pid;
668         code = LWP_CurrentProcess(&pid);
669 #endif /* AFS_PTHREAD_ENV */
670
671         sprintf(name,"srv_%d", ++nProcs);
672         if (registerProgram)
673             (*registerProgram)(pid, name);
674 #endif /* KERNEL */
675 #endif /* AFS_NT40_ENV */
676         rx_ServerProc(); /* Never returns */
677     }
678     return;
679 }
680
681 /* Create a new client connection to the specified service, using the
682  * specified security object to implement the security model for this
683  * connection. */
684 struct rx_connection *
685 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
686     register afs_uint32 shost;      /* Server host */
687     u_short sport;                  /* Server port */
688     u_short sservice;               /* Server service id */
689     register struct rx_securityClass *securityObject;
690     int serviceSecurityIndex;
691 {
692     int hashindex;
693     afs_int32 cid;
694     register struct rx_connection *conn;
695
696     SPLVAR;
697
698     clock_NewTime();
699     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
700           shost, sport, sservice, securityObject, serviceSecurityIndex));
701
702     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
703      * the case of kmem_alloc? */
704     conn = rxi_AllocConnection();
705 #ifdef  RX_ENABLE_LOCKS
706     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
707     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
708     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
709 #endif
710     NETPRI;
711     AFS_RXGLOCK();
712     MUTEX_ENTER(&rx_connHashTable_lock);
713     cid = (rx_nextCid += RX_MAXCALLS);
714     conn->type = RX_CLIENT_CONNECTION;
715     conn->cid = cid;
716     conn->epoch = rx_epoch;
717     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
718     conn->serviceId = sservice;
719     conn->securityObject = securityObject;
720     /* This doesn't work in all compilers with void (they're buggy), so fake it
721      * with VOID */
722     conn->securityData = (VOID *) 0;
723     conn->securityIndex = serviceSecurityIndex;
724     rx_SetConnDeadTime(conn, rx_connDeadTime);
725     conn->ackRate = RX_FAST_ACK_RATE;
726     conn->nSpecific = 0;
727     conn->specific = NULL;
728     conn->challengeEvent = (struct rxevent *)0;
729     conn->delayedAbortEvent = (struct rxevent *)0;
730     conn->abortCount = 0;
731     conn->error = 0;
732
733     RXS_NewConnection(securityObject, conn);
734     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
735     
736     conn->refCount++; /* no lock required since only this thread knows... */
737     conn->next = rx_connHashTable[hashindex];
738     rx_connHashTable[hashindex] = conn;
739     MUTEX_ENTER(&rx_stats_mutex);
740     rx_stats.nClientConns++;
741     MUTEX_EXIT(&rx_stats_mutex);
742
743     MUTEX_EXIT(&rx_connHashTable_lock);
744     AFS_RXGUNLOCK();
745     USERPRI;
746     return conn;
747 }
748
749 void rx_SetConnDeadTime(conn, seconds)
750     register struct rx_connection *conn;
751     register int seconds;
752 {
753     /* The idea is to set the dead time to a value that allows several
754      * keepalives to be dropped without timing out the connection. */
755     conn->secondsUntilDead = MAX(seconds, 6);
756     conn->secondsUntilPing = conn->secondsUntilDead/6;
757 }
758
759 int rxi_lowPeerRefCount = 0;
760 int rxi_lowConnRefCount = 0;
761
762 /*
763  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
764  * NOTE: must not be called with rx_connHashTable_lock held.
765  */
766 void rxi_CleanupConnection(conn)
767     struct rx_connection *conn;
768 {
769     int i;
770
771     /* Notify the service exporter, if requested, that this connection
772      * is being destroyed */
773     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
774       (*conn->service->destroyConnProc)(conn);
775
776     /* Notify the security module that this connection is being destroyed */
777     RXS_DestroyConnection(conn->securityObject, conn);
778
779     /* If this is the last connection using the rx_peer struct, set its
780      * idle time to now. rxi_ReapConnections will reap it if it's still
781      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
782      */
783     MUTEX_ENTER(&rx_peerHashTable_lock);
784     if (--conn->peer->refCount <= 0) {
785         conn->peer->idleWhen = clock_Sec();
786         if (conn->peer->refCount < 0) {
787             conn->peer->refCount = 0; 
788             MUTEX_ENTER(&rx_stats_mutex);
789             rxi_lowPeerRefCount ++;
790             MUTEX_EXIT(&rx_stats_mutex);
791         }
792     }
793     MUTEX_EXIT(&rx_peerHashTable_lock);
794
795     MUTEX_ENTER(&rx_stats_mutex);
796     if (conn->type == RX_SERVER_CONNECTION)
797       rx_stats.nServerConns--;
798     else
799       rx_stats.nClientConns--;
800     MUTEX_EXIT(&rx_stats_mutex);
801
802 #ifndef KERNEL
803     if (conn->specific) {
804         for (i = 0 ; i < conn->nSpecific ; i++) {
805             if (conn->specific[i] && rxi_keyCreate_destructor[i])
806                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
807             conn->specific[i] = NULL;
808         }
809         free(conn->specific);
810     }
811     conn->specific = NULL;
812     conn->nSpecific = 0;
813 #endif /* !KERNEL */
814
815     MUTEX_DESTROY(&conn->conn_call_lock);
816     MUTEX_DESTROY(&conn->conn_data_lock);
817     CV_DESTROY(&conn->conn_call_cv);
818         
819     rxi_FreeConnection(conn);
820 }
821
822 /* Destroy the specified connection */
823 void rxi_DestroyConnection(conn)
824     register struct rx_connection *conn;
825 {
826     MUTEX_ENTER(&rx_connHashTable_lock);
827     rxi_DestroyConnectionNoLock(conn);
828     /* conn should be at the head of the cleanup list */
829     if (conn == rx_connCleanup_list) {
830         rx_connCleanup_list = rx_connCleanup_list->next;
831         MUTEX_EXIT(&rx_connHashTable_lock);
832         rxi_CleanupConnection(conn);
833     }
834 #ifdef RX_ENABLE_LOCKS
835     else {
836         MUTEX_EXIT(&rx_connHashTable_lock);
837     }
838 #endif /* RX_ENABLE_LOCKS */
839 }
840     
841 static void rxi_DestroyConnectionNoLock(conn)
842     register struct rx_connection *conn;
843 {
844     register struct rx_connection **conn_ptr;
845     register int havecalls = 0;
846     struct rx_packet *packet;
847     int i;
848     SPLVAR;
849
850     clock_NewTime();
851
852     NETPRI;
853     MUTEX_ENTER(&conn->conn_data_lock);
854     if (conn->refCount > 0)
855         conn->refCount--;
856     else {
857         MUTEX_ENTER(&rx_stats_mutex);
858         rxi_lowConnRefCount++;
859         MUTEX_EXIT(&rx_stats_mutex);
860     }
861
862     if (conn->refCount > 0) {
863         /* Busy; wait till the last guy before proceeding */
864         MUTEX_EXIT(&conn->conn_data_lock);
865         USERPRI;
866         return;
867     }
868
869     /* If the client previously called rx_NewCall, but it is still
870      * waiting, treat this as a running call, and wait to destroy the
871      * connection later when the call completes. */
872     if ((conn->type == RX_CLIENT_CONNECTION) &&
873         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
874         conn->flags |= RX_CONN_DESTROY_ME;
875         MUTEX_EXIT(&conn->conn_data_lock);
876         USERPRI;
877         return;
878     }
879     MUTEX_EXIT(&conn->conn_data_lock);
880
881     /* Check for extant references to this connection */
882     for (i = 0; i<RX_MAXCALLS; i++) {
883         register struct rx_call *call = conn->call[i];
884         if (call) {
885             havecalls = 1;
886             if (conn->type == RX_CLIENT_CONNECTION) {
887                 MUTEX_ENTER(&call->lock);
888                 if (call->delayedAckEvent) {
889                     /* Push the final acknowledgment out now--there
890                      * won't be a subsequent call to acknowledge the
891                      * last reply packets */
892                     rxevent_Cancel(call->delayedAckEvent, call,
893                                    RX_CALL_REFCOUNT_DELAY);
894                     rxi_AckAll((struct rxevent *)0, call, 0);
895                 }
896                 MUTEX_EXIT(&call->lock);
897             }
898         }
899     }
900 #ifdef RX_ENABLE_LOCKS
901     if (!havecalls) {
902         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
903             MUTEX_EXIT(&conn->conn_data_lock);
904         }
905         else {
906             /* Someone is accessing a packet right now. */
907             havecalls = 1;
908         }
909     }
910 #endif /* RX_ENABLE_LOCKS */
911
912     if (havecalls) {
913         /* Don't destroy the connection if there are any call
914          * structures still in use */
915         MUTEX_ENTER(&conn->conn_data_lock);
916         conn->flags |= RX_CONN_DESTROY_ME;
917         MUTEX_EXIT(&conn->conn_data_lock);
918         USERPRI;
919         return;
920     }
921
922     if (conn->delayedAbortEvent) {
923         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
924         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
925         if (packet) {
926             MUTEX_ENTER(&conn->conn_data_lock);
927             rxi_SendConnectionAbort(conn, packet, 0, 1);
928             MUTEX_EXIT(&conn->conn_data_lock);
929             rxi_FreePacket(packet);
930         }
931     }
932
933     /* Remove from connection hash table before proceeding */
934     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
935                                              conn->epoch, conn->type) ];
936     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
937         if (*conn_ptr == conn) {
938             *conn_ptr = conn->next;
939             break;
940         }
941     }
942     /* if the conn that we are destroying was the last connection, then we
943     * clear rxLastConn as well */
944     if ( rxLastConn == conn )
945         rxLastConn = 0;
946
947     /* Make sure the connection is completely reset before deleting it. */
948     /* get rid of pending events that could zap us later */
949     if (conn->challengeEvent) {
950         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
951     }
952  
953     /* Add the connection to the list of destroyed connections that
954      * need to be cleaned up. This is necessary to avoid deadlocks
955      * in the routines we call to inform others that this connection is
956      * being destroyed. */
957     conn->next = rx_connCleanup_list;
958     rx_connCleanup_list = conn;
959 }
960
961 /* Externally available version */
962 void rx_DestroyConnection(conn) 
963     register struct rx_connection *conn;
964 {
965     SPLVAR;
966
967     NETPRI;
968     AFS_RXGLOCK();
969     rxi_DestroyConnection (conn);
970     AFS_RXGUNLOCK();
971     USERPRI;
972 }
973
974 /* Start a new rx remote procedure call, on the specified connection.
975  * If wait is set to 1, wait for a free call channel; otherwise return
976  * 0.  Maxtime gives the maximum number of seconds this call may take,
977  * after rx_MakeCall returns.  After this time interval, a call to any
978  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
979  * For fine grain locking, we hold the conn_call_lock in order to 
980  * to ensure that we don't get signalle after we found a call in an active
981  * state and before we go to sleep.
982  */
983 struct rx_call *rx_NewCall(conn)
984     register struct rx_connection *conn;
985 {
986     register int i;
987     register struct rx_call *call;
988     struct clock queueTime;
989     SPLVAR;
990
991     clock_NewTime();
992     dpf (("rx_MakeCall(conn %x)\n", conn));
993
994     NETPRI;
995     clock_GetTime(&queueTime);
996     AFS_RXGLOCK();
997     MUTEX_ENTER(&conn->conn_call_lock);
998     for (;;) {
999         for (i=0; i<RX_MAXCALLS; i++) {
1000             call = conn->call[i];
1001             if (call) {
1002                 MUTEX_ENTER(&call->lock);
1003                 if (call->state == RX_STATE_DALLY) {
1004                     rxi_ResetCall(call, 0);
1005                     (*call->callNumber)++;
1006                     break;
1007                 }
1008                 MUTEX_EXIT(&call->lock);
1009             }
1010             else {
1011                 call = rxi_NewCall(conn, i);
1012                 MUTEX_ENTER(&call->lock);
1013                 break;
1014             }
1015         }
1016         if (i < RX_MAXCALLS) {
1017             break;
1018         }
1019         MUTEX_ENTER(&conn->conn_data_lock);
1020         conn->flags |= RX_CONN_MAKECALL_WAITING;
1021         MUTEX_EXIT(&conn->conn_data_lock);
1022 #ifdef  RX_ENABLE_LOCKS
1023         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1024 #else
1025         osi_rxSleep(conn);
1026 #endif
1027     }
1028
1029     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1030
1031     /* Client is initially in send mode */
1032     call->state = RX_STATE_ACTIVE;
1033     call->mode = RX_MODE_SENDING;
1034
1035     /* remember start time for call in case we have hard dead time limit */
1036     call->queueTime = queueTime;
1037     clock_GetTime(&call->startTime);
1038     hzero(call->bytesSent);
1039     hzero(call->bytesRcvd);
1040
1041     /* Turn on busy protocol. */
1042     rxi_KeepAliveOn(call);
1043
1044     MUTEX_EXIT(&call->lock);
1045     MUTEX_EXIT(&conn->conn_call_lock);
1046     AFS_RXGUNLOCK();
1047     USERPRI;
1048
1049 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1050     /* Now, if TQ wasn't cleared earlier, do it now. */
1051     AFS_RXGLOCK();
1052     MUTEX_ENTER(&call->lock);
1053     while (call->flags & RX_CALL_TQ_BUSY) {
1054         call->flags |= RX_CALL_TQ_WAIT;
1055 #ifdef RX_ENABLE_LOCKS
1056         CV_WAIT(&call->cv_tq, &call->lock);
1057 #else /* RX_ENABLE_LOCKS */
1058         osi_rxSleep(&call->tq);
1059 #endif /* RX_ENABLE_LOCKS */
1060     }
1061     if (call->flags & RX_CALL_TQ_CLEARME) {
1062         rxi_ClearTransmitQueue(call, 0);
1063         queue_Init(&call->tq);
1064     }
1065     MUTEX_EXIT(&call->lock);
1066     AFS_RXGUNLOCK();
1067 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1068
1069     return call;
1070 }
1071
1072 rxi_HasActiveCalls(aconn)
1073 register struct rx_connection *aconn; {
1074     register int i;
1075     register struct rx_call *tcall;
1076     SPLVAR;
1077
1078     NETPRI;
1079     for(i=0; i<RX_MAXCALLS; i++) {
1080       if (tcall = aconn->call[i]) {
1081         if ((tcall->state == RX_STATE_ACTIVE) 
1082             || (tcall->state == RX_STATE_PRECALL)) {
1083           USERPRI;
1084           return 1;
1085         }
1086       }
1087     }
1088     USERPRI;
1089     return 0;
1090 }
1091
1092 rxi_GetCallNumberVector(aconn, aint32s)
1093 register struct rx_connection *aconn;
1094 register afs_int32 *aint32s; {
1095     register int i;
1096     register struct rx_call *tcall;
1097     SPLVAR;
1098
1099     NETPRI;
1100     for(i=0; i<RX_MAXCALLS; i++) {
1101         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1102             aint32s[i] = aconn->callNumber[i]+1;
1103         else
1104             aint32s[i] = aconn->callNumber[i];
1105     }
1106     USERPRI;
1107     return 0;
1108 }
1109
1110 rxi_SetCallNumberVector(aconn, aint32s)
1111 register struct rx_connection *aconn;
1112 register afs_int32 *aint32s; {
1113     register int i;
1114     register struct rx_call *tcall;
1115     SPLVAR;
1116
1117     NETPRI;
1118     for(i=0; i<RX_MAXCALLS; i++) {
1119         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1120             aconn->callNumber[i] = aint32s[i] - 1;
1121         else
1122             aconn->callNumber[i] = aint32s[i];
1123     }
1124     USERPRI;
1125     return 0;
1126 }
1127
1128 /* Advertise a new service.  A service is named locally by a UDP port
1129  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1130  * on a failure. */
1131 struct rx_service *
1132 rx_NewService(port, serviceId, serviceName, securityObjects,
1133               nSecurityObjects, serviceProc)
1134     u_short port;
1135     u_short serviceId;
1136     char *serviceName;  /* Name for identification purposes (e.g. the
1137                          * service name might be used for probing for
1138                          * statistics) */
1139     struct rx_securityClass **securityObjects;
1140     int nSecurityObjects;
1141     afs_int32 (*serviceProc)();
1142 {    
1143     osi_socket socket = OSI_NULLSOCKET;
1144     register struct rx_service *tservice;    
1145     register int i;
1146     SPLVAR;
1147
1148     clock_NewTime();
1149
1150     if (serviceId == 0) {
1151         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1152          serviceName);
1153         return 0;
1154     }
1155     if (port == 0) {
1156         if (rx_port == 0) {
1157             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1158             return 0;
1159         }
1160         port = rx_port;
1161         socket = rx_socket;
1162     }
1163
1164     tservice = rxi_AllocService();
1165     NETPRI;
1166     AFS_RXGLOCK();
1167     for (i = 0; i<RX_MAX_SERVICES; i++) {
1168         register struct rx_service *service = rx_services[i];
1169         if (service) {
1170             if (port == service->servicePort) {
1171                 if (service->serviceId == serviceId) {
1172                     /* The identical service has already been
1173                      * installed; if the caller was intending to
1174                      * change the security classes used by this
1175                      * service, he/she loses. */
1176                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1177                     AFS_RXGUNLOCK();
1178                     USERPRI;
1179                     rxi_FreeService(tservice);
1180                     return service;
1181                 }
1182                 /* Different service, same port: re-use the socket
1183                  * which is bound to the same port */
1184                 socket = service->socket;
1185             }
1186         } else {
1187             if (socket == OSI_NULLSOCKET) {
1188                 /* If we don't already have a socket (from another
1189                  * service on same port) get a new one */
1190                 socket = rxi_GetUDPSocket(port);
1191                 if (socket == OSI_NULLSOCKET) {
1192                     AFS_RXGUNLOCK();
1193                     USERPRI;
1194                     rxi_FreeService(tservice);
1195                     return 0;
1196                 }
1197             }
1198             service = tservice;
1199             service->socket = socket;
1200             service->servicePort = port;
1201             service->serviceId = serviceId;
1202             service->serviceName = serviceName;
1203             service->nSecurityObjects = nSecurityObjects;
1204             service->securityObjects = securityObjects;
1205             service->minProcs = 0;
1206             service->maxProcs = 1;
1207             service->idleDeadTime = 60;
1208             service->connDeadTime = rx_connDeadTime;
1209             service->executeRequestProc = serviceProc;
1210             rx_services[i] = service;   /* not visible until now */
1211             AFS_RXGUNLOCK();
1212             USERPRI;
1213             return service;
1214         }
1215     }
1216     AFS_RXGUNLOCK();
1217     USERPRI;
1218     rxi_FreeService(tservice);
1219     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1220     return 0;
1221 }
1222
1223 /* Generic request processing loop. This routine should be called
1224  * by the implementation dependent rx_ServerProc. If socketp is
1225  * non-null, it will be set to the file descriptor that this thread
1226  * is now listening on. If socketp is null, this routine will never
1227  * returns. */
1228 void rxi_ServerProc(threadID, newcall, socketp)
1229 int threadID;
1230 struct rx_call *newcall;
1231 osi_socket *socketp;
1232 {
1233     register struct rx_call *call;
1234     register afs_int32 code;
1235     register struct rx_service *tservice = NULL;
1236
1237     for (;;) {
1238         if (newcall) {
1239             call = newcall;
1240             newcall = NULL;
1241         } else {
1242             call = rx_GetCall(threadID, tservice, socketp);
1243             if (socketp && *socketp != OSI_NULLSOCKET) {
1244                 /* We are now a listener thread */
1245                 return;
1246             }
1247         }
1248
1249         /* if server is restarting( typically smooth shutdown) then do not
1250          * allow any new calls.
1251          */
1252
1253         if ( rx_tranquil && (call != NULL) ) {
1254             SPLVAR;
1255
1256             NETPRI;
1257             AFS_RXGLOCK();
1258             MUTEX_ENTER(&call->lock);
1259
1260             rxi_CallError(call, RX_RESTARTING);
1261             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1262
1263             MUTEX_EXIT(&call->lock);
1264             AFS_RXGUNLOCK();
1265             USERPRI;
1266         }
1267
1268 #ifdef  KERNEL
1269         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1270 #ifdef RX_ENABLE_LOCKS
1271             AFS_GLOCK();
1272 #endif /* RX_ENABLE_LOCKS */
1273             afs_termState = AFSOP_STOP_AFS;
1274             afs_osi_Wakeup(&afs_termState);
1275 #ifdef RX_ENABLE_LOCKS
1276             AFS_GUNLOCK();
1277 #endif /* RX_ENABLE_LOCKS */
1278             return;
1279         }
1280 #endif
1281
1282         tservice = call->conn->service;
1283
1284         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1285
1286         code = call->conn->service->executeRequestProc(call);
1287
1288         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1289
1290         rx_EndCall(call, code);
1291         MUTEX_ENTER(&rx_stats_mutex);
1292         rxi_nCalls++;
1293         MUTEX_EXIT(&rx_stats_mutex);
1294     }
1295 }
1296
1297
1298 void rx_WakeupServerProcs()
1299 {
1300     struct rx_serverQueueEntry *np, *tqp;
1301     SPLVAR;
1302
1303     NETPRI;
1304     AFS_RXGLOCK();
1305     MUTEX_ENTER(&rx_serverPool_lock);
1306
1307 #ifdef RX_ENABLE_LOCKS
1308     if (rx_waitForPacket)
1309         CV_BROADCAST(&rx_waitForPacket->cv);
1310 #else /* RX_ENABLE_LOCKS */
1311     if (rx_waitForPacket)
1312         osi_rxWakeup(rx_waitForPacket);
1313 #endif /* RX_ENABLE_LOCKS */
1314     MUTEX_ENTER(&freeSQEList_lock);
1315     for (np = rx_FreeSQEList; np; np = tqp) {
1316       tqp = *(struct rx_serverQueueEntry **)np;
1317 #ifdef RX_ENABLE_LOCKS
1318       CV_BROADCAST(&np->cv);
1319 #else /* RX_ENABLE_LOCKS */
1320       osi_rxWakeup(np);
1321 #endif /* RX_ENABLE_LOCKS */
1322     }
1323     MUTEX_EXIT(&freeSQEList_lock);
1324     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1325 #ifdef RX_ENABLE_LOCKS
1326       CV_BROADCAST(&np->cv);
1327 #else /* RX_ENABLE_LOCKS */
1328       osi_rxWakeup(np);
1329 #endif /* RX_ENABLE_LOCKS */
1330     }
1331     MUTEX_EXIT(&rx_serverPool_lock);
1332     AFS_RXGUNLOCK();
1333     USERPRI;
1334 }
1335
1336 /* meltdown:
1337  * One thing that seems to happen is that all the server threads get
1338  * tied up on some empty or slow call, and then a whole bunch of calls
1339  * arrive at once, using up the packet pool, so now there are more 
1340  * empty calls.  The most critical resources here are server threads
1341  * and the free packet pool.  The "doreclaim" code seems to help in
1342  * general.  I think that eventually we arrive in this state: there
1343  * are lots of pending calls which do have all their packets present,
1344  * so they won't be reclaimed, are multi-packet calls, so they won't
1345  * be scheduled until later, and thus are tying up most of the free 
1346  * packet pool for a very long time.
1347  * future options:
1348  * 1.  schedule multi-packet calls if all the packets are present.  
1349  * Probably CPU-bound operation, useful to return packets to pool. 
1350  * Do what if there is a full window, but the last packet isn't here?
1351  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1352  * it sleeps and waits for that type of call.
1353  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1354  * the current dataquota business is badly broken.  The quota isn't adjusted
1355  * to reflect how many packets are presently queued for a running call.
1356  * So, when we schedule a queued call with a full window of packets queued
1357  * up for it, that *should* free up a window full of packets for other 2d-class
1358  * calls to be able to use from the packet pool.  But it doesn't.
1359  *
1360  * NB.  Most of the time, this code doesn't run -- since idle server threads
1361  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1362  * as a new call arrives.
1363  */
1364 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1365  * for an rx_Read. */
1366 #ifdef RX_ENABLE_LOCKS
1367 struct rx_call *
1368 rx_GetCall(tno, cur_service, socketp)
1369 int tno;
1370 struct rx_service *cur_service;
1371 osi_socket *socketp;
1372 {
1373     struct rx_serverQueueEntry *sq;
1374     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1375     struct rx_service *service;
1376     SPLVAR;
1377
1378     MUTEX_ENTER(&freeSQEList_lock);
1379
1380     if (sq = rx_FreeSQEList) {
1381         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1382         MUTEX_EXIT(&freeSQEList_lock);
1383     } else {    /* otherwise allocate a new one and return that */
1384         MUTEX_EXIT(&freeSQEList_lock);
1385         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1386         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1387         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1388     }
1389
1390     MUTEX_ENTER(&rx_serverPool_lock);
1391     if (cur_service != NULL) {
1392         ReturnToServerPool(cur_service);
1393     }
1394     while (1) {
1395         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1396             register struct rx_call *tcall, *ncall;
1397             choice2 = (struct rx_call *) 0;
1398             /* Scan for eligible incoming calls.  A call is not eligible
1399              * if the maximum number of calls for its service type are
1400              * already executing */
1401             /* One thread will process calls FCFS (to prevent starvation),
1402              * while the other threads may run ahead looking for calls which
1403              * have all their input data available immediately.  This helps 
1404              * keep threads from blocking, waiting for data from the client. */
1405             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1406               service = tcall->conn->service;
1407               if (!QuotaOK(service)) {
1408                 continue;
1409               }
1410               if (!tno || !tcall->queue_item_header.next  ) {
1411                 /* If we're thread 0, then  we'll just use 
1412                  * this call. If we haven't been able to find an optimal 
1413                  * choice, and we're at the end of the list, then use a 
1414                  * 2d choice if one has been identified.  Otherwise... */
1415                 call = (choice2 ? choice2 : tcall);
1416                 service = call->conn->service;
1417               } else if (!queue_IsEmpty(&tcall->rq)) {
1418                 struct rx_packet *rp;
1419                 rp = queue_First(&tcall->rq, rx_packet);
1420                 if (rp->header.seq == 1) {
1421                   if (!meltdown_1pkt ||             
1422                       (rp->header.flags & RX_LAST_PACKET)) {
1423                     call = tcall;
1424                   } else if (rxi_2dchoice && !choice2 &&
1425                              !(tcall->flags & RX_CALL_CLEARED) &&
1426                              (tcall->rprev > rxi_HardAckRate)) {
1427                     choice2 = tcall;
1428                   } else rxi_md2cnt++;
1429                 }
1430               }
1431               if (call)  {
1432                 break;
1433               } else {
1434                   ReturnToServerPool(service);
1435               }
1436             }
1437           }
1438
1439         if (call) {
1440             queue_Remove(call);
1441             rxi_ServerThreadSelectingCall = 1;
1442             MUTEX_EXIT(&rx_serverPool_lock);
1443             MUTEX_ENTER(&call->lock);
1444             MUTEX_ENTER(&rx_serverPool_lock);
1445
1446             if (queue_IsEmpty(&call->rq) ||
1447                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1448               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1449
1450             CLEAR_CALL_QUEUE_LOCK(call);
1451             if (call->error) {
1452                 MUTEX_EXIT(&call->lock);
1453                 ReturnToServerPool(service);
1454                 rxi_ServerThreadSelectingCall = 0;
1455                 CV_SIGNAL(&rx_serverPool_cv);
1456                 call = (struct rx_call*)0;
1457                 continue;
1458             }
1459             call->flags &= (~RX_CALL_WAIT_PROC);
1460             MUTEX_ENTER(&rx_stats_mutex);
1461             rx_nWaiting--;
1462             MUTEX_EXIT(&rx_stats_mutex);
1463             rxi_ServerThreadSelectingCall = 0;
1464             CV_SIGNAL(&rx_serverPool_cv);
1465             MUTEX_EXIT(&rx_serverPool_lock);
1466             break;
1467         }
1468         else {
1469             /* If there are no eligible incoming calls, add this process
1470              * to the idle server queue, to wait for one */
1471             sq->newcall = 0;
1472             sq->tno = tno;
1473             if (socketp) {
1474                 *socketp = OSI_NULLSOCKET;
1475             }
1476             sq->socketp = socketp;
1477             queue_Append(&rx_idleServerQueue, sq);
1478 #ifndef AFS_AIX41_ENV
1479             rx_waitForPacket = sq;
1480 #endif /* AFS_AIX41_ENV */
1481             do {
1482                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1483 #ifdef  KERNEL
1484                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1485                     MUTEX_EXIT(&rx_serverPool_lock);
1486                     return (struct rx_call *)0;
1487                 }
1488 #endif
1489             } while (!(call = sq->newcall) &&
1490                      !(socketp && *socketp != OSI_NULLSOCKET));
1491             MUTEX_EXIT(&rx_serverPool_lock);
1492             if (call) {
1493                 MUTEX_ENTER(&call->lock);
1494             }
1495             break;
1496         }
1497     }
1498
1499     MUTEX_ENTER(&freeSQEList_lock);
1500     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1501     rx_FreeSQEList = sq;
1502     MUTEX_EXIT(&freeSQEList_lock);
1503
1504     if (call) {
1505         clock_GetTime(&call->startTime);
1506         call->state = RX_STATE_ACTIVE;
1507         call->mode = RX_MODE_RECEIVING;
1508
1509         rxi_calltrace(RX_CALL_START, call);
1510         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1511              call->conn->service->servicePort, 
1512              call->conn->service->serviceId, call));
1513
1514         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1515         MUTEX_EXIT(&call->lock);
1516     } else {
1517         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1518     }
1519
1520     return call;
1521 }
1522 #else /* RX_ENABLE_LOCKS */
1523 struct rx_call *
1524 rx_GetCall(tno, cur_service, socketp)
1525   int tno;
1526   struct rx_service *cur_service;
1527   osi_socket *socketp;
1528 {
1529     struct rx_serverQueueEntry *sq;
1530     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1531     struct rx_service *service;
1532     SPLVAR;
1533
1534     NETPRI;
1535     AFS_RXGLOCK();
1536     MUTEX_ENTER(&freeSQEList_lock);
1537
1538     if (sq = rx_FreeSQEList) {
1539         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1540         MUTEX_EXIT(&freeSQEList_lock);
1541     } else {    /* otherwise allocate a new one and return that */
1542         MUTEX_EXIT(&freeSQEList_lock);
1543         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1544         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1545         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1546     }
1547     MUTEX_ENTER(&sq->lock);
1548
1549     if (cur_service != NULL) {
1550         cur_service->nRequestsRunning--;
1551         if (cur_service->nRequestsRunning < cur_service->minProcs)
1552             rxi_minDeficit++;
1553         rxi_availProcs++;
1554     }
1555     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1556         register struct rx_call *tcall, *ncall;
1557         /* Scan for eligible incoming calls.  A call is not eligible
1558          * if the maximum number of calls for its service type are
1559          * already executing */
1560         /* One thread will process calls FCFS (to prevent starvation),
1561          * while the other threads may run ahead looking for calls which
1562          * have all their input data available immediately.  This helps 
1563          * keep threads from blocking, waiting for data from the client. */
1564         choice2 = (struct rx_call *) 0;
1565         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1566           service = tcall->conn->service;
1567           if (QuotaOK(service)) {
1568              if (!tno || !tcall->queue_item_header.next  ) {
1569                  /* If we're thread 0, then  we'll just use 
1570                   * this call. If we haven't been able to find an optimal 
1571                   * choice, and we're at the end of the list, then use a 
1572                   * 2d choice if one has been identified.  Otherwise... */
1573                  call = (choice2 ? choice2 : tcall);
1574                  service = call->conn->service;
1575              } else if (!queue_IsEmpty(&tcall->rq)) {
1576                  struct rx_packet *rp;
1577                  rp = queue_First(&tcall->rq, rx_packet);
1578                  if (rp->header.seq == 1
1579                      && (!meltdown_1pkt ||
1580                          (rp->header.flags & RX_LAST_PACKET))) {
1581                      call = tcall;
1582                  } else if (rxi_2dchoice && !choice2 &&
1583                             !(tcall->flags & RX_CALL_CLEARED) &&
1584                             (tcall->rprev > rxi_HardAckRate)) {
1585                      choice2 = tcall;
1586                  } else rxi_md2cnt++;
1587              }
1588           }
1589           if (call) 
1590              break;
1591         }
1592       }
1593
1594     if (call) {
1595         queue_Remove(call);
1596         /* we can't schedule a call if there's no data!!! */
1597         /* send an ack if there's no data, if we're missing the
1598          * first packet, or we're missing something between first 
1599          * and last -- there's a "hole" in the incoming data. */
1600         if (queue_IsEmpty(&call->rq) ||
1601             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1602             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1603           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1604
1605         call->flags &= (~RX_CALL_WAIT_PROC);
1606         service->nRequestsRunning++;
1607         /* just started call in minProcs pool, need fewer to maintain
1608          * guarantee */
1609         if (service->nRequestsRunning <= service->minProcs)
1610             rxi_minDeficit--;
1611         rxi_availProcs--;
1612         rx_nWaiting--;
1613         /* MUTEX_EXIT(&call->lock); */
1614     }
1615     else {
1616         /* If there are no eligible incoming calls, add this process
1617          * to the idle server queue, to wait for one */
1618         sq->newcall = 0;
1619         if (socketp) {
1620             *socketp = OSI_NULLSOCKET;
1621         }
1622         sq->socketp = socketp;
1623         queue_Append(&rx_idleServerQueue, sq);
1624         do {
1625             osi_rxSleep(sq);
1626 #ifdef  KERNEL
1627                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1628                     AFS_RXGUNLOCK();
1629                     USERPRI;
1630                     return (struct rx_call *)0;
1631                 }
1632 #endif
1633         } while (!(call = sq->newcall) &&
1634                  !(socketp && *socketp != OSI_NULLSOCKET));
1635     }
1636     MUTEX_EXIT(&sq->lock);
1637
1638     MUTEX_ENTER(&freeSQEList_lock);
1639     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1640     rx_FreeSQEList = sq;
1641     MUTEX_EXIT(&freeSQEList_lock);
1642
1643     if (call) {
1644         clock_GetTime(&call->startTime);
1645         call->state = RX_STATE_ACTIVE;
1646         call->mode = RX_MODE_RECEIVING;
1647
1648         rxi_calltrace(RX_CALL_START, call);
1649         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1650          call->conn->service->servicePort, 
1651          call->conn->service->serviceId, call));
1652     } else {
1653         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1654     }
1655
1656     AFS_RXGUNLOCK();
1657     USERPRI;
1658
1659     return call;
1660 }
1661 #endif /* RX_ENABLE_LOCKS */
1662
1663
1664
1665 /* Establish a procedure to be called when a packet arrives for a
1666  * call.  This routine will be called at most once after each call,
1667  * and will also be called if there is an error condition on the or
1668  * the call is complete.  Used by multi rx to build a selection
1669  * function which determines which of several calls is likely to be a
1670  * good one to read from.  
1671  * NOTE: the way this is currently implemented it is probably only a
1672  * good idea to (1) use it immediately after a newcall (clients only)
1673  * and (2) only use it once.  Other uses currently void your warranty
1674  */
1675 void rx_SetArrivalProc(call, proc, handle, arg)
1676     register struct rx_call *call;
1677     register VOID (*proc)();
1678     register VOID *handle;
1679     register VOID *arg;
1680 {
1681     call->arrivalProc = proc;
1682     call->arrivalProcHandle = handle;
1683     call->arrivalProcArg = arg;
1684 }
1685
1686 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1687  * appropriate, and return the final error code from the conversation
1688  * to the caller */
1689
1690 afs_int32 rx_EndCall(call, rc)
1691     register struct rx_call *call;
1692     afs_int32 rc;
1693 {
1694     register struct rx_connection *conn = call->conn;
1695     register struct rx_service *service;
1696     register struct rx_packet *tp; /* Temporary packet pointer */
1697     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1698     afs_int32 error;
1699     SPLVAR;
1700
1701     dpf(("rx_EndCall(call %x)\n", call));
1702
1703     NETPRI;
1704     AFS_RXGLOCK();
1705     MUTEX_ENTER(&call->lock);
1706
1707     if (rc == 0 && call->error == 0) {
1708         call->abortCode = 0;
1709         call->abortCount = 0;
1710     }
1711
1712     call->arrivalProc = (VOID (*)()) 0;
1713     if (rc && call->error == 0) {
1714         rxi_CallError(call, rc);
1715         /* Send an abort message to the peer if this error code has
1716          * only just been set.  If it was set previously, assume the
1717          * peer has already been sent the error code or will request it 
1718          */
1719         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1720     }
1721     if (conn->type == RX_SERVER_CONNECTION) {
1722         /* Make sure reply or at least dummy reply is sent */
1723         if (call->mode == RX_MODE_RECEIVING) {
1724             rxi_WriteProc(call, 0, 0);
1725         }
1726         if (call->mode == RX_MODE_SENDING) {
1727             rxi_FlushWrite(call);
1728         }
1729         service = conn->service;
1730         rxi_calltrace(RX_CALL_END, call);
1731         /* Call goes to hold state until reply packets are acknowledged */
1732         if (call->tfirst + call->nSoftAcked < call->tnext) {
1733             call->state = RX_STATE_HOLD;
1734         } else {
1735             call->state = RX_STATE_DALLY;
1736             rxi_ClearTransmitQueue(call, 0);
1737             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1738             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1739         }
1740     }
1741     else { /* Client connection */
1742         char dummy;
1743         /* Make sure server receives input packets, in the case where
1744          * no reply arguments are expected */
1745         if ((call->mode == RX_MODE_SENDING)
1746          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1747             (void) rxi_ReadProc(call, &dummy, 1);
1748         }
1749         /* We need to release the call lock since it's lower than the
1750          * conn_call_lock and we don't want to hold the conn_call_lock
1751          * over the rx_ReadProc call. The conn_call_lock needs to be held
1752          * here for the case where rx_NewCall is perusing the calls on
1753          * the connection structure. We don't want to signal until
1754          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1755          * have checked this call, found it active and by the time it
1756          * goes to sleep, will have missed the signal.
1757          */
1758         MUTEX_EXIT(&call->lock);
1759         MUTEX_ENTER(&conn->conn_call_lock);
1760         MUTEX_ENTER(&call->lock);
1761         MUTEX_ENTER(&conn->conn_data_lock);
1762         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1763             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1764             MUTEX_EXIT(&conn->conn_data_lock);
1765 #ifdef  RX_ENABLE_LOCKS
1766             CV_BROADCAST(&conn->conn_call_cv);
1767 #else
1768             osi_rxWakeup(conn);
1769 #endif
1770         }
1771 #ifdef RX_ENABLE_LOCKS
1772         else {
1773             MUTEX_EXIT(&conn->conn_data_lock);
1774         }
1775 #endif /* RX_ENABLE_LOCKS */
1776         call->state = RX_STATE_DALLY;
1777     }
1778     error = call->error;
1779
1780     /* currentPacket, nLeft, and NFree must be zeroed here, because
1781      * ResetCall cannot: ResetCall may be called at splnet(), in the
1782      * kernel version, and may interrupt the macros rx_Read or
1783      * rx_Write, which run at normal priority for efficiency. */
1784     if (call->currentPacket) {
1785         rxi_FreePacket(call->currentPacket);
1786         call->currentPacket = (struct rx_packet *) 0;
1787         call->nLeft = call->nFree = call->curlen = 0;
1788     }
1789     else
1790         call->nLeft = call->nFree = call->curlen = 0;
1791
1792     /* Free any packets from the last call to ReadvProc/WritevProc */
1793     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1794         queue_Remove(tp);
1795         rxi_FreePacket(tp);
1796     }
1797
1798     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1799     MUTEX_EXIT(&call->lock);
1800     if (conn->type == RX_CLIENT_CONNECTION)
1801         MUTEX_EXIT(&conn->conn_call_lock);
1802     AFS_RXGUNLOCK();
1803     USERPRI;
1804     /*
1805      * Map errors to the local host's errno.h format.
1806      */
1807     error = ntoh_syserr_conv(error);
1808     return error;
1809 }
1810
1811 #if !defined(KERNEL)
1812
1813 /* Call this routine when shutting down a server or client (especially
1814  * clients).  This will allow Rx to gracefully garbage collect server
1815  * connections, and reduce the number of retries that a server might
1816  * make to a dead client.
1817  * This is not quite right, since some calls may still be ongoing and
1818  * we can't lock them to destroy them. */
1819 void rx_Finalize() {
1820     register struct rx_connection **conn_ptr, **conn_end;
1821
1822     INIT_PTHREAD_LOCKS
1823     LOCK_RX_INIT
1824     if (rxinit_status == 1) {
1825         UNLOCK_RX_INIT
1826         return; /* Already shutdown. */
1827     }
1828     rxi_DeleteCachedConnections();
1829     if (rx_connHashTable) {
1830         MUTEX_ENTER(&rx_connHashTable_lock);
1831         for (conn_ptr = &rx_connHashTable[0], 
1832              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1833              conn_ptr < conn_end; conn_ptr++) {
1834             struct rx_connection *conn, *next;
1835             for (conn = *conn_ptr; conn; conn = next) {
1836                 next = conn->next;
1837                 if (conn->type == RX_CLIENT_CONNECTION) {
1838                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1839                     conn->refCount++;
1840                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1841 #ifdef RX_ENABLE_LOCKS
1842                     rxi_DestroyConnectionNoLock(conn);
1843 #else /* RX_ENABLE_LOCKS */
1844                     rxi_DestroyConnection(conn);
1845 #endif /* RX_ENABLE_LOCKS */
1846                 }
1847             }
1848         }
1849 #ifdef RX_ENABLE_LOCKS
1850         while (rx_connCleanup_list) {
1851             struct rx_connection *conn;
1852             conn = rx_connCleanup_list;
1853             rx_connCleanup_list = rx_connCleanup_list->next;
1854             MUTEX_EXIT(&rx_connHashTable_lock);
1855             rxi_CleanupConnection(conn);
1856             MUTEX_ENTER(&rx_connHashTable_lock);
1857         }
1858         MUTEX_EXIT(&rx_connHashTable_lock);
1859 #endif /* RX_ENABLE_LOCKS */
1860     }
1861     rxi_flushtrace();
1862
1863     rxinit_status = 1;
1864     UNLOCK_RX_INIT
1865 }
1866 #endif
1867
1868 /* if we wakeup packet waiter too often, can get in loop with two
1869     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1870 void
1871 rxi_PacketsUnWait() {
1872
1873     if (!rx_waitingForPackets) {
1874         return;
1875     }
1876 #ifdef KERNEL
1877     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1878         return;                                     /* still over quota */
1879     }
1880 #endif /* KERNEL */
1881     rx_waitingForPackets = 0;
1882 #ifdef  RX_ENABLE_LOCKS
1883     CV_BROADCAST(&rx_waitingForPackets_cv);
1884 #else
1885     osi_rxWakeup(&rx_waitingForPackets);
1886 #endif
1887     return;
1888 }
1889
1890
1891 /* ------------------Internal interfaces------------------------- */
1892
1893 /* Return this process's service structure for the
1894  * specified socket and service */
1895 struct rx_service *rxi_FindService(socket, serviceId)
1896     register osi_socket socket;
1897     register u_short serviceId;
1898 {
1899     register struct rx_service **sp;    
1900     for (sp = &rx_services[0]; *sp; sp++) {
1901         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1902           return *sp;
1903     }
1904     return 0;
1905 }
1906
1907 /* Allocate a call structure, for the indicated channel of the
1908  * supplied connection.  The mode and state of the call must be set by
1909  * the caller. */
1910 struct rx_call *rxi_NewCall(conn, channel)
1911     register struct rx_connection *conn;
1912     register int channel;
1913 {
1914     register struct rx_call *call;
1915 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1916     register struct rx_call *cp;        /* Call pointer temp */
1917     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1918 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1919
1920     /* Grab an existing call structure, or allocate a new one.
1921      * Existing call structures are assumed to have been left reset by
1922      * rxi_FreeCall */
1923     MUTEX_ENTER(&rx_freeCallQueue_lock);
1924
1925 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1926     /*
1927      * EXCEPT that the TQ might not yet be cleared out.
1928      * Skip over those with in-use TQs.
1929      */
1930     call = NULL;
1931     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1932         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1933             call = cp;
1934             break;
1935         }
1936     }
1937     if (call) {
1938 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1939     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1940         call = queue_First(&rx_freeCallQueue, rx_call);
1941 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1942         queue_Remove(call);
1943         MUTEX_ENTER(&rx_stats_mutex);
1944         rx_stats.nFreeCallStructs--;
1945         MUTEX_EXIT(&rx_stats_mutex);
1946         MUTEX_EXIT(&rx_freeCallQueue_lock);
1947         MUTEX_ENTER(&call->lock);
1948         CLEAR_CALL_QUEUE_LOCK(call);
1949 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1950         /* Now, if TQ wasn't cleared earlier, do it now. */
1951         if (call->flags & RX_CALL_TQ_CLEARME) {
1952             rxi_ClearTransmitQueue(call, 0);
1953             queue_Init(&call->tq);
1954         }
1955 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1956         /* Bind the call to its connection structure */
1957         call->conn = conn;
1958         rxi_ResetCall(call, 1);
1959     }
1960     else {
1961         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1962
1963         MUTEX_EXIT(&rx_freeCallQueue_lock);
1964         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1965         MUTEX_ENTER(&call->lock);
1966         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1967         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1968         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1969
1970         MUTEX_ENTER(&rx_stats_mutex);
1971         rx_stats.nCallStructs++;
1972         MUTEX_EXIT(&rx_stats_mutex);
1973         /* Initialize once-only items */
1974         queue_Init(&call->tq);
1975         queue_Init(&call->rq);
1976         queue_Init(&call->iovq);
1977         /* Bind the call to its connection structure (prereq for reset) */
1978         call->conn = conn;
1979         rxi_ResetCall(call, 1);
1980     }
1981     call->channel = channel;
1982     call->callNumber = &conn->callNumber[channel];
1983     /* Note that the next expected call number is retained (in
1984      * conn->callNumber[i]), even if we reallocate the call structure
1985      */
1986     conn->call[channel] = call;
1987     /* if the channel's never been used (== 0), we should start at 1, otherwise
1988         the call number is valid from the last time this channel was used */
1989     if (*call->callNumber == 0) *call->callNumber = 1;
1990
1991     MUTEX_EXIT(&call->lock);
1992     return call;
1993 }
1994
1995 /* A call has been inactive long enough that so we can throw away
1996  * state, including the call structure, which is placed on the call
1997  * free list.
1998  * Call is locked upon entry.
1999  */
2000 #ifdef RX_ENABLE_LOCKS
2001 void rxi_FreeCall(call, haveCTLock)
2002     int haveCTLock; /* Set if called from rxi_ReapConnections */
2003 #else /* RX_ENABLE_LOCKS */
2004 void rxi_FreeCall(call)
2005 #endif /* RX_ENABLE_LOCKS */
2006     register struct rx_call *call;
2007 {
2008     register int channel = call->channel;
2009     register struct rx_connection *conn = call->conn;
2010
2011
2012     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2013       (*call->callNumber)++;
2014     rxi_ResetCall(call, 0);
2015     call->conn->call[channel] = (struct rx_call *) 0;
2016
2017     MUTEX_ENTER(&rx_freeCallQueue_lock);
2018     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2019 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2020     /* A call may be free even though its transmit queue is still in use.
2021      * Since we search the call list from head to tail, put busy calls at
2022      * the head of the list, and idle calls at the tail.
2023      */
2024     if (call->flags & RX_CALL_TQ_BUSY)
2025         queue_Prepend(&rx_freeCallQueue, call);
2026     else
2027         queue_Append(&rx_freeCallQueue, call);
2028 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2029     queue_Append(&rx_freeCallQueue, call);
2030 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2031     MUTEX_ENTER(&rx_stats_mutex);
2032     rx_stats.nFreeCallStructs++;
2033     MUTEX_EXIT(&rx_stats_mutex);
2034
2035     MUTEX_EXIT(&rx_freeCallQueue_lock);
2036  
2037     /* Destroy the connection if it was previously slated for
2038      * destruction, i.e. the Rx client code previously called
2039      * rx_DestroyConnection (client connections), or
2040      * rxi_ReapConnections called the same routine (server
2041      * connections).  Only do this, however, if there are no
2042      * outstanding calls. Note that for fine grain locking, there appears
2043      * to be a deadlock in that rxi_FreeCall has a call locked and
2044      * DestroyConnectionNoLock locks each call in the conn. But note a
2045      * few lines up where we have removed this call from the conn.
2046      * If someone else destroys a connection, they either have no
2047      * call lock held or are going through this section of code.
2048      */
2049     if (conn->flags & RX_CONN_DESTROY_ME) {
2050         MUTEX_ENTER(&conn->conn_data_lock);
2051         conn->refCount++;
2052         MUTEX_EXIT(&conn->conn_data_lock);
2053 #ifdef RX_ENABLE_LOCKS
2054         if (haveCTLock)
2055             rxi_DestroyConnectionNoLock(conn);
2056         else
2057             rxi_DestroyConnection(conn);
2058 #else /* RX_ENABLE_LOCKS */
2059         rxi_DestroyConnection(conn);
2060 #endif /* RX_ENABLE_LOCKS */
2061     }
2062 }
2063
2064 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2065 char *rxi_Alloc(size)
2066 register size_t size;
2067 {
2068     register char *p;
2069
2070 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2071     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2072      * implementation.
2073      */
2074     int glockOwner = ISAFS_GLOCK();
2075     if (!glockOwner)
2076         AFS_GLOCK();
2077 #endif
2078     MUTEX_ENTER(&rx_stats_mutex);
2079     rxi_Alloccnt++; rxi_Allocsize += size;
2080     MUTEX_EXIT(&rx_stats_mutex);
2081 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2082     if (size > AFS_SMALLOCSIZ) {
2083         p = (char *) osi_AllocMediumSpace(size);
2084     } else
2085         p = (char *) osi_AllocSmall(size, 1);
2086 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2087     if (!glockOwner)
2088         AFS_GUNLOCK();
2089 #endif
2090 #else
2091     p = (char *) osi_Alloc(size);
2092 #endif
2093     if (!p) osi_Panic("rxi_Alloc error");
2094     bzero(p, size);
2095     return p;
2096 }
2097
2098 void rxi_Free(addr, size)
2099 void *addr;
2100 register size_t size;
2101 {
2102 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2103     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2104      * implementation.
2105      */
2106     int glockOwner = ISAFS_GLOCK();
2107     if (!glockOwner)
2108         AFS_GLOCK();
2109 #endif
2110     MUTEX_ENTER(&rx_stats_mutex);
2111     rxi_Alloccnt--; rxi_Allocsize -= size;
2112     MUTEX_EXIT(&rx_stats_mutex);
2113 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2114     if (size > AFS_SMALLOCSIZ)
2115         osi_FreeMediumSpace(addr);
2116     else
2117         osi_FreeSmall(addr);
2118 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2119     if (!glockOwner)
2120         AFS_GUNLOCK();
2121 #endif
2122 #else
2123     osi_Free(addr, size);
2124 #endif    
2125 }
2126
2127 /* Find the peer process represented by the supplied (host,port)
2128  * combination.  If there is no appropriate active peer structure, a
2129  * new one will be allocated and initialized 
2130  * The origPeer, if set, is a pointer to a peer structure on which the
2131  * refcount will be be decremented. This is used to replace the peer
2132  * structure hanging off a connection structure */
2133 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2134     register afs_uint32 host;
2135     register u_short port;
2136     struct rx_peer *origPeer;
2137     int create;
2138 {
2139     register struct rx_peer *pp;
2140     int hashIndex;
2141     hashIndex = PEER_HASH(host, port);
2142     MUTEX_ENTER(&rx_peerHashTable_lock);
2143     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2144         if ((pp->host == host) && (pp->port == port)) break;
2145     }
2146     if (!pp) {
2147         if (create) {
2148             pp = rxi_AllocPeer(); /* This bzero's *pp */
2149             pp->host = host;      /* set here or in InitPeerParams is zero */
2150             pp->port = port;
2151             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2152             queue_Init(&pp->congestionQueue);
2153             queue_Init(&pp->rpcStats);
2154             pp->next = rx_peerHashTable[hashIndex];
2155             rx_peerHashTable[hashIndex] = pp;
2156             rxi_InitPeerParams(pp);
2157             MUTEX_ENTER(&rx_stats_mutex);
2158             rx_stats.nPeerStructs++;
2159             MUTEX_EXIT(&rx_stats_mutex);
2160         }
2161     }
2162     if (pp && create) {
2163         pp->refCount++;
2164     }
2165     if ( origPeer)
2166         origPeer->refCount--;
2167     MUTEX_EXIT(&rx_peerHashTable_lock);
2168     return pp;
2169 }
2170
2171
2172 /* Find the connection at (host, port) started at epoch, and with the
2173  * given connection id.  Creates the server connection if necessary.
2174  * The type specifies whether a client connection or a server
2175  * connection is desired.  In both cases, (host, port) specify the
2176  * peer's (host, pair) pair.  Client connections are not made
2177  * automatically by this routine.  The parameter socket gives the
2178  * socket descriptor on which the packet was received.  This is used,
2179  * in the case of server connections, to check that *new* connections
2180  * come via a valid (port, serviceId).  Finally, the securityIndex
2181  * parameter must match the existing index for the connection.  If a
2182  * server connection is created, it will be created using the supplied
2183  * index, if the index is valid for this service */
2184 struct rx_connection *
2185 rxi_FindConnection(socket, host, port, serviceId, cid, 
2186                    epoch, type, securityIndex)
2187     osi_socket socket;
2188     register afs_int32 host;
2189     register u_short port;
2190     u_short serviceId;
2191     afs_uint32 cid;
2192     afs_uint32 epoch;
2193     int type;
2194     u_int securityIndex;
2195 {
2196     int hashindex, flag;
2197     register struct rx_connection *conn;
2198     struct rx_peer *peer;
2199     hashindex = CONN_HASH(host, port, cid, epoch, type);
2200     MUTEX_ENTER(&rx_connHashTable_lock);
2201     rxLastConn ? (conn = rxLastConn, flag = 0) :
2202                  (conn = rx_connHashTable[hashindex], flag = 1);
2203     for (; conn; ) {
2204       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2205           && (epoch == conn->epoch)) {
2206         register struct rx_peer *pp = conn->peer;
2207         if (securityIndex != conn->securityIndex) {
2208             /* this isn't supposed to happen, but someone could forge a packet
2209                like this, and there seems to be some CM bug that makes this
2210                happen from time to time -- in which case, the fileserver
2211                asserts. */  
2212             MUTEX_EXIT(&rx_connHashTable_lock);
2213             return (struct rx_connection *) 0;
2214         }
2215         /* epoch's high order bits mean route for security reasons only on
2216          * the cid, not the host and port fields.
2217          */
2218         if (conn->epoch & 0x80000000) break;
2219         if (((type == RX_CLIENT_CONNECTION) 
2220              || (pp->host == host)) && (pp->port == port))
2221           break;
2222       }
2223       if ( !flag )
2224       {
2225         /* the connection rxLastConn that was used the last time is not the
2226         ** one we are looking for now. Hence, start searching in the hash */
2227         flag = 1;
2228         conn = rx_connHashTable[hashindex];
2229       }
2230       else
2231         conn = conn->next;
2232     }
2233     if (!conn) {
2234         struct rx_service *service;
2235         if (type == RX_CLIENT_CONNECTION) {
2236             MUTEX_EXIT(&rx_connHashTable_lock);
2237             return (struct rx_connection *) 0;
2238         }
2239         service = rxi_FindService(socket, serviceId);
2240         if (!service || (securityIndex >= service->nSecurityObjects) 
2241             || (service->securityObjects[securityIndex] == 0)) {
2242             MUTEX_EXIT(&rx_connHashTable_lock);
2243             return (struct rx_connection *) 0;
2244         }
2245         conn = rxi_AllocConnection(); /* This bzero's the connection */
2246         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2247                    MUTEX_DEFAULT,0);
2248         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2249                    MUTEX_DEFAULT,0);
2250         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2251         conn->next = rx_connHashTable[hashindex];
2252         rx_connHashTable[hashindex] = conn;
2253         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2254         conn->type = RX_SERVER_CONNECTION;
2255         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2256         conn->epoch = epoch;
2257         conn->cid = cid & RX_CIDMASK;
2258         /* conn->serial = conn->lastSerial = 0; */
2259         /* conn->timeout = 0; */
2260         conn->ackRate = RX_FAST_ACK_RATE;
2261         conn->service = service;
2262         conn->serviceId = serviceId;
2263         conn->securityIndex = securityIndex;
2264         conn->securityObject = service->securityObjects[securityIndex];
2265         conn->nSpecific = 0;
2266         conn->specific = NULL;
2267         rx_SetConnDeadTime(conn, service->connDeadTime);
2268         /* Notify security object of the new connection */
2269         RXS_NewConnection(conn->securityObject, conn);
2270         /* XXXX Connection timeout? */
2271         if (service->newConnProc) (*service->newConnProc)(conn);
2272         MUTEX_ENTER(&rx_stats_mutex);
2273         rx_stats.nServerConns++;
2274         MUTEX_EXIT(&rx_stats_mutex);
2275     }
2276     else
2277     {
2278     /* Ensure that the peer structure is set up in such a way that
2279     ** replies in this connection go back to that remote interface
2280     ** from which the last packet was sent out. In case, this packet's
2281     ** source IP address does not match the peer struct for this conn,
2282     ** then drop the refCount on conn->peer and get a new peer structure.
2283     ** We can check the host,port field in the peer structure without the
2284     ** rx_peerHashTable_lock because the peer structure has its refCount
2285     ** incremented and the only time the host,port in the peer struct gets
2286     ** updated is when the peer structure is created.
2287     */
2288         if (conn->peer->host == host )
2289                 peer = conn->peer; /* no change to the peer structure */
2290         else
2291                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2292     }
2293
2294     MUTEX_ENTER(&conn->conn_data_lock);
2295     conn->refCount++;
2296     conn->peer = peer;
2297     MUTEX_EXIT(&conn->conn_data_lock);
2298
2299     rxLastConn = conn;  /* store this connection as the last conn used */
2300     MUTEX_EXIT(&rx_connHashTable_lock);
2301     return conn;
2302 }
2303
2304 /* There are two packet tracing routines available for testing and monitoring
2305  * Rx.  One is called just after every packet is received and the other is
2306  * called just before every packet is sent.  Received packets, have had their
2307  * headers decoded, and packets to be sent have not yet had their headers
2308  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2309  * containing the network address.  Both can be modified.  The return value, if
2310  * non-zero, indicates that the packet should be dropped.  */
2311
2312 int (*rx_justReceived)() = 0;
2313 int (*rx_almostSent)() = 0;
2314
2315 /* A packet has been received off the interface.  Np is the packet, socket is
2316  * the socket number it was received from (useful in determining which service
2317  * this packet corresponds to), and (host, port) reflect the host,port of the
2318  * sender.  This call returns the packet to the caller if it is finished with
2319  * it, rather than de-allocating it, just as a small performance hack */
2320
2321 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2322     register struct rx_packet *np;
2323     osi_socket socket;
2324     afs_uint32 host;
2325     u_short port;
2326     int *tnop;
2327     struct rx_call **newcallp;
2328 {
2329     register struct rx_call *call;
2330     register struct rx_connection *conn;
2331     int channel;
2332     afs_uint32 currentCallNumber;
2333     int type;
2334     int skew;
2335 #ifdef RXDEBUG
2336     char *packetType;
2337 #endif
2338     struct rx_packet *tnp;
2339
2340 #ifdef RXDEBUG
2341 /* We don't print out the packet until now because (1) the time may not be
2342  * accurate enough until now in the lwp implementation (rx_Listener only gets
2343  * the time after the packet is read) and (2) from a protocol point of view,
2344  * this is the first time the packet has been seen */
2345     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2346         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2347     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2348          np->header.serial, packetType, host, port, np->header.serviceId,
2349          np->header.epoch, np->header.cid, np->header.callNumber, 
2350          np->header.seq, np->header.flags, np));
2351 #endif
2352
2353     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2354       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2355     }
2356
2357     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2358         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2359     }
2360 #ifdef RXDEBUG
2361     /* If an input tracer function is defined, call it with the packet and
2362      * network address.  Note this function may modify its arguments. */
2363     if (rx_justReceived) {
2364         struct sockaddr_in addr;
2365         int drop;
2366         addr.sin_family = AF_INET;
2367         addr.sin_port = port;
2368         addr.sin_addr.s_addr = host;
2369 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2370         addr.sin_len = sizeof(addr);
2371 #endif  /* AFS_OSF_ENV */
2372         drop = (*rx_justReceived) (np, &addr);
2373         /* drop packet if return value is non-zero */
2374         if (drop) return np;
2375         port = addr.sin_port;           /* in case fcn changed addr */
2376         host = addr.sin_addr.s_addr;
2377     }
2378 #endif
2379
2380     /* If packet was not sent by the client, then *we* must be the client */
2381     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2382         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2383
2384     /* Find the connection (or fabricate one, if we're the server & if
2385      * necessary) associated with this packet */
2386     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2387                               np->header.cid, np->header.epoch, type, 
2388                               np->header.securityIndex);
2389
2390     if (!conn) {
2391       /* If no connection found or fabricated, just ignore the packet.
2392        * (An argument could be made for sending an abort packet for
2393        * the conn) */
2394       return np;
2395     }   
2396
2397     MUTEX_ENTER(&conn->conn_data_lock);
2398     if (conn->maxSerial < np->header.serial)
2399       conn->maxSerial = np->header.serial;
2400     MUTEX_EXIT(&conn->conn_data_lock);
2401
2402     /* If the connection is in an error state, send an abort packet and ignore
2403      * the incoming packet */
2404     if (conn->error) {
2405         /* Don't respond to an abort packet--we don't want loops! */
2406         MUTEX_ENTER(&conn->conn_data_lock);
2407         if (np->header.type != RX_PACKET_TYPE_ABORT)
2408             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2409         conn->refCount--;
2410         MUTEX_EXIT(&conn->conn_data_lock);
2411         return np;
2412     }
2413
2414     /* Check for connection-only requests (i.e. not call specific). */
2415     if (np->header.callNumber == 0) {
2416         switch (np->header.type) {
2417             case RX_PACKET_TYPE_ABORT:
2418                 /* What if the supplied error is zero? */
2419                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2420                 MUTEX_ENTER(&conn->conn_data_lock);
2421                 conn->refCount--;
2422                 MUTEX_EXIT(&conn->conn_data_lock);
2423                 return np;
2424             case RX_PACKET_TYPE_CHALLENGE:
2425                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2426                 MUTEX_ENTER(&conn->conn_data_lock);
2427                 conn->refCount--;
2428                 MUTEX_EXIT(&conn->conn_data_lock);
2429                 return tnp;
2430             case RX_PACKET_TYPE_RESPONSE:
2431                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2432                 MUTEX_ENTER(&conn->conn_data_lock);
2433                 conn->refCount--;
2434                 MUTEX_EXIT(&conn->conn_data_lock);
2435                 return tnp;
2436             case RX_PACKET_TYPE_PARAMS:
2437             case RX_PACKET_TYPE_PARAMS+1:
2438             case RX_PACKET_TYPE_PARAMS+2:
2439                 /* ignore these packet types for now */
2440                 MUTEX_ENTER(&conn->conn_data_lock);
2441                 conn->refCount--;
2442                 MUTEX_EXIT(&conn->conn_data_lock);
2443                 return np;
2444
2445
2446             default:
2447                 /* Should not reach here, unless the peer is broken: send an
2448                  * abort packet */
2449                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2450                 MUTEX_ENTER(&conn->conn_data_lock);
2451                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2452                 conn->refCount--;
2453                 MUTEX_EXIT(&conn->conn_data_lock);
2454                 return tnp;
2455         }
2456     }
2457
2458     channel = np->header.cid & RX_CHANNELMASK;
2459     call = conn->call[channel];
2460 #ifdef  RX_ENABLE_LOCKS
2461     if (call)
2462         MUTEX_ENTER(&call->lock);
2463     /* Test to see if call struct is still attached to conn. */
2464     if (call != conn->call[channel]) {
2465         if (call)
2466             MUTEX_EXIT(&call->lock);
2467         if (type == RX_SERVER_CONNECTION) {
2468             call = conn->call[channel];
2469             /* If we started with no call attached and there is one now,
2470              * another thread is also running this routine and has gotten
2471              * the connection channel. We should drop this packet in the tests
2472              * below. If there was a call on this connection and it's now
2473              * gone, then we'll be making a new call below.
2474              * If there was previously a call and it's now different then
2475              * the old call was freed and another thread running this routine
2476              * has created a call on this channel. One of these two threads
2477              * has a packet for the old call and the code below handles those
2478              * cases.
2479              */
2480             if (call)
2481                 MUTEX_ENTER(&call->lock);
2482         }
2483         else {
2484             /* This packet can't be for this call. If the new call address is
2485              * 0 then no call is running on this channel. If there is a call
2486              * then, since this is a client connection we're getting data for
2487              * it must be for the previous call.
2488              */
2489             MUTEX_ENTER(&rx_stats_mutex);
2490             rx_stats.spuriousPacketsRead++;
2491             MUTEX_EXIT(&rx_stats_mutex);
2492             MUTEX_ENTER(&conn->conn_data_lock);
2493             conn->refCount--;
2494             MUTEX_EXIT(&conn->conn_data_lock);
2495             return np;
2496         }
2497     }
2498 #endif
2499     currentCallNumber = conn->callNumber[channel];
2500
2501     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2502         if (np->header.callNumber < currentCallNumber) {
2503             MUTEX_ENTER(&rx_stats_mutex);
2504             rx_stats.spuriousPacketsRead++;
2505             MUTEX_EXIT(&rx_stats_mutex);
2506 #ifdef  RX_ENABLE_LOCKS
2507             if (call)
2508                 MUTEX_EXIT(&call->lock);
2509 #endif
2510             MUTEX_ENTER(&conn->conn_data_lock);
2511             conn->refCount--;
2512             MUTEX_EXIT(&conn->conn_data_lock);
2513             return np;
2514         }
2515         if (!call) {
2516             call = rxi_NewCall(conn, channel);
2517             MUTEX_ENTER(&call->lock);
2518             *call->callNumber = np->header.callNumber;
2519             call->state = RX_STATE_PRECALL;
2520             clock_GetTime(&call->queueTime);
2521             hzero(call->bytesSent);
2522             hzero(call->bytesRcvd);
2523             rxi_KeepAliveOn(call);
2524         }
2525         else if (np->header.callNumber != currentCallNumber) {
2526             /* Wait until the transmit queue is idle before deciding
2527              * whether to reset the current call. Chances are that the
2528              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2529              * flag is cleared.
2530              */
2531 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2532             while ((call->state == RX_STATE_ACTIVE) &&
2533                    (call->flags & RX_CALL_TQ_BUSY)) {
2534                 call->flags |= RX_CALL_TQ_WAIT;
2535 #ifdef RX_ENABLE_LOCKS
2536                 CV_WAIT(&call->cv_tq, &call->lock);
2537 #else /* RX_ENABLE_LOCKS */
2538                 osi_rxSleep(&call->tq);
2539 #endif /* RX_ENABLE_LOCKS */
2540             }
2541 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2542             /* If the new call cannot be taken right now send a busy and set
2543              * the error condition in this call, so that it terminates as
2544              * quickly as possible */
2545             if (call->state == RX_STATE_ACTIVE) {
2546                 struct rx_packet *tp;
2547
2548                 rxi_CallError(call, RX_CALL_DEAD);
2549                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2550                 MUTEX_EXIT(&call->lock);
2551                 MUTEX_ENTER(&conn->conn_data_lock);
2552                 conn->refCount--;
2553                 MUTEX_EXIT(&conn->conn_data_lock);
2554                 return tp;
2555             }
2556             rxi_ResetCall(call, 0);
2557             *call->callNumber = np->header.callNumber;
2558             call->state = RX_STATE_PRECALL;
2559             clock_GetTime(&call->queueTime);
2560             hzero(call->bytesSent);
2561             hzero(call->bytesRcvd);
2562             /*
2563              * If the number of queued calls exceeds the overload
2564              * threshold then abort this call.
2565              */
2566             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2567                 struct rx_packet *tp;
2568
2569                 rxi_CallError(call, rx_BusyError);
2570                 tp = rxi_SendCallAbort(call, np, 1, 0);
2571                 MUTEX_EXIT(&call->lock);
2572                 MUTEX_ENTER(&conn->conn_data_lock);
2573                 conn->refCount--;
2574                 MUTEX_EXIT(&conn->conn_data_lock);
2575                 return tp;
2576             }
2577             rxi_KeepAliveOn(call);
2578         }
2579         else {
2580             /* Continuing call; do nothing here. */
2581         }
2582     } else { /* we're the client */
2583         /* Ignore all incoming acknowledgements for calls in DALLY state */
2584         if ( call && (call->state == RX_STATE_DALLY) 
2585          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2586             MUTEX_ENTER(&rx_stats_mutex);
2587             rx_stats.ignorePacketDally++;
2588             MUTEX_EXIT(&rx_stats_mutex);
2589 #ifdef  RX_ENABLE_LOCKS
2590             if (call) {
2591                 MUTEX_EXIT(&call->lock);
2592             }
2593 #endif
2594             MUTEX_ENTER(&conn->conn_data_lock);
2595             conn->refCount--;
2596             MUTEX_EXIT(&conn->conn_data_lock);
2597             return np;
2598         }
2599         
2600         /* Ignore anything that's not relevant to the current call.  If there
2601          * isn't a current call, then no packet is relevant. */
2602         if (!call || (np->header.callNumber != currentCallNumber)) {
2603             MUTEX_ENTER(&rx_stats_mutex);
2604             rx_stats.spuriousPacketsRead++;
2605             MUTEX_EXIT(&rx_stats_mutex);
2606 #ifdef  RX_ENABLE_LOCKS
2607             if (call) {
2608                 MUTEX_EXIT(&call->lock);
2609             }
2610 #endif
2611             MUTEX_ENTER(&conn->conn_data_lock);
2612             conn->refCount--;
2613             MUTEX_EXIT(&conn->conn_data_lock);
2614             return np;  
2615         }
2616         /* If the service security object index stamped in the packet does not
2617          * match the connection's security index, ignore the packet */
2618         if (np->header.securityIndex != conn->securityIndex) {
2619 #ifdef  RX_ENABLE_LOCKS
2620             MUTEX_EXIT(&call->lock);
2621 #endif
2622             MUTEX_ENTER(&conn->conn_data_lock);
2623             conn->refCount--;       
2624             MUTEX_EXIT(&conn->conn_data_lock);
2625             return np;
2626         }
2627
2628         /* If we're receiving the response, then all transmit packets are
2629          * implicitly acknowledged.  Get rid of them. */
2630         if (np->header.type == RX_PACKET_TYPE_DATA) {
2631 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2632             /* XXX Hack. Because we must release the global rx lock when
2633              * sending packets (osi_NetSend) we drop all acks while we're
2634              * traversing the tq in rxi_Start sending packets out because
2635              * packets may move to the freePacketQueue as result of being here!
2636              * So we drop these packets until we're safely out of the
2637              * traversing. Really ugly! 
2638              * For fine grain RX locking, we set the acked field in the
2639              * packets and let rxi_Start remove them from the transmit queue.
2640              */
2641             if (call->flags & RX_CALL_TQ_BUSY) {
2642 #ifdef  RX_ENABLE_LOCKS
2643                 rxi_SetAcksInTransmitQueue(call);
2644 #else
2645                 conn->refCount--;
2646                 return np;              /* xmitting; drop packet */
2647 #endif
2648             }
2649             else {
2650                 rxi_ClearTransmitQueue(call, 0);
2651             }
2652 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2653             rxi_ClearTransmitQueue(call, 0);
2654 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2655         } else {
2656           if (np->header.type == RX_PACKET_TYPE_ACK) {
2657         /* now check to see if this is an ack packet acknowledging that the
2658          * server actually *lost* some hard-acked data.  If this happens we
2659          * ignore this packet, as it may indicate that the server restarted in
2660          * the middle of a call.  It is also possible that this is an old ack
2661          * packet.  We don't abort the connection in this case, because this
2662          * *might* just be an old ack packet.  The right way to detect a server
2663          * restart in the midst of a call is to notice that the server epoch
2664          * changed, btw.  */
2665         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2666          * XXX unacknowledged.  I think that this is off-by-one, but
2667          * XXX I don't dare change it just yet, since it will
2668          * XXX interact badly with the server-restart detection 
2669          * XXX code in receiveackpacket.  */
2670             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2671                 MUTEX_ENTER(&rx_stats_mutex);
2672                 rx_stats.spuriousPacketsRead++;
2673                 MUTEX_EXIT(&rx_stats_mutex);
2674                 MUTEX_EXIT(&call->lock);
2675                 MUTEX_ENTER(&conn->conn_data_lock);
2676                 conn->refCount--;
2677                 MUTEX_EXIT(&conn->conn_data_lock);
2678                 return np;
2679             }
2680           }
2681         } /* else not a data packet */
2682     }
2683
2684     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2685     /* Set remote user defined status from packet */
2686     call->remoteStatus = np->header.userStatus;
2687
2688     /* Note the gap between the expected next packet and the actual
2689      * packet that arrived, when the new packet has a smaller serial number
2690      * than expected.  Rioses frequently reorder packets all by themselves,
2691      * so this will be quite important with very large window sizes.
2692      * Skew is checked against 0 here to avoid any dependence on the type of
2693      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2694      * true! 
2695      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2696      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2697      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2698      */
2699     MUTEX_ENTER(&conn->conn_data_lock);
2700     skew = conn->lastSerial - np->header.serial;
2701     conn->lastSerial = np->header.serial;
2702     MUTEX_EXIT(&conn->conn_data_lock);
2703     if (skew > 0) {
2704       register struct rx_peer *peer;
2705       peer = conn->peer;
2706       if (skew > peer->inPacketSkew) {
2707         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2708         peer->inPacketSkew = skew;
2709       }
2710     }
2711
2712     /* Now do packet type-specific processing */
2713     switch (np->header.type) {
2714         case RX_PACKET_TYPE_DATA:
2715             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2716                                        tnop, newcallp);
2717             break;
2718         case RX_PACKET_TYPE_ACK:
2719             /* Respond immediately to ack packets requesting acknowledgement
2720              * (ping packets) */
2721             if (np->header.flags & RX_REQUEST_ACK) {
2722                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2723                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2724             }
2725             np = rxi_ReceiveAckPacket(call, np, 1);
2726             break;
2727         case RX_PACKET_TYPE_ABORT:
2728             /* An abort packet: reset the connection, passing the error up to
2729              * the user */
2730             /* What if error is zero? */
2731             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2732             break;
2733         case RX_PACKET_TYPE_BUSY:
2734             /* XXXX */
2735             break;
2736         case RX_PACKET_TYPE_ACKALL:
2737             /* All packets acknowledged, so we can drop all packets previously
2738              * readied for sending */
2739 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2740             /* XXX Hack. We because we can't release the global rx lock when
2741              * sending packets (osi_NetSend) we drop all ack pkts while we're
2742              * traversing the tq in rxi_Start sending packets out because
2743              * packets may move to the freePacketQueue as result of being
2744              * here! So we drop these packets until we're safely out of the
2745              * traversing. Really ugly! 
2746              * For fine grain RX locking, we set the acked field in the packets
2747              * and let rxi_Start remove the packets from the transmit queue.
2748              */
2749             if (call->flags & RX_CALL_TQ_BUSY) {
2750 #ifdef  RX_ENABLE_LOCKS
2751                 rxi_SetAcksInTransmitQueue(call);
2752                 break;
2753 #else /* RX_ENABLE_LOCKS */
2754                 conn->refCount--;
2755                 return np;              /* xmitting; drop packet */
2756 #endif /* RX_ENABLE_LOCKS */
2757             }
2758 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2759             rxi_ClearTransmitQueue(call, 0);
2760             break;
2761         default:
2762             /* Should not reach here, unless the peer is broken: send an abort
2763              * packet */
2764             rxi_CallError(call, RX_PROTOCOL_ERROR);
2765             np = rxi_SendCallAbort(call, np, 1, 0);
2766             break;
2767     };
2768     /* Note when this last legitimate packet was received, for keep-alive
2769      * processing.  Note, we delay getting the time until now in the hope that
2770      * the packet will be delivered to the user before any get time is required
2771      * (if not, then the time won't actually be re-evaluated here). */
2772     call->lastReceiveTime = clock_Sec();
2773     MUTEX_EXIT(&call->lock);
2774     MUTEX_ENTER(&conn->conn_data_lock);
2775     conn->refCount--;
2776     MUTEX_EXIT(&conn->conn_data_lock);
2777     return np;
2778 }
2779
2780 /* return true if this is an "interesting" connection from the point of view
2781     of someone trying to debug the system */
2782 int rxi_IsConnInteresting(struct rx_connection *aconn)
2783 {
2784     register int i;
2785     register struct rx_call *tcall;
2786
2787     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2788         return 1;
2789     for(i=0;i<RX_MAXCALLS;i++) {
2790         tcall = aconn->call[i];
2791         if (tcall) {
2792             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2793                 return 1;
2794             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2795                 return 1;
2796         }
2797     }
2798     return 0;
2799 }
2800
2801 #ifdef KERNEL
2802 /* if this is one of the last few packets AND it wouldn't be used by the
2803    receiving call to immediately satisfy a read request, then drop it on
2804    the floor, since accepting it might prevent a lock-holding thread from
2805    making progress in its reading. If a call has been cleared while in
2806    the precall state then ignore all subsequent packets until the call
2807    is assigned to a thread. */
2808
2809 static TooLow(ap, acall)
2810   struct rx_call *acall;
2811   struct rx_packet *ap; {
2812     int rc=0;
2813     MUTEX_ENTER(&rx_stats_mutex);
2814     if (((ap->header.seq != 1) &&
2815          (acall->flags & RX_CALL_CLEARED) &&
2816          (acall->state == RX_STATE_PRECALL)) ||
2817         ((rx_nFreePackets < rxi_dataQuota+2) &&
2818          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2819            && (acall->flags & RX_CALL_READER_WAIT)))) {
2820         rc = 1;
2821     }
2822     MUTEX_EXIT(&rx_stats_mutex);
2823     return rc;
2824 }
2825 #endif /* KERNEL */
2826
2827 /* try to attach call, if authentication is complete */
2828 static void TryAttach(acall, socket, tnop, newcallp)
2829 register struct rx_call *acall;
2830 register osi_socket socket;
2831 register int *tnop;
2832 register struct rx_call **newcallp; {
2833     register struct rx_connection *conn;
2834     conn = acall->conn;
2835     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2836         /* Don't attach until we have any req'd. authentication. */
2837         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2838             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2839             /* Note:  this does not necessarily succeed; there
2840                may not any proc available */
2841         }
2842         else {
2843             rxi_ChallengeOn(acall->conn);
2844         }
2845     }
2846 }
2847
2848 /* A data packet has been received off the interface.  This packet is
2849  * appropriate to the call (the call is in the right state, etc.).  This
2850  * routine can return a packet to the caller, for re-use */
2851
2852 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2853                                         port, tnop, newcallp)
2854     register struct rx_call *call;
2855     register struct rx_packet *np;
2856     int istack;
2857     osi_socket socket;
2858     afs_uint32 host;
2859     u_short port;
2860     int *tnop;
2861     struct rx_call **newcallp;
2862 {
2863     int ackNeeded = 0;
2864     int newPackets = 0;
2865     int didHardAck = 0;
2866     int haveLast = 0;
2867     afs_uint32 seq, serial, flags;
2868     int isFirst;
2869     struct rx_packet *tnp;
2870     struct clock when;
2871     MUTEX_ENTER(&rx_stats_mutex);
2872     rx_stats.dataPacketsRead++;
2873     MUTEX_EXIT(&rx_stats_mutex);
2874
2875 #ifdef KERNEL
2876     /* If there are no packet buffers, drop this new packet, unless we can find
2877      * packet buffers from inactive calls */
2878     if (!call->error &&
2879         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2880         MUTEX_ENTER(&rx_freePktQ_lock);
2881         rxi_NeedMorePackets = TRUE;
2882         MUTEX_EXIT(&rx_freePktQ_lock);
2883         MUTEX_ENTER(&rx_stats_mutex);
2884         rx_stats.noPacketBuffersOnRead++;
2885         MUTEX_EXIT(&rx_stats_mutex);
2886         call->rprev = np->header.serial;
2887         rxi_calltrace(RX_TRACE_DROP, call);
2888         dpf (("packet %x dropped on receipt - quota problems", np));
2889         if (rxi_doreclaim)
2890             rxi_ClearReceiveQueue(call);
2891         clock_GetTime(&when);
2892         clock_Add(&when, &rx_softAckDelay);
2893         if (!call->delayedAckEvent ||
2894             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2895             rxevent_Cancel(call->delayedAckEvent, call,
2896                            RX_CALL_REFCOUNT_DELAY);
2897             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2898             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2899                                                  call, 0);
2900         }
2901         /* we've damaged this call already, might as well do it in. */
2902         return np;
2903     }
2904 #endif /* KERNEL */
2905
2906     /*
2907      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2908      * packet is one of several packets transmitted as a single
2909      * datagram. Do not send any soft or hard acks until all packets
2910      * in a jumbogram have been processed. Send negative acks right away.
2911      */
2912     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2913         /* tnp is non-null when there are more packets in the
2914          * current jumbo gram */
2915         if (tnp) {
2916             if (np)
2917                 rxi_FreePacket(np);
2918             np = tnp;
2919         }
2920
2921         seq = np->header.seq;
2922         serial = np->header.serial;
2923         flags = np->header.flags;
2924
2925         /* If the call is in an error state, send an abort message */
2926         if (call->error)
2927             return rxi_SendCallAbort(call, np, istack, 0);
2928
2929         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2930          * AFS 3.5 jumbogram. */
2931         if (flags & RX_JUMBO_PACKET) {
2932             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2933         } else {
2934             tnp = NULL;
2935         }
2936
2937         if (np->header.spare != 0) {
2938             MUTEX_ENTER(&call->conn->conn_data_lock);
2939             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2940             MUTEX_EXIT(&call->conn->conn_data_lock);
2941         }
2942
2943         /* The usual case is that this is the expected next packet */
2944         if (seq == call->rnext) {
2945
2946             /* Check to make sure it is not a duplicate of one already queued */
2947             if (queue_IsNotEmpty(&call->rq) 
2948                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2949                 MUTEX_ENTER(&rx_stats_mutex);
2950                 rx_stats.dupPacketsRead++;
2951                 MUTEX_EXIT(&rx_stats_mutex);
2952                 dpf (("packet %x dropped on receipt - duplicate", np));
2953                 rxevent_Cancel(call->delayedAckEvent, call,
2954                                RX_CALL_REFCOUNT_DELAY);
2955                 np = rxi_SendAck(call, np, seq, serial,
2956                                  flags, RX_ACK_DUPLICATE, istack);
2957                 ackNeeded = 0;
2958                 call->rprev = seq;
2959                 continue;
2960             }
2961
2962             /* It's the next packet. Stick it on the receive queue
2963              * for this call. Set newPackets to make sure we wake
2964              * the reader once all packets have been processed */
2965             queue_Prepend(&call->rq, np);
2966             call->nSoftAcks++;
2967             np = NULL; /* We can't use this anymore */
2968             newPackets = 1;
2969
2970             /* If an ack is requested then set a flag to make sure we
2971              * send an acknowledgement for this packet */
2972             if (flags & RX_REQUEST_ACK) {
2973                 ackNeeded = 1;
2974             }
2975
2976             /* Keep track of whether we have received the last packet */
2977             if (flags & RX_LAST_PACKET) {
2978                 call->flags |= RX_CALL_HAVE_LAST;
2979                 haveLast = 1;
2980             }
2981
2982             /* Check whether we have all of the packets for this call */
2983             if (call->flags & RX_CALL_HAVE_LAST) {
2984                 afs_uint32 tseq;                /* temporary sequence number */
2985                 struct rx_packet *tp;   /* Temporary packet pointer */
2986                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2987
2988                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2989                     if (tseq != tp->header.seq)
2990                         break;
2991                     if (tp->header.flags & RX_LAST_PACKET) {
2992                         call->flags |= RX_CALL_RECEIVE_DONE;
2993                         break;
2994                     }
2995                     tseq++;
2996                 }
2997             }
2998
2999             /* Provide asynchronous notification for those who want it
3000              * (e.g. multi rx) */
3001             if (call->arrivalProc) {
3002                 (*call->arrivalProc)(call, call->arrivalProcHandle,
3003                                      call->arrivalProcArg);
3004                 call->arrivalProc = (VOID (*)()) 0;
3005             }
3006
3007             /* Update last packet received */
3008             call->rprev = seq;
3009
3010             /* If there is no server process serving this call, grab
3011              * one, if available. We only need to do this once. If a
3012              * server thread is available, this thread becomes a server
3013              * thread and the server thread becomes a listener thread. */
3014             if (isFirst) {
3015                 TryAttach(call, socket, tnop, newcallp);
3016             }
3017         }       
3018         /* This is not the expected next packet. */
3019         else {
3020             /* Determine whether this is a new or old packet, and if it's
3021              * a new one, whether it fits into the current receive window.
3022              * Also figure out whether the packet was delivered in sequence.
3023              * We use the prev variable to determine whether the new packet
3024              * is the successor of its immediate predecessor in the
3025              * receive queue, and the missing flag to determine whether
3026              * any of this packets predecessors are missing.  */
3027
3028             afs_uint32 prev;            /* "Previous packet" sequence number */
3029             struct rx_packet *tp;       /* Temporary packet pointer */
3030             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3031             int missing;                /* Are any predecessors missing? */
3032
3033             /* If the new packet's sequence number has been sent to the
3034              * application already, then this is a duplicate */
3035             if (seq < call->rnext) {
3036                 MUTEX_ENTER(&rx_stats_mutex);
3037                 rx_stats.dupPacketsRead++;
3038                 MUTEX_EXIT(&rx_stats_mutex);
3039                 rxevent_Cancel(call->delayedAckEvent, call,
3040                                RX_CALL_REFCOUNT_DELAY);
3041                 np = rxi_SendAck(call, np, seq, serial,
3042                                  flags, RX_ACK_DUPLICATE, istack);
3043                 ackNeeded = 0;
3044                 call->rprev = seq;
3045                 continue;
3046             }
3047
3048             /* If the sequence number is greater than what can be
3049              * accomodated by the current window, then send a negative
3050              * acknowledge and drop the packet */
3051             if ((call->rnext + call->rwind) <= seq) {
3052                 rxevent_Cancel(call->delayedAckEvent, call,
3053                                RX_CALL_REFCOUNT_DELAY);
3054                 np = rxi_SendAck(call, np, seq, serial,
3055                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3056                 ackNeeded = 0;
3057                 call->rprev = seq;
3058                 continue;
3059             }
3060
3061             /* Look for the packet in the queue of old received packets */
3062             for (prev = call->rnext - 1, missing = 0,
3063                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3064                 /*Check for duplicate packet */
3065                 if (seq == tp->header.seq) {
3066                     MUTEX_ENTER(&rx_stats_mutex);
3067                     rx_stats.dupPacketsRead++;
3068                     MUTEX_EXIT(&rx_stats_mutex);
3069                     rxevent_Cancel(call->delayedAckEvent, call,
3070                                    RX_CALL_REFCOUNT_DELAY);
3071                     np = rxi_SendAck(call, np, seq, serial, 
3072                                      flags, RX_ACK_DUPLICATE, istack);
3073                     ackNeeded = 0;
3074                     call->rprev = seq;
3075                     goto nextloop;
3076                 }
3077                 /* If we find a higher sequence packet, break out and
3078                  * insert the new packet here. */
3079                 if (seq < tp->header.seq) break;
3080                 /* Check for missing packet */
3081                 if (tp->header.seq != prev+1) {
3082                     missing = 1;
3083                 }
3084
3085                 prev = tp->header.seq;
3086             }
3087
3088             /* Keep track of whether we have received the last packet. */
3089             if (flags & RX_LAST_PACKET) {
3090                 call->flags |= RX_CALL_HAVE_LAST;
3091             }
3092
3093             /* It's within the window: add it to the the receive queue.
3094              * tp is left by the previous loop either pointing at the
3095              * packet before which to insert the new packet, or at the
3096              * queue head if the queue is empty or the packet should be
3097              * appended. */
3098             queue_InsertBefore(tp, np);
3099             call->nSoftAcks++;
3100             np = NULL;
3101
3102             /* Check whether we have all of the packets for this call */
3103             if ((call->flags & RX_CALL_HAVE_LAST)
3104              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3105                 afs_uint32 tseq;                /* temporary sequence number */
3106
3107                 for (tseq = call->rnext,
3108                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3109                     if (tseq != tp->header.seq)
3110                         break;
3111                     if (tp->header.flags & RX_LAST_PACKET) {
3112                         call->flags |= RX_CALL_RECEIVE_DONE;
3113                         break;
3114                     }
3115                     tseq++;
3116                 }
3117             }
3118
3119             /* We need to send an ack of the packet is out of sequence, 
3120              * or if an ack was requested by the peer. */
3121             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3122                 ackNeeded = 1;
3123             }
3124
3125             /* Acknowledge the last packet for each call */
3126             if (flags & RX_LAST_PACKET) {
3127                 haveLast = 1;
3128             }
3129
3130             call->rprev = seq;
3131         }
3132 nextloop:;
3133     }
3134
3135     if (newPackets) {
3136         /*
3137          * If the receiver is waiting for an iovec, fill the iovec
3138          * using the data from the receive queue */
3139         if (call->flags & RX_CALL_IOVEC_WAIT) {
3140             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3141             /* the call may have been aborted */
3142             if (call->error) {
3143                 return NULL;
3144             }
3145             if (didHardAck) {
3146                 ackNeeded = 0;
3147             }
3148         }
3149
3150         /* Wakeup the reader if any */
3151         if ((call->flags & RX_CALL_READER_WAIT) &&
3152             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3153              (call->iovNext >= call->iovMax) ||
3154              (call->flags & RX_CALL_RECEIVE_DONE))) {
3155             call->flags &= ~RX_CALL_READER_WAIT;
3156 #ifdef  RX_ENABLE_LOCKS
3157             CV_BROADCAST(&call->cv_rq);
3158 #else
3159             osi_rxWakeup(&call->rq);
3160 #endif
3161         }
3162     }
3163
3164     /*
3165      * Send an ack when requested by the peer, or once every
3166      * rxi_SoftAckRate packets until the last packet has been
3167      * received. Always send a soft ack for the last packet in
3168      * the server's reply. */
3169     if (ackNeeded) {
3170         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3171         np = rxi_SendAck(call, np, seq, serial, flags,
3172                          RX_ACK_REQUESTED, istack);
3173     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3174         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3175         np = rxi_SendAck(call, np, seq, serial, flags,
3176                          RX_ACK_IDLE, istack);
3177     } else if (call->nSoftAcks) {
3178         clock_GetTime(&when);
3179         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3180             clock_Add(&when, &rx_lastAckDelay);
3181         } else {
3182             clock_Add(&when, &rx_softAckDelay);
3183         }
3184         if (!call->delayedAckEvent ||
3185             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3186             rxevent_Cancel(call->delayedAckEvent, call,
3187                            RX_CALL_REFCOUNT_DELAY);
3188             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3189             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3190                                                  call, 0);
3191         }
3192     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3193         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3194     }
3195
3196     return np;
3197 }
3198
3199 #ifdef  ADAPT_WINDOW
3200 static void rxi_ComputeRate();
3201 #endif
3202
3203 /* The real smarts of the whole thing.  */
3204 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3205     register struct rx_call *call;
3206     struct rx_packet *np;
3207     int istack;
3208 {
3209     struct rx_ackPacket *ap;
3210     int nAcks;
3211     register struct rx_packet *tp;
3212     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3213     register struct rx_connection *conn = call->conn;
3214     struct rx_peer *peer = conn->peer;
3215     afs_uint32 first;
3216     afs_uint32 serial;
3217     /* because there are CM's that are bogus, sending weird values for this. */
3218     afs_uint32 skew = 0;
3219     int needRxStart = 0;
3220     int nbytes;
3221     int missing;
3222     int acked;
3223     int nNacked = 0;
3224     int newAckCount = 0;
3225     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3226     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3227
3228     MUTEX_ENTER(&rx_stats_mutex);
3229     rx_stats.ackPacketsRead++;
3230     MUTEX_EXIT(&rx_stats_mutex);
3231     ap = (struct rx_ackPacket *) rx_DataOf(np);
3232     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3233     if (nbytes < 0)
3234       return np;       /* truncated ack packet */
3235
3236     /* depends on ack packet struct */
3237     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3238     first = ntohl(ap->firstPacket);
3239     serial = ntohl(ap->serial);
3240     /* temporarily disabled -- needs to degrade over time 
3241        skew = ntohs(ap->maxSkew); */
3242
3243     /* Ignore ack packets received out of order */
3244     if (first < call->tfirst) {
3245         return np;
3246     }
3247
3248     if (np->header.flags & RX_SLOW_START_OK) {
3249         call->flags |= RX_CALL_SLOW_START_OK;
3250     }
3251     
3252 #ifdef RXDEBUG
3253     if (rx_Log) {
3254       fprintf( rx_Log, 
3255               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3256               ap->reason, ntohl(ap->previousPacket), np->header.seq, serial, 
3257               skew, ntohl(ap->firstPacket));
3258         if (nAcks) {
3259             int offset;
3260             for (offset = 0; offset < nAcks; offset++) 
3261                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3262         }
3263         putc('\n', rx_Log);
3264     }
3265 #endif
3266
3267     /* if a server connection has been re-created, it doesn't remember what
3268         serial # it was up to.  An ack will tell us, since the serial field
3269         contains the largest serial received by the other side */
3270     MUTEX_ENTER(&conn->conn_data_lock);
3271     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3272         conn->serial = serial+1;
3273     }
3274     MUTEX_EXIT(&conn->conn_data_lock);
3275
3276     /* Update the outgoing packet skew value to the latest value of
3277      * the peer's incoming packet skew value.  The ack packet, of
3278      * course, could arrive out of order, but that won't affect things
3279      * much */
3280     MUTEX_ENTER(&peer->peer_lock);
3281     peer->outPacketSkew = skew;
3282
3283     /* Check for packets that no longer need to be transmitted, and
3284      * discard them.  This only applies to packets positively
3285      * acknowledged as having been sent to the peer's upper level.
3286      * All other packets must be retained.  So only packets with
3287      * sequence numbers < ap->firstPacket are candidates. */
3288     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3289         if (tp->header.seq >= first) break;
3290         call->tfirst = tp->header.seq + 1;
3291         if (tp->header.serial == serial) {
3292           /* Use RTT if not delayed by client. */
3293           if (ap->reason != RX_ACK_DELAY)
3294               rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3295 #ifdef ADAPT_WINDOW
3296           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3297 #endif
3298         }
3299         else if (tp->firstSerial == serial) {
3300             /* Use RTT if not delayed by client. */
3301             if (ap->reason != RX_ACK_DELAY)
3302                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3303 #ifdef ADAPT_WINDOW
3304           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3305 #endif
3306         }
3307 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3308     /* XXX Hack. Because we have to release the global rx lock when sending
3309      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3310      * in rxi_Start sending packets out because packets may move to the
3311      * freePacketQueue as result of being here! So we drop these packets until
3312      * we're safely out of the traversing. Really ugly! 
3313      * To make it even uglier, if we're using fine grain locking, we can
3314      * set the ack bits in the packets and have rxi_Start remove the packets
3315      * when it's done transmitting.
3316      */
3317         if (!tp->acked) {
3318             newAckCount++;
3319         }
3320         if (call->flags & RX_CALL_TQ_BUSY) {
3321 #ifdef RX_ENABLE_LOCKS
3322             tp->acked = 1;
3323             call->flags |= RX_CALL_TQ_SOME_ACKED;
3324 #else /* RX_ENABLE_LOCKS */
3325             break;
3326 #endif /* RX_ENABLE_LOCKS */
3327         } else
3328 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3329         {
3330         queue_Remove(tp);
3331         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3332         }
3333     }
3334
3335 #ifdef ADAPT_WINDOW
3336     /* Give rate detector a chance to respond to ping requests */
3337     if (ap->reason == RX_ACK_PING_RESPONSE) {
3338         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3339     }
3340 #endif
3341
3342     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3343    
3344    /* Now go through explicit acks/nacks and record the results in
3345     * the waiting packets.  These are packets that can't be released
3346     * yet, even with a positive acknowledge.  This positive
3347     * acknowledge only means the packet has been received by the
3348     * peer, not that it will be retained long enough to be sent to
3349     * the peer's upper level.  In addition, reset the transmit timers
3350     * of any missing packets (those packets that must be missing
3351     * because this packet was out of sequence) */
3352
3353     call->nSoftAcked = 0;
3354     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3355         /* Update round trip time if the ack was stimulated on receipt
3356          * of this packet */
3357 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3358 #ifdef RX_ENABLE_LOCKS
3359         if (tp->header.seq >= first) {
3360 #endif /* RX_ENABLE_LOCKS */
3361 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3362         if (tp->header.serial == serial) {
3363             /* Use RTT if not delayed by client. */
3364             if (ap->reason != RX_ACK_DELAY)
3365                 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3366 #ifdef ADAPT_WINDOW
3367           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3368 #endif
3369         }
3370         else if ((tp->firstSerial == serial)) {
3371             /* Use RTT if not delayed by client. */
3372             if (ap->reason != RX_ACK_DELAY)
3373                 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3374 #ifdef ADAPT_WINDOW
3375           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3376 #endif
3377         }
3378 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3379 #ifdef RX_ENABLE_LOCKS
3380         }
3381 #endif /* RX_ENABLE_LOCKS */
3382 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3383
3384         /* Set the acknowledge flag per packet based on the
3385          * information in the ack packet. An acknowlegded packet can
3386          * be downgraded when the server has discarded a packet it
3387          * soacked previously, or when an ack packet is received
3388          * out of sequence. */
3389         if (tp->header.seq < first) {
3390             /* Implicit ack information */
3391             if (!tp->acked) {
3392                 newAckCount++;
3393             }
3394             tp->acked = 1;
3395         }
3396         else if (tp->header.seq < first + nAcks) {
3397             /* Explicit ack information:  set it in the packet appropriately */
3398             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3399                 if (!tp->acked) {
3400                     newAckCount++;
3401                     tp->acked = 1;
3402                 }
3403                 if (missing) {
3404                     nNacked++;
3405                 } else {
3406                     call->nSoftAcked++;
3407                 }
3408             } else {
3409                 tp->acked = 0;
3410                 missing = 1;
3411             }
3412         }
3413         else {
3414             tp->acked = 0;
3415             missing = 1;
3416         }
3417
3418         /* If packet isn't yet acked, and it has been transmitted at least 
3419          * once, reset retransmit time using latest timeout 
3420          * ie, this should readjust the retransmit timer for all outstanding 
3421          * packets...  So we don't just retransmit when we should know better*/
3422
3423         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3424           tp->retryTime = tp->timeSent;
3425           clock_Add(&tp->retryTime, &peer->timeout);
3426           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3427           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3428         }
3429     }
3430
3431     /* If the window has been extended by this acknowledge packet,
3432      * then wakeup a sender waiting in alloc for window space, or try
3433      * sending packets now, if he's been sitting on packets due to
3434      * lack of window space */
3435     if (call->tnext < (call->tfirst + call->twind))  {
3436 #ifdef  RX_ENABLE_LOCKS
3437         CV_SIGNAL(&call->cv_twind);
3438 #else
3439         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3440             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3441             osi_rxWakeup(&call->twind);
3442         }
3443 #endif
3444         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3445             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3446         }
3447     }
3448
3449     /* if the ack packet has a receivelen field hanging off it,
3450      * update our state */
3451     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3452       afs_uint32 tSize;
3453
3454       /* If the ack packet has a "recommended" size that is less than 
3455        * what I am using now, reduce my size to match */
3456       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3457                     sizeof(afs_int32), &tSize);
3458       tSize = (afs_uint32) ntohl(tSize);
3459       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3460
3461       /* Get the maximum packet size to send to this peer */
3462       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3463                     &tSize);
3464       tSize = (afs_uint32)ntohl(tSize);
3465       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3466       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3467
3468       /* sanity check - peer might have restarted with different params.
3469        * If peer says "send less", dammit, send less...  Peer should never 
3470        * be unable to accept packets of the size that prior AFS versions would
3471        * send without asking.  */
3472       if (peer->maxMTU != tSize) {
3473           peer->maxMTU = tSize;
3474           peer->MTU = MIN(tSize, peer->MTU);
3475           call->MTU = MIN(call->MTU, tSize);
3476           peer->congestSeq++;
3477       }
3478
3479       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3480           /* AFS 3.4a */
3481           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3482                         sizeof(afs_int32), &tSize);
3483           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3484           if (tSize < call->twind) {       /* smaller than our send */
3485               call->twind = tSize;         /* window, we must send less... */
3486               call->ssthresh = MIN(call->twind, call->ssthresh);
3487           }
3488
3489           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3490            * network MTU confused with the loopback MTU. Calculate the
3491            * maximum MTU here for use in the slow start code below.
3492            */
3493           maxMTU = peer->maxMTU;
3494           /* Did peer restart with older RX version? */
3495           if (peer->maxDgramPackets > 1) {
3496               peer->maxDgramPackets = 1;
3497           }
3498       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3499           /* AFS 3.5 */
3500           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3501                         sizeof(afs_int32), &tSize);
3502           tSize = (afs_uint32) ntohl(tSize);
3503           /*
3504            * As of AFS 3.5 we set the send window to match the receive window. 
3505            */
3506           if (tSize < call->twind) {
3507               call->twind = tSize;
3508               call->ssthresh = MIN(call->twind, call->ssthresh);
3509           } else if (tSize > call->twind) {
3510               call->twind = tSize;
3511           }
3512
3513           /*
3514            * As of AFS 3.5, a jumbogram is more than one fixed size
3515            * packet transmitted in a single UDP datagram. If the remote
3516            * MTU is smaller than our local MTU then never send a datagram
3517            * larger than the natural MTU.
3518            */
3519           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3520                         sizeof(afs_int32), &tSize);
3521           maxDgramPackets = (afs_uint32) ntohl(tSize);
3522           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3523           maxDgramPackets = MIN(maxDgramPackets,
3524                                 (int)(peer->ifDgramPackets));
3525           maxDgramPackets = MIN(maxDgramPackets, tSize);
3526           if (maxDgramPackets > 1) {
3527             peer->maxDgramPackets = maxDgramPackets;
3528             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3529           } else {
3530             peer->maxDgramPackets = 1;
3531             call->MTU = peer->natMTU;
3532           }
3533        } else if (peer->maxDgramPackets > 1) {
3534           /* Restarted with lower version of RX */
3535           peer->maxDgramPackets = 1;
3536        }
3537     } else if (peer->maxDgramPackets > 1 ||
3538                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3539         /* Restarted with lower version of RX */
3540         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3541         peer->natMTU = OLD_MAX_PACKET_SIZE;
3542         peer->MTU = OLD_MAX_PACKET_SIZE;
3543         peer->maxDgramPackets = 1;
3544         peer->nDgramPackets = 1;
3545         peer->congestSeq++;
3546         call->MTU = OLD_MAX_PACKET_SIZE;
3547     }
3548
3549     if (nNacked) {
3550         /*
3551          * Calculate how many datagrams were successfully received after
3552          * the first missing packet and adjust the negative ack counter
3553          * accordingly.
3554          */
3555         call->nAcks = 0;
3556         call->nNacks++;
3557         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3558         if (call->nNacks < nNacked) {
3559             call->nNacks = nNacked;
3560         }
3561     } else {
3562         if (newAckCount) {
3563             call->nAcks++;
3564         }
3565         call->nNacks = 0;
3566     }
3567
3568     if (call->flags & RX_CALL_FAST_RECOVER) {
3569         if (nNacked) {
3570             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3571         } else {
3572             call->flags &= ~RX_CALL_FAST_RECOVER;
3573             call->cwind = call->nextCwind;
3574             call->nextCwind = 0;
3575             call->nAcks = 0;
3576         }
3577         call->nCwindAcks = 0;
3578     }
3579     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3580         /* Three negative acks in a row trigger congestion recovery */
3581 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3582         MUTEX_EXIT(&peer->peer_lock);
3583         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3584                 /* someone else is waiting to start recovery */
3585                 return np;
3586         }
3587         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3588         while (call->flags & RX_CALL_TQ_BUSY) {
3589             call->flags |= RX_CALL_TQ_WAIT;
3590 #ifdef RX_ENABLE_LOCKS
3591             CV_WAIT(&call->cv_tq, &call->lock);
3592 #else /* RX_ENABLE_LOCKS */
3593             osi_rxSleep(&call->tq);
3594 #endif /* RX_ENABLE_LOCKS */
3595         }
3596         MUTEX_ENTER(&peer->peer_lock);
3597 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3598         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3599         call->flags |= RX_CALL_FAST_RECOVER;
3600         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3601         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3602                           rx_maxSendWindow);
3603         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3604         call->nextCwind = call->ssthresh;
3605         call->nAcks = 0;
3606         call->nNacks = 0;
3607         peer->MTU = call->MTU;
3608         peer->cwind = call->nextCwind;
3609         peer->nDgramPackets = call->nDgramPackets;
3610         peer->congestSeq++;
3611         call->congestSeq = peer->congestSeq;
3612         /* Reset the resend times on the packets that were nacked
3613          * so we will retransmit as soon as the window permits*/
3614         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3615             if (acked) {
3616                 if (!tp->acked) {
3617                     clock_Zero(&tp->retryTime);
3618                 }
3619             } else if (tp->acked) {
3620                 acked = 1;
3621             }
3622         }
3623     } else {
3624         /* If cwind is smaller than ssthresh, then increase
3625          * the window one packet for each ack we receive (exponential
3626          * growth).
3627          * If cwind is greater than or equal to ssthresh then increase
3628          * the congestion window by one packet for each cwind acks we
3629          * receive (linear growth).  */
3630         if (call->cwind < call->ssthresh) {
3631             call->cwind = MIN((int)call->ssthresh,
3632                               (int)(call->cwind + newAckCount));
3633             call->nCwindAcks = 0;
3634         } else {
3635             call->nCwindAcks += newAckCount;
3636             if (call->nCwindAcks >= call->cwind) {
3637                 call->nCwindAcks = 0;
3638                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3639             }
3640         }
3641         /*
3642          * If we have received several acknowledgements in a row then
3643          * it is time to increase the size of our datagrams
3644          */
3645         if ((int)call->nAcks > rx_nDgramThreshold) {
3646             if (peer->maxDgramPackets > 1) {
3647                 if (call->nDgramPackets < peer->maxDgramPackets) {
3648                     call->nDgramPackets++;
3649                 }
3650                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3651             } else if (call->MTU < peer->maxMTU) {
3652                 call->MTU += peer->natMTU;
3653                 call->MTU = MIN(call->MTU, peer->maxMTU);
3654             }
3655             call->nAcks = 0;
3656         }
3657     }
3658
3659     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3660
3661     /* Servers need to hold the call until all response packets have
3662      * been acknowledged. Soft acks are good enough since clients
3663      * are not allowed to clear their receive queues. */
3664     if (call->state == RX_STATE_HOLD &&
3665         call->tfirst + call->nSoftAcked >= call->tnext) {
3666         call->state = RX_STATE_DALLY;
3667         rxi_ClearTransmitQueue(call, 0);
3668     } else if (!queue_IsEmpty(&call->tq)) {
3669         rxi_Start(0, call, istack);
3670     }
3671     return np;
3672 }
3673
3674 /* Received a response to a challenge packet */
3675 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3676     register struct rx_connection *conn;
3677     register struct rx_packet *np;
3678     int istack;
3679 {
3680     int error;
3681
3682     /* Ignore the packet if we're the client */
3683     if (conn->type == RX_CLIENT_CONNECTION) return np;
3684
3685     /* If already authenticated, ignore the packet (it's probably a retry) */
3686     if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3687         return np;
3688
3689     /* Otherwise, have the security object evaluate the response packet */
3690     error = RXS_CheckResponse(conn->securityObject, conn, np);
3691     if (error) {
3692         /* If the response is invalid, reset the connection, sending
3693          * an abort to the peer */
3694 #ifndef KERNEL
3695         rxi_Delay(1);
3696 #endif
3697         rxi_ConnectionError(conn, error);
3698         MUTEX_ENTER(&conn->conn_data_lock);
3699         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3700         MUTEX_EXIT(&conn->conn_data_lock);
3701         return np;
3702     }
3703     else {
3704         /* If the response is valid, any calls waiting to attach
3705          * servers can now do so */
3706         int i;
3707         for (i=0; i<RX_MAXCALLS; i++) {
3708             struct rx_call *call = conn->call[i];
3709             if (call) {
3710                 MUTEX_ENTER(&call->lock);
3711                  if (call->state == RX_STATE_PRECALL)
3712                      rxi_AttachServerProc(call, -1, NULL, NULL);
3713                 MUTEX_EXIT(&call->lock);
3714             }
3715         }
3716     }
3717     return np;
3718 }
3719
3720 /* A client has received an authentication challenge: the security
3721  * object is asked to cough up a respectable response packet to send
3722  * back to the server.  The server is responsible for retrying the
3723  * challenge if it fails to get a response. */
3724
3725 struct rx_packet *
3726 rxi_ReceiveChallengePacket(conn, np, istack)
3727     register struct rx_connection *conn;
3728     register struct rx_packet *np;
3729     int istack;
3730 {
3731     int error;
3732
3733     /* Ignore the challenge if we're the server */
3734     if (conn->type == RX_SERVER_CONNECTION) return np;
3735
3736     /* Ignore the challenge if the connection is otherwise idle; someone's
3737      * trying to use us as an oracle. */
3738     if (!rxi_HasActiveCalls(conn)) return np;
3739
3740     /* Send the security object the challenge packet.  It is expected to fill
3741      * in the response. */
3742     error = RXS_GetResponse(conn->securityObject, conn, np);
3743
3744     /* If the security object is unable to return a valid response, reset the
3745      * connection and send an abort to the peer.  Otherwise send the response
3746      * packet to the peer connection. */
3747     if (error) {
3748         rxi_ConnectionError(conn, error);
3749         MUTEX_ENTER(&conn->conn_data_lock);
3750         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3751         MUTEX_EXIT(&conn->conn_data_lock);
3752     }
3753     else {
3754         np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3755                              RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3756     }
3757     return np;
3758 }
3759
3760
3761 /* Find an available server process to service the current request in
3762  * the given call structure.  If one isn't available, queue up this
3763  * call so it eventually gets one */
3764 void 
3765 rxi_AttachServerProc(call, socket, tnop, newcallp)
3766 register struct rx_call *call;
3767 register osi_socket socket;
3768 register int *tnop;
3769 register struct rx_call **newcallp;
3770 {
3771     register struct rx_serverQueueEntry *sq;
3772     register struct rx_service *service = call->conn->service;
3773 #ifdef RX_ENABLE_LOCKS
3774     register int haveQuota = 0;
3775 #endif /* RX_ENABLE_LOCKS */
3776     /* May already be attached */
3777     if (call->state == RX_STATE_ACTIVE) return;
3778
3779     MUTEX_ENTER(&rx_serverPool_lock);
3780 #ifdef RX_ENABLE_LOCKS
3781     while(rxi_ServerThreadSelectingCall) {
3782         MUTEX_EXIT(&call->lock);
3783         CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3784         MUTEX_EXIT(&rx_serverPool_lock);
3785         MUTEX_ENTER(&call->lock);
3786         MUTEX_ENTER(&rx_serverPool_lock);
3787         /* Call may have been attached */
3788         if (call->state == RX_STATE_ACTIVE) return;
3789     }
3790
3791     haveQuota = QuotaOK(service);
3792     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3793         /* If there are no processes available to service this call,
3794          * put the call on the incoming call queue (unless it's
3795          * already on the queue).
3796          */
3797         if (haveQuota)
3798             ReturnToServerPool(service);
3799         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3800             call->flags |= RX_CALL_WAIT_PROC;
3801             MUTEX_ENTER(&rx_stats_mutex);
3802             rx_nWaiting++;
3803             MUTEX_EXIT(&rx_stats_mutex);
3804             rxi_calltrace(RX_CALL_ARRIVAL, call);
3805             SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3806             queue_Append(&rx_incomingCallQueue, call);
3807         }
3808     }
3809 #else /* RX_ENABLE_LOCKS */
3810     if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3811         /* If there are no processes available to service this call,
3812          * put the call on the incoming call queue (unless it's
3813          * already on the queue).
3814          */
3815         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3816             call->flags |= RX_CALL_WAIT_PROC;
3817             rx_nWaiting++;
3818             rxi_calltrace(RX_CALL_ARRIVAL, call);
3819             queue_Append(&rx_incomingCallQueue, call);
3820         }
3821     }
3822 #endif /* RX_ENABLE_LOCKS */
3823     else {
3824         sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3825
3826         /* If hot threads are enabled, and both newcallp and sq->socketp
3827          * are non-null, then this thread will process the call, and the
3828          * idle server thread will start listening on this threads socket.
3829          */
3830         queue_Remove(sq);
3831         if (rx_enable_hot_thread && newcallp && sq->socketp) {
3832             *newcallp = call;
3833             *tnop = sq->tno;
3834             *sq->socketp = socket;
3835             clock_GetTime(&call->startTime);
3836             CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3837         } else {
3838             sq->newcall = call;
3839         }
3840         if (call->flags & RX_CALL_WAIT_PROC) {
3841             /* Conservative:  I don't think this should happen */
3842             call->flags &= ~RX_CALL_WAIT_PROC;
3843             MUTEX_ENTER(&rx_stats_mutex);
3844             rx_nWaiting--;
3845             MUTEX_EXIT(&rx_stats_mutex);
3846             queue_Remove(call);
3847         }
3848         call->state = RX_STATE_ACTIVE;
3849         call->mode = RX_MODE_RECEIVING;
3850         if (call->flags & RX_CALL_CLEARED) {
3851             /* send an ack now to start the packet flow up again */
3852             call->flags &= ~RX_CALL_CLEARED;
3853             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_IDLE, 0);
3854         }
3855 #ifdef  RX_ENABLE_LOCKS
3856         CV_SIGNAL(&sq->cv);
3857 #else
3858         service->nRequestsRunning++;
3859         if (service->nRequestsRunning <= service->minProcs)
3860           rxi_minDeficit--;
3861         rxi_availProcs--;
3862         osi_rxWakeup(sq);
3863 #endif
3864     }
3865     MUTEX_EXIT(&rx_serverPool_lock);
3866 }
3867
3868 /* Delay the sending of an acknowledge event for a short while, while
3869  * a new call is being prepared (in the case of a client) or a reply
3870  * is being prepared (in the case of a server).  Rather than sending
3871  * an ack packet, an ACKALL packet is sent. */
3872 void rxi_AckAll(event, call, dummy)
3873 struct rxevent *event;
3874 register struct rx_call *call;
3875 char *dummy;
3876 {
3877 #ifdef RX_ENABLE_LOCKS
3878     if (event) {
3879         MUTEX_ENTER(&call->lock);
3880         call->delayedAckEvent = (struct rxevent *) 0;
3881         CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3882     }
3883     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3884                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3885     if (event)
3886         MUTEX_EXIT(&call->lock);
3887 #else /* RX_ENABLE_LOCKS */
3888     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3889     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3890                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3891 #endif /* RX_ENABLE_LOCKS */
3892 }
3893
3894 void rxi_SendDelayedAck(event, call, dummy)     
3895 struct rxevent *event;
3896 register struct rx_call *call;
3897 char *dummy;
3898 {
3899 #ifdef RX_ENABLE_LOCKS
3900     if (event) {
3901         MUTEX_ENTER(&call->lock);
3902         if (event == call->delayedAckEvent)
3903             call->delayedAckEvent = (struct rxevent *) 0;
3904         CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3905     }
3906     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3907     if (event)
3908         MUTEX_EXIT(&call->lock);
3909 #else /* RX_ENABLE_LOCKS */
3910     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3911     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3912 #endif /* RX_ENABLE_LOCKS */
3913 }
3914
3915
3916 #ifdef RX_ENABLE_LOCKS
3917 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3918  * clearing them out.
3919  */
3920 static void rxi_SetAcksInTransmitQueue(call)
3921       register struct rx_call *call;
3922 {
3923     register struct rx_packet *p, *tp;
3924     int someAcked = 0;
3925
3926      for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3927          if (!p) 
3928              break;
3929          p->acked = 1;
3930          someAcked = 1;
3931      }
3932      if (someAcked) {
3933          call->flags |= RX_CALL_TQ_CLEARME;
3934          call->flags |= RX_CALL_TQ_SOME_ACKED;
3935      }
3936
3937      rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3938      rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3939      call->tfirst = call->tnext;
3940      call->nSoftAcked = 0;
3941
3942      if (call->flags & RX_CALL_FAST_RECOVER) {
3943         call->flags &= ~RX_CALL_FAST_RECOVER;
3944         call->cwind = call->nextCwind;
3945         call->nextCwind = 0;
3946      }
3947
3948      CV_SIGNAL(&call->cv_twind);
3949 }
3950 #endif /* RX_ENABLE_LOCKS */
3951
3952 /* Clear out the transmit queue for the current call (all packets have
3953  * been received by peer) */
3954 void rxi_ClearTransmitQueue(call, force)
3955     register struct rx_call *call;
3956     register int force;
3957 {
3958     register struct rx_packet *p, *tp;
3959
3960 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3961     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3962         int someAcked = 0;
3963         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3964           if (!p) 
3965              break;
3966           p->acked = 1;
3967           someAcked = 1;
3968         }
3969         if (someAcked) {
3970             call->flags |= RX_CALL_TQ_CLEARME;
3971             call->flags |= RX_CALL_TQ_SOME_ACKED;
3972         }
3973     } else {
3974 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3975         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3976             if (!p) 
3977                 break;
3978             queue_Remove(p);
3979             rxi_FreePacket(p);
3980         }
3981 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3982         call->flags &= ~RX_CALL_TQ_CLEARME;
3983     }
3984 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3985
3986     rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3987     rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3988     call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3989     call->nSoftAcked = 0;
3990
3991     if (call->flags & RX_CALL_FAST_RECOVER) {
3992         call->flags &= ~RX_CALL_FAST_RECOVER;
3993         call->cwind = call->nextCwind;
3994     }
3995
3996 #ifdef  RX_ENABLE_LOCKS
3997     CV_SIGNAL(&call->cv_twind);
3998 #else
3999     osi_rxWakeup(&call->twind);
4000 #endif
4001 }
4002
4003 void rxi_ClearReceiveQueue(call)
4004     register struct rx_call *call;
4005 {
4006     register struct rx_packet *p, *tp;
4007     if (queue_IsNotEmpty(&call->rq)) {
4008       for (queue_Scan(&call->rq, p, tp, rx_packet)) {
4009         if (!p)
4010           break;
4011         queue_Remove(p);
4012         rxi_FreePacket(p);
4013         rx_packetReclaims++;
4014       }
4015       call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
4016     }
4017     if (call->state == RX_STATE_PRECALL) {
4018         call->flags |= RX_CALL_CLEARED;
4019     }
4020 }
4021
4022 /* Send an abort packet for the specified call */
4023 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
4024     register struct rx_call *call;
4025     struct rx_packet *packet;
4026     int istack;
4027     int force;
4028 {
4029   afs_int32 error;
4030   struct clock when;
4031
4032   if (!call->error)
4033     return packet;
4034
4035   /* Clients should never delay abort messages */
4036   if (rx_IsClientConn(call->conn))
4037     force = 1;
4038
4039   if (call->abortCode != call->error) {
4040     call->abortCode = call->error;
4041     call->abortCount = 0;
4042   }
4043
4044   if (force || rxi_callAbortThreshhold == 0 ||
4045       call->abortCount < rxi_callAbortThreshhold) {
4046     if (call->delayedAbortEvent) {
4047         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4048     }
4049     error = htonl(call->error);
4050     call->abortCount++;
4051     packet = rxi_SendSpecial(call, call->conn, packet,
4052                              RX_PACKET_TYPE_ABORT, (char *)&error,
4053                              sizeof(error), istack);
4054   } else if (!call->delayedAbortEvent) {
4055     clock_GetTime(&when);
4056     clock_Addmsec(&when, rxi_callAbortDelay);
4057     CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4058     call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4059                                            call, 0);
4060   }
4061   return packet;
4062 }
4063
4064 /* Send an abort packet for the specified connection.  Packet is an
4065  * optional pointer to a packet that can be used to send the abort.
4066  * Once the number of abort messages reaches the threshhold, an
4067  * event is scheduled to send the abort. Setting the force flag
4068  * overrides sending delayed abort messages.
4069  *
4070  * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4071  *       to send the abort packet.
4072  */
4073 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4074     register struct rx_connection *conn;
4075     struct rx_packet *packet;
4076     int istack;
4077     int force;
4078 {
4079   afs_int32 error;
4080   struct clock when;
4081
4082   if (!conn->error)
4083     return packet;
4084
4085   /* Clients should never delay abort messages */
4086   if (rx_IsClientConn(conn))
4087     force = 1;
4088
4089   if (force || rxi_connAbortThreshhold == 0 ||
4090       conn->abortCount < rxi_connAbortThreshhold) {
4091     if (conn->delayedAbortEvent) {
4092         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4093     }
4094     error = htonl(conn->error);
4095     conn->abortCount++;
4096     MUTEX_EXIT(&conn->conn_data_lock);
4097     packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4098                              RX_PACKET_TYPE_ABORT, (char *)&error,
4099                              sizeof(error), istack);
4100     MUTEX_ENTER(&conn->conn_data_lock);
4101   } else if (!conn->delayedAbortEvent) {
4102     clock_GetTime(&when);
4103     clock_Addmsec(&when, rxi_connAbortDelay);
4104     conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4105                                            conn, 0);
4106   }
4107   return packet;
4108 }
4109
4110 /* Associate an error all of the calls owned by a connection.  Called
4111  * with error non-zero.  This is only for really fatal things, like
4112  * bad authentication responses.  The connection itself is set in
4113  * error at this point, so that future packets received will be
4114  * rejected. */
4115 void rxi_ConnectionError(conn, error)
4116     register struct rx_connection *conn;
4117     register afs_int32 error;
4118 {
4119     if (error) {
4120         register int i;
4121         if (conn->challengeEvent)
4122             rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4123         for (i=0; i<RX_MAXCALLS; i++) {
4124             struct rx_call *call = conn->call[i];
4125             if (call) {
4126                 MUTEX_ENTER(&call->lock);
4127                 rxi_CallError(call, error);
4128                 MUTEX_EXIT(&call->lock);
4129             }
4130         }
4131         conn->error = error;
4132         MUTEX_ENTER(&rx_stats_mutex);
4133         rx_stats.fatalErrors++;
4134         MUTEX_EXIT(&rx_stats_mutex);
4135     }
4136 }
4137
4138 void rxi_CallError(call, error)
4139     register struct rx_call *call;
4140     afs_int32 error;
4141 {
4142     if (call->error) error = call->error;
4143 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4144     if (!(call->flags & RX_CALL_TQ_BUSY)) {
4145         rxi_ResetCall(call, 0);
4146     }
4147 #else
4148         rxi_ResetCall(call, 0);
4149 #endif
4150     call->error = error;
4151     call->mode = RX_MODE_ERROR;
4152 }
4153
4154 /* Reset various fields in a call structure, and wakeup waiting
4155  * processes.  Some fields aren't changed: state & mode are not
4156  * touched (these must be set by the caller), and bufptr, nLeft, and
4157  * nFree are not reset, since these fields are manipulated by
4158  * unprotected macros, and may only be reset by non-interrupting code.
4159  */
4160 #ifdef ADAPT_WINDOW
4161 /* this code requires that call->conn be set properly as a pre-condition. */
4162 #endif /* ADAPT_WINDOW */
4163
4164 void rxi_ResetCall(call, newcall)
4165     register struct rx_call *call;
4166     register int newcall;
4167 {
4168     register int flags;
4169     register struct rx_peer *peer;
4170     struct rx_packet *packet;
4171
4172     /* Notify anyone who is waiting for asynchronous packet arrival */
4173     if (call->arrivalProc) {
4174         (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4175         call->arrivalProc = (VOID (*)()) 0;
4176     }
4177
4178     if (call->delayedAbortEvent) {
4179         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4180         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4181         if (packet) {
4182             rxi_SendCallAbort(call, packet, 0, 1);
4183             rxi_FreePacket(packet);
4184         }
4185     }
4186
4187     /*
4188      * Update the peer with the congestion information in this call
4189      * so other calls on this connection can pick up where this call
4190      * left off. If the congestion sequence numbers don't match then
4191      * another call experienced a retransmission.
4192      */
4193     peer = call->conn->peer;
4194     MUTEX_ENTER(&peer->peer_lock);
4195     if (!newcall) {
4196         if (call->congestSeq == peer->congestSeq) {
4197             peer->cwind = MAX(peer->cwind, call->cwind);
4198             peer->MTU = MAX(peer->MTU, call->MTU);
4199             peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4200         }
4201     } else {
4202         call->abortCode = 0;
4203         call->abortCount = 0;
4204     }
4205     if (peer->maxDgramPackets > 1) {
4206         call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4207     } else {
4208         call->MTU = peer->MTU;
4209     }
4210     call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4211     call->ssthresh = rx_maxSendWindow;
4212     call->nDgramPackets = peer->nDgramPackets;
4213     call->congestSeq = peer->congestSeq;
4214     MUTEX_EXIT(&peer->peer_lock);
4215
4216     flags = call->flags;
4217     rxi_ClearReceiveQueue(call);
4218 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4219     if (call->flags & RX_CALL_TQ_BUSY) {
4220         call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4221         call->flags |= (flags & RX_CALL_TQ_WAIT);
4222     } else
4223 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4224     {
4225         rxi_ClearTransmitQueue(call, 0);
4226         queue_Init(&call->tq);
4227         call->flags = 0;
4228     }
4229     queue_Init(&call->rq);
4230     call->error = 0;
4231     call->rwind = rx_initReceiveWindow; 
4232     call->twind = rx_initSendWindow; 
4233     call->nSoftAcked = 0;
4234     call->nextCwind = 0;
4235     call->nAcks = 0;
4236     call->nNacks = 0;
4237     call->nCwindAcks = 0;
4238     call->nSoftAcks = 0;
4239     call->nHardAcks = 0;
4240
4241     call->tfirst = call->rnext = call->tnext = 1;
4242     call->rprev = 0;
4243     call->lastAcked = 0;
4244     call->localStatus = call->remoteStatus = 0;
4245
4246     if (flags & RX_CALL_READER_WAIT)  {
4247 #ifdef  RX_ENABLE_LOCKS
4248         CV_BROADCAST(&call->cv_rq);
4249 #else
4250         osi_rxWakeup(&call->rq);
4251 #endif
4252     }
4253     if (flags & RX_CALL_WAIT_PACKETS) {
4254         MUTEX_ENTER(&rx_freePktQ_lock);
4255         rxi_PacketsUnWait();            /* XXX */
4256         MUTEX_EXIT(&rx_freePktQ_lock);
4257     }
4258
4259 #ifdef  RX_ENABLE_LOCKS
4260     CV_SIGNAL(&call->cv_twind);
4261 #else
4262     if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4263         osi_rxWakeup(&call->twind);
4264 #endif
4265
4266 #ifdef RX_ENABLE_LOCKS
4267     /* The following ensures that we don't mess with any queue while some
4268      * other thread might also be doing so. The call_queue_lock field is
4269      * is only modified under the call lock. If the call is in the process
4270      * of being removed from a queue, the call is not locked until the
4271      * the queue lock is dropped and only then is the call_queue_lock field
4272      * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4273      * Note that any other routine which removes a call from a queue has to
4274      * obtain the queue lock before examing the queue and removing the call.
4275      */
4276     if (call->call_queue_lock) {
4277         MUTEX_ENTER(call->call_queue_lock);
4278         if (queue_IsOnQueue(call)) {
4279             queue_Remove(call);
4280             if (flags & RX_CALL_WAIT_PROC) {
4281                 MUTEX_ENTER(&rx_stats_mutex);
4282                 rx_nWaiting--;
4283                 MUTEX_EXIT(&rx_stats_mutex);
4284             }
4285         }
4286         MUTEX_EXIT(call->call_queue_lock);
4287         CLEAR_CALL_QUEUE_LOCK(call);
4288     }
4289 #else /* RX_ENABLE_LOCKS */
4290     if (queue_IsOnQueue(call)) {
4291       queue_Remove(call);
4292       if (flags & RX_CALL_WAIT_PROC)
4293         rx_nWaiting--;
4294     }
4295 #endif /* RX_ENABLE_LOCKS */
4296
4297     rxi_KeepAliveOff(call);
4298     rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4299 }
4300
4301 /* Send an acknowledge for the indicated packet (seq,serial) of the
4302  * indicated call, for the indicated reason (reason).  This
4303  * acknowledge will specifically acknowledge receiving the packet, and
4304  * will also specify which other packets for this call have been
4305  * received.  This routine returns the packet that was used to the
4306  * caller.  The caller is responsible for freeing it or re-using it.
4307  * This acknowledgement also returns the highest sequence number
4308  * actually read out by the higher level to the sender; the sender
4309  * promises to keep around packets that have not been read by the
4310  * higher level yet (unless, of course, the sender decides to abort
4311  * the call altogether).  Any of p, seq, serial, pflags, or reason may
4312  * be set to zero without ill effect.  That is, if they are zero, they
4313  * will not convey any information.  
4314  * NOW there is a trailer field, after the ack where it will safely be
4315  * ignored by mundanes, which indicates the maximum size packet this 
4316  * host can swallow.  */  
4317 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4318     register struct rx_call *call;
4319     register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4320     int seq;                    /* Sequence number of the packet we are acking */
4321     int serial;                 /* Serial number of the packet */
4322     int pflags;                 /* Flags field from packet header */
4323     int reason;                 /* Reason an acknowledge was prompted */
4324     int istack;
4325 {
4326     struct rx_ackPacket *ap;
4327     register struct rx_packet *rqp;
4328     register struct rx_packet *nxp;  /* For queue_Scan */
4329     register struct rx_packet *p;
4330     u_char offset;
4331     afs_int32 templ;
4332
4333     /*
4334      * Open the receive window once a thread starts reading packets
4335      */
4336     if (call->rnext > 1) {
4337         call->rwind = rx_maxReceiveWindow;
4338     }
4339
4340     call->nHardAcks = 0;
4341     call->nSoftAcks = 0;
4342     if (call->rnext > call->lastAcked)
4343       call->lastAcked = call->rnext;
4344     p = optionalPacket;
4345
4346     if (p) {
4347       rx_computelen(p, p->length);  /* reset length, you never know */
4348     }                               /* where that's been...         */
4349     else
4350       if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4351           /* We won't send the ack, but don't panic. */
4352           return optionalPacket;
4353       }
4354
4355     templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4356     if (templ > 0) {
4357       if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4358           if (!optionalPacket) rxi_FreePacket(p);
4359           return optionalPacket;
4360       }
4361       templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32); 
4362       if (rx_Contiguous(p)<templ) {
4363           if (!optionalPacket) rxi_FreePacket(p);
4364           return optionalPacket;
4365       }
4366     }    /* MTUXXX failing to send an ack is very serious.  We should */
4367          /* try as hard as possible to send even a partial ack; it's */
4368          /* better than nothing. */
4369
4370     ap = (struct rx_ackPacket *) rx_DataOf(p);
4371     ap->bufferSpace = htonl(0); /* Something should go here, sometime */