2db8ba8a1715601105fb5acc07c09fb454a51562
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #ifdef  KERNEL
13 #include "../afs/param.h"
14 #include "../afs/sysincludes.h"
15 #include "../afs/afsincludes.h"
16 #ifndef UKERNEL
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
20 #ifdef  AFS_OSF_ENV
21 #include <net/net_globals.h>
22 #endif  /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
25 #endif
26 #include "../netinet/in.h"
27 #include "../afs/afs_args.h"
28 #include "../afs/afs_osi.h"
29 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
30 #include "../h/systm.h"
31 #endif
32 #ifdef RXDEBUG
33 #undef RXDEBUG      /* turn off debugging */
34 #endif /* RXDEBUG */
35 #if defined(AFS_SGI_ENV)
36 #include "../sys/debug.h"
37 #endif
38 #include "../afsint/afsint.h"
39 #ifdef  AFS_ALPHA_ENV
40 #undef kmem_alloc
41 #undef kmem_free
42 #undef mem_alloc
43 #undef mem_free
44 #undef register
45 #endif  /* AFS_ALPHA_ENV */
46 #else /* !UKERNEL */
47 #include "../afs/sysincludes.h"
48 #include "../afs/afsincludes.h"
49 #endif /* !UKERNEL */
50 #include "../afs/lock.h"
51 #include "../rx/rx_kmutex.h"
52 #include "../rx/rx_kernel.h"
53 #include "../rx/rx_clock.h"
54 #include "../rx/rx_queue.h"
55 #include "../rx/rx.h"
56 #include "../rx/rx_globals.h"
57 #include "../rx/rx_trace.h"
58 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
59 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
60 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
61 #include "../afsint/afsint.h"
62 extern afs_int32 afs_termState;
63 #ifdef AFS_AIX41_ENV
64 #include "sys/lockl.h"
65 #include "sys/lock_def.h"
66 #endif /* AFS_AIX41_ENV */
67 # include "../afsint/rxgen_consts.h"
68 #else /* KERNEL */
69 # include <afs/param.h>
70 # include <sys/types.h>
71 # include <errno.h>
72 #ifdef AFS_NT40_ENV
73 # include <stdlib.h>
74 # include <fcntl.h>
75 # include <afsutil.h>
76 #else
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <sys/stat.h>
81 # include <netinet/in.h>
82 # include <sys/time.h>
83 #endif
84 # include "rx.h"
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx_globals.h"
89 # include "rx_trace.h"
90 # include "rx_internal.h"
91 # include <afs/rxgen_consts.h>
92 #endif /* KERNEL */
93
94 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
95 struct rx_tq_debug {
96     afs_int32 rxi_start_aborted; /* rxi_start awoke after rxi_Send in error. */
97     afs_int32 rxi_start_in_error;
98 } rx_tq_debug;
99 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
100
101 /*
102  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
103  * currently allocated within rx.  This number is used to allocate the
104  * memory required to return the statistics when queried.
105  */
106
107 static unsigned int rxi_rpc_peer_stat_cnt;
108
109 /*
110  * rxi_rpc_process_stat_cnt counts the total number of local process stat
111  * structures currently allocated within rx.  The number is used to allocate
112  * the memory required to return the statistics when queried.
113  */
114
115 static unsigned int rxi_rpc_process_stat_cnt;
116
117 #if !defined(offsetof)
118 #include <stddef.h>     /* for definition of offsetof() */
119 #endif
120
121 #ifdef AFS_PTHREAD_ENV
122 #include <assert.h>
123
124 /*
125  * Use procedural initialization of mutexes/condition variables
126  * to ease NT porting
127  */
128
129 extern pthread_mutex_t rxkad_stats_mutex;
130 extern pthread_mutex_t des_init_mutex;
131 extern pthread_mutex_t des_random_mutex;
132 extern pthread_mutex_t rx_clock_mutex;
133 extern pthread_mutex_t rxi_connCacheMutex;
134 extern pthread_mutex_t rx_event_mutex;
135 extern pthread_mutex_t osi_malloc_mutex;
136 extern pthread_mutex_t event_handler_mutex;
137 extern pthread_mutex_t listener_mutex;
138 extern pthread_mutex_t rx_if_init_mutex;
139 extern pthread_mutex_t rx_if_mutex;
140 extern pthread_mutex_t rxkad_client_uid_mutex;
141 extern pthread_mutex_t rxkad_random_mutex;
142
143 extern pthread_cond_t rx_event_handler_cond;
144 extern pthread_cond_t rx_listener_cond;
145
146 static pthread_mutex_t epoch_mutex;
147 static pthread_mutex_t rx_init_mutex;
148 static pthread_mutex_t rx_debug_mutex;
149
150 static void rxi_InitPthread(void) {
151     assert(pthread_mutex_init(&rx_clock_mutex,
152                               (const pthread_mutexattr_t*)0)==0);
153     assert(pthread_mutex_init(&rxi_connCacheMutex,
154                               (const pthread_mutexattr_t*)0)==0);
155     assert(pthread_mutex_init(&rx_init_mutex,
156                               (const pthread_mutexattr_t*)0)==0);
157     assert(pthread_mutex_init(&epoch_mutex,
158                               (const pthread_mutexattr_t*)0)==0);
159     assert(pthread_mutex_init(&rx_event_mutex,
160                               (const pthread_mutexattr_t*)0)==0);
161     assert(pthread_mutex_init(&des_init_mutex,
162                               (const pthread_mutexattr_t*)0)==0);
163     assert(pthread_mutex_init(&des_random_mutex,
164                               (const pthread_mutexattr_t*)0)==0);
165     assert(pthread_mutex_init(&osi_malloc_mutex,
166                               (const pthread_mutexattr_t*)0)==0);
167     assert(pthread_mutex_init(&event_handler_mutex,
168                               (const pthread_mutexattr_t*)0)==0);
169     assert(pthread_mutex_init(&listener_mutex,
170                               (const pthread_mutexattr_t*)0)==0);
171     assert(pthread_mutex_init(&rx_if_init_mutex,
172                               (const pthread_mutexattr_t*)0)==0);
173     assert(pthread_mutex_init(&rx_if_mutex,
174                               (const pthread_mutexattr_t*)0)==0);
175     assert(pthread_mutex_init(&rxkad_client_uid_mutex,
176                               (const pthread_mutexattr_t*)0)==0);
177     assert(pthread_mutex_init(&rxkad_random_mutex,
178                               (const pthread_mutexattr_t*)0)==0);
179     assert(pthread_mutex_init(&rxkad_stats_mutex,
180                               (const pthread_mutexattr_t*)0)==0);
181     assert(pthread_mutex_init(&rx_debug_mutex,
182                               (const pthread_mutexattr_t*)0)==0);
183
184     assert(pthread_cond_init(&rx_event_handler_cond,
185                               (const pthread_condattr_t*)0)==0);
186     assert(pthread_cond_init(&rx_listener_cond,
187                               (const pthread_condattr_t*)0)==0);
188     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
189 }
190
191 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
192 #define INIT_PTHREAD_LOCKS \
193 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
194 /*
195  * The rx_stats_mutex mutex protects the following global variables:
196  * rxi_dataQuota
197  * rxi_minDeficit
198  * rxi_availProcs
199  * rxi_totalMin
200  * rxi_lowConnRefCount
201  * rxi_lowPeerRefCount
202  * rxi_nCalls
203  * rxi_Alloccnt
204  * rxi_Allocsize
205  * rx_nFreePackets
206  * rx_tq_debug
207  * rx_stats
208  */
209 #else
210 #define INIT_PTHREAD_LOCKS
211 #endif
212
213 extern void rxi_DeleteCachedConnections(void);
214
215
216 /* Variables for handling the minProcs implementation.  availProcs gives the
217  * number of threads available in the pool at this moment (not counting dudes
218  * executing right now).  totalMin gives the total number of procs required
219  * for handling all minProcs requests.  minDeficit is a dynamic variable
220  * tracking the # of procs required to satisfy all of the remaining minProcs
221  * demands.
222  * For fine grain locking to work, the quota check and the reservation of
223  * a server thread has to come while rxi_availProcs and rxi_minDeficit
224  * are locked. To this end, the code has been modified under #ifdef
225  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
226  * same time. A new function, ReturnToServerPool() returns the allocation.
227  * 
228  * A call can be on several queue's (but only one at a time). When
229  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
230  * that no one else is touching the queue. To this end, we store the address
231  * of the queue lock in the call structure (under the call lock) when we
232  * put the call on a queue, and we clear the call_queue_lock when the
233  * call is removed from a queue (once the call lock has been obtained).
234  * This allows rxi_ResetCall to safely synchronize with others wishing
235  * to manipulate the queue.
236  */
237
238 extern void rxi_Delay(int);
239
240 static int rxi_ServerThreadSelectingCall;
241
242 #ifdef RX_ENABLE_LOCKS
243 static afs_kmutex_t rx_rpc_stats;
244 void rxi_StartUnlocked();
245 #endif
246
247 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
248 ** pretty good that the next packet coming in is from the same connection 
249 ** as the last packet, since we're send multiple packets in a transmit window.
250 */
251 struct rx_connection *rxLastConn; 
252
253 #ifdef RX_ENABLE_LOCKS
254 /* The locking hierarchy for rx fine grain locking is composed of five
255  * tiers:
256  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
257  * call->lock - locks call data fields.
258  * Most any other lock - these are all independent of each other.....
259  *      rx_freePktQ_lock
260  *      rx_freeCallQueue_lock
261  *      freeSQEList_lock
262  *      rx_connHashTable_lock
263  *      rx_serverPool_lock
264  *      rxi_keyCreate_lock
265  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
266
267  * lowest level:
268  *      peer_lock - locks peer data fields.
269  *      conn_data_lock - that more than one thread is not updating a conn data
270  *              field at the same time.
271  * Do we need a lock to protect the peer field in the conn structure?
272  *      conn->peer was previously a constant for all intents and so has no
273  *      lock protecting this field. The multihomed client delta introduced
274  *      a RX code change : change the peer field in the connection structure
275  *      to that remote inetrface from which the last packet for this
276  *      connection was sent out. This may become an issue if further changes
277  *      are made.
278  */
279 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
280 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
281 #ifdef RX_LOCKS_DB
282 /* rxdb_fileID is used to identify the lock location, along with line#. */
283 static int rxdb_fileID = RXDB_FILE_RX;
284 #endif /* RX_LOCKS_DB */
285 static void rxi_SetAcksInTransmitQueue();
286 void osirx_AssertMine(afs_kmutex_t *lockaddr, char *msg);
287 #else /* RX_ENABLE_LOCKS */
288 #define SET_CALL_QUEUE_LOCK(C, L)
289 #define CLEAR_CALL_QUEUE_LOCK(C)
290 #endif /* RX_ENABLE_LOCKS */
291 static void rxi_DestroyConnectionNoLock();
292 void rxi_DestroyConnection();
293 void rxi_CleanupConnection();
294 struct rx_serverQueueEntry *rx_waitForPacket = 0;
295
296 /* ------------Exported Interfaces------------- */
297
298 /* This function allows rxkad to set the epoch to a suitably random number
299  * which rx_NewConnection will use in the future.  The principle purpose is to
300  * get rxnull connections to use the same epoch as the rxkad connections do, at
301  * least once the first rxkad connection is established.  This is important now
302  * that the host/port addresses aren't used in FindConnection: the uniqueness
303  * of epoch/cid matters and the start time won't do. */
304
305 #ifdef AFS_PTHREAD_ENV
306 /*
307  * This mutex protects the following global variables:
308  * rx_epoch
309  */
310
311 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
312 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
313 #else
314 #define LOCK_EPOCH
315 #define UNLOCK_EPOCH
316 #endif /* AFS_PTHREAD_ENV */
317
318 void rx_SetEpoch (epoch)
319   afs_uint32 epoch;
320 {
321     LOCK_EPOCH
322     rx_epoch = epoch;
323     UNLOCK_EPOCH
324 }
325
326 /* Initialize rx.  A port number may be mentioned, in which case this
327  * becomes the default port number for any service installed later.
328  * If 0 is provided for the port number, a random port will be chosen
329  * by the kernel.  Whether this will ever overlap anything in
330  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
331  * error. */
332 static int rxinit_status = 1;
333 #ifdef AFS_PTHREAD_ENV
334 /*
335  * This mutex protects the following global variables:
336  * rxinit_status
337  */
338
339 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
340 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
341 #else
342 #define LOCK_RX_INIT
343 #define UNLOCK_RX_INIT
344 #endif
345
346 int rx_Init(u_int port)
347 {
348 #ifdef KERNEL
349     osi_timeval_t tv;
350 #else /* KERNEL */
351     struct timeval tv;
352 #endif /* KERNEL */
353     char *htable, *ptable;
354     int tmp_status;
355
356     SPLVAR;
357
358     INIT_PTHREAD_LOCKS
359     LOCK_RX_INIT
360     if (rxinit_status == 0) {
361         tmp_status = rxinit_status;
362         UNLOCK_RX_INIT
363         return tmp_status; /* Already started; return previous error code. */
364     }
365
366 #ifdef AFS_NT40_ENV
367     if (afs_winsockInit()<0)
368         return -1;
369 #endif
370
371 #ifndef KERNEL
372     /*
373      * Initialize anything necessary to provide a non-premptive threading
374      * environment.
375      */
376     rxi_InitializeThreadSupport();
377 #endif
378
379     /* Allocate and initialize a socket for client and perhaps server
380      * connections. */
381
382     rx_socket = rxi_GetUDPSocket((u_short)port); 
383     if (rx_socket == OSI_NULLSOCKET) {
384         UNLOCK_RX_INIT
385         return RX_ADDRINUSE;
386     }
387     
388
389 #ifdef  RX_ENABLE_LOCKS
390 #ifdef RX_LOCKS_DB
391     rxdb_init();
392 #endif /* RX_LOCKS_DB */
393     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex",MUTEX_DEFAULT,0);    
394     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats",MUTEX_DEFAULT,0);    
395     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock",MUTEX_DEFAULT,0);    
396     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock",MUTEX_DEFAULT,0);
397     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock",
398                MUTEX_DEFAULT,0);
399     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv",CV_DEFAULT, 0);
400     MUTEX_INIT(&rx_peerHashTable_lock,"rx_peerHashTable_lock",MUTEX_DEFAULT,0);
401     MUTEX_INIT(&rx_connHashTable_lock,"rx_connHashTable_lock",MUTEX_DEFAULT,0);
402     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
403 #ifndef KERNEL
404     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
405 #endif /* !KERNEL */
406     CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
407 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
408     if ( !uniprocessor )
409       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
410 #endif /* KERNEL && AFS_HPUX110_ENV */
411 #else /* RX_ENABLE_LOCKS */
412 #if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV)
413     mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
414 #endif /* AFS_GLOBAL_SUNLOCK */
415 #endif /* RX_ENABLE_LOCKS */
416
417     rxi_nCalls = 0;
418     rx_connDeadTime = 12;
419     rx_tranquil     = 0;        /* reset flag */
420     bzero((char *)&rx_stats, sizeof(struct rx_stats));
421     htable = (char *)
422         osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
423     PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *));  /* XXXXX */
424     bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
425     ptable =  (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));   
426     PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *));       /* XXXXX */
427     bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
428
429     /* Malloc up a bunch of packets & buffers */
430     rx_nFreePackets = 0;
431     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
432     queue_Init(&rx_freePacketQueue);
433     rxi_NeedMorePackets = FALSE;
434     rxi_MorePackets(rx_nPackets);
435     rx_CheckPackets();
436
437     NETPRI;
438     AFS_RXGLOCK();
439
440     clock_Init();
441
442 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
443     tv.tv_sec = clock_now.sec;
444     tv.tv_usec = clock_now.usec;
445     srand((unsigned int) tv.tv_usec);
446 #else
447     osi_GetTime(&tv);
448 #endif
449     if (port) {
450         rx_port = port;
451     } else {
452 #if defined(KERNEL) && !defined(UKERNEL)
453         /* Really, this should never happen in a real kernel */
454         rx_port = 0;
455 #else
456         struct sockaddr_in addr;
457         int addrlen = sizeof(addr);
458         if (getsockname((int)rx_socket, (struct sockaddr *) &addr, &addrlen)) {
459             rx_Finalize();
460             return -1;
461         }
462         rx_port = addr.sin_port;
463 #endif
464     }
465     rx_stats.minRtt.sec = 9999999;
466 #ifdef  KERNEL
467     rx_SetEpoch (tv.tv_sec | 0x80000000);
468 #else
469     rx_SetEpoch (tv.tv_sec);            /* Start time of this package, rxkad
470                                          * will provide a randomer value. */
471 #endif
472     MUTEX_ENTER(&rx_stats_mutex);
473     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
474     MUTEX_EXIT(&rx_stats_mutex);
475     /* *Slightly* random start time for the cid.  This is just to help
476      * out with the hashing function at the peer */
477     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
478     rx_connHashTable = (struct rx_connection **) htable;
479     rx_peerHashTable = (struct rx_peer **) ptable;
480
481     rx_lastAckDelay.sec = 0;
482     rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
483     rx_hardAckDelay.sec = 0;
484     rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
485     rx_softAckDelay.sec = 0;
486     rx_softAckDelay.usec = 100000; /* 100 milliseconds */
487
488     rxevent_Init(20, rxi_ReScheduleEvents);
489
490     /* Initialize various global queues */
491     queue_Init(&rx_idleServerQueue);
492     queue_Init(&rx_incomingCallQueue);
493     queue_Init(&rx_freeCallQueue);
494
495 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
496     /* Initialize our list of usable IP addresses. */
497     rx_GetIFInfo();
498 #endif
499
500     /* Start listener process (exact function is dependent on the
501      * implementation environment--kernel or user space) */
502     rxi_StartListener();
503
504     AFS_RXGUNLOCK();
505     USERPRI;
506     tmp_status = rxinit_status = 0;
507     UNLOCK_RX_INIT
508     return tmp_status;
509 }
510
511 /* called with unincremented nRequestsRunning to see if it is OK to start
512  * a new thread in this service.  Could be "no" for two reasons: over the
513  * max quota, or would prevent others from reaching their min quota.
514  */
515 #ifdef RX_ENABLE_LOCKS
516 /* This verion of QuotaOK reserves quota if it's ok while the
517  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
518  */
519 static int QuotaOK(aservice)
520 register struct rx_service *aservice;
521 {
522     /* check if over max quota */
523     if (aservice->nRequestsRunning >= aservice->maxProcs) {
524         return 0;
525     }
526
527     /* under min quota, we're OK */
528     /* otherwise, can use only if there are enough to allow everyone
529      * to go to their min quota after this guy starts.
530      */
531     MUTEX_ENTER(&rx_stats_mutex);
532     if ((aservice->nRequestsRunning < aservice->minProcs) ||
533          (rxi_availProcs > rxi_minDeficit)) {
534         aservice->nRequestsRunning++;
535         /* just started call in minProcs pool, need fewer to maintain
536          * guarantee */
537         if (aservice->nRequestsRunning <= aservice->minProcs)
538             rxi_minDeficit--;
539         rxi_availProcs--;
540         MUTEX_EXIT(&rx_stats_mutex);
541         return 1;
542     }
543     MUTEX_EXIT(&rx_stats_mutex);
544
545     return 0;
546 }
547 static void ReturnToServerPool(aservice)
548 register struct rx_service *aservice;
549 {
550     aservice->nRequestsRunning--;
551     MUTEX_ENTER(&rx_stats_mutex);
552     if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++;
553     rxi_availProcs++;
554     MUTEX_EXIT(&rx_stats_mutex);
555 }
556
557 #else /* RX_ENABLE_LOCKS */
558 static QuotaOK(aservice)
559 register struct rx_service *aservice; {
560     int rc=0;
561     /* under min quota, we're OK */
562     if (aservice->nRequestsRunning < aservice->minProcs) return 1;
563
564     /* check if over max quota */
565     if (aservice->nRequestsRunning >= aservice->maxProcs) return 0;
566
567     /* otherwise, can use only if there are enough to allow everyone
568      * to go to their min quota after this guy starts.
569      */
570     if (rxi_availProcs > rxi_minDeficit) rc = 1;
571     return rc;
572 }
573 #endif /* RX_ENABLE_LOCKS */
574
575 #ifndef KERNEL
576 /* Called by rx_StartServer to start up lwp's to service calls.
577    NExistingProcs gives the number of procs already existing, and which
578    therefore needn't be created. */
579 void rxi_StartServerProcs(nExistingProcs)
580     int nExistingProcs;
581 {
582     register struct rx_service *service;
583     register int i;
584     int maxdiff = 0;
585     int nProcs = 0;
586
587     /* For each service, reserve N processes, where N is the "minimum"
588        number of processes that MUST be able to execute a request in parallel,
589        at any time, for that process.  Also compute the maximum difference
590        between any service's maximum number of processes that can run
591        (i.e. the maximum number that ever will be run, and a guarantee
592        that this number will run if other services aren't running), and its
593        minimum number.  The result is the extra number of processes that
594        we need in order to provide the latter guarantee */
595     for (i=0; i<RX_MAX_SERVICES; i++) {
596         int diff;
597         service = rx_services[i];
598         if (service == (struct rx_service *) 0) break;
599         nProcs += service->minProcs;
600         diff = service->maxProcs - service->minProcs;
601         if (diff > maxdiff) maxdiff = diff;
602     }
603     nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
604     nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */
605     for (i = 0; i<nProcs; i++) {
606         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
607     }
608 }
609 #endif /* KERNEL */
610
611 /* This routine must be called if any services are exported.  If the
612  * donateMe flag is set, the calling process is donated to the server
613  * process pool */
614 void rx_StartServer(donateMe)
615 {
616     register struct rx_service *service;
617     register int i;
618     SPLVAR;
619     clock_NewTime();
620
621     NETPRI;
622     AFS_RXGLOCK();
623     /* Start server processes, if necessary (exact function is dependent
624      * on the implementation environment--kernel or user space).  DonateMe
625      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
626      * case, one less new proc will be created rx_StartServerProcs.
627      */
628     rxi_StartServerProcs(donateMe);
629
630     /* count up the # of threads in minProcs, and add set the min deficit to
631      * be that value, too.
632      */
633     for (i=0; i<RX_MAX_SERVICES; i++) {
634         service = rx_services[i];
635         if (service == (struct rx_service *) 0) break;
636         MUTEX_ENTER(&rx_stats_mutex);
637         rxi_totalMin += service->minProcs;
638         /* below works even if a thread is running, since minDeficit would
639          * still have been decremented and later re-incremented.
640          */
641         rxi_minDeficit += service->minProcs;
642         MUTEX_EXIT(&rx_stats_mutex);
643     }
644
645     /* Turn on reaping of idle server connections */
646     rxi_ReapConnections();
647
648     AFS_RXGUNLOCK();
649     USERPRI;
650
651     if (donateMe) rx_ServerProc(); /* Never returns */
652     return;
653 }
654
655 /* Create a new client connection to the specified service, using the
656  * specified security object to implement the security model for this
657  * connection. */
658 struct rx_connection *
659 rx_NewConnection(shost, sport, sservice, securityObject, serviceSecurityIndex)
660     register afs_uint32 shost;      /* Server host */
661     u_short sport;                  /* Server port */
662     u_short sservice;               /* Server service id */
663     register struct rx_securityClass *securityObject;
664     int serviceSecurityIndex;
665 {
666     int hashindex;
667     afs_int32 cid;
668     register struct rx_connection *conn;
669
670     SPLVAR;
671
672     clock_NewTime();
673     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n",
674           shost, sport, sservice, securityObject, serviceSecurityIndex));
675
676     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
677      * the case of kmem_alloc? */
678     conn = rxi_AllocConnection();
679 #ifdef  RX_ENABLE_LOCKS
680     MUTEX_INIT(&conn->conn_call_lock, "conn call lock",MUTEX_DEFAULT,0);
681     MUTEX_INIT(&conn->conn_data_lock, "conn call lock",MUTEX_DEFAULT,0);
682     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
683 #endif
684     NETPRI;
685     AFS_RXGLOCK();
686     MUTEX_ENTER(&rx_connHashTable_lock);
687     cid = (rx_nextCid += RX_MAXCALLS);
688     conn->type = RX_CLIENT_CONNECTION;
689     conn->cid = cid;
690     conn->epoch = rx_epoch;
691     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
692     conn->serviceId = sservice;
693     conn->securityObject = securityObject;
694     /* This doesn't work in all compilers with void (they're buggy), so fake it
695      * with VOID */
696     conn->securityData = (VOID *) 0;
697     conn->securityIndex = serviceSecurityIndex;
698     rx_SetConnDeadTime(conn, rx_connDeadTime);
699     conn->ackRate = RX_FAST_ACK_RATE;
700     conn->nSpecific = 0;
701     conn->specific = NULL;
702     conn->challengeEvent = (struct rxevent *)0;
703     conn->delayedAbortEvent = (struct rxevent *)0;
704     conn->abortCount = 0;
705     conn->error = 0;
706
707     RXS_NewConnection(securityObject, conn);
708     hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
709     
710     conn->refCount++; /* no lock required since only this thread knows... */
711     conn->next = rx_connHashTable[hashindex];
712     rx_connHashTable[hashindex] = conn;
713     MUTEX_ENTER(&rx_stats_mutex);
714     rx_stats.nClientConns++;
715     MUTEX_EXIT(&rx_stats_mutex);
716
717     MUTEX_EXIT(&rx_connHashTable_lock);
718     AFS_RXGUNLOCK();
719     USERPRI;
720     return conn;
721 }
722
723 void rx_SetConnDeadTime(conn, seconds)
724     register struct rx_connection *conn;
725     register int seconds;
726 {
727     /* The idea is to set the dead time to a value that allows several
728      * keepalives to be dropped without timing out the connection. */
729     conn->secondsUntilDead = MAX(seconds, 6);
730     conn->secondsUntilPing = conn->secondsUntilDead/6;
731 }
732
733 int rxi_lowPeerRefCount = 0;
734 int rxi_lowConnRefCount = 0;
735
736 /*
737  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
738  * NOTE: must not be called with rx_connHashTable_lock held.
739  */
740 void rxi_CleanupConnection(conn)
741     struct rx_connection *conn;
742 {
743     int i;
744
745     /* Notify the service exporter, if requested, that this connection
746      * is being destroyed */
747     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
748       (*conn->service->destroyConnProc)(conn);
749
750     /* Notify the security module that this connection is being destroyed */
751     RXS_DestroyConnection(conn->securityObject, conn);
752
753     /* If this is the last connection using the rx_peer struct, set its
754      * idle time to now. rxi_ReapConnections will reap it if it's still
755      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
756      */
757     MUTEX_ENTER(&rx_peerHashTable_lock);
758     if (--conn->peer->refCount <= 0) {
759         conn->peer->idleWhen = clock_Sec();
760         if (conn->peer->refCount < 0) {
761             conn->peer->refCount = 0; 
762             MUTEX_ENTER(&rx_stats_mutex);
763             rxi_lowPeerRefCount ++;
764             MUTEX_EXIT(&rx_stats_mutex);
765         }
766     }
767     MUTEX_EXIT(&rx_peerHashTable_lock);
768
769     MUTEX_ENTER(&rx_stats_mutex);
770     if (conn->type == RX_SERVER_CONNECTION)
771       rx_stats.nServerConns--;
772     else
773       rx_stats.nClientConns--;
774     MUTEX_EXIT(&rx_stats_mutex);
775
776 #ifndef KERNEL
777     if (conn->specific) {
778         for (i = 0 ; i < conn->nSpecific ; i++) {
779             if (conn->specific[i] && rxi_keyCreate_destructor[i])
780                 (*rxi_keyCreate_destructor[i])(conn->specific[i]);
781             conn->specific[i] = NULL;
782         }
783         free(conn->specific);
784     }
785     conn->specific = NULL;
786     conn->nSpecific = 0;
787 #endif /* !KERNEL */
788
789     MUTEX_DESTROY(&conn->conn_call_lock);
790     MUTEX_DESTROY(&conn->conn_data_lock);
791     CV_DESTROY(&conn->conn_call_cv);
792         
793     rxi_FreeConnection(conn);
794 }
795
796 /* Destroy the specified connection */
797 void rxi_DestroyConnection(conn)
798     register struct rx_connection *conn;
799 {
800     MUTEX_ENTER(&rx_connHashTable_lock);
801     rxi_DestroyConnectionNoLock(conn);
802     /* conn should be at the head of the cleanup list */
803     if (conn == rx_connCleanup_list) {
804         rx_connCleanup_list = rx_connCleanup_list->next;
805         MUTEX_EXIT(&rx_connHashTable_lock);
806         rxi_CleanupConnection(conn);
807     }
808 #ifdef RX_ENABLE_LOCKS
809     else {
810         MUTEX_EXIT(&rx_connHashTable_lock);
811     }
812 #endif /* RX_ENABLE_LOCKS */
813 }
814     
815 static void rxi_DestroyConnectionNoLock(conn)
816     register struct rx_connection *conn;
817 {
818     register struct rx_connection **conn_ptr;
819     register int havecalls = 0;
820     struct rx_packet *packet;
821     int i;
822     SPLVAR;
823
824     clock_NewTime();
825
826     NETPRI;
827     MUTEX_ENTER(&conn->conn_data_lock);
828     if (conn->refCount > 0)
829         conn->refCount--;
830     else {
831         MUTEX_ENTER(&rx_stats_mutex);
832         rxi_lowConnRefCount++;
833         MUTEX_EXIT(&rx_stats_mutex);
834     }
835
836     if (conn->refCount > 0) {
837         /* Busy; wait till the last guy before proceeding */
838         MUTEX_EXIT(&conn->conn_data_lock);
839         USERPRI;
840         return;
841     }
842
843     /* If the client previously called rx_NewCall, but it is still
844      * waiting, treat this as a running call, and wait to destroy the
845      * connection later when the call completes. */
846     if ((conn->type == RX_CLIENT_CONNECTION) &&
847         (conn->flags & RX_CONN_MAKECALL_WAITING)) {
848         conn->flags |= RX_CONN_DESTROY_ME;
849         MUTEX_EXIT(&conn->conn_data_lock);
850         USERPRI;
851         return;
852     }
853     MUTEX_EXIT(&conn->conn_data_lock);
854
855     /* Check for extant references to this connection */
856     for (i = 0; i<RX_MAXCALLS; i++) {
857         register struct rx_call *call = conn->call[i];
858         if (call) {
859             havecalls = 1;
860             if (conn->type == RX_CLIENT_CONNECTION) {
861                 MUTEX_ENTER(&call->lock);
862                 if (call->delayedAckEvent) {
863                     /* Push the final acknowledgment out now--there
864                      * won't be a subsequent call to acknowledge the
865                      * last reply packets */
866                     rxevent_Cancel(call->delayedAckEvent, call,
867                                    RX_CALL_REFCOUNT_DELAY);
868                     rxi_AckAll((struct rxevent *)0, call, 0);
869                 }
870                 MUTEX_EXIT(&call->lock);
871             }
872         }
873     }
874 #ifdef RX_ENABLE_LOCKS
875     if (!havecalls) {
876         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
877             MUTEX_EXIT(&conn->conn_data_lock);
878         }
879         else {
880             /* Someone is accessing a packet right now. */
881             havecalls = 1;
882         }
883     }
884 #endif /* RX_ENABLE_LOCKS */
885
886     if (havecalls) {
887         /* Don't destroy the connection if there are any call
888          * structures still in use */
889         MUTEX_ENTER(&conn->conn_data_lock);
890         conn->flags |= RX_CONN_DESTROY_ME;
891         MUTEX_EXIT(&conn->conn_data_lock);
892         USERPRI;
893         return;
894     }
895
896     if (conn->delayedAbortEvent) {
897         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
898         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
899         if (packet) {
900             MUTEX_ENTER(&conn->conn_data_lock);
901             rxi_SendConnectionAbort(conn, packet, 0, 1);
902             MUTEX_EXIT(&conn->conn_data_lock);
903             rxi_FreePacket(packet);
904         }
905     }
906
907     /* Remove from connection hash table before proceeding */
908     conn_ptr = & rx_connHashTable[ CONN_HASH(peer->host, peer->port, conn->cid,
909                                              conn->epoch, conn->type) ];
910     for ( ; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
911         if (*conn_ptr == conn) {
912             *conn_ptr = conn->next;
913             break;
914         }
915     }
916     /* if the conn that we are destroying was the last connection, then we
917     * clear rxLastConn as well */
918     if ( rxLastConn == conn )
919         rxLastConn = 0;
920
921     /* Make sure the connection is completely reset before deleting it. */
922     /* get rid of pending events that could zap us later */
923     if (conn->challengeEvent) {
924         rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
925     }
926  
927     /* Add the connection to the list of destroyed connections that
928      * need to be cleaned up. This is necessary to avoid deadlocks
929      * in the routines we call to inform others that this connection is
930      * being destroyed. */
931     conn->next = rx_connCleanup_list;
932     rx_connCleanup_list = conn;
933 }
934
935 /* Externally available version */
936 void rx_DestroyConnection(conn) 
937     register struct rx_connection *conn;
938 {
939     SPLVAR;
940
941     NETPRI;
942     AFS_RXGLOCK();
943     rxi_DestroyConnection (conn);
944     AFS_RXGUNLOCK();
945     USERPRI;
946 }
947
948 /* Start a new rx remote procedure call, on the specified connection.
949  * If wait is set to 1, wait for a free call channel; otherwise return
950  * 0.  Maxtime gives the maximum number of seconds this call may take,
951  * after rx_MakeCall returns.  After this time interval, a call to any
952  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
953  * For fine grain locking, we hold the conn_call_lock in order to 
954  * to ensure that we don't get signalle after we found a call in an active
955  * state and before we go to sleep.
956  */
957 struct rx_call *rx_NewCall(conn)
958     register struct rx_connection *conn;
959 {
960     register int i;
961     register struct rx_call *call;
962     struct clock queueTime;
963     SPLVAR;
964
965     clock_NewTime();
966     dpf (("rx_MakeCall(conn %x)\n", conn));
967
968     NETPRI;
969     clock_GetTime(&queueTime);
970     AFS_RXGLOCK();
971     MUTEX_ENTER(&conn->conn_call_lock);
972     for (;;) {
973         for (i=0; i<RX_MAXCALLS; i++) {
974             call = conn->call[i];
975             if (call) {
976                 MUTEX_ENTER(&call->lock);
977                 if (call->state == RX_STATE_DALLY) {
978                     rxi_ResetCall(call, 0);
979                     (*call->callNumber)++;
980                     break;
981                 }
982                 MUTEX_EXIT(&call->lock);
983             }
984             else {
985                 call = rxi_NewCall(conn, i);
986                 MUTEX_ENTER(&call->lock);
987                 break;
988             }
989         }
990         if (i < RX_MAXCALLS) {
991             break;
992         }
993         MUTEX_ENTER(&conn->conn_data_lock);
994         conn->flags |= RX_CONN_MAKECALL_WAITING;
995         MUTEX_EXIT(&conn->conn_data_lock);
996 #ifdef  RX_ENABLE_LOCKS
997         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
998 #else
999         osi_rxSleep(conn);
1000 #endif
1001     }
1002
1003     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1004
1005     /* Client is initially in send mode */
1006     call->state = RX_STATE_ACTIVE;
1007     call->mode = RX_MODE_SENDING;
1008
1009     /* remember start time for call in case we have hard dead time limit */
1010     call->queueTime = queueTime;
1011     clock_GetTime(&call->startTime);
1012     hzero(call->bytesSent);
1013     hzero(call->bytesRcvd);
1014
1015     /* Turn on busy protocol. */
1016     rxi_KeepAliveOn(call);
1017
1018     MUTEX_EXIT(&call->lock);
1019     MUTEX_EXIT(&conn->conn_call_lock);
1020     AFS_RXGUNLOCK();
1021     USERPRI;
1022
1023 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1024     /* Now, if TQ wasn't cleared earlier, do it now. */
1025     AFS_RXGLOCK();
1026     MUTEX_ENTER(&call->lock);
1027     while (call->flags & RX_CALL_TQ_BUSY) {
1028         call->flags |= RX_CALL_TQ_WAIT;
1029 #ifdef RX_ENABLE_LOCKS
1030         CV_WAIT(&call->cv_tq, &call->lock);
1031 #else /* RX_ENABLE_LOCKS */
1032         osi_rxSleep(&call->tq);
1033 #endif /* RX_ENABLE_LOCKS */
1034     }
1035     if (call->flags & RX_CALL_TQ_CLEARME) {
1036         rxi_ClearTransmitQueue(call, 0);
1037         queue_Init(&call->tq);
1038     }
1039     MUTEX_EXIT(&call->lock);
1040     AFS_RXGUNLOCK();
1041 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1042
1043     return call;
1044 }
1045
1046 rxi_HasActiveCalls(aconn)
1047 register struct rx_connection *aconn; {
1048     register int i;
1049     register struct rx_call *tcall;
1050     SPLVAR;
1051
1052     NETPRI;
1053     for(i=0; i<RX_MAXCALLS; i++) {
1054       if (tcall = aconn->call[i]) {
1055         if ((tcall->state == RX_STATE_ACTIVE) 
1056             || (tcall->state == RX_STATE_PRECALL)) {
1057           USERPRI;
1058           return 1;
1059         }
1060       }
1061     }
1062     USERPRI;
1063     return 0;
1064 }
1065
1066 rxi_GetCallNumberVector(aconn, aint32s)
1067 register struct rx_connection *aconn;
1068 register afs_int32 *aint32s; {
1069     register int i;
1070     register struct rx_call *tcall;
1071     SPLVAR;
1072
1073     NETPRI;
1074     for(i=0; i<RX_MAXCALLS; i++) {
1075         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1076             aint32s[i] = aconn->callNumber[i]+1;
1077         else
1078             aint32s[i] = aconn->callNumber[i];
1079     }
1080     USERPRI;
1081     return 0;
1082 }
1083
1084 rxi_SetCallNumberVector(aconn, aint32s)
1085 register struct rx_connection *aconn;
1086 register afs_int32 *aint32s; {
1087     register int i;
1088     register struct rx_call *tcall;
1089     SPLVAR;
1090
1091     NETPRI;
1092     for(i=0; i<RX_MAXCALLS; i++) {
1093         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1094             aconn->callNumber[i] = aint32s[i] - 1;
1095         else
1096             aconn->callNumber[i] = aint32s[i];
1097     }
1098     USERPRI;
1099     return 0;
1100 }
1101
1102 /* Advertise a new service.  A service is named locally by a UDP port
1103  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1104  * on a failure. */
1105 struct rx_service *
1106 rx_NewService(port, serviceId, serviceName, securityObjects,
1107               nSecurityObjects, serviceProc)
1108     u_short port;
1109     u_short serviceId;
1110     char *serviceName;  /* Name for identification purposes (e.g. the
1111                          * service name might be used for probing for
1112                          * statistics) */
1113     struct rx_securityClass **securityObjects;
1114     int nSecurityObjects;
1115     afs_int32 (*serviceProc)();
1116 {    
1117     osi_socket socket = OSI_NULLSOCKET;
1118     register struct rx_service *tservice;    
1119     register int i;
1120     SPLVAR;
1121
1122     clock_NewTime();
1123
1124     if (serviceId == 0) {
1125         (osi_Msg "rx_NewService:  service id for service %s is not non-zero.\n",
1126          serviceName);
1127         return 0;
1128     }
1129     if (port == 0) {
1130         if (rx_port == 0) {
1131             (osi_Msg "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n", serviceName);
1132             return 0;
1133         }
1134         port = rx_port;
1135         socket = rx_socket;
1136     }
1137
1138     tservice = rxi_AllocService();
1139     NETPRI;
1140     AFS_RXGLOCK();
1141     for (i = 0; i<RX_MAX_SERVICES; i++) {
1142         register struct rx_service *service = rx_services[i];
1143         if (service) {
1144             if (port == service->servicePort) {
1145                 if (service->serviceId == serviceId) {
1146                     /* The identical service has already been
1147                      * installed; if the caller was intending to
1148                      * change the security classes used by this
1149                      * service, he/she loses. */
1150                     (osi_Msg "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n", serviceName, serviceId, service->serviceName);
1151                     AFS_RXGUNLOCK();
1152                     USERPRI;
1153                     rxi_FreeService(tservice);
1154                     return service;
1155                 }
1156                 /* Different service, same port: re-use the socket
1157                  * which is bound to the same port */
1158                 socket = service->socket;
1159             }
1160         } else {
1161             if (socket == OSI_NULLSOCKET) {
1162                 /* If we don't already have a socket (from another
1163                  * service on same port) get a new one */
1164                 socket = rxi_GetUDPSocket(port);
1165                 if (socket == OSI_NULLSOCKET) {
1166                     AFS_RXGUNLOCK();
1167                     USERPRI;
1168                     rxi_FreeService(tservice);
1169                     return 0;
1170                 }
1171             }
1172             service = tservice;
1173             service->socket = socket;
1174             service->servicePort = port;
1175             service->serviceId = serviceId;
1176             service->serviceName = serviceName;
1177             service->nSecurityObjects = nSecurityObjects;
1178             service->securityObjects = securityObjects;
1179             service->minProcs = 0;
1180             service->maxProcs = 1;
1181             service->idleDeadTime = 60;
1182             service->connDeadTime = rx_connDeadTime;
1183             service->executeRequestProc = serviceProc;
1184             rx_services[i] = service;   /* not visible until now */
1185             AFS_RXGUNLOCK();
1186             USERPRI;
1187             return service;
1188         }
1189     }
1190     AFS_RXGUNLOCK();
1191     USERPRI;
1192     rxi_FreeService(tservice);
1193     (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
1194     return 0;
1195 }
1196
1197 /* Generic request processing loop. This routine should be called
1198  * by the implementation dependent rx_ServerProc. If socketp is
1199  * non-null, it will be set to the file descriptor that this thread
1200  * is now listening on. If socketp is null, this routine will never
1201  * returns. */
1202 void rxi_ServerProc(threadID, newcall, socketp)
1203 int threadID;
1204 struct rx_call *newcall;
1205 osi_socket *socketp;
1206 {
1207     register struct rx_call *call;
1208     register afs_int32 code;
1209     register struct rx_service *tservice = NULL;
1210
1211     for (;;) {
1212         if (newcall) {
1213             call = newcall;
1214             newcall = NULL;
1215         } else {
1216             call = rx_GetCall(threadID, tservice, socketp);
1217             if (socketp && *socketp != OSI_NULLSOCKET) {
1218                 /* We are now a listener thread */
1219                 return;
1220             }
1221         }
1222
1223         /* if server is restarting( typically smooth shutdown) then do not
1224          * allow any new calls.
1225          */
1226
1227         if ( rx_tranquil && (call != NULL) ) {
1228             SPLVAR;
1229
1230             NETPRI;
1231             AFS_RXGLOCK();
1232             MUTEX_ENTER(&call->lock);
1233
1234             rxi_CallError(call, RX_RESTARTING);
1235             rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1236
1237             MUTEX_EXIT(&call->lock);
1238             AFS_RXGUNLOCK();
1239             USERPRI;
1240         }
1241
1242 #ifdef  KERNEL
1243         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1244 #ifdef RX_ENABLE_LOCKS
1245             AFS_GLOCK();
1246 #endif /* RX_ENABLE_LOCKS */
1247             afs_termState = AFSOP_STOP_AFS;
1248             afs_osi_Wakeup(&afs_termState);
1249 #ifdef RX_ENABLE_LOCKS
1250             AFS_GUNLOCK();
1251 #endif /* RX_ENABLE_LOCKS */
1252             return;
1253         }
1254 #endif
1255
1256         tservice = call->conn->service;
1257
1258         if (tservice->beforeProc) (*tservice->beforeProc)(call);
1259
1260         code = call->conn->service->executeRequestProc(call);
1261
1262         if (tservice->afterProc) (*tservice->afterProc)(call, code);
1263
1264         rx_EndCall(call, code);
1265         MUTEX_ENTER(&rx_stats_mutex);
1266         rxi_nCalls++;
1267         MUTEX_EXIT(&rx_stats_mutex);
1268     }
1269 }
1270
1271
1272 void rx_WakeupServerProcs()
1273 {
1274     struct rx_serverQueueEntry *np, *tqp;
1275     SPLVAR;
1276
1277     NETPRI;
1278     AFS_RXGLOCK();
1279     MUTEX_ENTER(&rx_serverPool_lock);
1280
1281 #ifdef RX_ENABLE_LOCKS
1282     if (rx_waitForPacket)
1283         CV_BROADCAST(&rx_waitForPacket->cv);
1284 #else /* RX_ENABLE_LOCKS */
1285     if (rx_waitForPacket)
1286         osi_rxWakeup(rx_waitForPacket);
1287 #endif /* RX_ENABLE_LOCKS */
1288     MUTEX_ENTER(&freeSQEList_lock);
1289     for (np = rx_FreeSQEList; np; np = tqp) {
1290       tqp = *(struct rx_serverQueueEntry **)np;
1291 #ifdef RX_ENABLE_LOCKS
1292       CV_BROADCAST(&np->cv);
1293 #else /* RX_ENABLE_LOCKS */
1294       osi_rxWakeup(np);
1295 #endif /* RX_ENABLE_LOCKS */
1296     }
1297     MUTEX_EXIT(&freeSQEList_lock);
1298     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1299 #ifdef RX_ENABLE_LOCKS
1300       CV_BROADCAST(&np->cv);
1301 #else /* RX_ENABLE_LOCKS */
1302       osi_rxWakeup(np);
1303 #endif /* RX_ENABLE_LOCKS */
1304     }
1305     MUTEX_EXIT(&rx_serverPool_lock);
1306     AFS_RXGUNLOCK();
1307     USERPRI;
1308 }
1309
1310 /* meltdown:
1311  * One thing that seems to happen is that all the server threads get
1312  * tied up on some empty or slow call, and then a whole bunch of calls
1313  * arrive at once, using up the packet pool, so now there are more 
1314  * empty calls.  The most critical resources here are server threads
1315  * and the free packet pool.  The "doreclaim" code seems to help in
1316  * general.  I think that eventually we arrive in this state: there
1317  * are lots of pending calls which do have all their packets present,
1318  * so they won't be reclaimed, are multi-packet calls, so they won't
1319  * be scheduled until later, and thus are tying up most of the free 
1320  * packet pool for a very long time.
1321  * future options:
1322  * 1.  schedule multi-packet calls if all the packets are present.  
1323  * Probably CPU-bound operation, useful to return packets to pool. 
1324  * Do what if there is a full window, but the last packet isn't here?
1325  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1326  * it sleeps and waits for that type of call.
1327  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1328  * the current dataquota business is badly broken.  The quota isn't adjusted
1329  * to reflect how many packets are presently queued for a running call.
1330  * So, when we schedule a queued call with a full window of packets queued
1331  * up for it, that *should* free up a window full of packets for other 2d-class
1332  * calls to be able to use from the packet pool.  But it doesn't.
1333  *
1334  * NB.  Most of the time, this code doesn't run -- since idle server threads
1335  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1336  * as a new call arrives.
1337  */
1338 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1339  * for an rx_Read. */
1340 #ifdef RX_ENABLE_LOCKS
1341 struct rx_call *
1342 rx_GetCall(tno, cur_service, socketp)
1343 int tno;
1344 struct rx_service *cur_service;
1345 osi_socket *socketp;
1346 {
1347     struct rx_serverQueueEntry *sq;
1348     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1349     struct rx_service *service;
1350     SPLVAR;
1351
1352     MUTEX_ENTER(&freeSQEList_lock);
1353
1354     if (sq = rx_FreeSQEList) {
1355         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1356         MUTEX_EXIT(&freeSQEList_lock);
1357     } else {    /* otherwise allocate a new one and return that */
1358         MUTEX_EXIT(&freeSQEList_lock);
1359         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1360         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1361         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1362     }
1363
1364     MUTEX_ENTER(&rx_serverPool_lock);
1365     if (cur_service != NULL) {
1366         ReturnToServerPool(cur_service);
1367     }
1368     while (1) {
1369         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1370             register struct rx_call *tcall, *ncall;
1371             choice2 = (struct rx_call *) 0;
1372             /* Scan for eligible incoming calls.  A call is not eligible
1373              * if the maximum number of calls for its service type are
1374              * already executing */
1375             /* One thread will process calls FCFS (to prevent starvation),
1376              * while the other threads may run ahead looking for calls which
1377              * have all their input data available immediately.  This helps 
1378              * keep threads from blocking, waiting for data from the client. */
1379             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1380               service = tcall->conn->service;
1381               if (!QuotaOK(service)) {
1382                 continue;
1383               }
1384               if (!tno || !tcall->queue_item_header.next  ) {
1385                 /* If we're thread 0, then  we'll just use 
1386                  * this call. If we haven't been able to find an optimal 
1387                  * choice, and we're at the end of the list, then use a 
1388                  * 2d choice if one has been identified.  Otherwise... */
1389                 call = (choice2 ? choice2 : tcall);
1390                 service = call->conn->service;
1391               } else if (!queue_IsEmpty(&tcall->rq)) {
1392                 struct rx_packet *rp;
1393                 rp = queue_First(&tcall->rq, rx_packet);
1394                 if (rp->header.seq == 1) {
1395                   if (!meltdown_1pkt ||             
1396                       (rp->header.flags & RX_LAST_PACKET)) {
1397                     call = tcall;
1398                   } else if (rxi_2dchoice && !choice2 &&
1399                              !(tcall->flags & RX_CALL_CLEARED) &&
1400                              (tcall->rprev > rxi_HardAckRate)) {
1401                     choice2 = tcall;
1402                   } else rxi_md2cnt++;
1403                 }
1404               }
1405               if (call)  {
1406                 break;
1407               } else {
1408                   ReturnToServerPool(service);
1409               }
1410             }
1411           }
1412
1413         if (call) {
1414             queue_Remove(call);
1415             rxi_ServerThreadSelectingCall = 1;
1416             MUTEX_EXIT(&rx_serverPool_lock);
1417             MUTEX_ENTER(&call->lock);
1418             MUTEX_ENTER(&rx_serverPool_lock);
1419
1420             if (queue_IsEmpty(&call->rq) ||
1421                 queue_First(&call->rq, rx_packet)->header.seq != 1)
1422               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1423
1424             CLEAR_CALL_QUEUE_LOCK(call);
1425             if (call->error) {
1426                 MUTEX_EXIT(&call->lock);
1427                 ReturnToServerPool(service);
1428                 rxi_ServerThreadSelectingCall = 0;
1429                 CV_SIGNAL(&rx_serverPool_cv);
1430                 call = (struct rx_call*)0;
1431                 continue;
1432             }
1433             call->flags &= (~RX_CALL_WAIT_PROC);
1434             MUTEX_ENTER(&rx_stats_mutex);
1435             rx_nWaiting--;
1436             MUTEX_EXIT(&rx_stats_mutex);
1437             rxi_ServerThreadSelectingCall = 0;
1438             CV_SIGNAL(&rx_serverPool_cv);
1439             MUTEX_EXIT(&rx_serverPool_lock);
1440             break;
1441         }
1442         else {
1443             /* If there are no eligible incoming calls, add this process
1444              * to the idle server queue, to wait for one */
1445             sq->newcall = 0;
1446             sq->tno = tno;
1447             if (socketp) {
1448                 *socketp = OSI_NULLSOCKET;
1449             }
1450             sq->socketp = socketp;
1451             queue_Append(&rx_idleServerQueue, sq);
1452 #ifndef AFS_AIX41_ENV
1453             rx_waitForPacket = sq;
1454 #endif /* AFS_AIX41_ENV */
1455             do {
1456                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1457 #ifdef  KERNEL
1458                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1459                     MUTEX_EXIT(&rx_serverPool_lock);
1460                     return (struct rx_call *)0;
1461                 }
1462 #endif
1463             } while (!(call = sq->newcall) &&
1464                      !(socketp && *socketp != OSI_NULLSOCKET));
1465             MUTEX_EXIT(&rx_serverPool_lock);
1466             if (call) {
1467                 MUTEX_ENTER(&call->lock);
1468             }
1469             break;
1470         }
1471     }
1472
1473     MUTEX_ENTER(&freeSQEList_lock);
1474     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1475     rx_FreeSQEList = sq;
1476     MUTEX_EXIT(&freeSQEList_lock);
1477
1478     if (call) {
1479         clock_GetTime(&call->startTime);
1480         call->state = RX_STATE_ACTIVE;
1481         call->mode = RX_MODE_RECEIVING;
1482
1483         rxi_calltrace(RX_CALL_START, call);
1484         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1485              call->conn->service->servicePort, 
1486              call->conn->service->serviceId, call));
1487
1488         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1489         MUTEX_EXIT(&call->lock);
1490     } else {
1491         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1492     }
1493
1494     return call;
1495 }
1496 #else /* RX_ENABLE_LOCKS */
1497 struct rx_call *
1498 rx_GetCall(tno, cur_service, socketp)
1499   int tno;
1500   struct rx_service *cur_service;
1501   osi_socket *socketp;
1502 {
1503     struct rx_serverQueueEntry *sq;
1504     register struct rx_call *call = (struct rx_call *) 0, *choice2;
1505     struct rx_service *service;
1506     SPLVAR;
1507
1508     NETPRI;
1509     AFS_RXGLOCK();
1510     MUTEX_ENTER(&freeSQEList_lock);
1511
1512     if (sq = rx_FreeSQEList) {
1513         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1514         MUTEX_EXIT(&freeSQEList_lock);
1515     } else {    /* otherwise allocate a new one and return that */
1516         MUTEX_EXIT(&freeSQEList_lock);
1517         sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1518         MUTEX_INIT(&sq->lock, "server Queue lock",MUTEX_DEFAULT,0);     
1519         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1520     }
1521     MUTEX_ENTER(&sq->lock);
1522
1523     if (cur_service != NULL) {
1524         cur_service->nRequestsRunning--;
1525         if (cur_service->nRequestsRunning < cur_service->minProcs)
1526             rxi_minDeficit++;
1527         rxi_availProcs++;
1528     }
1529     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1530         register struct rx_call *tcall, *ncall;
1531         /* Scan for eligible incoming calls.  A call is not eligible
1532          * if the maximum number of calls for its service type are
1533          * already executing */
1534         /* One thread will process calls FCFS (to prevent starvation),
1535          * while the other threads may run ahead looking for calls which
1536          * have all their input data available immediately.  This helps 
1537          * keep threads from blocking, waiting for data from the client. */
1538         choice2 = (struct rx_call *) 0;
1539         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1540           service = tcall->conn->service;
1541           if (QuotaOK(service)) {
1542              if (!tno || !tcall->queue_item_header.next  ) {
1543                  /* If we're thread 0, then  we'll just use 
1544                   * this call. If we haven't been able to find an optimal 
1545                   * choice, and we're at the end of the list, then use a 
1546                   * 2d choice if one has been identified.  Otherwise... */
1547                  call = (choice2 ? choice2 : tcall);
1548                  service = call->conn->service;
1549              } else if (!queue_IsEmpty(&tcall->rq)) {
1550                  struct rx_packet *rp;
1551                  rp = queue_First(&tcall->rq, rx_packet);
1552                  if (rp->header.seq == 1
1553                      && (!meltdown_1pkt ||
1554                          (rp->header.flags & RX_LAST_PACKET))) {
1555                      call = tcall;
1556                  } else if (rxi_2dchoice && !choice2 &&
1557                             !(tcall->flags & RX_CALL_CLEARED) &&
1558                             (tcall->rprev > rxi_HardAckRate)) {
1559                      choice2 = tcall;
1560                  } else rxi_md2cnt++;
1561              }
1562           }
1563           if (call) 
1564              break;
1565         }
1566       }
1567
1568     if (call) {
1569         queue_Remove(call);
1570         /* we can't schedule a call if there's no data!!! */
1571         /* send an ack if there's no data, if we're missing the
1572          * first packet, or we're missing something between first 
1573          * and last -- there's a "hole" in the incoming data. */
1574         if (queue_IsEmpty(&call->rq) ||
1575             queue_First(&call->rq, rx_packet)->header.seq != 1 ||
1576             call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1577           rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
1578
1579         call->flags &= (~RX_CALL_WAIT_PROC);
1580         service->nRequestsRunning++;
1581         /* just started call in minProcs pool, need fewer to maintain
1582          * guarantee */
1583         if (service->nRequestsRunning <= service->minProcs)
1584             rxi_minDeficit--;
1585         rxi_availProcs--;
1586         rx_nWaiting--;
1587         /* MUTEX_EXIT(&call->lock); */
1588     }
1589     else {
1590         /* If there are no eligible incoming calls, add this process
1591          * to the idle server queue, to wait for one */
1592         sq->newcall = 0;
1593         if (socketp) {
1594             *socketp = OSI_NULLSOCKET;
1595         }
1596         sq->socketp = socketp;
1597         queue_Append(&rx_idleServerQueue, sq);
1598         do {
1599             osi_rxSleep(sq);
1600 #ifdef  KERNEL
1601                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1602                     AFS_RXGUNLOCK();
1603                     USERPRI;
1604                     return (struct rx_call *)0;
1605                 }
1606 #endif
1607         } while (!(call = sq->newcall) &&
1608                  !(socketp && *socketp != OSI_NULLSOCKET));
1609     }
1610     MUTEX_EXIT(&sq->lock);
1611
1612     MUTEX_ENTER(&freeSQEList_lock);
1613     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1614     rx_FreeSQEList = sq;
1615     MUTEX_EXIT(&freeSQEList_lock);
1616
1617     if (call) {
1618         clock_GetTime(&call->startTime);
1619         call->state = RX_STATE_ACTIVE;
1620         call->mode = RX_MODE_RECEIVING;
1621
1622         rxi_calltrace(RX_CALL_START, call);
1623         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n", 
1624          call->conn->service->servicePort, 
1625          call->conn->service->serviceId, call));
1626     } else {
1627         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1628     }
1629
1630     AFS_RXGUNLOCK();
1631     USERPRI;
1632
1633     return call;
1634 }
1635 #endif /* RX_ENABLE_LOCKS */
1636
1637
1638
1639 /* Establish a procedure to be called when a packet arrives for a
1640  * call.  This routine will be called at most once after each call,
1641  * and will also be called if there is an error condition on the or
1642  * the call is complete.  Used by multi rx to build a selection
1643  * function which determines which of several calls is likely to be a
1644  * good one to read from.  
1645  * NOTE: the way this is currently implemented it is probably only a
1646  * good idea to (1) use it immediately after a newcall (clients only)
1647  * and (2) only use it once.  Other uses currently void your warranty
1648  */
1649 void rx_SetArrivalProc(call, proc, handle, arg)
1650     register struct rx_call *call;
1651     register VOID (*proc)();
1652     register VOID *handle;
1653     register VOID *arg;
1654 {
1655     call->arrivalProc = proc;
1656     call->arrivalProcHandle = handle;
1657     call->arrivalProcArg = arg;
1658 }
1659
1660 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1661  * appropriate, and return the final error code from the conversation
1662  * to the caller */
1663
1664 afs_int32 rx_EndCall(call, rc)
1665     register struct rx_call *call;
1666     afs_int32 rc;
1667 {
1668     register struct rx_connection *conn = call->conn;
1669     register struct rx_service *service;
1670     register struct rx_packet *tp; /* Temporary packet pointer */
1671     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1672     afs_int32 error;
1673     SPLVAR;
1674
1675     dpf(("rx_EndCall(call %x)\n", call));
1676
1677     NETPRI;
1678     AFS_RXGLOCK();
1679     MUTEX_ENTER(&call->lock);
1680
1681     if (rc == 0 && call->error == 0) {
1682         call->abortCode = 0;
1683         call->abortCount = 0;
1684     }
1685
1686     call->arrivalProc = (VOID (*)()) 0;
1687     if (rc && call->error == 0) {
1688         rxi_CallError(call, rc);
1689         /* Send an abort message to the peer if this error code has
1690          * only just been set.  If it was set previously, assume the
1691          * peer has already been sent the error code or will request it 
1692          */
1693         rxi_SendCallAbort(call, (struct rx_packet *) 0, 0, 0);
1694     }
1695     if (conn->type == RX_SERVER_CONNECTION) {
1696         /* Make sure reply or at least dummy reply is sent */
1697         if (call->mode == RX_MODE_RECEIVING) {
1698             rxi_WriteProc(call, 0, 0);
1699         }
1700         if (call->mode == RX_MODE_SENDING) {
1701             rxi_FlushWrite(call);
1702         }
1703         service = conn->service;
1704         rxi_calltrace(RX_CALL_END, call);
1705         /* Call goes to hold state until reply packets are acknowledged */
1706         if (call->tfirst + call->nSoftAcked < call->tnext) {
1707             call->state = RX_STATE_HOLD;
1708         } else {
1709             call->state = RX_STATE_DALLY;
1710             rxi_ClearTransmitQueue(call, 0);
1711             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1712             rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
1713         }
1714     }
1715     else { /* Client connection */
1716         char dummy;
1717         /* Make sure server receives input packets, in the case where
1718          * no reply arguments are expected */
1719         if ((call->mode == RX_MODE_SENDING)
1720          || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1721             (void) rxi_ReadProc(call, &dummy, 1);
1722         }
1723         /* We need to release the call lock since it's lower than the
1724          * conn_call_lock and we don't want to hold the conn_call_lock
1725          * over the rx_ReadProc call. The conn_call_lock needs to be held
1726          * here for the case where rx_NewCall is perusing the calls on
1727          * the connection structure. We don't want to signal until
1728          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1729          * have checked this call, found it active and by the time it
1730          * goes to sleep, will have missed the signal.
1731          */
1732         MUTEX_EXIT(&call->lock);
1733         MUTEX_ENTER(&conn->conn_call_lock);
1734         MUTEX_ENTER(&call->lock);
1735         MUTEX_ENTER(&conn->conn_data_lock);
1736         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1737             conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1738             MUTEX_EXIT(&conn->conn_data_lock);
1739 #ifdef  RX_ENABLE_LOCKS
1740             CV_BROADCAST(&conn->conn_call_cv);
1741 #else
1742             osi_rxWakeup(conn);
1743 #endif
1744         }
1745 #ifdef RX_ENABLE_LOCKS
1746         else {
1747             MUTEX_EXIT(&conn->conn_data_lock);
1748         }
1749 #endif /* RX_ENABLE_LOCKS */
1750         call->state = RX_STATE_DALLY;
1751     }
1752     error = call->error;
1753
1754     /* currentPacket, nLeft, and NFree must be zeroed here, because
1755      * ResetCall cannot: ResetCall may be called at splnet(), in the
1756      * kernel version, and may interrupt the macros rx_Read or
1757      * rx_Write, which run at normal priority for efficiency. */
1758     if (call->currentPacket) {
1759         rxi_FreePacket(call->currentPacket);
1760         call->currentPacket = (struct rx_packet *) 0;
1761         call->nLeft = call->nFree = call->curlen = 0;
1762     }
1763     else
1764         call->nLeft = call->nFree = call->curlen = 0;
1765
1766     /* Free any packets from the last call to ReadvProc/WritevProc */
1767     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1768         queue_Remove(tp);
1769         rxi_FreePacket(tp);
1770     }
1771
1772     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1773     MUTEX_EXIT(&call->lock);
1774     if (conn->type == RX_CLIENT_CONNECTION)
1775         MUTEX_EXIT(&conn->conn_call_lock);
1776     AFS_RXGUNLOCK();
1777     USERPRI;
1778     /*
1779      * Map errors to the local host's errno.h format.
1780      */
1781     error = ntoh_syserr_conv(error);
1782     return error;
1783 }
1784
1785 #if !defined(KERNEL)
1786
1787 /* Call this routine when shutting down a server or client (especially
1788  * clients).  This will allow Rx to gracefully garbage collect server
1789  * connections, and reduce the number of retries that a server might
1790  * make to a dead client.
1791  * This is not quite right, since some calls may still be ongoing and
1792  * we can't lock them to destroy them. */
1793 void rx_Finalize() {
1794     register struct rx_connection **conn_ptr, **conn_end;
1795
1796     INIT_PTHREAD_LOCKS
1797     LOCK_RX_INIT
1798     if (rxinit_status == 1) {
1799         UNLOCK_RX_INIT
1800         return; /* Already shutdown. */
1801     }
1802     rxi_DeleteCachedConnections();
1803     if (rx_connHashTable) {
1804         MUTEX_ENTER(&rx_connHashTable_lock);
1805         for (conn_ptr = &rx_connHashTable[0], 
1806              conn_end = &rx_connHashTable[rx_hashTableSize]; 
1807              conn_ptr < conn_end; conn_ptr++) {
1808             struct rx_connection *conn, *next;
1809             for (conn = *conn_ptr; conn; conn = next) {
1810                 next = conn->next;
1811                 if (conn->type == RX_CLIENT_CONNECTION) {
1812                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
1813                     conn->refCount++;
1814                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
1815 #ifdef RX_ENABLE_LOCKS
1816                     rxi_DestroyConnectionNoLock(conn);
1817 #else /* RX_ENABLE_LOCKS */
1818                     rxi_DestroyConnection(conn);
1819 #endif /* RX_ENABLE_LOCKS */
1820                 }
1821             }
1822         }
1823 #ifdef RX_ENABLE_LOCKS
1824         while (rx_connCleanup_list) {
1825             struct rx_connection *conn;
1826             conn = rx_connCleanup_list;
1827             rx_connCleanup_list = rx_connCleanup_list->next;
1828             MUTEX_EXIT(&rx_connHashTable_lock);
1829             rxi_CleanupConnection(conn);
1830             MUTEX_ENTER(&rx_connHashTable_lock);
1831         }
1832         MUTEX_EXIT(&rx_connHashTable_lock);
1833 #endif /* RX_ENABLE_LOCKS */
1834     }
1835     rxi_flushtrace();
1836
1837     rxinit_status = 1;
1838     UNLOCK_RX_INIT
1839 }
1840 #endif
1841
1842 /* if we wakeup packet waiter too often, can get in loop with two
1843     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
1844 void
1845 rxi_PacketsUnWait() {
1846
1847     if (!rx_waitingForPackets) {
1848         return;
1849     }
1850 #ifdef KERNEL
1851     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1852         return;                                     /* still over quota */
1853     }
1854 #endif /* KERNEL */
1855     rx_waitingForPackets = 0;
1856 #ifdef  RX_ENABLE_LOCKS
1857     CV_BROADCAST(&rx_waitingForPackets_cv);
1858 #else
1859     osi_rxWakeup(&rx_waitingForPackets);
1860 #endif
1861     return;
1862 }
1863
1864
1865 /* ------------------Internal interfaces------------------------- */
1866
1867 /* Return this process's service structure for the
1868  * specified socket and service */
1869 struct rx_service *rxi_FindService(socket, serviceId)
1870     register osi_socket socket;
1871     register u_short serviceId;
1872 {
1873     register struct rx_service **sp;    
1874     for (sp = &rx_services[0]; *sp; sp++) {
1875         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket) 
1876           return *sp;
1877     }
1878     return 0;
1879 }
1880
1881 /* Allocate a call structure, for the indicated channel of the
1882  * supplied connection.  The mode and state of the call must be set by
1883  * the caller. */
1884 struct rx_call *rxi_NewCall(conn, channel)
1885     register struct rx_connection *conn;
1886     register int channel;
1887 {
1888     register struct rx_call *call;
1889 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1890     register struct rx_call *cp;        /* Call pointer temp */
1891     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
1892 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1893
1894     /* Grab an existing call structure, or allocate a new one.
1895      * Existing call structures are assumed to have been left reset by
1896      * rxi_FreeCall */
1897     MUTEX_ENTER(&rx_freeCallQueue_lock);
1898
1899 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1900     /*
1901      * EXCEPT that the TQ might not yet be cleared out.
1902      * Skip over those with in-use TQs.
1903      */
1904     call = NULL;
1905     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
1906         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
1907             call = cp;
1908             break;
1909         }
1910     }
1911     if (call) {
1912 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
1913     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1914         call = queue_First(&rx_freeCallQueue, rx_call);
1915 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1916         queue_Remove(call);
1917         MUTEX_ENTER(&rx_stats_mutex);
1918         rx_stats.nFreeCallStructs--;
1919         MUTEX_EXIT(&rx_stats_mutex);
1920         MUTEX_EXIT(&rx_freeCallQueue_lock);
1921         MUTEX_ENTER(&call->lock);
1922         CLEAR_CALL_QUEUE_LOCK(call);
1923 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1924         /* Now, if TQ wasn't cleared earlier, do it now. */
1925         if (call->flags & RX_CALL_TQ_CLEARME) {
1926             rxi_ClearTransmitQueue(call, 0);
1927             queue_Init(&call->tq);
1928         }
1929 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1930         /* Bind the call to its connection structure */
1931         call->conn = conn;
1932         rxi_ResetCall(call, 1);
1933     }
1934     else {
1935         call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1936
1937         MUTEX_EXIT(&rx_freeCallQueue_lock);
1938         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
1939         MUTEX_ENTER(&call->lock);
1940         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
1941         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
1942         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
1943
1944         MUTEX_ENTER(&rx_stats_mutex);
1945         rx_stats.nCallStructs++;
1946         MUTEX_EXIT(&rx_stats_mutex);
1947         /* Initialize once-only items */
1948         queue_Init(&call->tq);
1949         queue_Init(&call->rq);
1950         queue_Init(&call->iovq);
1951         /* Bind the call to its connection structure (prereq for reset) */
1952         call->conn = conn;
1953         rxi_ResetCall(call, 1);
1954     }
1955     call->channel = channel;
1956     call->callNumber = &conn->callNumber[channel];
1957     /* Note that the next expected call number is retained (in
1958      * conn->callNumber[i]), even if we reallocate the call structure
1959      */
1960     conn->call[channel] = call;
1961     /* if the channel's never been used (== 0), we should start at 1, otherwise
1962         the call number is valid from the last time this channel was used */
1963     if (*call->callNumber == 0) *call->callNumber = 1;
1964
1965     MUTEX_EXIT(&call->lock);
1966     return call;
1967 }
1968
1969 /* A call has been inactive long enough that so we can throw away
1970  * state, including the call structure, which is placed on the call
1971  * free list.
1972  * Call is locked upon entry.
1973  */
1974 #ifdef RX_ENABLE_LOCKS
1975 void rxi_FreeCall(call, haveCTLock)
1976     int haveCTLock; /* Set if called from rxi_ReapConnections */
1977 #else /* RX_ENABLE_LOCKS */
1978 void rxi_FreeCall(call)
1979 #endif /* RX_ENABLE_LOCKS */
1980     register struct rx_call *call;
1981 {
1982     register int channel = call->channel;
1983     register struct rx_connection *conn = call->conn;
1984
1985
1986     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
1987       (*call->callNumber)++;
1988     rxi_ResetCall(call, 0);
1989     call->conn->call[channel] = (struct rx_call *) 0;
1990
1991     MUTEX_ENTER(&rx_freeCallQueue_lock);
1992     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
1993 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1994     /* A call may be free even though its transmit queue is still in use.
1995      * Since we search the call list from head to tail, put busy calls at
1996      * the head of the list, and idle calls at the tail.
1997      */
1998     if (call->flags & RX_CALL_TQ_BUSY)
1999         queue_Prepend(&rx_freeCallQueue, call);
2000     else
2001         queue_Append(&rx_freeCallQueue, call);
2002 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2003     queue_Append(&rx_freeCallQueue, call);
2004 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2005     MUTEX_ENTER(&rx_stats_mutex);
2006     rx_stats.nFreeCallStructs++;
2007     MUTEX_EXIT(&rx_stats_mutex);
2008
2009     MUTEX_EXIT(&rx_freeCallQueue_lock);
2010  
2011     /* Destroy the connection if it was previously slated for
2012      * destruction, i.e. the Rx client code previously called
2013      * rx_DestroyConnection (client connections), or
2014      * rxi_ReapConnections called the same routine (server
2015      * connections).  Only do this, however, if there are no
2016      * outstanding calls. Note that for fine grain locking, there appears
2017      * to be a deadlock in that rxi_FreeCall has a call locked and
2018      * DestroyConnectionNoLock locks each call in the conn. But note a
2019      * few lines up where we have removed this call from the conn.
2020      * If someone else destroys a connection, they either have no
2021      * call lock held or are going through this section of code.
2022      */
2023     if (conn->flags & RX_CONN_DESTROY_ME) {
2024         MUTEX_ENTER(&conn->conn_data_lock);
2025         conn->refCount++;
2026         MUTEX_EXIT(&conn->conn_data_lock);
2027 #ifdef RX_ENABLE_LOCKS
2028         if (haveCTLock)
2029             rxi_DestroyConnectionNoLock(conn);
2030         else
2031             rxi_DestroyConnection(conn);
2032 #else /* RX_ENABLE_LOCKS */
2033         rxi_DestroyConnection(conn);
2034 #endif /* RX_ENABLE_LOCKS */
2035     }
2036 }
2037
2038 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2039 char *rxi_Alloc(size)
2040 register size_t size;
2041 {
2042     register char *p;
2043
2044 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2045     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2046      * implementation.
2047      */
2048     int glockOwner = ISAFS_GLOCK();
2049     if (!glockOwner)
2050         AFS_GLOCK();
2051 #endif
2052     MUTEX_ENTER(&rx_stats_mutex);
2053     rxi_Alloccnt++; rxi_Allocsize += size;
2054     MUTEX_EXIT(&rx_stats_mutex);
2055 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2056     if (size > AFS_SMALLOCSIZ) {
2057         p = (char *) osi_AllocMediumSpace(size);
2058     } else
2059         p = (char *) osi_AllocSmall(size, 1);
2060 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2061     if (!glockOwner)
2062         AFS_GUNLOCK();
2063 #endif
2064 #else
2065     p = (char *) osi_Alloc(size);
2066 #endif
2067     if (!p) osi_Panic("rxi_Alloc error");
2068     bzero(p, size);
2069     return p;
2070 }
2071
2072 void rxi_Free(addr, size)
2073 void *addr;
2074 register size_t size;
2075 {
2076 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2077     /* Grab the AFS filesystem lock. See afs/osi.h for the lock
2078      * implementation.
2079      */
2080     int glockOwner = ISAFS_GLOCK();
2081     if (!glockOwner)
2082         AFS_GLOCK();
2083 #endif
2084     MUTEX_ENTER(&rx_stats_mutex);
2085     rxi_Alloccnt--; rxi_Allocsize -= size;
2086     MUTEX_EXIT(&rx_stats_mutex);
2087 #if     (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
2088     if (size > AFS_SMALLOCSIZ)
2089         osi_FreeMediumSpace(addr);
2090     else
2091         osi_FreeSmall(addr);
2092 #if defined(AFS_AIX41_ENV) && defined(KERNEL)
2093     if (!glockOwner)
2094         AFS_GUNLOCK();
2095 #endif
2096 #else
2097     osi_Free(addr, size);
2098 #endif    
2099 }
2100
2101 /* Find the peer process represented by the supplied (host,port)
2102  * combination.  If there is no appropriate active peer structure, a
2103  * new one will be allocated and initialized 
2104  * The origPeer, if set, is a pointer to a peer structure on which the
2105  * refcount will be be decremented. This is used to replace the peer
2106  * structure hanging off a connection structure */
2107 struct rx_peer *rxi_FindPeer(host, port, origPeer, create)
2108     register afs_uint32 host;
2109     register u_short port;
2110     struct rx_peer *origPeer;
2111     int create;
2112 {
2113     register struct rx_peer *pp;
2114     int hashIndex;
2115     hashIndex = PEER_HASH(host, port);
2116     MUTEX_ENTER(&rx_peerHashTable_lock);
2117     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2118         if ((pp->host == host) && (pp->port == port)) break;
2119     }
2120     if (!pp) {
2121         if (create) {
2122             pp = rxi_AllocPeer(); /* This bzero's *pp */
2123             pp->host = host;      /* set here or in InitPeerParams is zero */
2124             pp->port = port;
2125             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2126             queue_Init(&pp->congestionQueue);
2127             queue_Init(&pp->rpcStats);
2128             pp->next = rx_peerHashTable[hashIndex];
2129             rx_peerHashTable[hashIndex] = pp;
2130             rxi_InitPeerParams(pp);
2131             MUTEX_ENTER(&rx_stats_mutex);
2132             rx_stats.nPeerStructs++;
2133             MUTEX_EXIT(&rx_stats_mutex);
2134         }
2135     }
2136     if (pp && create) {
2137         pp->refCount++;
2138     }
2139     if ( origPeer)
2140         origPeer->refCount--;
2141     MUTEX_EXIT(&rx_peerHashTable_lock);
2142     return pp;
2143 }
2144
2145
2146 /* Find the connection at (host, port) started at epoch, and with the
2147  * given connection id.  Creates the server connection if necessary.
2148  * The type specifies whether a client connection or a server
2149  * connection is desired.  In both cases, (host, port) specify the
2150  * peer's (host, pair) pair.  Client connections are not made
2151  * automatically by this routine.  The parameter socket gives the
2152  * socket descriptor on which the packet was received.  This is used,
2153  * in the case of server connections, to check that *new* connections
2154  * come via a valid (port, serviceId).  Finally, the securityIndex
2155  * parameter must match the existing index for the connection.  If a
2156  * server connection is created, it will be created using the supplied
2157  * index, if the index is valid for this service */
2158 struct rx_connection *
2159 rxi_FindConnection(socket, host, port, serviceId, cid, 
2160                    epoch, type, securityIndex)
2161     osi_socket socket;
2162     register afs_int32 host;
2163     register u_short port;
2164     u_short serviceId;
2165     afs_uint32 cid;
2166     afs_uint32 epoch;
2167     int type;
2168     u_int securityIndex;
2169 {
2170     int hashindex, flag;
2171     register struct rx_connection *conn;
2172     struct rx_peer *peer;
2173     hashindex = CONN_HASH(host, port, cid, epoch, type);
2174     MUTEX_ENTER(&rx_connHashTable_lock);
2175     rxLastConn ? (conn = rxLastConn, flag = 0) :
2176                  (conn = rx_connHashTable[hashindex], flag = 1);
2177     for (; conn; ) {
2178       if ((conn->type == type) && ((cid&RX_CIDMASK) == conn->cid) 
2179           && (epoch == conn->epoch)) {
2180         register struct rx_peer *pp = conn->peer;
2181         if (securityIndex != conn->securityIndex) {
2182             /* this isn't supposed to happen, but someone could forge a packet
2183                like this, and there seems to be some CM bug that makes this
2184                happen from time to time -- in which case, the fileserver
2185                asserts. */  
2186             MUTEX_EXIT(&rx_connHashTable_lock);
2187             return (struct rx_connection *) 0;
2188         }
2189         /* epoch's high order bits mean route for security reasons only on
2190          * the cid, not the host and port fields.
2191          */
2192         if (conn->epoch & 0x80000000) break;
2193         if (((type == RX_CLIENT_CONNECTION) 
2194              || (pp->host == host)) && (pp->port == port))
2195           break;
2196       }
2197       if ( !flag )
2198       {
2199         /* the connection rxLastConn that was used the last time is not the
2200         ** one we are looking for now. Hence, start searching in the hash */
2201         flag = 1;
2202         conn = rx_connHashTable[hashindex];
2203       }
2204       else
2205         conn = conn->next;
2206     }
2207     if (!conn) {
2208         struct rx_service *service;
2209         if (type == RX_CLIENT_CONNECTION) {
2210             MUTEX_EXIT(&rx_connHashTable_lock);
2211             return (struct rx_connection *) 0;
2212         }
2213         service = rxi_FindService(socket, serviceId);
2214         if (!service || (securityIndex >= service->nSecurityObjects) 
2215             || (service->securityObjects[securityIndex] == 0)) {
2216             MUTEX_EXIT(&rx_connHashTable_lock);
2217             return (struct rx_connection *) 0;
2218         }
2219         conn = rxi_AllocConnection(); /* This bzero's the connection */
2220         MUTEX_INIT(&conn->conn_call_lock, "conn call lock",
2221                    MUTEX_DEFAULT,0);
2222         MUTEX_INIT(&conn->conn_data_lock, "conn data lock",
2223                    MUTEX_DEFAULT,0);
2224         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2225         conn->next = rx_connHashTable[hashindex];
2226         rx_connHashTable[hashindex] = conn;
2227         peer = conn->peer = rxi_FindPeer(host, port, 0, 1);
2228         conn->type = RX_SERVER_CONNECTION;
2229         conn->lastSendTime = clock_Sec();   /* don't GC immediately */
2230         conn->epoch = epoch;
2231         conn->cid = cid & RX_CIDMASK;
2232         /* conn->serial = conn->lastSerial = 0; */
2233         /* conn->timeout = 0; */
2234         conn->ackRate = RX_FAST_ACK_RATE;
2235         conn->service = service;
2236         conn->serviceId = serviceId;
2237         conn->securityIndex = securityIndex;
2238         conn->securityObject = service->securityObjects[securityIndex];
2239         conn->nSpecific = 0;
2240         conn->specific = NULL;
2241         rx_SetConnDeadTime(conn, service->connDeadTime);
2242         /* Notify security object of the new connection */
2243         RXS_NewConnection(conn->securityObject, conn);
2244         /* XXXX Connection timeout? */
2245         if (service->newConnProc) (*service->newConnProc)(conn);
2246         MUTEX_ENTER(&rx_stats_mutex);
2247         rx_stats.nServerConns++;
2248         MUTEX_EXIT(&rx_stats_mutex);
2249     }
2250     else
2251     {
2252     /* Ensure that the peer structure is set up in such a way that
2253     ** replies in this connection go back to that remote interface
2254     ** from which the last packet was sent out. In case, this packet's
2255     ** source IP address does not match the peer struct for this conn,
2256     ** then drop the refCount on conn->peer and get a new peer structure.
2257     ** We can check the host,port field in the peer structure without the
2258     ** rx_peerHashTable_lock because the peer structure has its refCount
2259     ** incremented and the only time the host,port in the peer struct gets
2260     ** updated is when the peer structure is created.
2261     */
2262         if (conn->peer->host == host )
2263                 peer = conn->peer; /* no change to the peer structure */
2264         else
2265                 peer = rxi_FindPeer(host, port, conn->peer, 1);
2266     }
2267
2268     MUTEX_ENTER(&conn->conn_data_lock);
2269     conn->refCount++;
2270     conn->peer = peer;
2271     MUTEX_EXIT(&conn->conn_data_lock);
2272
2273     rxLastConn = conn;  /* store this connection as the last conn used */
2274     MUTEX_EXIT(&rx_connHashTable_lock);
2275     return conn;
2276 }
2277
2278 /* There are two packet tracing routines available for testing and monitoring
2279  * Rx.  One is called just after every packet is received and the other is
2280  * called just before every packet is sent.  Received packets, have had their
2281  * headers decoded, and packets to be sent have not yet had their headers
2282  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2283  * containing the network address.  Both can be modified.  The return value, if
2284  * non-zero, indicates that the packet should be dropped.  */
2285
2286 int (*rx_justReceived)() = 0;
2287 int (*rx_almostSent)() = 0;
2288
2289 /* A packet has been received off the interface.  Np is the packet, socket is
2290  * the socket number it was received from (useful in determining which service
2291  * this packet corresponds to), and (host, port) reflect the host,port of the
2292  * sender.  This call returns the packet to the caller if it is finished with
2293  * it, rather than de-allocating it, just as a small performance hack */
2294
2295 struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
2296     register struct rx_packet *np;
2297     osi_socket socket;
2298     afs_uint32 host;
2299     u_short port;
2300     int *tnop;
2301     struct rx_call **newcallp;
2302 {
2303     register struct rx_call *call;
2304     register struct rx_connection *conn;
2305     int channel;
2306     afs_uint32 currentCallNumber;
2307     int type;
2308     int skew;
2309 #ifdef RXDEBUG
2310     char *packetType;
2311 #endif
2312     struct rx_packet *tnp;
2313
2314 #ifdef RXDEBUG
2315 /* We don't print out the packet until now because (1) the time may not be
2316  * accurate enough until now in the lwp implementation (rx_Listener only gets
2317  * the time after the packet is read) and (2) from a protocol point of view,
2318  * this is the first time the packet has been seen */
2319     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2320         ? rx_packetTypes[np->header.type-1]: "*UNKNOWN*";
2321     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2322          np->header.serial, packetType, host, port, np->header.serviceId,
2323          np->header.epoch, np->header.cid, np->header.callNumber, 
2324          np->header.seq, np->header.flags, np));
2325 #endif
2326
2327     if(np->header.type == RX_PACKET_TYPE_VERSION) {
2328       return rxi_ReceiveVersionPacket(np,socket,host,port, 1);
2329     }
2330
2331     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2332         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2333     }
2334 #ifdef RXDEBUG
2335     /* If an input tracer function is defined, call it with the packet and
2336      * network address.  Note this function may modify its arguments. */
2337     if (rx_justReceived) {
2338         struct sockaddr_in addr;
2339         int drop;
2340         addr.sin_family = AF_INET;
2341         addr.sin_port = port;
2342         addr.sin_addr.s_addr = host;
2343 #if  defined(AFS_OSF_ENV) && defined(_KERNEL)
2344         addr.sin_len = sizeof(addr);
2345 #endif  /* AFS_OSF_ENV */
2346         drop = (*rx_justReceived) (np, &addr);
2347         /* drop packet if return value is non-zero */
2348         if (drop) return np;
2349         port = addr.sin_port;           /* in case fcn changed addr */
2350         host = addr.sin_addr.s_addr;
2351     }
2352 #endif
2353
2354     /* If packet was not sent by the client, then *we* must be the client */
2355     type = ((np->header.flags&RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2356         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2357
2358     /* Find the connection (or fabricate one, if we're the server & if
2359      * necessary) associated with this packet */
2360     conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
2361                               np->header.cid, np->header.epoch, type, 
2362                               np->header.securityIndex);
2363
2364     if (!conn) {
2365       /* If no connection found or fabricated, just ignore the packet.
2366        * (An argument could be made for sending an abort packet for
2367        * the conn) */
2368       return np;
2369     }   
2370
2371     MUTEX_ENTER(&conn->conn_data_lock);
2372     if (conn->maxSerial < np->header.serial)
2373       conn->maxSerial = np->header.serial;
2374     MUTEX_EXIT(&conn->conn_data_lock);
2375
2376     /* If the connection is in an error state, send an abort packet and ignore
2377      * the incoming packet */
2378     if (conn->error) {
2379         /* Don't respond to an abort packet--we don't want loops! */
2380         MUTEX_ENTER(&conn->conn_data_lock);
2381         if (np->header.type != RX_PACKET_TYPE_ABORT)
2382             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2383         conn->refCount--;
2384         MUTEX_EXIT(&conn->conn_data_lock);
2385         return np;
2386     }
2387
2388     /* Check for connection-only requests (i.e. not call specific). */
2389     if (np->header.callNumber == 0) {
2390         switch (np->header.type) {
2391             case RX_PACKET_TYPE_ABORT:
2392                 /* What if the supplied error is zero? */
2393                 rxi_ConnectionError(conn, ntohl(rx_GetInt32(np,0)));
2394                 MUTEX_ENTER(&conn->conn_data_lock);
2395                 conn->refCount--;
2396                 MUTEX_EXIT(&conn->conn_data_lock);
2397                 return np;
2398             case RX_PACKET_TYPE_CHALLENGE:
2399                 tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2400                 MUTEX_ENTER(&conn->conn_data_lock);
2401                 conn->refCount--;
2402                 MUTEX_EXIT(&conn->conn_data_lock);
2403                 return tnp;
2404             case RX_PACKET_TYPE_RESPONSE:
2405                 tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2406                 MUTEX_ENTER(&conn->conn_data_lock);
2407                 conn->refCount--;
2408                 MUTEX_EXIT(&conn->conn_data_lock);
2409                 return tnp;
2410             case RX_PACKET_TYPE_PARAMS:
2411             case RX_PACKET_TYPE_PARAMS+1:
2412             case RX_PACKET_TYPE_PARAMS+2:
2413                 /* ignore these packet types for now */
2414                 MUTEX_ENTER(&conn->conn_data_lock);
2415                 conn->refCount--;
2416                 MUTEX_EXIT(&conn->conn_data_lock);
2417                 return np;
2418
2419
2420             default:
2421                 /* Should not reach here, unless the peer is broken: send an
2422                  * abort packet */
2423                 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2424                 MUTEX_ENTER(&conn->conn_data_lock);
2425                 tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2426                 conn->refCount--;
2427                 MUTEX_EXIT(&conn->conn_data_lock);
2428                 return tnp;
2429         }
2430     }
2431
2432     channel = np->header.cid & RX_CHANNELMASK;
2433     call = conn->call[channel];
2434 #ifdef  RX_ENABLE_LOCKS
2435     if (call)
2436         MUTEX_ENTER(&call->lock);
2437     /* Test to see if call struct is still attached to conn. */
2438     if (call != conn->call[channel]) {
2439         if (call)
2440             MUTEX_EXIT(&call->lock);
2441         if (type == RX_SERVER_CONNECTION) {
2442             call = conn->call[channel];
2443             /* If we started with no call attached and there is one now,
2444              * another thread is also running this routine and has gotten
2445              * the connection channel. We should drop this packet in the tests
2446              * below. If there was a call on this connection and it's now
2447              * gone, then we'll be making a new call below.
2448              * If there was previously a call and it's now different then
2449              * the old call was freed and another thread running this routine
2450              * has created a call on this channel. One of these two threads
2451              * has a packet for the old call and the code below handles those
2452              * cases.
2453              */
2454             if (call)
2455                 MUTEX_ENTER(&call->lock);
2456         }
2457         else {
2458             /* This packet can't be for this call. If the new call address is
2459              * 0 then no call is running on this channel. If there is a call
2460              * then, since this is a client connection we're getting data for
2461              * it must be for the previous call.
2462              */
2463             MUTEX_ENTER(&rx_stats_mutex);
2464             rx_stats.spuriousPacketsRead++;
2465             MUTEX_EXIT(&rx_stats_mutex);
2466             MUTEX_ENTER(&conn->conn_data_lock);
2467             conn->refCount--;
2468             MUTEX_EXIT(&conn->conn_data_lock);
2469             return np;
2470         }
2471     }
2472 #endif
2473     currentCallNumber = conn->callNumber[channel];
2474
2475     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2476         if (np->header.callNumber < currentCallNumber) {
2477             MUTEX_ENTER(&rx_stats_mutex);
2478             rx_stats.spuriousPacketsRead++;
2479             MUTEX_EXIT(&rx_stats_mutex);
2480 #ifdef  RX_ENABLE_LOCKS
2481             if (call)
2482                 MUTEX_EXIT(&call->lock);
2483 #endif
2484             MUTEX_ENTER(&conn->conn_data_lock);
2485             conn->refCount--;
2486             MUTEX_EXIT(&conn->conn_data_lock);
2487             return np;
2488         }
2489         if (!call) {
2490             call = rxi_NewCall(conn, channel);
2491             MUTEX_ENTER(&call->lock);
2492             *call->callNumber = np->header.callNumber;
2493             call->state = RX_STATE_PRECALL;
2494             clock_GetTime(&call->queueTime);
2495             hzero(call->bytesSent);
2496             hzero(call->bytesRcvd);
2497             rxi_KeepAliveOn(call);
2498         }
2499         else if (np->header.callNumber != currentCallNumber) {
2500             /* Wait until the transmit queue is idle before deciding
2501              * whether to reset the current call. Chances are that the
2502              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2503              * flag is cleared.
2504              */
2505 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2506             while ((call->state == RX_STATE_ACTIVE) &&
2507                    (call->flags & RX_CALL_TQ_BUSY)) {
2508                 call->flags |= RX_CALL_TQ_WAIT;
2509 #ifdef RX_ENABLE_LOCKS
2510                 CV_WAIT(&call->cv_tq, &call->lock);
2511 #else /* RX_ENABLE_LOCKS */
2512                 osi_rxSleep(&call->tq);
2513 #endif /* RX_ENABLE_LOCKS */
2514             }
2515 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2516             /* If the new call cannot be taken right now send a busy and set
2517              * the error condition in this call, so that it terminates as
2518              * quickly as possible */
2519             if (call->state == RX_STATE_ACTIVE) {
2520                 struct rx_packet *tp;
2521
2522                 rxi_CallError(call, RX_CALL_DEAD);
2523                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0, 1);
2524                 MUTEX_EXIT(&call->lock);
2525                 MUTEX_ENTER(&conn->conn_data_lock);
2526                 conn->refCount--;
2527                 MUTEX_EXIT(&conn->conn_data_lock);
2528                 return tp;
2529             }
2530             rxi_ResetCall(call, 0);
2531             *call->callNumber = np->header.callNumber;
2532             call->state = RX_STATE_PRECALL;
2533             clock_GetTime(&call->queueTime);
2534             hzero(call->bytesSent);
2535             hzero(call->bytesRcvd);
2536             /*
2537              * If the number of queued calls exceeds the overload
2538              * threshold then abort this call.
2539              */
2540             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2541                 struct rx_packet *tp;
2542
2543                 rxi_CallError(call, rx_BusyError);
2544                 tp = rxi_SendCallAbort(call, np, 1, 0);
2545                 MUTEX_EXIT(&call->lock);
2546                 MUTEX_ENTER(&conn->conn_data_lock);
2547                 conn->refCount--;
2548                 MUTEX_EXIT(&conn->conn_data_lock);
2549                 return tp;
2550             }
2551             rxi_KeepAliveOn(call);
2552         }
2553         else {
2554             /* Continuing call; do nothing here. */
2555         }
2556     } else { /* we're the client */
2557         /* Ignore all incoming acknowledgements for calls in DALLY state */
2558         if ( call && (call->state == RX_STATE_DALLY) 
2559          && (np->header.type == RX_PACKET_TYPE_ACK)) {
2560             MUTEX_ENTER(&rx_stats_mutex);
2561             rx_stats.ignorePacketDally++;
2562             MUTEX_EXIT(&rx_stats_mutex);
2563 #ifdef  RX_ENABLE_LOCKS
2564             if (call) {
2565                 MUTEX_EXIT(&call->lock);
2566             }
2567 #endif
2568             MUTEX_ENTER(&conn->conn_data_lock);
2569             conn->refCount--;
2570             MUTEX_EXIT(&conn->conn_data_lock);
2571             return np;
2572         }
2573         
2574         /* Ignore anything that's not relevant to the current call.  If there
2575          * isn't a current call, then no packet is relevant. */
2576         if (!call || (np->header.callNumber != currentCallNumber)) {
2577             MUTEX_ENTER(&rx_stats_mutex);
2578             rx_stats.spuriousPacketsRead++;
2579             MUTEX_EXIT(&rx_stats_mutex);
2580 #ifdef  RX_ENABLE_LOCKS
2581             if (call) {
2582                 MUTEX_EXIT(&call->lock);
2583             }
2584 #endif
2585             MUTEX_ENTER(&conn->conn_data_lock);
2586             conn->refCount--;
2587             MUTEX_EXIT(&conn->conn_data_lock);
2588             return np;  
2589         }
2590         /* If the service security object index stamped in the packet does not
2591          * match the connection's security index, ignore the packet */
2592         if (np->header.securityIndex != conn->securityIndex) {
2593 #ifdef  RX_ENABLE_LOCKS
2594             MUTEX_EXIT(&call->lock);
2595 #endif
2596             MUTEX_ENTER(&conn->conn_data_lock);
2597             conn->refCount--;       
2598             MUTEX_EXIT(&conn->conn_data_lock);
2599             return np;
2600         }
2601
2602         /* If we're receiving the response, then all transmit packets are
2603          * implicitly acknowledged.  Get rid of them. */
2604         if (np->header.type == RX_PACKET_TYPE_DATA) {
2605 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2606             /* XXX Hack. Because we must release the global rx lock when
2607              * sending packets (osi_NetSend) we drop all acks while we're
2608              * traversing the tq in rxi_Start sending packets out because
2609              * packets may move to the freePacketQueue as result of being here!
2610              * So we drop these packets until we're safely out of the
2611              * traversing. Really ugly! 
2612              * For fine grain RX locking, we set the acked field in the
2613              * packets and let rxi_Start remove them from the transmit queue.
2614              */
2615             if (call->flags & RX_CALL_TQ_BUSY) {
2616 #ifdef  RX_ENABLE_LOCKS
2617                 rxi_SetAcksInTransmitQueue(call);
2618 #else
2619                 conn->refCount--;
2620                 return np;              /* xmitting; drop packet */
2621 #endif
2622             }
2623             else {
2624                 rxi_ClearTransmitQueue(call, 0);
2625             }
2626 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2627             rxi_ClearTransmitQueue(call, 0);
2628 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2629         } else {
2630           if (np->header.type == RX_PACKET_TYPE_ACK) {
2631         /* now check to see if this is an ack packet acknowledging that the
2632          * server actually *lost* some hard-acked data.  If this happens we
2633          * ignore this packet, as it may indicate that the server restarted in
2634          * the middle of a call.  It is also possible that this is an old ack
2635          * packet.  We don't abort the connection in this case, because this
2636          * *might* just be an old ack packet.  The right way to detect a server
2637          * restart in the midst of a call is to notice that the server epoch
2638          * changed, btw.  */
2639         /* XXX I'm not sure this is exactly right, since tfirst **IS**
2640          * XXX unacknowledged.  I think that this is off-by-one, but
2641          * XXX I don't dare change it just yet, since it will
2642          * XXX interact badly with the server-restart detection 
2643          * XXX code in receiveackpacket.  */
2644             if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2645                 MUTEX_ENTER(&rx_stats_mutex);
2646                 rx_stats.spuriousPacketsRead++;
2647                 MUTEX_EXIT(&rx_stats_mutex);
2648                 MUTEX_EXIT(&call->lock);
2649                 MUTEX_ENTER(&conn->conn_data_lock);
2650                 conn->refCount--;
2651                 MUTEX_EXIT(&conn->conn_data_lock);
2652                 return np;
2653             }
2654           }
2655         } /* else not a data packet */
2656     }
2657
2658     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2659     /* Set remote user defined status from packet */
2660     call->remoteStatus = np->header.userStatus;
2661
2662     /* Note the gap between the expected next packet and the actual
2663      * packet that arrived, when the new packet has a smaller serial number
2664      * than expected.  Rioses frequently reorder packets all by themselves,
2665      * so this will be quite important with very large window sizes.
2666      * Skew is checked against 0 here to avoid any dependence on the type of
2667      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2668      * true! 
2669      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2670      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2671      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2672      */
2673     MUTEX_ENTER(&conn->conn_data_lock);
2674     skew = conn->lastSerial - np->header.serial;
2675     conn->lastSerial = np->header.serial;
2676     MUTEX_EXIT(&conn->conn_data_lock);
2677     if (skew > 0) {
2678       register struct rx_peer *peer;
2679       peer = conn->peer;
2680       if (skew > peer->inPacketSkew) {
2681         dpf (("*** In skew changed from %d to %d\n", peer->inPacketSkew, skew));
2682         peer->inPacketSkew = skew;
2683       }
2684     }
2685
2686     /* Now do packet type-specific processing */
2687     switch (np->header.type) {
2688         case RX_PACKET_TYPE_DATA:
2689             np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port,
2690                                        tnop, newcallp);
2691             break;
2692         case RX_PACKET_TYPE_ACK:
2693             /* Respond immediately to ack packets requesting acknowledgement
2694              * (ping packets) */
2695             if (np->header.flags & RX_REQUEST_ACK) {
2696                 if (call->error) (void) rxi_SendCallAbort(call, 0, 1, 0);
2697                 else (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE, 1);
2698             }
2699             np = rxi_ReceiveAckPacket(call, np, 1);
2700             break;
2701         case RX_PACKET_TYPE_ABORT:
2702             /* An abort packet: reset the connection, passing the error up to
2703              * the user */
2704             /* What if error is zero? */
2705             rxi_CallError(call, ntohl(*(afs_int32 *)rx_DataOf(np)));
2706             break;
2707         case RX_PACKET_TYPE_BUSY:
2708             /* XXXX */
2709             break;
2710         case RX_PACKET_TYPE_ACKALL:
2711             /* All packets acknowledged, so we can drop all packets previously
2712              * readied for sending */
2713 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2714             /* XXX Hack. We because we can't release the global rx lock when
2715              * sending packets (osi_NetSend) we drop all ack pkts while we're
2716              * traversing the tq in rxi_Start sending packets out because
2717              * packets may move to the freePacketQueue as result of being
2718              * here! So we drop these packets until we're safely out of the
2719              * traversing. Really ugly! 
2720              * For fine grain RX locking, we set the acked field in the packets
2721              * and let rxi_Start remove the packets from the transmit queue.
2722              */
2723             if (call->flags & RX_CALL_TQ_BUSY) {
2724 #ifdef  RX_ENABLE_LOCKS
2725                 rxi_SetAcksInTransmitQueue(call);
2726                 break;
2727 #else /* RX_ENABLE_LOCKS */
2728                 conn->refCount--;
2729                 return np;              /* xmitting; drop packet */
2730 #endif /* RX_ENABLE_LOCKS */
2731             }
2732 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2733             rxi_ClearTransmitQueue(call, 0);
2734             break;
2735         default:
2736             /* Should not reach here, unless the peer is broken: send an abort
2737              * packet */
2738             rxi_CallError(call, RX_PROTOCOL_ERROR);
2739             np = rxi_SendCallAbort(call, np, 1, 0);
2740             break;
2741     };
2742     /* Note when this last legitimate packet was received, for keep-alive
2743      * processing.  Note, we delay getting the time until now in the hope that
2744      * the packet will be delivered to the user before any get time is required
2745      * (if not, then the time won't actually be re-evaluated here). */
2746     call->lastReceiveTime = clock_Sec();
2747     MUTEX_EXIT(&call->lock);
2748     MUTEX_ENTER(&conn->conn_data_lock);
2749     conn->refCount--;
2750     MUTEX_EXIT(&conn->conn_data_lock);
2751     return np;
2752 }
2753
2754 /* return true if this is an "interesting" connection from the point of view
2755     of someone trying to debug the system */
2756 int rxi_IsConnInteresting(struct rx_connection *aconn)
2757 {
2758     register int i;
2759     register struct rx_call *tcall;
2760
2761     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2762         return 1;
2763     for(i=0;i<RX_MAXCALLS;i++) {
2764         tcall = aconn->call[i];
2765         if (tcall) {
2766             if ((tcall->state == RX_STATE_PRECALL) || (tcall->state == RX_STATE_ACTIVE))
2767                 return 1;
2768             if ((tcall->mode == RX_MODE_SENDING) || (tcall->mode == RX_MODE_RECEIVING))
2769                 return 1;
2770         }
2771     }
2772     return 0;
2773 }
2774
2775 #ifdef KERNEL
2776 /* if this is one of the last few packets AND it wouldn't be used by the
2777    receiving call to immediately satisfy a read request, then drop it on
2778    the floor, since accepting it might prevent a lock-holding thread from
2779    making progress in its reading. If a call has been cleared while in
2780    the precall state then ignore all subsequent packets until the call
2781    is assigned to a thread. */
2782
2783 static TooLow(ap, acall)
2784   struct rx_call *acall;
2785   struct rx_packet *ap; {
2786     int rc=0;
2787     MUTEX_ENTER(&rx_stats_mutex);
2788     if (((ap->header.seq != 1) &&
2789          (acall->flags & RX_CALL_CLEARED) &&
2790          (acall->state == RX_STATE_PRECALL)) ||
2791         ((rx_nFreePackets < rxi_dataQuota+2) &&
2792          !( (ap->header.seq < acall->rnext+rx_initSendWindow) 
2793            && (acall->flags & RX_CALL_READER_WAIT)))) {
2794         rc = 1;
2795     }
2796     MUTEX_EXIT(&rx_stats_mutex);
2797     return rc;
2798 }
2799 #endif /* KERNEL */
2800
2801 /* try to attach call, if authentication is complete */
2802 static void TryAttach(acall, socket, tnop, newcallp)
2803 register struct rx_call *acall;
2804 register osi_socket socket;
2805 register int *tnop;
2806 register struct rx_call **newcallp; {
2807     register struct rx_connection *conn;
2808     conn = acall->conn;
2809     if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
2810         /* Don't attach until we have any req'd. authentication. */
2811         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
2812             rxi_AttachServerProc(acall, socket, tnop, newcallp);
2813             /* Note:  this does not necessarily succeed; there
2814                may not any proc available */
2815         }
2816         else {
2817             rxi_ChallengeOn(acall->conn);
2818         }
2819     }
2820 }
2821
2822 /* A data packet has been received off the interface.  This packet is
2823  * appropriate to the call (the call is in the right state, etc.).  This
2824  * routine can return a packet to the caller, for re-use */
2825
2826 struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
2827                                         port, tnop, newcallp)
2828     register struct rx_call *call;
2829     register struct rx_packet *np;
2830     int istack;
2831     osi_socket socket;
2832     afs_uint32 host;
2833     u_short port;
2834     int *tnop;
2835     struct rx_call **newcallp;
2836 {
2837     int ackNeeded = 0;
2838     int newPackets = 0;
2839     int didHardAck = 0;
2840     int haveLast = 0;
2841     afs_uint32 seq, serial, flags;
2842     int isFirst;
2843     struct rx_packet *tnp;
2844     struct clock when;
2845     MUTEX_ENTER(&rx_stats_mutex);
2846     rx_stats.dataPacketsRead++;
2847     MUTEX_EXIT(&rx_stats_mutex);
2848
2849 #ifdef KERNEL
2850     /* If there are no packet buffers, drop this new packet, unless we can find
2851      * packet buffers from inactive calls */
2852     if (!call->error &&
2853         (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
2854         MUTEX_ENTER(&rx_freePktQ_lock);
2855         rxi_NeedMorePackets = TRUE;
2856         MUTEX_EXIT(&rx_freePktQ_lock);
2857         MUTEX_ENTER(&rx_stats_mutex);
2858         rx_stats.noPacketBuffersOnRead++;
2859         MUTEX_EXIT(&rx_stats_mutex);
2860         call->rprev = np->header.serial;
2861         rxi_calltrace(RX_TRACE_DROP, call);
2862         dpf (("packet %x dropped on receipt - quota problems", np));
2863         if (rxi_doreclaim)
2864             rxi_ClearReceiveQueue(call);
2865         clock_GetTime(&when);
2866         clock_Add(&when, &rx_softAckDelay);
2867         if (!call->delayedAckEvent ||
2868             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
2869             rxevent_Cancel(call->delayedAckEvent, call,
2870                            RX_CALL_REFCOUNT_DELAY);
2871             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
2872             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2873                                                  call, 0);
2874         }
2875         /* we've damaged this call already, might as well do it in. */
2876         return np;
2877     }
2878 #endif /* KERNEL */
2879
2880     /*
2881      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
2882      * packet is one of several packets transmitted as a single
2883      * datagram. Do not send any soft or hard acks until all packets
2884      * in a jumbogram have been processed. Send negative acks right away.
2885      */
2886     for (isFirst = 1 , tnp = NULL ; isFirst || tnp ; isFirst = 0 ) {
2887         /* tnp is non-null when there are more packets in the
2888          * current jumbo gram */
2889         if (tnp) {
2890             if (np)
2891                 rxi_FreePacket(np);
2892             np = tnp;
2893         }
2894
2895         seq = np->header.seq;
2896         serial = np->header.serial;
2897         flags = np->header.flags;
2898
2899         /* If the call is in an error state, send an abort message */
2900         if (call->error)
2901             return rxi_SendCallAbort(call, np, istack, 0);
2902
2903         /* The RX_JUMBO_PACKET is set in all but the last packet in each
2904          * AFS 3.5 jumbogram. */
2905         if (flags & RX_JUMBO_PACKET) {
2906             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
2907         } else {
2908             tnp = NULL;
2909         }
2910
2911         if (np->header.spare != 0) {
2912             MUTEX_ENTER(&call->conn->conn_data_lock);
2913             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
2914             MUTEX_EXIT(&call->conn->conn_data_lock);
2915         }
2916
2917         /* The usual case is that this is the expected next packet */
2918         if (seq == call->rnext) {
2919
2920             /* Check to make sure it is not a duplicate of one already queued */
2921             if (queue_IsNotEmpty(&call->rq) 
2922                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
2923                 MUTEX_ENTER(&rx_stats_mutex);
2924                 rx_stats.dupPacketsRead++;
2925                 MUTEX_EXIT(&rx_stats_mutex);
2926                 dpf (("packet %x dropped on receipt - duplicate", np));
2927                 rxevent_Cancel(call->delayedAckEvent, call,
2928                                RX_CALL_REFCOUNT_DELAY);
2929                 np = rxi_SendAck(call, np, seq, serial,
2930                                  flags, RX_ACK_DUPLICATE, istack);
2931                 ackNeeded = 0;
2932                 call->rprev = seq;
2933                 continue;
2934             }
2935
2936             /* It's the next packet. Stick it on the receive queue
2937              * for this call. Set newPackets to make sure we wake
2938              * the reader once all packets have been processed */
2939             queue_Prepend(&call->rq, np);
2940             call->nSoftAcks++;
2941             np = NULL; /* We can't use this anymore */
2942             newPackets = 1;
2943
2944             /* If an ack is requested then set a flag to make sure we
2945              * send an acknowledgement for this packet */
2946             if (flags & RX_REQUEST_ACK) {
2947                 ackNeeded = 1;
2948             }
2949
2950             /* Keep track of whether we have received the last packet */
2951             if (flags & RX_LAST_PACKET) {
2952                 call->flags |= RX_CALL_HAVE_LAST;
2953                 haveLast = 1;
2954             }
2955
2956             /* Check whether we have all of the packets for this call */
2957             if (call->flags & RX_CALL_HAVE_LAST) {
2958                 afs_uint32 tseq;                /* temporary sequence number */
2959                 struct rx_packet *tp;   /* Temporary packet pointer */
2960                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
2961
2962                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2963                     if (tseq != tp->header.seq)
2964                         break;
2965                     if (tp->header.flags & RX_LAST_PACKET) {
2966                         call->flags |= RX_CALL_RECEIVE_DONE;
2967                         break;
2968                     }
2969                     tseq++;
2970                 }
2971             }
2972
2973             /* Provide asynchronous notification for those who want it
2974              * (e.g. multi rx) */
2975             if (call->arrivalProc) {
2976                 (*call->arrivalProc)(call, call->arrivalProcHandle,
2977                                      call->arrivalProcArg);
2978                 call->arrivalProc = (VOID (*)()) 0;
2979             }
2980
2981             /* Update last packet received */
2982             call->rprev = seq;
2983
2984             /* If there is no server process serving this call, grab
2985              * one, if available. We only need to do this once. If a
2986              * server thread is available, this thread becomes a server
2987              * thread and the server thread becomes a listener thread. */
2988             if (isFirst) {
2989                 TryAttach(call, socket, tnop, newcallp);
2990             }
2991         }       
2992         /* This is not the expected next packet. */
2993         else {
2994             /* Determine whether this is a new or old packet, and if it's
2995              * a new one, whether it fits into the current receive window.
2996              * Also figure out whether the packet was delivered in sequence.
2997              * We use the prev variable to determine whether the new packet
2998              * is the successor of its immediate predecessor in the
2999              * receive queue, and the missing flag to determine whether
3000              * any of this packets predecessors are missing.  */
3001
3002             afs_uint32 prev;            /* "Previous packet" sequence number */
3003             struct rx_packet *tp;       /* Temporary packet pointer */
3004             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3005             int missing;                /* Are any predecessors missing? */
3006
3007             /* If the new packet's sequence number has been sent to the
3008              * application already, then this is a duplicate */
3009             if (seq < call->rnext) {
3010                 MUTEX_ENTER(&rx_stats_mutex);
3011                 rx_stats.dupPacketsRead++;
3012                 MUTEX_EXIT(&rx_stats_mutex);
3013                 rxevent_Cancel(call->delayedAckEvent, call,
3014                                RX_CALL_REFCOUNT_DELAY);
3015                 np = rxi_SendAck(call, np, seq, serial,
3016                                  flags, RX_ACK_DUPLICATE, istack);
3017                 ackNeeded = 0;
3018                 call->rprev = seq;
3019                 continue;
3020             }
3021
3022             /* If the sequence number is greater than what can be
3023              * accomodated by the current window, then send a negative
3024              * acknowledge and drop the packet */
3025             if ((call->rnext + call->rwind) <= seq) {
3026                 rxevent_Cancel(call->delayedAckEvent, call,
3027                                RX_CALL_REFCOUNT_DELAY);
3028                 np = rxi_SendAck(call, np, seq, serial,
3029                                  flags, RX_ACK_EXCEEDS_WINDOW, istack);
3030                 ackNeeded = 0;
3031                 call->rprev = seq;
3032                 continue;
3033             }
3034
3035             /* Look for the packet in the queue of old received packets */
3036             for (prev = call->rnext - 1, missing = 0,
3037                  queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3038                 /*Check for duplicate packet */
3039                 if (seq == tp->header.seq) {
3040                     MUTEX_ENTER(&rx_stats_mutex);
3041                     rx_stats.dupPacketsRead++;
3042                     MUTEX_EXIT(&rx_stats_mutex);
3043                     rxevent_Cancel(call->delayedAckEvent, call,
3044                                    RX_CALL_REFCOUNT_DELAY);
3045                     np = rxi_SendAck(call, np, seq, serial, 
3046                                      flags, RX_ACK_DUPLICATE, istack);
3047                     ackNeeded = 0;
3048                     call->rprev = seq;
3049                     goto nextloop;
3050                 }
3051                 /* If we find a higher sequence packet, break out and
3052                  * insert the new packet here. */
3053                 if (seq < tp->header.seq) break;
3054                 /* Check for missing packet */
3055                 if (tp->header.seq != prev+1) {
3056                     missing = 1;
3057                 }
3058
3059                 prev = tp->header.seq;
3060             }
3061
3062             /* Keep track of whether we have received the last packet. */
3063             if (flags & RX_LAST_PACKET) {
3064                 call->flags |= RX_CALL_HAVE_LAST;
3065             }
3066
3067             /* It's within the window: add it to the the receive queue.
3068              * tp is left by the previous loop either pointing at the
3069              * packet before which to insert the new packet, or at the
3070              * queue head if the queue is empty or the packet should be
3071              * appended. */
3072             queue_InsertBefore(tp, np);
3073             call->nSoftAcks++;
3074             np = NULL;
3075
3076             /* Check whether we have all of the packets for this call */
3077             if ((call->flags & RX_CALL_HAVE_LAST)
3078              && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3079                 afs_uint32 tseq;                /* temporary sequence number */
3080
3081                 for (tseq = call->rnext,
3082                      queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3083                     if (tseq != tp->header.seq)
3084                         break;
3085                     if (tp->header.flags & RX_LAST_PACKET) {
3086                         call->flags |= RX_CALL_RECEIVE_DONE;
3087                         break;
3088                     }
3089                     tseq++;
3090                 }
3091             }
3092
3093             /* We need to send an ack of the packet is out of sequence, 
3094              * or if an ack was requested by the peer. */
3095             if (seq != prev+1 || missing || (flags & RX_REQUEST_ACK)) {
3096                 ackNeeded = 1;
3097             }
3098
3099             /* Acknowledge the last packet for each call */
3100             if (flags & RX_LAST_PACKET) {
3101                 haveLast = 1;
3102             }
3103
3104             call->rprev = seq;
3105         }
3106 nextloop:;
3107     }
3108
3109     if (newPackets) {
3110         /*
3111          * If the receiver is waiting for an iovec, fill the iovec
3112          * using the data from the receive queue */
3113         if (call->flags & RX_CALL_IOVEC_WAIT) {
3114             didHardAck = rxi_FillReadVec(call, seq, serial, flags); 
3115             /* the call may have been aborted */
3116             if (call->error) {
3117                 return NULL;
3118             }
3119             if (didHardAck) {
3120                 ackNeeded = 0;
3121             }
3122         }
3123
3124         /* Wakeup the reader if any */
3125         if ((call->flags & RX_CALL_READER_WAIT) &&
3126             (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes) ||
3127              (call->iovNext >= call->iovMax) ||
3128              (call->flags & RX_CALL_RECEIVE_DONE))) {
3129             call->flags &= ~RX_CALL_READER_WAIT;
3130 #ifdef  RX_ENABLE_LOCKS
3131             CV_BROADCAST(&call->cv_rq);
3132 #else
3133             osi_rxWakeup(&call->rq);
3134 #endif
3135         }
3136     }
3137
3138     /*
3139      * Send an ack when requested by the peer, or once every
3140      * rxi_SoftAckRate packets until the last packet has been
3141      * received. Always send a soft ack for the last packet in
3142      * the server's reply. */
3143     if (ackNeeded) {
3144         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3145         np = rxi_SendAck(call, np, seq, serial, flags,
3146                          RX_ACK_REQUESTED, istack);
3147     } else if (call->nSoftAcks > (u_short)rxi_SoftAckRate) {
3148         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3149         np = rxi_SendAck(call, np, seq, serial, flags,
3150                          RX_ACK_DELAY, istack);
3151     } else if (call->nSoftAcks) {
3152         clock_GetTime(&when);
3153         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3154             clock_Add(&when, &rx_lastAckDelay);
3155         } else {
3156             clock_Add(&when, &rx_softAckDelay);
3157         }
3158         if (!call->delayedAckEvent ||
3159             clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3160             rxevent_Cancel(call->delayedAckEvent, call,
3161                            RX_CALL_REFCOUNT_DELAY);
3162             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3163             call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
3164                                                  call, 0);
3165         }
3166     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3167         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3168     }
3169
3170     return np;
3171 }
3172
3173 #ifdef  ADAPT_WINDOW
3174 static void rxi_ComputeRate();
3175 #endif
3176
3177 /* The real smarts of the whole thing.  */
3178 struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
3179     register struct rx_call *call;
3180     struct rx_packet *np;
3181     int istack;
3182 {
3183     struct rx_ackPacket *ap;
3184     int nAcks;
3185     register struct rx_packet *tp;
3186     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3187     register struct rx_connection *conn = call->conn;
3188     struct rx_peer *peer = conn->peer;
3189     afs_uint32 first;
3190     afs_uint32 serial;
3191     /* because there are CM's that are bogus, sending weird values for this. */
3192     afs_uint32 skew = 0;
3193     int needRxStart = 0;
3194     int nbytes;
3195     int missing;
3196     int acked;
3197     int nNacked = 0;
3198     int newAckCount = 0;
3199     u_short maxMTU = 0;  /* Set if peer supports AFS 3.4a jumbo datagrams */
3200     int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
3201
3202     MUTEX_ENTER(&rx_stats_mutex);
3203     rx_stats.ackPacketsRead++;
3204     MUTEX_EXIT(&rx_stats_mutex);
3205     ap = (struct rx_ackPacket *) rx_DataOf(np);
3206     nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *)ap);
3207     if (nbytes < 0)
3208       return np;       /* truncated ack packet */
3209
3210     /* depends on ack packet struct */
3211     nAcks = MIN((unsigned)nbytes, (unsigned) ap->nAcks);
3212     first = ntohl(ap->firstPacket);
3213     serial = ntohl(ap->serial);
3214     /* temporarily disabled -- needs to degrade over time 
3215        skew = ntohs(ap->maxSkew); */
3216
3217     /* Ignore ack packets received out of order */
3218     if (first < call->tfirst) {
3219         return np;
3220     }
3221
3222     if (np->header.flags & RX_SLOW_START_OK) {
3223         call->flags |= RX_CALL_SLOW_START_OK;
3224     }
3225     
3226 #ifdef RXDEBUG
3227     if (rx_Log) {
3228       fprintf( rx_Log, 
3229               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3230               ap->reason, ntohl(ap->previousPacket), np->header.seq, serial, 
3231               skew, ntohl(ap->firstPacket));
3232         if (nAcks) {
3233             int offset;
3234             for (offset = 0; offset < nAcks; offset++) 
3235                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK? '-' : '*', rx_Log);
3236         }
3237         putc('\n', rx_Log);
3238     }
3239 #endif
3240
3241     /* if a server connection has been re-created, it doesn't remember what
3242         serial # it was up to.  An ack will tell us, since the serial field
3243         contains the largest serial received by the other side */
3244     MUTEX_ENTER(&conn->conn_data_lock);
3245     if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
3246         conn->serial = serial+1;
3247     }
3248     MUTEX_EXIT(&conn->conn_data_lock);
3249
3250     /* Update the outgoing packet skew value to the latest value of
3251      * the peer's incoming packet skew value.  The ack packet, of
3252      * course, could arrive out of order, but that won't affect things
3253      * much */
3254     MUTEX_ENTER(&peer->peer_lock);
3255     peer->outPacketSkew = skew;
3256
3257     /* Check for packets that no longer need to be transmitted, and
3258      * discard them.  This only applies to packets positively
3259      * acknowledged as having been sent to the peer's upper level.
3260      * All other packets must be retained.  So only packets with
3261      * sequence numbers < ap->firstPacket are candidates. */
3262     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3263         if (tp->header.seq >= first) break;
3264         call->tfirst = tp->header.seq + 1;
3265         if (tp->header.serial == serial) {
3266           rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3267 #ifdef ADAPT_WINDOW
3268           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3269 #endif
3270         }
3271         else if ((tp->firstSerial == serial)) {
3272           rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3273 #ifdef ADAPT_WINDOW
3274           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3275 #endif
3276         }
3277 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3278     /* XXX Hack. Because we have to release the global rx lock when sending
3279      * packets (osi_NetSend) we drop all acks while we're traversing the tq
3280      * in rxi_Start sending packets out because packets may move to the
3281      * freePacketQueue as result of being here! So we drop these packets until
3282      * we're safely out of the traversing. Really ugly! 
3283      * To make it even uglier, if we're using fine grain locking, we can
3284      * set the ack bits in the packets and have rxi_Start remove the packets
3285      * when it's done transmitting.
3286      */
3287         if (!tp->acked) {
3288             newAckCount++;
3289         }
3290         if (call->flags & RX_CALL_TQ_BUSY) {
3291 #ifdef RX_ENABLE_LOCKS
3292             tp->acked = 1;
3293             call->flags |= RX_CALL_TQ_SOME_ACKED;
3294 #else /* RX_ENABLE_LOCKS */
3295             break;
3296 #endif /* RX_ENABLE_LOCKS */
3297         } else
3298 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3299         {
3300         queue_Remove(tp);
3301         rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3302         }
3303     }
3304
3305 #ifdef ADAPT_WINDOW
3306     /* Give rate detector a chance to respond to ping requests */
3307     if (ap->reason == RX_ACK_PING_RESPONSE) {
3308         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3309     }
3310 #endif
3311
3312     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3313    
3314    /* Now go through explicit acks/nacks and record the results in
3315     * the waiting packets.  These are packets that can't be released
3316     * yet, even with a positive acknowledge.  This positive
3317     * acknowledge only means the packet has been received by the
3318     * peer, not that it will be retained long enough to be sent to
3319     * the peer's upper level.  In addition, reset the transmit timers
3320     * of any missing packets (those packets that must be missing
3321     * because this packet was out of sequence) */
3322
3323     call->nSoftAcked = 0;
3324     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3325         /* Update round trip time if the ack was stimulated on receipt
3326          * of this packet */
3327 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3328 #ifdef RX_ENABLE_LOCKS
3329         if (tp->header.seq >= first) {
3330 #endif /* RX_ENABLE_LOCKS */
3331 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3332         if (tp->header.serial == serial) {
3333           rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
3334 #ifdef ADAPT_WINDOW
3335           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3336 #endif
3337         }
3338         else if ((tp->firstSerial == serial)) {
3339           rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
3340 #ifdef ADAPT_WINDOW
3341           rxi_ComputeRate(peer, call, tp, np, ap->reason);
3342 #endif
3343         }
3344 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3345 #ifdef RX_ENABLE_LOCKS
3346         }
3347 #endif /* RX_ENABLE_LOCKS */
3348 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3349
3350         /* Set the acknowledge flag per packet based on the
3351          * information in the ack packet. An acknowlegded packet can
3352          * be downgraded when the server has discarded a packet it
3353          * soacked previously, or when an ack packet is received
3354          * out of sequence. */
3355         if (tp->header.seq < first) {
3356             /* Implicit ack information */
3357             if (!tp->acked) {
3358                 newAckCount++;
3359             }
3360             tp->acked = 1;
3361         }
3362         else if (tp->header.seq < first + nAcks) {
3363             /* Explicit ack information:  set it in the packet appropriately */
3364             if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
3365                 if (!tp->acked) {
3366                     newAckCount++;
3367                     tp->acked = 1;
3368                 }
3369                 if (missing) {
3370                     nNacked++;
3371                 } else {
3372                     call->nSoftAcked++;
3373                 }
3374             } else {
3375                 tp->acked = 0;
3376                 missing = 1;
3377             }
3378         }
3379         else {
3380             tp->acked = 0;
3381             missing = 1;
3382         }
3383
3384         /* If packet isn't yet acked, and it has been transmitted at least 
3385          * once, reset retransmit time using latest timeout 
3386          * ie, this should readjust the retransmit timer for all outstanding 
3387          * packets...  So we don't just retransmit when we should know better*/
3388
3389         if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
3390           tp->retryTime = tp->timeSent;
3391           clock_Add(&tp->retryTime, &peer->timeout);
3392           /* shift by eight because one quarter-sec ~ 256 milliseconds */
3393           clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
3394         }
3395     }
3396
3397     /* If the window has been extended by this acknowledge packet,
3398      * then wakeup a sender waiting in alloc for window space, or try
3399      * sending packets now, if he's been sitting on packets due to
3400      * lack of window space */
3401     if (call->tnext < (call->tfirst + call->twind))  {
3402 #ifdef  RX_ENABLE_LOCKS
3403         CV_SIGNAL(&call->cv_twind);
3404 #else
3405         if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
3406             call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
3407             osi_rxWakeup(&call->twind);
3408         }
3409 #endif
3410         if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
3411             call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
3412         }
3413     }
3414
3415     /* if the ack packet has a receivelen field hanging off it,
3416      * update our state */
3417     if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
3418       afs_uint32 tSize;
3419
3420       /* If the ack packet has a "recommended" size that is less than 
3421        * what I am using now, reduce my size to match */
3422       rx_packetread(np, rx_AckDataSize(ap->nAcks)+sizeof(afs_int32),
3423                     sizeof(afs_int32), &tSize);
3424       tSize = (afs_uint32) ntohl(tSize);
3425       peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
3426
3427       /* Get the maximum packet size to send to this peer */
3428       rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
3429                     &tSize);
3430       tSize = (afs_uint32)ntohl(tSize);
3431       tSize = (afs_uint32)MIN(tSize, rx_MyMaxSendSize);
3432       tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
3433
3434       /* sanity check - peer might have restarted with different params.
3435        * If peer says "send less", dammit, send less...  Peer should never 
3436        * be unable to accept packets of the size that prior AFS versions would
3437        * send without asking.  */
3438       if (peer->maxMTU != tSize) {
3439           peer->maxMTU = tSize;
3440           peer->MTU = MIN(tSize, peer->MTU);
3441           call->MTU = MIN(call->MTU, tSize);
3442           peer->congestSeq++;
3443       }
3444
3445       if ( np->length == rx_AckDataSize(ap->nAcks) +3*sizeof(afs_int32)) {
3446           /* AFS 3.4a */
3447           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3448                         sizeof(afs_int32), &tSize);
3449           tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
3450           if (tSize < call->twind) {       /* smaller than our send */
3451               call->twind = tSize;         /* window, we must send less... */
3452               call->ssthresh = MIN(call->twind, call->ssthresh);
3453           }
3454
3455           /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
3456            * network MTU confused with the loopback MTU. Calculate the
3457            * maximum MTU here for use in the slow start code below.
3458            */
3459           maxMTU = peer->maxMTU;
3460           /* Did peer restart with older RX version? */
3461           if (peer->maxDgramPackets > 1) {
3462               peer->maxDgramPackets = 1;
3463           }
3464       } else if ( np->length >= rx_AckDataSize(ap->nAcks) +4*sizeof(afs_int32)) {
3465           /* AFS 3.5 */
3466           rx_packetread(np, rx_AckDataSize(ap->nAcks)+2*sizeof(afs_int32),
3467                         sizeof(afs_int32), &tSize);
3468           tSize = (afs_uint32) ntohl(tSize);
3469           /*
3470            * As of AFS 3.5 we set the send window to match the receive window. 
3471            */
3472           if (tSize < call->twind) {
3473               call->twind = tSize;
3474               call->ssthresh = MIN(call->twind, call->ssthresh);
3475           } else if (tSize > call->twind) {
3476               call->twind = tSize;
3477           }
3478
3479           /*
3480            * As of AFS 3.5, a jumbogram is more than one fixed size
3481            * packet transmitted in a single UDP datagram. If the remote
3482            * MTU is smaller than our local MTU then never send a datagram
3483            * larger than the natural MTU.
3484            */
3485           rx_packetread(np, rx_AckDataSize(ap->nAcks)+3*sizeof(afs_int32),
3486                         sizeof(afs_int32), &tSize);
3487           maxDgramPackets = (afs_uint32) ntohl(tSize);
3488           maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
3489           maxDgramPackets = MIN(maxDgramPackets,
3490                                 (int)(peer->ifDgramPackets));
3491           maxDgramPackets = MIN(maxDgramPackets, tSize);
3492           if (maxDgramPackets > 1) {
3493             peer->maxDgramPackets = maxDgramPackets;
3494             call->MTU = RX_JUMBOBUFFERSIZE+RX_HEADER_SIZE;
3495           } else {
3496             peer->maxDgramPackets = 1;
3497             call->MTU = peer->natMTU;
3498           }
3499        } else if (peer->maxDgramPackets > 1) {
3500           /* Restarted with lower version of RX */
3501           peer->maxDgramPackets = 1;
3502        }
3503     } else if (peer->maxDgramPackets > 1 ||
3504                peer->maxMTU != OLD_MAX_PACKET_SIZE) {
3505         /* Restarted with lower version of RX */
3506         peer->maxMTU = OLD_MAX_PACKET_SIZE;
3507         peer->natMTU = OLD_MAX_PACKET_SIZE;
3508         peer->MTU = OLD_MAX_PACKET_SIZE;
3509         peer->maxDgramPackets = 1;
3510         peer->nDgramPackets = 1;
3511         peer->congestSeq++;
3512         call->MTU = OLD_MAX_PACKET_SIZE;
3513     }
3514
3515     if (nNacked) {
3516         /*
3517          * Calculate how many datagrams were successfully received after
3518          * the first missing packet and adjust the negative ack counter
3519          * accordingly.
3520          */
3521         call->nAcks = 0;
3522         call->nNacks++;
3523         nNacked = (nNacked + call->nDgramPackets - 1) / call->nDgramPackets;
3524         if (call->nNacks < nNacked) {
3525             call->nNacks = nNacked;
3526         }
3527     } else {
3528         if (newAckCount) {
3529             call->nAcks++;
3530         }
3531         call->nNacks = 0;
3532     }
3533
3534     if (call->flags & RX_CALL_FAST_RECOVER) {
3535         if (nNacked) {
3536             call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3537         } else {
3538             call->flags &= ~RX_CALL_FAST_RECOVER;
3539             call->cwind = call->nextCwind;
3540             call->nextCwind = 0;
3541             call->nAcks = 0;
3542         }
3543         call->nCwindAcks = 0;
3544     }
3545     else if (nNacked && call->nNacks >= (u_short)rx_nackThreshold) {
3546         /* Three negative acks in a row trigger congestion recovery */
3547 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3548         MUTEX_EXIT(&peer->peer_lock);
3549         if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
3550                 /* someone else is waiting to start recovery */
3551                 return np;
3552         }
3553         call->flags |= RX_CALL_FAST_RECOVER_WAIT;
3554         while (call->flags & RX_CALL_TQ_BUSY) {
3555             call->flags |= RX_CALL_TQ_WAIT;
3556 #ifdef RX_ENABLE_LOCKS
3557             CV_WAIT(&call->cv_tq, &call->lock);
3558 #else /* RX_ENABLE_LOCKS */
3559             osi_rxSleep(&call->tq);
3560 #endif /* RX_ENABLE_LOCKS */
3561         }
3562         MUTEX_ENTER(&peer->peer_lock);
3563 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3564         call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
3565         call->flags |= RX_CALL_FAST_RECOVER;
3566         call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind))>>1;
3567         call->cwind = MIN((int)(call->ssthresh + rx_nackThreshold),
3568                           rx_maxSendWindow);
3569         call->nDgramPackets = MAX(2, (int)call->nDgramPackets)>>1;
3570         call->nextCwind = call->ssthresh;
3571         call->nAcks = 0;
3572         call->nNacks = 0;
3573         peer->MTU = call->MTU;
3574         peer->cwind = call->nextCwind;
3575         peer->nDgramPackets = call->nDgramPackets;
3576         peer->congestSeq++;
3577         call->congestSeq = peer->congestSeq;
3578         /* Reset the resend times on the packets that were nacked
3579          * so we will retransmit as soon as the window permits*/
3580         for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
3581             if (acked) {
3582                 if (!tp->acked) {
3583                     clock_Zero(&tp->retryTime);
3584                 }
3585             } else if (tp->acked) {
3586                 acked = 1;
3587             }
3588         }
3589     } else {
3590         /* If cwind is smaller than ssthresh, then increase
3591          * the window one packet for each ack we receive (exponential
3592          * growth).
3593          * If cwind is greater than or equal to ssthresh then increase
3594          * the congestion window by one packet for each cwind acks we
3595          * receive (linear growth).  */
3596         if (call->cwind < call->ssthresh) {
3597             call->cwind = MIN((int)call->ssthresh,
3598                               (int)(call->cwind + newAckCount));
3599             call->nCwindAcks = 0;
3600         } else {
3601             call->nCwindAcks += newAckCount;
3602             if (call->nCwindAcks >= call->cwind) {
3603                 call->nCwindAcks = 0;
3604                 call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
3605             }
3606         }
3607         /*
3608          * If we have received several acknowledgements in a row then
3609          * it is time to increase the size of our datagrams
3610          */
3611         if ((int)call->nAcks > rx_nDgramThreshold) {
3612             if (peer->maxDgramPackets > 1) {
3613                 if (call->nDgramPackets < peer->maxDgramPackets) {
3614                     call->nDgramPackets++;
3615                 }
3616                 call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
3617             } else if (call->MTU < peer->maxMTU) {
3618                 call->MTU += peer->natMTU;
3619                 call->MTU = MIN(call->MTU, peer->maxMTU);
3620             }
3621             call->nAcks = 0;
3622         }
3623     }
3624
3625     MUTEX_EXIT(&peer->peer_lock); /* rxi_Start will lock peer. */
3626
3627     /* Servers need to hold the call until all response packets have
3628      * been acknowledged. Soft acks are good enough since clients
3629      * are not allowed to clear their receive queues. */
3630     if (call->state == RX_STATE_HOLD &&
3631         call->tfirst + call->nSoftAcked >= call->tnext) {
3632         call->state = RX_STATE_DALLY;
3633         rxi_ClearTransmitQueue(call, 0);
3634     } else if (!queue_IsEmpty(&call->tq)) {
3635         rxi_Start(0, call, istack);
3636     }
3637     return np;
3638 }
3639
3640 /* Received a response to a challenge packet */
3641 struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
3642     register struct rx_connection *conn;
3643     register struct rx_packet *np;
3644     int istack;
3645 {
3646     int error;
3647
3648     /* Ignore the packet if we're the client */
3649     if (conn->type == RX_CLIENT_CONNECTION) return np;
3650
3651     /* If already authenticated, ignore the packet (it's probably a retry) */
3652     if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
3653         return np;
3654
3655     /* Otherwise, have the security object evaluate the response packet */
3656     error = RXS_CheckResponse(conn->securityObject, conn, np);
3657     if (error) {
3658         /* If the response is invalid, reset the connection, sending
3659          * an abort to the peer */
3660 #ifndef KERNEL
3661         rxi_Delay(1);
3662 #endif
3663         rxi_ConnectionError(conn, error);
3664         MUTEX_ENTER(&conn->conn_data_lock);
3665         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3666         MUTEX_EXIT(&conn->conn_data_lock);
3667         return np;
3668     }
3669     else {
3670         /* If the response is valid, any calls waiting to attach
3671          * servers can now do so */
3672         int i;
3673         for (i=0; i<RX_MAXCALLS; i++) {
3674             struct rx_call *call = conn->call[i];
3675             if (call) {
3676                 MUTEX_ENTER(&call->lock);
3677                  if (call->state == RX_STATE_PRECALL)
3678                      rxi_AttachServerProc(call, -1, NULL, NULL);
3679                 MUTEX_EXIT(&call->lock);
3680             }
3681         }
3682     }
3683     return np;
3684 }
3685
3686 /* A client has received an authentication challenge: the security
3687  * object is asked to cough up a respectable response packet to send
3688  * back to the server.  The server is responsible for retrying the
3689  * challenge if it fails to get a response. */
3690
3691 struct rx_packet *
3692 rxi_ReceiveChallengePacket(conn, np, istack)
3693     register struct rx_connection *conn;
3694     register struct rx_packet *np;
3695     int istack;
3696 {
3697     int error;
3698
3699     /* Ignore the challenge if we're the server */
3700     if (conn->type == RX_SERVER_CONNECTION) return np;
3701
3702     /* Ignore the challenge if the connection is otherwise idle; someone's
3703      * trying to use us as an oracle. */
3704     if (!rxi_HasActiveCalls(conn)) return np;
3705
3706     /* Send the security object the challenge packet.  It is expected to fill
3707      * in the response. */
3708     error = RXS_GetResponse(conn->securityObject, conn, np);
3709
3710     /* If the security object is unable to return a valid response, reset the
3711      * connection and send an abort to the peer.  Otherwise send the response
3712      * packet to the peer connection. */
3713     if (error) {
3714         rxi_ConnectionError(conn, error);
3715         MUTEX_ENTER(&conn->conn_data_lock);
3716         np = rxi_SendConnectionAbort(conn, np, istack, 0);
3717         MUTEX_EXIT(&conn->conn_data_lock);
3718     }
3719     else {
3720         np = rxi_SendSpecial((struct rx_call *)0, conn, np,
3721                              RX_PACKET_TYPE_RESPONSE, (char *) 0, -1, istack);
3722     }
3723     return np;
3724 }
3725
3726
3727 /* Find an available server process to service the current request in
3728  * the given call structure.  If one isn't available, queue up this
3729  * call so it eventually gets one */
3730 void 
3731 rxi_AttachServerProc(call, socket, tnop, newcallp)
3732 register struct rx_call *call;
3733 register osi_socket socket;
3734 register int *tnop;
3735 register struct rx_call **newcallp;
3736 {
3737     register struct rx_serverQueueEntry *sq;
3738     register struct rx_service *service = call->conn->service;
3739 #ifdef RX_ENABLE_LOCKS
3740     register int haveQuota = 0;
3741 #endif /* RX_ENABLE_LOCKS */
3742     /* May already be attached */
3743     if (call->state == RX_STATE_ACTIVE) return;
3744
3745     MUTEX_ENTER(&rx_serverPool_lock);
3746 #ifdef RX_ENABLE_LOCKS
3747     while(rxi_ServerThreadSelectingCall) {
3748         MUTEX_EXIT(&call->lock);
3749         CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
3750         MUTEX_EXIT(&rx_serverPool_lock);
3751         MUTEX_ENTER(&call->lock);
3752         MUTEX_ENTER(&rx_serverPool_lock);
3753         /* Call may have been attached */
3754         if (call->state == RX_STATE_ACTIVE) return;
3755     }
3756
3757     haveQuota = QuotaOK(service);
3758     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
3759         /* If there are no processes available to service this call,
3760          * put the call on the incoming call queue (unless it's
3761          * already on the queue).
3762          */
3763         if (haveQuota)
3764             ReturnToServerPool(service);
3765         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3766             call->flags |= RX_CALL_WAIT_PROC;
3767             MUTEX_ENTER(&rx_stats_mutex);
3768             rx_nWaiting++;
3769             MUTEX_EXIT(&rx_stats_mutex);
3770             rxi_calltrace(RX_CALL_ARRIVAL, call);
3771             SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
3772             queue_Append(&rx_incomingCallQueue, call);
3773         }
3774     }
3775 #else /* RX_ENABLE_LOCKS */
3776     if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
3777         /* If there are no processes available to service this call,
3778          * put the call on the incoming call queue (unless it's
3779          * already on the queue).
3780          */
3781         if (!(call->flags & RX_CALL_WAIT_PROC)) {
3782             call->flags |= RX_CALL_WAIT_PROC;
3783             rx_nWaiting++;
3784             rxi_calltrace(RX_CALL_ARRIVAL, call);
3785             queue_Append(&rx_incomingCallQueue, call);
3786         }
3787     }
3788 #endif /* RX_ENABLE_LOCKS */
3789     else {
3790         sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
3791
3792         /* If hot threads are enabled, and both newcallp and sq->socketp
3793          * are non-null, then this thread will process the call, and the
3794          * idle server thread will start listening on this threads socket.
3795          */
3796         queue_Remove(sq);
3797         if (rx_enable_hot_thread && newcallp && sq->socketp) {
3798             *newcallp = call;
3799             *tnop = sq->tno;
3800             *sq->socketp = socket;
3801             clock_GetTime(&call->startTime);
3802             CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
3803         } else {
3804             sq->newcall = call;
3805         }
3806         if (call->flags & RX_CALL_WAIT_PROC) {
3807             /* Conservative:  I don't think this should happen */
3808             call->flags &= ~RX_CALL_WAIT_PROC;
3809             MUTEX_ENTER(&rx_stats_mutex);
3810             rx_nWaiting--;
3811             MUTEX_EXIT(&rx_stats_mutex);
3812             queue_Remove(call);
3813         }
3814         call->state = RX_STATE_ACTIVE;
3815         call->mode = RX_MODE_RECEIVING;
3816         if (call->flags & RX_CALL_CLEARED) {
3817             /* send an ack now to start the packet flow up again */
3818             call->flags &= ~RX_CALL_CLEARED;
3819             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3820         }
3821 #ifdef  RX_ENABLE_LOCKS
3822         CV_SIGNAL(&sq->cv);
3823 #else
3824         service->nRequestsRunning++;
3825         if (service->nRequestsRunning <= service->minProcs)
3826           rxi_minDeficit--;
3827         rxi_availProcs--;
3828         osi_rxWakeup(sq);
3829 #endif
3830     }
3831     MUTEX_EXIT(&rx_serverPool_lock);
3832 }
3833
3834 /* Delay the sending of an acknowledge event for a short while, while
3835  * a new call is being prepared (in the case of a client) or a reply
3836  * is being prepared (in the case of a server).  Rather than sending
3837  * an ack packet, an ACKALL packet is sent. */
3838 void rxi_AckAll(event, call, dummy)
3839 struct rxevent *event;
3840 register struct rx_call *call;
3841 char *dummy;
3842 {
3843 #ifdef RX_ENABLE_LOCKS
3844     if (event) {
3845         MUTEX_ENTER(&call->lock);
3846         call->delayedAckEvent = (struct rxevent *) 0;
3847         CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
3848     }
3849     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3850                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3851     if (event)
3852         MUTEX_EXIT(&call->lock);
3853 #else /* RX_ENABLE_LOCKS */
3854     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3855     rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
3856                     RX_PACKET_TYPE_ACKALL, (char *) 0, 0, 0);
3857 #endif /* RX_ENABLE_LOCKS */
3858 }
3859
3860 void rxi_SendDelayedAck(event, call, dummy)     
3861 struct rxevent *event;
3862 register struct rx_call *call;
3863 char *dummy;
3864 {
3865 #ifdef RX_ENABLE_LOCKS
3866     if (event) {
3867         MUTEX_ENTER(&call->lock);
3868         if (event == call->delayedAckEvent)
3869             call->delayedAckEvent = (struct rxevent *) 0;
3870         CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
3871     }
3872     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3873     if (event)
3874         MUTEX_EXIT(&call->lock);
3875 #else /* RX_ENABLE_LOCKS */
3876     if (event) call->delayedAckEvent = (struct rxevent *) 0;
3877     (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
3878 #endif /* RX_ENABLE_LOCKS */
3879 }
3880
3881
3882 #ifdef RX_ENABLE_LOCKS
3883 /* Set ack in all packets in transmit queue. rxi_Start will deal with
3884  * clearing them out.
3885  */
3886 static void rxi_SetAcksInTransmitQueue(call)
3887       register struct rx_call *call;
3888 {
3889     register struct rx_packet *p, *tp;
3890     int someAcked = 0;
3891
3892      for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3893          if (!p) 
3894              break;
3895          p->acked = 1;
3896          someAcked = 1;
3897      }
3898      if (someAcked) {
3899          call->flags |= RX_CALL_TQ_CLEARME;
3900          call->flags |= RX_CALL_TQ_SOME_ACKED;
3901      }
3902
3903      rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3904      rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3905      call->tfirst = call->tnext;
3906      call->nSoftAcked = 0;
3907
3908      if (call->flags & RX_CALL_FAST_RECOVER) {
3909         call->flags &= ~RX_CALL_FAST_RECOVER;
3910         call->cwind = call->nextCwind;
3911         call->nextCwind = 0;
3912      }
3913
3914      CV_SIGNAL(&call->cv_twind);
3915 }
3916 #endif /* RX_ENABLE_LOCKS */
3917
3918 /* Clear out the transmit queue for the current call (all packets have
3919  * been received by peer) */
3920 void rxi_ClearTransmitQueue(call, force)
3921     register struct rx_call *call;
3922     register int force;
3923 {
3924     register struct rx_packet *p, *tp;
3925
3926 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3927     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
3928         int someAcked = 0;
3929         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3930           if (!p) 
3931              break;
3932           p->acked = 1;
3933           someAcked = 1;
3934         }
3935         if (someAcked) {
3936             call->flags |= RX_CALL_TQ_CLEARME;
3937             call->flags |= RX_CALL_TQ_SOME_ACKED;
3938         }
3939     } else {
3940 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3941         for (queue_Scan(&call->tq, p, tp, rx_packet)) {
3942             if (!p) 
3943                 break;
3944             queue_Remove(p);
3945             rxi_FreePacket(p);
3946         }
3947 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3948         call->flags &= ~RX_CALL_TQ_CLEARME;
3949     }
3950 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3951
3952     rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
3953     rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
3954     call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
3955     call->nSoftAcked = 0;
3956
3957     if (call->flags & RX_CALL_FAST_RECOVER) {
3958         call->flags &= ~RX_CALL_FAST_RECOVER;
3959         call->cwind = call->nextCwind;
3960     }
3961
3962 #ifdef  RX_ENABLE_LOCKS
3963     CV_SIGNAL(&call->cv_twind);
3964 #else
3965     osi_rxWakeup(&call->twind);
3966 #endif
3967 }
3968
3969 void rxi_ClearReceiveQueue(call)
3970     register struct rx_call *call;
3971 {
3972     register struct rx_packet *p, *tp;
3973     if (queue_IsNotEmpty(&call->rq)) {
3974       for (queue_Scan(&call->rq, p, tp, rx_packet)) {
3975         if (!p)
3976           break;
3977         queue_Remove(p);
3978         rxi_FreePacket(p);
3979         rx_packetReclaims++;
3980       }
3981       call->flags &= ~(RX_CALL_RECEIVE_DONE|RX_CALL_HAVE_LAST);
3982     }
3983     if (call->state == RX_STATE_PRECALL) {
3984         call->flags |= RX_CALL_CLEARED;
3985     }
3986 }
3987
3988 /* Send an abort packet for the specified call */
3989 struct rx_packet *rxi_SendCallAbort(call, packet, istack, force)
3990     register struct rx_call *call;
3991     struct rx_packet *packet;
3992     int istack;
3993     int force;
3994 {
3995   afs_int32 error;
3996   struct clock when;
3997
3998   if (!call->error)
3999     return packet;
4000
4001   /* Clients should never delay abort messages */
4002   if (rx_IsClientConn(call->conn))
4003     force = 1;
4004
4005   if (call->abortCode != call->error) {
4006     call->abortCode = call->error;
4007     call->abortCount = 0;
4008   }
4009
4010   if (force || rxi_callAbortThreshhold == 0 ||
4011       call->abortCount < rxi_callAbortThreshhold) {
4012     if (call->delayedAbortEvent) {
4013         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4014     }
4015     error = htonl(call->error);
4016     call->abortCount++;
4017     packet = rxi_SendSpecial(call, call->conn, packet,
4018                              RX_PACKET_TYPE_ABORT, (char *)&error,
4019                              sizeof(error), istack);
4020   } else if (!call->delayedAbortEvent) {
4021     clock_GetTime(&when);
4022     clock_Addmsec(&when, rxi_callAbortDelay);
4023     CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
4024     call->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedCallAbort,
4025                                            call, 0);
4026   }
4027   return packet;
4028 }
4029
4030 /* Send an abort packet for the specified connection.  Packet is an
4031  * optional pointer to a packet that can be used to send the abort.
4032  * Once the number of abort messages reaches the threshhold, an
4033  * event is scheduled to send the abort. Setting the force flag
4034  * overrides sending delayed abort messages.
4035  *
4036  * NOTE: Called with conn_data_lock held. conn_data_lock is dropped
4037  *       to send the abort packet.
4038  */
4039 struct rx_packet *rxi_SendConnectionAbort(conn, packet, istack, force)
4040     register struct rx_connection *conn;
4041     struct rx_packet *packet;
4042     int istack;
4043     int force;
4044 {
4045   afs_int32 error;
4046   struct clock when;
4047
4048   if (!conn->error)
4049     return packet;
4050
4051   /* Clients should never delay abort messages */
4052   if (rx_IsClientConn(conn))
4053     force = 1;
4054
4055   if (force || rxi_connAbortThreshhold == 0 ||
4056       conn->abortCount < rxi_connAbortThreshhold) {
4057     if (conn->delayedAbortEvent) {
4058         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call*)0, 0);
4059     }
4060     error = htonl(conn->error);
4061     conn->abortCount++;
4062     MUTEX_EXIT(&conn->conn_data_lock);
4063     packet = rxi_SendSpecial((struct rx_call *)0, conn, packet,
4064                              RX_PACKET_TYPE_ABORT, (char *)&error,
4065                              sizeof(error), istack);
4066     MUTEX_ENTER(&conn->conn_data_lock);
4067   } else if (!conn->delayedAbortEvent) {
4068     clock_GetTime(&when);
4069     clock_Addmsec(&when, rxi_connAbortDelay);
4070     conn->delayedAbortEvent = rxevent_Post(&when, rxi_SendDelayedConnAbort,
4071                                            conn, 0);
4072   }
4073   return packet;
4074 }
4075
4076 /* Associate an error all of the calls owned by a connection.  Called
4077  * with error non-zero.  This is only for really fatal things, like
4078  * bad authentication responses.  The connection itself is set in
4079  * error at this point, so that future packets received will be
4080  * rejected. */
4081 void rxi_ConnectionError(conn, error)
4082     register struct rx_connection *conn;
4083     register afs_int32 error;
4084 {
4085     if (error) {
4086         register int i;
4087         if (conn->challengeEvent)
4088             rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
4089         for (i=0; i<RX_MAXCALLS; i++) {
4090             struct rx_call *call = conn->call[i];
4091             if (call) {
4092                 MUTEX_ENTER(&call->lock);
4093                 rxi_CallError(call, error);
4094                 MUTEX_EXIT(&call->lock);
4095             }
4096         }
4097         conn->error = error;
4098         MUTEX_ENTER(&rx_stats_mutex);
4099         rx_stats.fatalErrors++;
4100         MUTEX_EXIT(&rx_stats_mutex);
4101     }
4102 }
4103
4104 void rxi_CallError(call, error)
4105     register struct rx_call *call;
4106     afs_int32 error;
4107 {
4108     if (call->error) error = call->error;
4109 #ifdef RX_GLOBAL_RXLOCK_KERNEL
4110     if (!(call->flags & RX_CALL_TQ_BUSY)) {
4111         rxi_ResetCall(call, 0);
4112     }
4113 #else
4114         rxi_ResetCall(call, 0);
4115 #endif
4116     call->error = error;
4117     call->mode = RX_MODE_ERROR;
4118 }
4119
4120 /* Reset various fields in a call structure, and wakeup waiting
4121  * processes.  Some fields aren't changed: state & mode are not
4122  * touched (these must be set by the caller), and bufptr, nLeft, and
4123  * nFree are not reset, since these fields are manipulated by
4124  * unprotected macros, and may only be reset by non-interrupting code.
4125  */
4126 #ifdef ADAPT_WINDOW
4127 /* this code requires that call->conn be set properly as a pre-condition. */
4128 #endif /* ADAPT_WINDOW */
4129
4130 void rxi_ResetCall(call, newcall)
4131     register struct rx_call *call;
4132     register int newcall;
4133 {
4134     register int flags;
4135     register struct rx_peer *peer;
4136     struct rx_packet *packet;
4137
4138     /* Notify anyone who is waiting for asynchronous packet arrival */
4139     if (call->arrivalProc) {
4140         (*call->arrivalProc)(call, call->arrivalProcHandle, call->arrivalProcArg);
4141         call->arrivalProc = (VOID (*)()) 0;
4142     }
4143
4144     if (call->delayedAbortEvent) {
4145         rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
4146         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
4147         if (packet) {
4148             rxi_SendCallAbort(call, packet, 0, 1);
4149             rxi_FreePacket(packet);
4150         }
4151     }
4152
4153     /*
4154      * Update the peer with the congestion information in this call
4155      * so other calls on this connection can pick up where this call
4156      * left off. If the congestion sequence numbers don't match then
4157      * another call experienced a retransmission.
4158      */
4159     peer = call->conn->peer;
4160     MUTEX_ENTER(&peer->peer_lock);
4161     if (!newcall) {
4162         if (call->congestSeq == peer->congestSeq) {
4163             peer->cwind = MAX(peer->cwind, call->cwind);
4164             peer->MTU = MAX(peer->MTU, call->MTU);
4165             peer->nDgramPackets = MAX(peer->nDgramPackets, call->nDgramPackets);
4166         }
4167     } else {
4168         call->abortCode = 0;
4169         call->abortCount = 0;
4170     }
4171     if (peer->maxDgramPackets > 1) {
4172         call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
4173     } else {
4174         call->MTU = peer->MTU;
4175     }
4176     call->cwind = MIN((int)peer->cwind, (int)peer->nDgramPackets);
4177     call->ssthresh = rx_maxSendWindow;
4178     call->nDgramPackets = peer->nDgramPackets;
4179     call->congestSeq = peer->congestSeq;
4180     MUTEX_EXIT(&peer->peer_lock);
4181
4182     flags = call->flags;
4183     rxi_ClearReceiveQueue(call);
4184 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
4185     if (call->flags & RX_CALL_TQ_BUSY) {
4186         call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
4187         call->flags |= (flags & RX_CALL_TQ_WAIT);
4188     } else
4189 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
4190     {
4191         rxi_ClearTransmitQueue(call, 0);
4192         queue_Init(&call->tq);
4193         call->flags = 0;
4194     }
4195     queue_Init(&call->rq);
4196     call->error = 0;
4197     call->rwind = rx_initReceiveWindow; 
4198     call->twind = rx_initSendWindow; 
4199     call->nSoftAcked = 0;
4200     call->nextCwind = 0;
4201     call->nAcks = 0;
4202     call->nNacks = 0;
4203     call->nCwindAcks = 0;
4204     call->nSoftAcks = 0;
4205     call->nHardAcks = 0;
4206
4207     call->tfirst = call->rnext = call->tnext = 1;
4208     call->rprev = 0;
4209     call->lastAcked = 0;
4210     call->localStatus = call->remoteStatus = 0;
4211
4212     if (flags & RX_CALL_READER_WAIT)  {
4213 #ifdef  RX_ENABLE_LOCKS
4214         CV_BROADCAST(&call->cv_rq);
4215 #else
4216         osi_rxWakeup(&call->rq);
4217 #endif
4218     }
4219     if (flags & RX_CALL_WAIT_PACKETS) {
4220         MUTEX_ENTER(&rx_freePktQ_lock);
4221         rxi_PacketsUnWait();            /* XXX */
4222         MUTEX_EXIT(&rx_freePktQ_lock);
4223     }
4224
4225 #ifdef  RX_ENABLE_LOCKS
4226     CV_SIGNAL(&call->cv_twind);
4227 #else
4228     if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
4229         osi_rxWakeup(&call->twind);
4230 #endif
4231
4232 #ifdef RX_ENABLE_LOCKS
4233     /* The following ensures that we don't mess with any queue while some
4234      * other thread might also be doing so. The call_queue_lock field is
4235      * is only modified under the call lock. If the call is in the process
4236      * of being removed from a queue, the call is not locked until the
4237      * the queue lock is dropped and only then is the call_queue_lock field
4238      * zero'd out. So it's safe to lock the queue if call_queue_lock is set.
4239      * Note that any other routine which removes a call from a queue has to
4240      * obtain the queue lock before examing the queue and removing the call.
4241      */
4242     if (call->call_queue_lock) {
4243         MUTEX_ENTER(call->call_queue_lock);
4244         if (queue_IsOnQueue(call)) {
4245             queue_Remove(call);
4246             if (flags & RX_CALL_WAIT_PROC) {
4247                 MUTEX_ENTER(&rx_stats_mutex);
4248                 rx_nWaiting--;
4249                 MUTEX_EXIT(&rx_stats_mutex);
4250             }
4251         }
4252         MUTEX_EXIT(call->call_queue_lock);
4253         CLEAR_CALL_QUEUE_LOCK(call);
4254     }
4255 #else /* RX_ENABLE_LOCKS */
4256     if (queue_IsOnQueue(call)) {
4257       queue_Remove(call);
4258       if (flags & RX_CALL_WAIT_PROC)
4259         rx_nWaiting--;
4260     }
4261 #endif /* RX_ENABLE_LOCKS */
4262
4263     rxi_KeepAliveOff(call);
4264     rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
4265 }
4266
4267 /* Send an acknowledge for the indicated packet (seq,serial) of the
4268  * indicated call, for the indicated reason (reason).  This
4269  * acknowledge will specifically acknowledge receiving the packet, and
4270  * will also specify which other packets for this call have been
4271  * received.  This routine returns the packet that was used to the
4272  * caller.  The caller is responsible for freeing it or re-using it.
4273  * This acknowledgement also returns the highest sequence number
4274  * actually read out by the higher level to the sender; the sender
4275  * promises to keep around packets that have not been read by the
4276  * higher level yet (unless, of course, the sender decides to abort
4277  * the call altogether).  Any of p, seq, serial, pflags, or reason may
4278  * be set to zero without ill effect.  That is, if they are zero, they
4279  * will not convey any information.  
4280  * NOW there is a trailer field, after the ack where it will safely be
4281  * ignored by mundanes, which indicates the maximum size packet this 
4282  * host can swallow.  */  
4283 struct rx_packet *rxi_SendAck(call, optionalPacket, seq, serial, pflags, reason, istack)
4284     register struct rx_call *call;
4285     register struct rx_packet *optionalPacket; /* use to send ack (or null) */
4286     int seq;                    /* Sequence number of the packet we are acking */
4287     int serial;                 /* Serial number of the packet */
4288     int pflags;                 /* Flags field from packet header */
4289     int reason;                 /* Reason an acknowledge was prompted */
4290     int istack;
4291 {
4292     struct rx_ackPacket *ap;
4293     register struct rx_packet *rqp;
4294     register struct rx_packet *nxp;  /* For queue_Scan */
4295     register struct rx_packet *p;
4296     u_char offset;
4297     afs_int32 templ;
4298
4299     /*
4300      * Open the receive window once a thread starts reading packets
4301      */
4302     if (call->rnext > 1) {
4303         call->rwind = rx_maxReceiveWindow;
4304     }
4305
4306     call->nHardAcks = 0;
4307     call->nSoftAcks = 0;
4308     if (call->rnext > call->lastAcked)
4309       call->lastAcked = call->rnext;
4310     p = optionalPacket;
4311
4312     if (p) {
4313       rx_computelen(p, p->length);  /* reset length, you never know */
4314     }                               /* where that's been...         */
4315     else
4316       if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
4317           /* We won't send the ack, but don't panic. */
4318           return optionalPacket;
4319       }
4320
4321     templ = rx_AckDataSize(call->rwind)+4*sizeof(afs_int32) - rx_GetDataSize(p);
4322     if (templ > 0) {
4323       if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
4324           if (!optionalPacket) rxi_FreePacket(p);
4325           return optionalPacket;
4326       }
4327       templ = rx_AckDataSize(call->rwind)+2*sizeof(afs_int32); 
4328       if (rx_Contiguous(p)<templ) {
4329           if (!optionalPacket) rxi_FreePacket(p);
4330           return optionalPacket;
4331       }
4332     }    /* MTUXXX failing to send an ack is very serious.  We should */
4333          /* try as hard as possible to send even a partial ack; it's */
4334          /* better than nothing. */
4335
4336     ap = (struct rx_ackPacket *) rx_DataOf(p);
4337     ap->bufferSpace = htonl(0); /* Something should go here, sometime */
4338     ap->reason = reason;
4339
4340     /* The skew computation used to be bogus, I think it's better now. */
4341     /* We should start paying attention to skew.    XXX  */
4342     ap->serial = htonl(call->conn->maxSerial);
4343     ap->maxSkew = 0;    /* used to be peer->inPacketSkew */
4344
4345     ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
4346     ap->previousPacket = htonl(call->rprev); /* Previous packet received */
4347
4348     /* No fear of running out of ack packet here because there can only be at most
4349      * one window full of unacknowledged packets.  The window size must be constrained 
4350      * to be less than the maximum ack size, of course.  Also, an ack should always
4351      * fit into a single packet -- it should not ever be fragmented.  */
4352     for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
4353         if (!rqp || !call->rq.next 
4354             || (rqp->header.seq > (call->rnext + call->rwind))) {
4355           if (!optionalPacket) rxi_FreePacket(p);
4356           rxi_CallError(call, RX_CALL_DEAD);
4357           return optionalPacket;   
4358         }
4359
4360         while (rqp->header.seq > call->rnext + offset) 
4361           ap->acks[offset++] = RX_ACK_TYPE_NACK;
4362         ap->acks[offset++] = RX_ACK_TYPE_ACK;
4363
4364         if ((offset > (u_char)rx_maxReceiveWindow) || (offset > call->rwind)) {
4365           if (!optionalPacket) rxi_FreePacket(p);
4366           rxi_CallError(call, RX_CALL_DEAD);
4367           return optionalPacket;   
4368         }
4369     }
4370
4371     ap->nAcks = offset;
4372     p->length = rx_AckDataSize(offset)+4*sizeof(afs_int32);
4373
4374     /* these are new for AFS 3.3 */
4375     templ = rxi_AdjustMaxMTU(call->conn->peer->ifMTU, rx_maxReceiveSize);
4376     templ = htonl(templ);
4377     rx_packetwrite(p, rx_AckDataSize(offset), sizeof(afs_int32), &templ);
4378     templ = htonl(call->conn->peer->ifMTU);
4379     rx_packetwrite(p, rx_AckDataSize(offset)+sizeof(afs_int32), sizeof(afs_int32), &templ);
4380
4381     /* new for AFS 3.4 */
4382     templ = htonl(call->rwind);
4383     rx_packetwrite(p, rx_AckDataSize(offset)+2*sizeof(afs_int32), sizeof(afs_int32), &templ);
4384
4385     /* new for AFS 3.5 */
4386     templ = htonl(call->conn->peer->ifDgramPackets);
4387     rx_packetwrite(p, rx_AckDataSize(offset)+3*sizeof(afs_int32), sizeof(afs_int32), &templ);
4388
4389     p->header.serviceId = call->conn->serviceId;
4390     p->header.cid = (call->conn->cid | call->channel);