rx-tq-busy-20070516
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #include <afsconfig.h>
13 #ifdef  KERNEL
14 #include "afs/param.h"
15 #else
16 #include <afs/param.h>
17 #endif
18
19 RCSID
20     ("$Header$");
21
22 #ifdef KERNEL
23 #include "afs/sysincludes.h"
24 #include "afsincludes.h"
25 #ifndef UKERNEL
26 #include "h/types.h"
27 #include "h/time.h"
28 #include "h/stat.h"
29 #ifdef  AFS_OSF_ENV
30 #include <net/net_globals.h>
31 #endif /* AFS_OSF_ENV */
32 #ifdef AFS_LINUX20_ENV
33 #include "h/socket.h"
34 #endif
35 #include "netinet/in.h"
36 #include "afs/afs_args.h"
37 #include "afs/afs_osi.h"
38 #ifdef RX_KERNEL_TRACE
39 #include "rx_kcommon.h"
40 #endif
41 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
42 #include "h/systm.h"
43 #endif
44 #ifdef RXDEBUG
45 #undef RXDEBUG                  /* turn off debugging */
46 #endif /* RXDEBUG */
47 #if defined(AFS_SGI_ENV)
48 #include "sys/debug.h"
49 #endif
50 #include "afsint.h"
51 #ifdef  AFS_OSF_ENV
52 #undef kmem_alloc
53 #undef kmem_free
54 #undef mem_alloc
55 #undef mem_free
56 #undef register
57 #endif /* AFS_OSF_ENV */
58 #else /* !UKERNEL */
59 #include "afs/sysincludes.h"
60 #include "afsincludes.h"
61 #endif /* !UKERNEL */
62 #include "afs/lock.h"
63 #include "rx_kmutex.h"
64 #include "rx_kernel.h"
65 #include "rx_clock.h"
66 #include "rx_queue.h"
67 #include "rx.h"
68 #include "rx_globals.h"
69 #include "rx_trace.h"
70 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
71 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
72 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
73 #include "afsint.h"
74 extern afs_int32 afs_termState;
75 #ifdef AFS_AIX41_ENV
76 #include "sys/lockl.h"
77 #include "sys/lock_def.h"
78 #endif /* AFS_AIX41_ENV */
79 # include "rxgen_consts.h"
80 #else /* KERNEL */
81 # include <sys/types.h>
82 # include <errno.h>
83 #ifdef AFS_NT40_ENV
84 # include <stdlib.h>
85 # include <fcntl.h>
86 # include <afs/afsutil.h>
87 # include <WINNT\afsreg.h>
88 #else
89 # include <sys/socket.h>
90 # include <sys/file.h>
91 # include <netdb.h>
92 # include <sys/stat.h>
93 # include <netinet/in.h>
94 # include <sys/time.h>
95 #endif
96 #ifdef HAVE_STRING_H
97 #include <string.h>
98 #else
99 #ifdef HAVE_STRINGS_H
100 #include <strings.h>
101 #endif
102 #endif
103 # include "rx.h"
104 # include "rx_user.h"
105 # include "rx_clock.h"
106 # include "rx_queue.h"
107 # include "rx_globals.h"
108 # include "rx_trace.h"
109 # include <afs/rxgen_consts.h>
110 #endif /* KERNEL */
111
112 int (*registerProgram) () = 0;
113 int (*swapNameProgram) () = 0;
114
115 /* Local static routines */
116 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
117 #ifdef RX_ENABLE_LOCKS
118 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
119 #endif
120
121 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
122 struct rx_tq_debug {
123     afs_int32 rxi_start_aborted;        /* rxi_start awoke after rxi_Send in error. */
124     afs_int32 rxi_start_in_error;
125 } rx_tq_debug;
126 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
127
128 /*
129  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
130  * currently allocated within rx.  This number is used to allocate the
131  * memory required to return the statistics when queried.
132  */
133
134 static unsigned int rxi_rpc_peer_stat_cnt;
135
136 /*
137  * rxi_rpc_process_stat_cnt counts the total number of local process stat
138  * structures currently allocated within rx.  The number is used to allocate
139  * the memory required to return the statistics when queried.
140  */
141
142 static unsigned int rxi_rpc_process_stat_cnt;
143
144 #if !defined(offsetof)
145 #include <stddef.h>             /* for definition of offsetof() */
146 #endif
147
148 #ifdef AFS_PTHREAD_ENV
149 #include <assert.h>
150
151 /*
152  * Use procedural initialization of mutexes/condition variables
153  * to ease NT porting
154  */
155
156 extern pthread_mutex_t rx_stats_mutex;
157 extern pthread_mutex_t des_init_mutex;
158 extern pthread_mutex_t des_random_mutex;
159 extern pthread_mutex_t rx_clock_mutex;
160 extern pthread_mutex_t rxi_connCacheMutex;
161 extern pthread_mutex_t rx_event_mutex;
162 extern pthread_mutex_t osi_malloc_mutex;
163 extern pthread_mutex_t event_handler_mutex;
164 extern pthread_mutex_t listener_mutex;
165 extern pthread_mutex_t rx_if_init_mutex;
166 extern pthread_mutex_t rx_if_mutex;
167 extern pthread_mutex_t rxkad_client_uid_mutex;
168 extern pthread_mutex_t rxkad_random_mutex;
169
170 extern pthread_cond_t rx_event_handler_cond;
171 extern pthread_cond_t rx_listener_cond;
172
173 static pthread_mutex_t epoch_mutex;
174 static pthread_mutex_t rx_init_mutex;
175 static pthread_mutex_t rx_debug_mutex;
176
177 static void
178 rxi_InitPthread(void)
179 {
180     assert(pthread_mutex_init(&rx_clock_mutex, (const pthread_mutexattr_t *)0)
181            == 0);
182     assert(pthread_mutex_init(&rx_stats_mutex, (const pthread_mutexattr_t *)0)
183            == 0);
184     assert(pthread_mutex_init
185            (&rxi_connCacheMutex, (const pthread_mutexattr_t *)0) == 0);
186     assert(pthread_mutex_init(&rx_init_mutex, (const pthread_mutexattr_t *)0)
187            == 0);
188     assert(pthread_mutex_init(&epoch_mutex, (const pthread_mutexattr_t *)0) ==
189            0);
190     assert(pthread_mutex_init(&rx_event_mutex, (const pthread_mutexattr_t *)0)
191            == 0);
192     assert(pthread_mutex_init(&des_init_mutex, (const pthread_mutexattr_t *)0)
193            == 0);
194     assert(pthread_mutex_init
195            (&des_random_mutex, (const pthread_mutexattr_t *)0) == 0);
196     assert(pthread_mutex_init
197            (&osi_malloc_mutex, (const pthread_mutexattr_t *)0) == 0);
198     assert(pthread_mutex_init
199            (&event_handler_mutex, (const pthread_mutexattr_t *)0) == 0);
200     assert(pthread_mutex_init(&listener_mutex, (const pthread_mutexattr_t *)0)
201            == 0);
202     assert(pthread_mutex_init
203            (&rx_if_init_mutex, (const pthread_mutexattr_t *)0) == 0);
204     assert(pthread_mutex_init(&rx_if_mutex, (const pthread_mutexattr_t *)0) ==
205            0);
206     assert(pthread_mutex_init
207            (&rxkad_client_uid_mutex, (const pthread_mutexattr_t *)0) == 0);
208     assert(pthread_mutex_init
209            (&rxkad_random_mutex, (const pthread_mutexattr_t *)0) == 0);
210     assert(pthread_mutex_init(&rx_debug_mutex, (const pthread_mutexattr_t *)0)
211            == 0);
212
213     assert(pthread_cond_init
214            (&rx_event_handler_cond, (const pthread_condattr_t *)0) == 0);
215     assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
216            == 0);
217     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
218     assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
219  
220     rxkad_global_stats_init();
221 }
222
223 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
224 #define INIT_PTHREAD_LOCKS \
225 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
226 /*
227  * The rx_stats_mutex mutex protects the following global variables:
228  * rxi_dataQuota
229  * rxi_minDeficit
230  * rxi_availProcs
231  * rxi_totalMin
232  * rxi_lowConnRefCount
233  * rxi_lowPeerRefCount
234  * rxi_nCalls
235  * rxi_Alloccnt
236  * rxi_Allocsize
237  * rx_nFreePackets
238  * rx_tq_debug
239  * rx_stats
240  */
241 #else
242 #define INIT_PTHREAD_LOCKS
243 #endif
244
245
246 /* Variables for handling the minProcs implementation.  availProcs gives the
247  * number of threads available in the pool at this moment (not counting dudes
248  * executing right now).  totalMin gives the total number of procs required
249  * for handling all minProcs requests.  minDeficit is a dynamic variable
250  * tracking the # of procs required to satisfy all of the remaining minProcs
251  * demands.
252  * For fine grain locking to work, the quota check and the reservation of
253  * a server thread has to come while rxi_availProcs and rxi_minDeficit
254  * are locked. To this end, the code has been modified under #ifdef
255  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
256  * same time. A new function, ReturnToServerPool() returns the allocation.
257  * 
258  * A call can be on several queue's (but only one at a time). When
259  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
260  * that no one else is touching the queue. To this end, we store the address
261  * of the queue lock in the call structure (under the call lock) when we
262  * put the call on a queue, and we clear the call_queue_lock when the
263  * call is removed from a queue (once the call lock has been obtained).
264  * This allows rxi_ResetCall to safely synchronize with others wishing
265  * to manipulate the queue.
266  */
267
268 #ifdef RX_ENABLE_LOCKS
269 static afs_kmutex_t rx_rpc_stats;
270 void rxi_StartUnlocked();
271 #endif
272
273 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
274 ** pretty good that the next packet coming in is from the same connection 
275 ** as the last packet, since we're send multiple packets in a transmit window.
276 */
277 struct rx_connection *rxLastConn = 0;
278
279 #ifdef RX_ENABLE_LOCKS
280 /* The locking hierarchy for rx fine grain locking is composed of these
281  * tiers:
282  *
283  * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
284  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
285  * call->lock - locks call data fields.
286  * These are independent of each other:
287  *      rx_freeCallQueue_lock
288  *      rxi_keyCreate_lock
289  * rx_serverPool_lock
290  * freeSQEList_lock
291  *
292  * serverQueueEntry->lock
293  * rx_rpc_stats
294  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
295  * peer->lock - locks peer data fields.
296  * conn_data_lock - that more than one thread is not updating a conn data
297  *                  field at the same time.
298  * rx_freePktQ_lock
299  *
300  * lowest level:
301  *      multi_handle->lock
302  *      rxevent_lock
303  *      rx_stats_mutex
304  *
305  * Do we need a lock to protect the peer field in the conn structure?
306  *      conn->peer was previously a constant for all intents and so has no
307  *      lock protecting this field. The multihomed client delta introduced
308  *      a RX code change : change the peer field in the connection structure
309  *      to that remote inetrface from which the last packet for this
310  *      connection was sent out. This may become an issue if further changes
311  *      are made.
312  */
313 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
314 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
315 #ifdef RX_LOCKS_DB
316 /* rxdb_fileID is used to identify the lock location, along with line#. */
317 static int rxdb_fileID = RXDB_FILE_RX;
318 #endif /* RX_LOCKS_DB */
319 #else /* RX_ENABLE_LOCKS */
320 #define SET_CALL_QUEUE_LOCK(C, L)
321 #define CLEAR_CALL_QUEUE_LOCK(C)
322 #endif /* RX_ENABLE_LOCKS */
323 struct rx_serverQueueEntry *rx_waitForPacket = 0;
324 struct rx_serverQueueEntry *rx_waitingForPacket = 0;
325
326 /* ------------Exported Interfaces------------- */
327
328 /* This function allows rxkad to set the epoch to a suitably random number
329  * which rx_NewConnection will use in the future.  The principle purpose is to
330  * get rxnull connections to use the same epoch as the rxkad connections do, at
331  * least once the first rxkad connection is established.  This is important now
332  * that the host/port addresses aren't used in FindConnection: the uniqueness
333  * of epoch/cid matters and the start time won't do. */
334
335 #ifdef AFS_PTHREAD_ENV
336 /*
337  * This mutex protects the following global variables:
338  * rx_epoch
339  */
340
341 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0)
342 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0)
343 #else
344 #define LOCK_EPOCH
345 #define UNLOCK_EPOCH
346 #endif /* AFS_PTHREAD_ENV */
347
348 void
349 rx_SetEpoch(afs_uint32 epoch)
350 {
351     LOCK_EPOCH;
352     rx_epoch = epoch;
353     UNLOCK_EPOCH;
354 }
355
356 /* Initialize rx.  A port number may be mentioned, in which case this
357  * becomes the default port number for any service installed later.
358  * If 0 is provided for the port number, a random port will be chosen
359  * by the kernel.  Whether this will ever overlap anything in
360  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
361  * error. */
362 static int rxinit_status = 1;
363 #ifdef AFS_PTHREAD_ENV
364 /*
365  * This mutex protects the following global variables:
366  * rxinit_status
367  */
368
369 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0)
370 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0)
371 #else
372 #define LOCK_RX_INIT
373 #define UNLOCK_RX_INIT
374 #endif
375
376 int
377 rx_InitHost(u_int host, u_int port)
378 {
379 #ifdef KERNEL
380     osi_timeval_t tv;
381 #else /* KERNEL */
382     struct timeval tv;
383 #endif /* KERNEL */
384     char *htable, *ptable;
385     int tmp_status;
386     
387     SPLVAR;
388     
389     INIT_PTHREAD_LOCKS;
390     LOCK_RX_INIT;
391     if (rxinit_status == 0) {
392         tmp_status = rxinit_status;
393         UNLOCK_RX_INIT;
394         return tmp_status;      /* Already started; return previous error code. */
395     }
396 #ifdef RXDEBUG
397     rxi_DebugInit();
398 #endif
399 #ifdef AFS_NT40_ENV
400     if (afs_winsockInit() < 0)
401         return -1;
402 #endif
403     
404 #ifndef KERNEL
405     /*
406      * Initialize anything necessary to provide a non-premptive threading
407      * environment.
408      */
409     rxi_InitializeThreadSupport();
410 #endif
411     
412     /* Allocate and initialize a socket for client and perhaps server
413      * connections. */
414     
415     rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
416     if (rx_socket == OSI_NULLSOCKET) {
417         UNLOCK_RX_INIT;
418         return RX_ADDRINUSE;
419     }
420 #ifdef  RX_ENABLE_LOCKS
421 #ifdef RX_LOCKS_DB
422     rxdb_init();
423 #endif /* RX_LOCKS_DB */
424     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
425     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
426     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0);
427     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0);
428     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock", MUTEX_DEFAULT,
429                0);
430     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv", CV_DEFAULT,
431             0);
432     MUTEX_INIT(&rx_peerHashTable_lock, "rx_peerHashTable_lock", MUTEX_DEFAULT,
433                0);
434     MUTEX_INIT(&rx_connHashTable_lock, "rx_connHashTable_lock", MUTEX_DEFAULT,
435                0);
436     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
437 #ifndef KERNEL
438     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
439 #endif /* !KERNEL */
440 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
441     if (!uniprocessor)
442         rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER - 10, "rx_sleepLock");
443 #endif /* KERNEL && AFS_HPUX110_ENV */
444 #endif /* RX_ENABLE_LOCKS */
445
446     rxi_nCalls = 0;
447     rx_connDeadTime = 12;
448     rx_tranquil = 0;            /* reset flag */
449     memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
450     htable = (char *)
451         osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
452     PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *));     /* XXXXX */
453     memset(htable, 0, rx_hashTableSize * sizeof(struct rx_connection *));
454     ptable = (char *)osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
455     PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *));   /* XXXXX */
456     memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
457
458     /* Malloc up a bunch of packets & buffers */
459     rx_nFreePackets = 0;
460     queue_Init(&rx_freePacketQueue);
461     rxi_NeedMorePackets = FALSE;
462 #ifdef RX_ENABLE_TSFPQ
463     rx_nPackets = 0;    /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
464     rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
465 #else /* RX_ENABLE_TSFPQ */
466     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
467     rxi_MorePackets(rx_nPackets);
468 #endif /* RX_ENABLE_TSFPQ */
469     rx_CheckPackets();
470
471     NETPRI;
472
473     clock_Init();
474
475 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
476     tv.tv_sec = clock_now.sec;
477     tv.tv_usec = clock_now.usec;
478     srand((unsigned int)tv.tv_usec);
479 #else
480     osi_GetTime(&tv);
481 #endif
482     if (port) {
483         rx_port = port;
484     } else {
485 #if defined(KERNEL) && !defined(UKERNEL)
486         /* Really, this should never happen in a real kernel */
487         rx_port = 0;
488 #else
489         struct sockaddr_in addr;
490         int addrlen = sizeof(addr);
491         if (getsockname((int)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
492             rx_Finalize();
493             return -1;
494         }
495         rx_port = addr.sin_port;
496 #endif
497     }
498     rx_stats.minRtt.sec = 9999999;
499 #ifdef  KERNEL
500     rx_SetEpoch(tv.tv_sec | 0x80000000);
501 #else
502     rx_SetEpoch(tv.tv_sec);     /* Start time of this package, rxkad
503                                  * will provide a randomer value. */
504 #endif
505     MUTEX_ENTER(&rx_stats_mutex);
506     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
507     MUTEX_EXIT(&rx_stats_mutex);
508     /* *Slightly* random start time for the cid.  This is just to help
509      * out with the hashing function at the peer */
510     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
511     rx_connHashTable = (struct rx_connection **)htable;
512     rx_peerHashTable = (struct rx_peer **)ptable;
513
514     rx_lastAckDelay.sec = 0;
515     rx_lastAckDelay.usec = 400000;      /* 400 milliseconds */
516     rx_hardAckDelay.sec = 0;
517     rx_hardAckDelay.usec = 100000;      /* 100 milliseconds */
518     rx_softAckDelay.sec = 0;
519     rx_softAckDelay.usec = 100000;      /* 100 milliseconds */
520
521     rxevent_Init(20, rxi_ReScheduleEvents);
522
523     /* Initialize various global queues */
524     queue_Init(&rx_idleServerQueue);
525     queue_Init(&rx_incomingCallQueue);
526     queue_Init(&rx_freeCallQueue);
527
528 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
529     /* Initialize our list of usable IP addresses. */
530     rx_GetIFInfo();
531 #endif
532
533     /* Start listener process (exact function is dependent on the
534      * implementation environment--kernel or user space) */
535     rxi_StartListener();
536
537     USERPRI;
538     tmp_status = rxinit_status = 0;
539     UNLOCK_RX_INIT;
540     return tmp_status;
541 }
542
543 int
544 rx_Init(u_int port)
545 {
546     return rx_InitHost(htonl(INADDR_ANY), port);
547 }
548
549 /* called with unincremented nRequestsRunning to see if it is OK to start
550  * a new thread in this service.  Could be "no" for two reasons: over the
551  * max quota, or would prevent others from reaching their min quota.
552  */
553 #ifdef RX_ENABLE_LOCKS
554 /* This verion of QuotaOK reserves quota if it's ok while the
555  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
556  */
557 static int
558 QuotaOK(register struct rx_service *aservice)
559 {
560     /* check if over max quota */
561     if (aservice->nRequestsRunning >= aservice->maxProcs) {
562         return 0;
563     }
564
565     /* under min quota, we're OK */
566     /* otherwise, can use only if there are enough to allow everyone
567      * to go to their min quota after this guy starts.
568      */
569     MUTEX_ENTER(&rx_stats_mutex);
570     if ((aservice->nRequestsRunning < aservice->minProcs)
571         || (rxi_availProcs > rxi_minDeficit)) {
572         aservice->nRequestsRunning++;
573         /* just started call in minProcs pool, need fewer to maintain
574          * guarantee */
575         if (aservice->nRequestsRunning <= aservice->minProcs)
576             rxi_minDeficit--;
577         rxi_availProcs--;
578         MUTEX_EXIT(&rx_stats_mutex);
579         return 1;
580     }
581     MUTEX_EXIT(&rx_stats_mutex);
582
583     return 0;
584 }
585
586 static void
587 ReturnToServerPool(register struct rx_service *aservice)
588 {
589     aservice->nRequestsRunning--;
590     MUTEX_ENTER(&rx_stats_mutex);
591     if (aservice->nRequestsRunning < aservice->minProcs)
592         rxi_minDeficit++;
593     rxi_availProcs++;
594     MUTEX_EXIT(&rx_stats_mutex);
595 }
596
597 #else /* RX_ENABLE_LOCKS */
598 static int
599 QuotaOK(register struct rx_service *aservice)
600 {
601     int rc = 0;
602     /* under min quota, we're OK */
603     if (aservice->nRequestsRunning < aservice->minProcs)
604         return 1;
605
606     /* check if over max quota */
607     if (aservice->nRequestsRunning >= aservice->maxProcs)
608         return 0;
609
610     /* otherwise, can use only if there are enough to allow everyone
611      * to go to their min quota after this guy starts.
612      */
613     if (rxi_availProcs > rxi_minDeficit)
614         rc = 1;
615     return rc;
616 }
617 #endif /* RX_ENABLE_LOCKS */
618
619 #ifndef KERNEL
620 /* Called by rx_StartServer to start up lwp's to service calls.
621    NExistingProcs gives the number of procs already existing, and which
622    therefore needn't be created. */
623 void
624 rxi_StartServerProcs(int nExistingProcs)
625 {
626     register struct rx_service *service;
627     register int i;
628     int maxdiff = 0;
629     int nProcs = 0;
630
631     /* For each service, reserve N processes, where N is the "minimum"
632      * number of processes that MUST be able to execute a request in parallel,
633      * at any time, for that process.  Also compute the maximum difference
634      * between any service's maximum number of processes that can run
635      * (i.e. the maximum number that ever will be run, and a guarantee
636      * that this number will run if other services aren't running), and its
637      * minimum number.  The result is the extra number of processes that
638      * we need in order to provide the latter guarantee */
639     for (i = 0; i < RX_MAX_SERVICES; i++) {
640         int diff;
641         service = rx_services[i];
642         if (service == (struct rx_service *)0)
643             break;
644         nProcs += service->minProcs;
645         diff = service->maxProcs - service->minProcs;
646         if (diff > maxdiff)
647             maxdiff = diff;
648     }
649     nProcs += maxdiff;          /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
650     nProcs -= nExistingProcs;   /* Subtract the number of procs that were previously created for use as server procs */
651     for (i = 0; i < nProcs; i++) {
652         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
653     }
654 }
655 #endif /* KERNEL */
656
657 #ifdef AFS_NT40_ENV
658 /* This routine is only required on Windows */
659 void
660 rx_StartClientThread(void)
661 {
662 #ifdef AFS_PTHREAD_ENV
663     int pid;
664     pid = (int) pthread_self();
665 #endif /* AFS_PTHREAD_ENV */
666 }
667 #endif /* AFS_NT40_ENV */
668
669 /* This routine must be called if any services are exported.  If the
670  * donateMe flag is set, the calling process is donated to the server
671  * process pool */
672 void
673 rx_StartServer(int donateMe)
674 {
675     register struct rx_service *service;
676     register int i;
677     SPLVAR;
678     clock_NewTime();
679
680     NETPRI;
681     /* Start server processes, if necessary (exact function is dependent
682      * on the implementation environment--kernel or user space).  DonateMe
683      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
684      * case, one less new proc will be created rx_StartServerProcs.
685      */
686     rxi_StartServerProcs(donateMe);
687
688     /* count up the # of threads in minProcs, and add set the min deficit to
689      * be that value, too.
690      */
691     for (i = 0; i < RX_MAX_SERVICES; i++) {
692         service = rx_services[i];
693         if (service == (struct rx_service *)0)
694             break;
695         MUTEX_ENTER(&rx_stats_mutex);
696         rxi_totalMin += service->minProcs;
697         /* below works even if a thread is running, since minDeficit would
698          * still have been decremented and later re-incremented.
699          */
700         rxi_minDeficit += service->minProcs;
701         MUTEX_EXIT(&rx_stats_mutex);
702     }
703
704     /* Turn on reaping of idle server connections */
705     rxi_ReapConnections();
706
707     USERPRI;
708
709     if (donateMe) {
710 #ifndef AFS_NT40_ENV
711 #ifndef KERNEL
712         char name[32];
713         static int nProcs;
714 #ifdef AFS_PTHREAD_ENV
715         pid_t pid;
716         pid = (pid_t) pthread_self();
717 #else /* AFS_PTHREAD_ENV */
718         PROCESS pid;
719         LWP_CurrentProcess(&pid);
720 #endif /* AFS_PTHREAD_ENV */
721
722         sprintf(name, "srv_%d", ++nProcs);
723         if (registerProgram)
724             (*registerProgram) (pid, name);
725 #endif /* KERNEL */
726 #endif /* AFS_NT40_ENV */
727         rx_ServerProc();        /* Never returns */
728     }
729 #ifdef RX_ENABLE_TSFPQ
730     /* no use leaving packets around in this thread's local queue if
731      * it isn't getting donated to the server thread pool. 
732      */
733     rxi_FlushLocalPacketsTSFPQ();
734 #endif /* RX_ENABLE_TSFPQ */
735     return;
736 }
737
738 /* Create a new client connection to the specified service, using the
739  * specified security object to implement the security model for this
740  * connection. */
741 struct rx_connection *
742 rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
743                  register struct rx_securityClass *securityObject,
744                  int serviceSecurityIndex)
745 {
746     int hashindex;
747     afs_int32 cid;
748     register struct rx_connection *conn;
749
750     SPLVAR;
751
752     clock_NewTime();
753     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex));
754
755     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
756      * the case of kmem_alloc? */
757     conn = rxi_AllocConnection();
758 #ifdef  RX_ENABLE_LOCKS
759     MUTEX_INIT(&conn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
760     MUTEX_INIT(&conn->conn_data_lock, "conn call lock", MUTEX_DEFAULT, 0);
761     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
762 #endif
763     NETPRI;
764     MUTEX_ENTER(&rx_connHashTable_lock);
765     cid = (rx_nextCid += RX_MAXCALLS);
766     conn->type = RX_CLIENT_CONNECTION;
767     conn->cid = cid;
768     conn->epoch = rx_epoch;
769     conn->peer = rxi_FindPeer(shost, sport, 0, 1);
770     conn->serviceId = sservice;
771     conn->securityObject = securityObject;
772     /* This doesn't work in all compilers with void (they're buggy), so fake it
773      * with VOID */
774     conn->securityData = (VOID *) 0;
775     conn->securityIndex = serviceSecurityIndex;
776     rx_SetConnDeadTime(conn, rx_connDeadTime);
777     conn->ackRate = RX_FAST_ACK_RATE;
778     conn->nSpecific = 0;
779     conn->specific = NULL;
780     conn->challengeEvent = NULL;
781     conn->delayedAbortEvent = NULL;
782     conn->abortCount = 0;
783     conn->error = 0;
784
785     RXS_NewConnection(securityObject, conn);
786     hashindex =
787         CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
788
789     conn->refCount++;           /* no lock required since only this thread knows... */
790     conn->next = rx_connHashTable[hashindex];
791     rx_connHashTable[hashindex] = conn;
792     MUTEX_ENTER(&rx_stats_mutex);
793     rx_stats.nClientConns++;
794     MUTEX_EXIT(&rx_stats_mutex);
795
796     MUTEX_EXIT(&rx_connHashTable_lock);
797     USERPRI;
798     return conn;
799 }
800
801 void
802 rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
803 {
804     /* The idea is to set the dead time to a value that allows several
805      * keepalives to be dropped without timing out the connection. */
806     conn->secondsUntilDead = MAX(seconds, 6);
807     conn->secondsUntilPing = conn->secondsUntilDead / 6;
808 }
809
810 int rxi_lowPeerRefCount = 0;
811 int rxi_lowConnRefCount = 0;
812
813 /*
814  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
815  * NOTE: must not be called with rx_connHashTable_lock held.
816  */
817 void
818 rxi_CleanupConnection(struct rx_connection *conn)
819 {
820     /* Notify the service exporter, if requested, that this connection
821      * is being destroyed */
822     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
823         (*conn->service->destroyConnProc) (conn);
824
825     /* Notify the security module that this connection is being destroyed */
826     RXS_DestroyConnection(conn->securityObject, conn);
827
828     /* If this is the last connection using the rx_peer struct, set its
829      * idle time to now. rxi_ReapConnections will reap it if it's still
830      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
831      */
832     MUTEX_ENTER(&rx_peerHashTable_lock);
833     if (conn->peer->refCount < 2) {
834         conn->peer->idleWhen = clock_Sec();
835         if (conn->peer->refCount < 1) {
836             conn->peer->refCount = 1;
837             MUTEX_ENTER(&rx_stats_mutex);
838             rxi_lowPeerRefCount++;
839             MUTEX_EXIT(&rx_stats_mutex);
840         }
841     }
842     conn->peer->refCount--;
843     MUTEX_EXIT(&rx_peerHashTable_lock);
844
845     MUTEX_ENTER(&rx_stats_mutex);
846     if (conn->type == RX_SERVER_CONNECTION)
847         rx_stats.nServerConns--;
848     else
849         rx_stats.nClientConns--;
850     MUTEX_EXIT(&rx_stats_mutex);
851
852 #ifndef KERNEL
853     if (conn->specific) {
854         int i;
855         for (i = 0; i < conn->nSpecific; i++) {
856             if (conn->specific[i] && rxi_keyCreate_destructor[i])
857                 (*rxi_keyCreate_destructor[i]) (conn->specific[i]);
858             conn->specific[i] = NULL;
859         }
860         free(conn->specific);
861     }
862     conn->specific = NULL;
863     conn->nSpecific = 0;
864 #endif /* !KERNEL */
865
866     MUTEX_DESTROY(&conn->conn_call_lock);
867     MUTEX_DESTROY(&conn->conn_data_lock);
868     CV_DESTROY(&conn->conn_call_cv);
869
870     rxi_FreeConnection(conn);
871 }
872
873 /* Destroy the specified connection */
874 void
875 rxi_DestroyConnection(register struct rx_connection *conn)
876 {
877     MUTEX_ENTER(&rx_connHashTable_lock);
878     rxi_DestroyConnectionNoLock(conn);
879     /* conn should be at the head of the cleanup list */
880     if (conn == rx_connCleanup_list) {
881         rx_connCleanup_list = rx_connCleanup_list->next;
882         MUTEX_EXIT(&rx_connHashTable_lock);
883         rxi_CleanupConnection(conn);
884     }
885 #ifdef RX_ENABLE_LOCKS
886     else {
887         MUTEX_EXIT(&rx_connHashTable_lock);
888     }
889 #endif /* RX_ENABLE_LOCKS */
890 }
891
892 static void
893 rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
894 {
895     register struct rx_connection **conn_ptr;
896     register int havecalls = 0;
897     struct rx_packet *packet;
898     int i;
899     SPLVAR;
900
901     clock_NewTime();
902
903     NETPRI;
904     MUTEX_ENTER(&conn->conn_data_lock);
905     if (conn->refCount > 0)
906         conn->refCount--;
907     else {
908         MUTEX_ENTER(&rx_stats_mutex);
909         rxi_lowConnRefCount++;
910         MUTEX_EXIT(&rx_stats_mutex);
911     }
912
913     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
914         /* Busy; wait till the last guy before proceeding */
915         MUTEX_EXIT(&conn->conn_data_lock);
916         USERPRI;
917         return;
918     }
919
920     /* If the client previously called rx_NewCall, but it is still
921      * waiting, treat this as a running call, and wait to destroy the
922      * connection later when the call completes. */
923     if ((conn->type == RX_CLIENT_CONNECTION)
924         && (conn->flags & RX_CONN_MAKECALL_WAITING)) {
925         conn->flags |= RX_CONN_DESTROY_ME;
926         MUTEX_EXIT(&conn->conn_data_lock);
927         USERPRI;
928         return;
929     }
930     MUTEX_EXIT(&conn->conn_data_lock);
931
932     /* Check for extant references to this connection */
933     for (i = 0; i < RX_MAXCALLS; i++) {
934         register struct rx_call *call = conn->call[i];
935         if (call) {
936             havecalls = 1;
937             if (conn->type == RX_CLIENT_CONNECTION) {
938                 MUTEX_ENTER(&call->lock);
939                 if (call->delayedAckEvent) {
940                     /* Push the final acknowledgment out now--there
941                      * won't be a subsequent call to acknowledge the
942                      * last reply packets */
943                     rxevent_Cancel(call->delayedAckEvent, call,
944                                    RX_CALL_REFCOUNT_DELAY);
945                     if (call->state == RX_STATE_PRECALL
946                         || call->state == RX_STATE_ACTIVE) {
947                         rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
948                     } else {
949                         rxi_AckAll(NULL, call, 0);
950                     }
951                 }
952                 MUTEX_EXIT(&call->lock);
953             }
954         }
955     }
956 #ifdef RX_ENABLE_LOCKS
957     if (!havecalls) {
958         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
959             MUTEX_EXIT(&conn->conn_data_lock);
960         } else {
961             /* Someone is accessing a packet right now. */
962             havecalls = 1;
963         }
964     }
965 #endif /* RX_ENABLE_LOCKS */
966
967     if (havecalls) {
968         /* Don't destroy the connection if there are any call
969          * structures still in use */
970         MUTEX_ENTER(&conn->conn_data_lock);
971         conn->flags |= RX_CONN_DESTROY_ME;
972         MUTEX_EXIT(&conn->conn_data_lock);
973         USERPRI;
974         return;
975     }
976
977     if (conn->delayedAbortEvent) {
978         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
979         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
980         if (packet) {
981             MUTEX_ENTER(&conn->conn_data_lock);
982             rxi_SendConnectionAbort(conn, packet, 0, 1);
983             MUTEX_EXIT(&conn->conn_data_lock);
984             rxi_FreePacket(packet);
985         }
986     }
987
988     /* Remove from connection hash table before proceeding */
989     conn_ptr =
990         &rx_connHashTable[CONN_HASH
991                           (peer->host, peer->port, conn->cid, conn->epoch,
992                            conn->type)];
993     for (; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
994         if (*conn_ptr == conn) {
995             *conn_ptr = conn->next;
996             break;
997         }
998     }
999     /* if the conn that we are destroying was the last connection, then we
1000      * clear rxLastConn as well */
1001     if (rxLastConn == conn)
1002         rxLastConn = 0;
1003
1004     /* Make sure the connection is completely reset before deleting it. */
1005     /* get rid of pending events that could zap us later */
1006     if (conn->challengeEvent)
1007         rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
1008     if (conn->checkReachEvent)
1009         rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
1010
1011     /* Add the connection to the list of destroyed connections that
1012      * need to be cleaned up. This is necessary to avoid deadlocks
1013      * in the routines we call to inform others that this connection is
1014      * being destroyed. */
1015     conn->next = rx_connCleanup_list;
1016     rx_connCleanup_list = conn;
1017 }
1018
1019 /* Externally available version */
1020 void
1021 rx_DestroyConnection(register struct rx_connection *conn)
1022 {
1023     SPLVAR;
1024
1025     NETPRI;
1026     rxi_DestroyConnection(conn);
1027     USERPRI;
1028 }
1029
1030 void
1031 rx_GetConnection(register struct rx_connection *conn)
1032 {
1033     SPLVAR;
1034
1035     NETPRI;
1036     MUTEX_ENTER(&conn->conn_data_lock);
1037     conn->refCount++;
1038     MUTEX_EXIT(&conn->conn_data_lock);
1039     USERPRI;
1040 }
1041
1042 /* Wait for the transmit queue to no longer be busy. 
1043  * requires the call->lock to be held */
1044 static void rxi_WaitforTQBusy(struct rx_call *call) {
1045     while (call->flags & RX_CALL_TQ_BUSY) {
1046         call->flags |= RX_CALL_TQ_WAIT;
1047         call->tqWaiters++;
1048 #ifdef RX_ENABLE_LOCKS
1049         osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
1050         CV_WAIT(&call->cv_tq, &call->lock);
1051 #else /* RX_ENABLE_LOCKS */
1052         osi_rxSleep(&call->tq);
1053 #endif /* RX_ENABLE_LOCKS */
1054         call->tqWaiters--;
1055         if (call->tqWaiters == 0) {
1056             call->flags &= ~RX_CALL_TQ_WAIT;
1057         }
1058     }
1059 }
1060 /* Start a new rx remote procedure call, on the specified connection.
1061  * If wait is set to 1, wait for a free call channel; otherwise return
1062  * 0.  Maxtime gives the maximum number of seconds this call may take,
1063  * after rx_NewCall returns.  After this time interval, a call to any
1064  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
1065  * For fine grain locking, we hold the conn_call_lock in order to 
1066  * to ensure that we don't get signalle after we found a call in an active
1067  * state and before we go to sleep.
1068  */
1069 struct rx_call *
1070 rx_NewCall(register struct rx_connection *conn)
1071 {
1072     register int i;
1073     register struct rx_call *call;
1074     struct clock queueTime;
1075     SPLVAR;
1076
1077     clock_NewTime();
1078     dpf(("rx_NewCall(conn %x)\n", conn));
1079
1080     NETPRI;
1081     clock_GetTime(&queueTime);
1082     MUTEX_ENTER(&conn->conn_call_lock);
1083
1084     /*
1085      * Check if there are others waiting for a new call.
1086      * If so, let them go first to avoid starving them.
1087      * This is a fairly simple scheme, and might not be
1088      * a complete solution for large numbers of waiters.
1089      * 
1090      * makeCallWaiters keeps track of the number of 
1091      * threads waiting to make calls and the 
1092      * RX_CONN_MAKECALL_WAITING flag bit is used to 
1093      * indicate that there are indeed calls waiting.
1094      * The flag is set when the waiter is incremented.
1095      * It is only cleared in rx_EndCall when 
1096      * makeCallWaiters is 0.  This prevents us from 
1097      * accidently destroying the connection while it
1098      * is potentially about to be used.
1099      */
1100     MUTEX_ENTER(&conn->conn_data_lock);
1101     if (conn->makeCallWaiters) {
1102         conn->flags |= RX_CONN_MAKECALL_WAITING;
1103         conn->makeCallWaiters++;
1104         MUTEX_EXIT(&conn->conn_data_lock);
1105
1106 #ifdef  RX_ENABLE_LOCKS
1107         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1108 #else
1109         osi_rxSleep(conn);
1110 #endif
1111         MUTEX_ENTER(&conn->conn_data_lock);
1112         conn->makeCallWaiters--;
1113     } 
1114     MUTEX_EXIT(&conn->conn_data_lock);
1115
1116     for (;;) {
1117         for (i = 0; i < RX_MAXCALLS; i++) {
1118             call = conn->call[i];
1119             if (call) {
1120                 MUTEX_ENTER(&call->lock);
1121                 if (call->state == RX_STATE_DALLY) {
1122                     rxi_ResetCall(call, 0);
1123                     (*call->callNumber)++;
1124                     break;
1125                 }
1126                 MUTEX_EXIT(&call->lock);
1127             } else {
1128                 call = rxi_NewCall(conn, i);
1129                 break;
1130             }
1131         }
1132         if (i < RX_MAXCALLS) {
1133             break;
1134         }
1135         MUTEX_ENTER(&conn->conn_data_lock);
1136         conn->flags |= RX_CONN_MAKECALL_WAITING;
1137         conn->makeCallWaiters++;
1138         MUTEX_EXIT(&conn->conn_data_lock);
1139
1140 #ifdef  RX_ENABLE_LOCKS
1141         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1142 #else
1143         osi_rxSleep(conn);
1144 #endif
1145         MUTEX_ENTER(&conn->conn_data_lock);
1146         conn->makeCallWaiters--;
1147         MUTEX_EXIT(&conn->conn_data_lock);
1148     }
1149     /*
1150      * Wake up anyone else who might be giving us a chance to
1151      * run (see code above that avoids resource starvation).
1152      */
1153 #ifdef  RX_ENABLE_LOCKS
1154     CV_BROADCAST(&conn->conn_call_cv);
1155 #else
1156     osi_rxWakeup(conn);
1157 #endif
1158
1159     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1160
1161     /* Client is initially in send mode */
1162     call->state = RX_STATE_ACTIVE;
1163     call->error = conn->error;
1164     if (call->error)
1165         call->mode = RX_MODE_ERROR;
1166     else
1167         call->mode = RX_MODE_SENDING;
1168     
1169     /* remember start time for call in case we have hard dead time limit */
1170     call->queueTime = queueTime;
1171     clock_GetTime(&call->startTime);
1172     hzero(call->bytesSent);
1173     hzero(call->bytesRcvd);
1174
1175     /* Turn on busy protocol. */
1176     rxi_KeepAliveOn(call);
1177
1178     MUTEX_EXIT(&call->lock);
1179     MUTEX_EXIT(&conn->conn_call_lock);
1180     USERPRI;
1181
1182 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1183     /* Now, if TQ wasn't cleared earlier, do it now. */
1184     MUTEX_ENTER(&call->lock);
1185     rxi_WaitforTQBusy(call);
1186     if (call->flags & RX_CALL_TQ_CLEARME) {
1187         rxi_ClearTransmitQueue(call, 0);
1188         queue_Init(&call->tq);
1189     }
1190     MUTEX_EXIT(&call->lock);
1191 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1192
1193     dpf(("rx_NewCall(call %x)\n", call));
1194     return call;
1195 }
1196
1197 int
1198 rxi_HasActiveCalls(register struct rx_connection *aconn)
1199 {
1200     register int i;
1201     register struct rx_call *tcall;
1202     SPLVAR;
1203
1204     NETPRI;
1205     for (i = 0; i < RX_MAXCALLS; i++) {
1206         if ((tcall = aconn->call[i])) {
1207             if ((tcall->state == RX_STATE_ACTIVE)
1208                 || (tcall->state == RX_STATE_PRECALL)) {
1209                 USERPRI;
1210                 return 1;
1211             }
1212         }
1213     }
1214     USERPRI;
1215     return 0;
1216 }
1217
1218 int
1219 rxi_GetCallNumberVector(register struct rx_connection *aconn,
1220                         register afs_int32 * aint32s)
1221 {
1222     register int i;
1223     register struct rx_call *tcall;
1224     SPLVAR;
1225
1226     NETPRI;
1227     for (i = 0; i < RX_MAXCALLS; i++) {
1228         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1229             aint32s[i] = aconn->callNumber[i] + 1;
1230         else
1231             aint32s[i] = aconn->callNumber[i];
1232     }
1233     USERPRI;
1234     return 0;
1235 }
1236
1237 int
1238 rxi_SetCallNumberVector(register struct rx_connection *aconn,
1239                         register afs_int32 * aint32s)
1240 {
1241     register int i;
1242     register struct rx_call *tcall;
1243     SPLVAR;
1244
1245     NETPRI;
1246     for (i = 0; i < RX_MAXCALLS; i++) {
1247         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1248             aconn->callNumber[i] = aint32s[i] - 1;
1249         else
1250             aconn->callNumber[i] = aint32s[i];
1251     }
1252     USERPRI;
1253     return 0;
1254 }
1255
1256 /* Advertise a new service.  A service is named locally by a UDP port
1257  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1258  * on a failure. 
1259  *
1260      char *serviceName;  Name for identification purposes (e.g. the
1261                          service name might be used for probing for
1262                          statistics) */
1263 struct rx_service *
1264 rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId, 
1265                   char *serviceName, struct rx_securityClass **securityObjects,
1266                   int nSecurityObjects, 
1267                   afs_int32(*serviceProc) (struct rx_call * acall))
1268 {
1269     osi_socket socket = OSI_NULLSOCKET;
1270     register struct rx_service *tservice;
1271     register int i;
1272     SPLVAR;
1273
1274     clock_NewTime();
1275
1276     if (serviceId == 0) {
1277         (osi_Msg
1278          "rx_NewService:  service id for service %s is not non-zero.\n",
1279          serviceName);
1280         return 0;
1281     }
1282     if (port == 0) {
1283         if (rx_port == 0) {
1284             (osi_Msg
1285              "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n",
1286              serviceName);
1287             return 0;
1288         }
1289         port = rx_port;
1290         socket = rx_socket;
1291     }
1292
1293     tservice = rxi_AllocService();
1294     NETPRI;
1295     for (i = 0; i < RX_MAX_SERVICES; i++) {
1296         register struct rx_service *service = rx_services[i];
1297         if (service) {
1298             if (port == service->servicePort && host == service->serviceHost) {
1299                 if (service->serviceId == serviceId) {
1300                     /* The identical service has already been
1301                      * installed; if the caller was intending to
1302                      * change the security classes used by this
1303                      * service, he/she loses. */
1304                     (osi_Msg
1305                      "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n",
1306                      serviceName, serviceId, service->serviceName);
1307                     USERPRI;
1308                     rxi_FreeService(tservice);
1309                     return service;
1310                 }
1311                 /* Different service, same port: re-use the socket
1312                  * which is bound to the same port */
1313                 socket = service->socket;
1314             }
1315         } else {
1316             if (socket == OSI_NULLSOCKET) {
1317                 /* If we don't already have a socket (from another
1318                  * service on same port) get a new one */
1319                 socket = rxi_GetHostUDPSocket(htonl(INADDR_ANY), port);
1320                 if (socket == OSI_NULLSOCKET) {
1321                     USERPRI;
1322                     rxi_FreeService(tservice);
1323                     return 0;
1324                 }
1325             }
1326             service = tservice;
1327             service->socket = socket;
1328             service->serviceHost = host;
1329             service->servicePort = port;
1330             service->serviceId = serviceId;
1331             service->serviceName = serviceName;
1332             service->nSecurityObjects = nSecurityObjects;
1333             service->securityObjects = securityObjects;
1334             service->minProcs = 0;
1335             service->maxProcs = 1;
1336             service->idleDeadTime = 60;
1337             service->connDeadTime = rx_connDeadTime;
1338             service->executeRequestProc = serviceProc;
1339             service->checkReach = 0;
1340             rx_services[i] = service;   /* not visible until now */
1341             USERPRI;
1342             return service;
1343         }
1344     }
1345     USERPRI;
1346     rxi_FreeService(tservice);
1347     (osi_Msg "rx_NewService: cannot support > %d services\n",
1348      RX_MAX_SERVICES);
1349     return 0;
1350 }
1351
1352 struct rx_service *
1353 rx_NewService(u_short port, u_short serviceId, char *serviceName,
1354               struct rx_securityClass **securityObjects, int nSecurityObjects,
1355               afs_int32(*serviceProc) (struct rx_call * acall))
1356 {
1357     return rx_NewServiceHost(htonl(INADDR_ANY), port, serviceId, serviceName, securityObjects, nSecurityObjects, serviceProc);
1358 }
1359
1360 /* Generic request processing loop. This routine should be called
1361  * by the implementation dependent rx_ServerProc. If socketp is
1362  * non-null, it will be set to the file descriptor that this thread
1363  * is now listening on. If socketp is null, this routine will never
1364  * returns. */
1365 void
1366 rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket * socketp)
1367 {
1368     register struct rx_call *call;
1369     register afs_int32 code;
1370     register struct rx_service *tservice = NULL;
1371
1372     for (;;) {
1373         if (newcall) {
1374             call = newcall;
1375             newcall = NULL;
1376         } else {
1377             call = rx_GetCall(threadID, tservice, socketp);
1378             if (socketp && *socketp != OSI_NULLSOCKET) {
1379                 /* We are now a listener thread */
1380                 return;
1381             }
1382         }
1383
1384         /* if server is restarting( typically smooth shutdown) then do not
1385          * allow any new calls.
1386          */
1387
1388         if (rx_tranquil && (call != NULL)) {
1389             SPLVAR;
1390
1391             NETPRI;
1392             MUTEX_ENTER(&call->lock);
1393
1394             rxi_CallError(call, RX_RESTARTING);
1395             rxi_SendCallAbort(call, (struct rx_packet *)0, 0, 0);
1396
1397             MUTEX_EXIT(&call->lock);
1398             USERPRI;
1399         }
1400 #ifdef  KERNEL
1401         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1402 #ifdef RX_ENABLE_LOCKS
1403             AFS_GLOCK();
1404 #endif /* RX_ENABLE_LOCKS */
1405             afs_termState = AFSOP_STOP_AFS;
1406             afs_osi_Wakeup(&afs_termState);
1407 #ifdef RX_ENABLE_LOCKS
1408             AFS_GUNLOCK();
1409 #endif /* RX_ENABLE_LOCKS */
1410             return;
1411         }
1412 #endif
1413
1414         tservice = call->conn->service;
1415
1416         if (tservice->beforeProc)
1417             (*tservice->beforeProc) (call);
1418
1419         code = call->conn->service->executeRequestProc(call);
1420
1421         if (tservice->afterProc)
1422             (*tservice->afterProc) (call, code);
1423
1424         rx_EndCall(call, code);
1425         MUTEX_ENTER(&rx_stats_mutex);
1426         rxi_nCalls++;
1427         MUTEX_EXIT(&rx_stats_mutex);
1428     }
1429 }
1430
1431
1432 void
1433 rx_WakeupServerProcs(void)
1434 {
1435     struct rx_serverQueueEntry *np, *tqp;
1436     SPLVAR;
1437
1438     NETPRI;
1439     MUTEX_ENTER(&rx_serverPool_lock);
1440
1441 #ifdef RX_ENABLE_LOCKS
1442     if (rx_waitForPacket)
1443         CV_BROADCAST(&rx_waitForPacket->cv);
1444 #else /* RX_ENABLE_LOCKS */
1445     if (rx_waitForPacket)
1446         osi_rxWakeup(rx_waitForPacket);
1447 #endif /* RX_ENABLE_LOCKS */
1448     MUTEX_ENTER(&freeSQEList_lock);
1449     for (np = rx_FreeSQEList; np; np = tqp) {
1450         tqp = *(struct rx_serverQueueEntry **)np;
1451 #ifdef RX_ENABLE_LOCKS
1452         CV_BROADCAST(&np->cv);
1453 #else /* RX_ENABLE_LOCKS */
1454         osi_rxWakeup(np);
1455 #endif /* RX_ENABLE_LOCKS */
1456     }
1457     MUTEX_EXIT(&freeSQEList_lock);
1458     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1459 #ifdef RX_ENABLE_LOCKS
1460         CV_BROADCAST(&np->cv);
1461 #else /* RX_ENABLE_LOCKS */
1462         osi_rxWakeup(np);
1463 #endif /* RX_ENABLE_LOCKS */
1464     }
1465     MUTEX_EXIT(&rx_serverPool_lock);
1466     USERPRI;
1467 }
1468
1469 /* meltdown:
1470  * One thing that seems to happen is that all the server threads get
1471  * tied up on some empty or slow call, and then a whole bunch of calls
1472  * arrive at once, using up the packet pool, so now there are more 
1473  * empty calls.  The most critical resources here are server threads
1474  * and the free packet pool.  The "doreclaim" code seems to help in
1475  * general.  I think that eventually we arrive in this state: there
1476  * are lots of pending calls which do have all their packets present,
1477  * so they won't be reclaimed, are multi-packet calls, so they won't
1478  * be scheduled until later, and thus are tying up most of the free 
1479  * packet pool for a very long time.
1480  * future options:
1481  * 1.  schedule multi-packet calls if all the packets are present.  
1482  * Probably CPU-bound operation, useful to return packets to pool. 
1483  * Do what if there is a full window, but the last packet isn't here?
1484  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1485  * it sleeps and waits for that type of call.
1486  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1487  * the current dataquota business is badly broken.  The quota isn't adjusted
1488  * to reflect how many packets are presently queued for a running call.
1489  * So, when we schedule a queued call with a full window of packets queued
1490  * up for it, that *should* free up a window full of packets for other 2d-class
1491  * calls to be able to use from the packet pool.  But it doesn't.
1492  *
1493  * NB.  Most of the time, this code doesn't run -- since idle server threads
1494  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1495  * as a new call arrives.
1496  */
1497 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1498  * for an rx_Read. */
1499 #ifdef RX_ENABLE_LOCKS
1500 struct rx_call *
1501 rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
1502 {
1503     struct rx_serverQueueEntry *sq;
1504     register struct rx_call *call = (struct rx_call *)0;
1505     struct rx_service *service = NULL;
1506     SPLVAR;
1507
1508     MUTEX_ENTER(&freeSQEList_lock);
1509
1510     if ((sq = rx_FreeSQEList)) {
1511         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1512         MUTEX_EXIT(&freeSQEList_lock);
1513     } else {                    /* otherwise allocate a new one and return that */
1514         MUTEX_EXIT(&freeSQEList_lock);
1515         sq = (struct rx_serverQueueEntry *)
1516             rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1517         MUTEX_INIT(&sq->lock, "server Queue lock", MUTEX_DEFAULT, 0);
1518         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1519     }
1520
1521     MUTEX_ENTER(&rx_serverPool_lock);
1522     if (cur_service != NULL) {
1523         ReturnToServerPool(cur_service);
1524     }
1525     while (1) {
1526         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1527             register struct rx_call *tcall, *ncall, *choice2 = NULL;
1528
1529             /* Scan for eligible incoming calls.  A call is not eligible
1530              * if the maximum number of calls for its service type are
1531              * already executing */
1532             /* One thread will process calls FCFS (to prevent starvation),
1533              * while the other threads may run ahead looking for calls which
1534              * have all their input data available immediately.  This helps 
1535              * keep threads from blocking, waiting for data from the client. */
1536             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1537                 service = tcall->conn->service;
1538                 if (!QuotaOK(service)) {
1539                     continue;
1540                 }
1541                 if (tno == rxi_fcfs_thread_num
1542                     || !tcall->queue_item_header.next) {
1543                     /* If we're the fcfs thread , then  we'll just use 
1544                      * this call. If we haven't been able to find an optimal 
1545                      * choice, and we're at the end of the list, then use a 
1546                      * 2d choice if one has been identified.  Otherwise... */
1547                     call = (choice2 ? choice2 : tcall);
1548                     service = call->conn->service;
1549                 } else if (!queue_IsEmpty(&tcall->rq)) {
1550                     struct rx_packet *rp;
1551                     rp = queue_First(&tcall->rq, rx_packet);
1552                     if (rp->header.seq == 1) {
1553                         if (!meltdown_1pkt
1554                             || (rp->header.flags & RX_LAST_PACKET)) {
1555                             call = tcall;
1556                         } else if (rxi_2dchoice && !choice2
1557                                    && !(tcall->flags & RX_CALL_CLEARED)
1558                                    && (tcall->rprev > rxi_HardAckRate)) {
1559                             choice2 = tcall;
1560                         } else
1561                             rxi_md2cnt++;
1562                     }
1563                 }
1564                 if (call) {
1565                     break;
1566                 } else {
1567                     ReturnToServerPool(service);
1568                 }
1569             }
1570         }
1571
1572         if (call) {
1573             queue_Remove(call);
1574             MUTEX_EXIT(&rx_serverPool_lock);
1575             MUTEX_ENTER(&call->lock);
1576
1577             if (call->flags & RX_CALL_WAIT_PROC) {
1578                 call->flags &= ~RX_CALL_WAIT_PROC;
1579                 MUTEX_ENTER(&rx_stats_mutex);
1580                 rx_nWaiting--;
1581                 MUTEX_EXIT(&rx_stats_mutex);
1582             }
1583
1584             if (call->state != RX_STATE_PRECALL || call->error) {
1585                 MUTEX_EXIT(&call->lock);
1586                 MUTEX_ENTER(&rx_serverPool_lock);
1587                 ReturnToServerPool(service);
1588                 call = NULL;
1589                 continue;
1590             }
1591
1592             if (queue_IsEmpty(&call->rq)
1593                 || queue_First(&call->rq, rx_packet)->header.seq != 1)
1594                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1595
1596             CLEAR_CALL_QUEUE_LOCK(call);
1597             break;
1598         } else {
1599             /* If there are no eligible incoming calls, add this process
1600              * to the idle server queue, to wait for one */
1601             sq->newcall = 0;
1602             sq->tno = tno;
1603             if (socketp) {
1604                 *socketp = OSI_NULLSOCKET;
1605             }
1606             sq->socketp = socketp;
1607             queue_Append(&rx_idleServerQueue, sq);
1608 #ifndef AFS_AIX41_ENV
1609             rx_waitForPacket = sq;
1610 #else
1611             rx_waitingForPacket = sq;
1612 #endif /* AFS_AIX41_ENV */
1613             do {
1614                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1615 #ifdef  KERNEL
1616                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1617                     MUTEX_EXIT(&rx_serverPool_lock);
1618                     return (struct rx_call *)0;
1619                 }
1620 #endif
1621             } while (!(call = sq->newcall)
1622                      && !(socketp && *socketp != OSI_NULLSOCKET));
1623             MUTEX_EXIT(&rx_serverPool_lock);
1624             if (call) {
1625                 MUTEX_ENTER(&call->lock);
1626             }
1627             break;
1628         }
1629     }
1630
1631     MUTEX_ENTER(&freeSQEList_lock);
1632     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1633     rx_FreeSQEList = sq;
1634     MUTEX_EXIT(&freeSQEList_lock);
1635
1636     if (call) {
1637         clock_GetTime(&call->startTime);
1638         call->state = RX_STATE_ACTIVE;
1639         call->mode = RX_MODE_RECEIVING;
1640 #ifdef RX_KERNEL_TRACE
1641         if (ICL_SETACTIVE(afs_iclSetp)) {
1642             int glockOwner = ISAFS_GLOCK();
1643             if (!glockOwner)
1644                 AFS_GLOCK();
1645             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1646                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1647                        call);
1648             if (!glockOwner)
1649                 AFS_GUNLOCK();
1650         }
1651 #endif
1652
1653         rxi_calltrace(RX_CALL_START, call);
1654         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1655              call->conn->service->servicePort, call->conn->service->serviceId,
1656              call));
1657
1658         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1659         MUTEX_EXIT(&call->lock);
1660     } else {
1661         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1662     }
1663
1664     return call;
1665 }
1666 #else /* RX_ENABLE_LOCKS */
1667 struct rx_call *
1668 rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
1669 {
1670     struct rx_serverQueueEntry *sq;
1671     register struct rx_call *call = (struct rx_call *)0, *choice2;
1672     struct rx_service *service = NULL;
1673     SPLVAR;
1674
1675     NETPRI;
1676     MUTEX_ENTER(&freeSQEList_lock);
1677
1678     if ((sq = rx_FreeSQEList)) {
1679         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1680         MUTEX_EXIT(&freeSQEList_lock);
1681     } else {                    /* otherwise allocate a new one and return that */
1682         MUTEX_EXIT(&freeSQEList_lock);
1683         sq = (struct rx_serverQueueEntry *)
1684             rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1685         MUTEX_INIT(&sq->lock, "server Queue lock", MUTEX_DEFAULT, 0);
1686         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1687     }
1688     MUTEX_ENTER(&sq->lock);
1689
1690     if (cur_service != NULL) {
1691         cur_service->nRequestsRunning--;
1692         if (cur_service->nRequestsRunning < cur_service->minProcs)
1693             rxi_minDeficit++;
1694         rxi_availProcs++;
1695     }
1696     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1697         register struct rx_call *tcall, *ncall;
1698         /* Scan for eligible incoming calls.  A call is not eligible
1699          * if the maximum number of calls for its service type are
1700          * already executing */
1701         /* One thread will process calls FCFS (to prevent starvation),
1702          * while the other threads may run ahead looking for calls which
1703          * have all their input data available immediately.  This helps 
1704          * keep threads from blocking, waiting for data from the client. */
1705         choice2 = (struct rx_call *)0;
1706         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1707             service = tcall->conn->service;
1708             if (QuotaOK(service)) {
1709                 if (tno == rxi_fcfs_thread_num
1710                     || !tcall->queue_item_header.next) {
1711                     /* If we're the fcfs thread, then  we'll just use 
1712                      * this call. If we haven't been able to find an optimal 
1713                      * choice, and we're at the end of the list, then use a 
1714                      * 2d choice if one has been identified.  Otherwise... */
1715                     call = (choice2 ? choice2 : tcall);
1716                     service = call->conn->service;
1717                 } else if (!queue_IsEmpty(&tcall->rq)) {
1718                     struct rx_packet *rp;
1719                     rp = queue_First(&tcall->rq, rx_packet);
1720                     if (rp->header.seq == 1
1721                         && (!meltdown_1pkt
1722                             || (rp->header.flags & RX_LAST_PACKET))) {
1723                         call = tcall;
1724                     } else if (rxi_2dchoice && !choice2
1725                                && !(tcall->flags & RX_CALL_CLEARED)
1726                                && (tcall->rprev > rxi_HardAckRate)) {
1727                         choice2 = tcall;
1728                     } else
1729                         rxi_md2cnt++;
1730                 }
1731             }
1732             if (call)
1733                 break;
1734         }
1735     }
1736
1737     if (call) {
1738         queue_Remove(call);
1739         /* we can't schedule a call if there's no data!!! */
1740         /* send an ack if there's no data, if we're missing the
1741          * first packet, or we're missing something between first 
1742          * and last -- there's a "hole" in the incoming data. */
1743         if (queue_IsEmpty(&call->rq)
1744             || queue_First(&call->rq, rx_packet)->header.seq != 1
1745             || call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1746             rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1747
1748         call->flags &= (~RX_CALL_WAIT_PROC);
1749         service->nRequestsRunning++;
1750         /* just started call in minProcs pool, need fewer to maintain
1751          * guarantee */
1752         if (service->nRequestsRunning <= service->minProcs)
1753             rxi_minDeficit--;
1754         rxi_availProcs--;
1755         rx_nWaiting--;
1756         /* MUTEX_EXIT(&call->lock); */
1757     } else {
1758         /* If there are no eligible incoming calls, add this process
1759          * to the idle server queue, to wait for one */
1760         sq->newcall = 0;
1761         if (socketp) {
1762             *socketp = OSI_NULLSOCKET;
1763         }
1764         sq->socketp = socketp;
1765         queue_Append(&rx_idleServerQueue, sq);
1766         do {
1767             osi_rxSleep(sq);
1768 #ifdef  KERNEL
1769             if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1770                 USERPRI;
1771                 rxi_Free(sq, sizeof(struct rx_serverQueueEntry));
1772                 return (struct rx_call *)0;
1773             }
1774 #endif
1775         } while (!(call = sq->newcall)
1776                  && !(socketp && *socketp != OSI_NULLSOCKET));
1777     }
1778     MUTEX_EXIT(&sq->lock);
1779
1780     MUTEX_ENTER(&freeSQEList_lock);
1781     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1782     rx_FreeSQEList = sq;
1783     MUTEX_EXIT(&freeSQEList_lock);
1784
1785     if (call) {
1786         clock_GetTime(&call->startTime);
1787         call->state = RX_STATE_ACTIVE;
1788         call->mode = RX_MODE_RECEIVING;
1789 #ifdef RX_KERNEL_TRACE
1790         if (ICL_SETACTIVE(afs_iclSetp)) {
1791             int glockOwner = ISAFS_GLOCK();
1792             if (!glockOwner)
1793                 AFS_GLOCK();
1794             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1795                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1796                        call);
1797             if (!glockOwner)
1798                 AFS_GUNLOCK();
1799         }
1800 #endif
1801
1802         rxi_calltrace(RX_CALL_START, call);
1803         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1804              call->conn->service->servicePort, call->conn->service->serviceId,
1805              call));
1806     } else {
1807         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1808     }
1809
1810     USERPRI;
1811
1812     return call;
1813 }
1814 #endif /* RX_ENABLE_LOCKS */
1815
1816
1817
1818 /* Establish a procedure to be called when a packet arrives for a
1819  * call.  This routine will be called at most once after each call,
1820  * and will also be called if there is an error condition on the or
1821  * the call is complete.  Used by multi rx to build a selection
1822  * function which determines which of several calls is likely to be a
1823  * good one to read from.  
1824  * NOTE: the way this is currently implemented it is probably only a
1825  * good idea to (1) use it immediately after a newcall (clients only)
1826  * and (2) only use it once.  Other uses currently void your warranty
1827  */
1828 void
1829 rx_SetArrivalProc(register struct rx_call *call,
1830                   register void (*proc) (register struct rx_call * call,
1831                                         register VOID * mh,
1832                                         register int index),
1833                   register VOID * handle, register int arg)
1834 {
1835     call->arrivalProc = proc;
1836     call->arrivalProcHandle = handle;
1837     call->arrivalProcArg = arg;
1838 }
1839
1840 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1841  * appropriate, and return the final error code from the conversation
1842  * to the caller */
1843
1844 afs_int32
1845 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1846 {
1847     register struct rx_connection *conn = call->conn;
1848     register struct rx_service *service;
1849     afs_int32 error;
1850     SPLVAR;
1851
1852
1853
1854     dpf(("rx_EndCall(call %x rc %d error %d abortCode %d)\n", call, rc, call->error, call->abortCode));
1855
1856     NETPRI;
1857     MUTEX_ENTER(&call->lock);
1858
1859     if (rc == 0 && call->error == 0) {
1860         call->abortCode = 0;
1861         call->abortCount = 0;
1862     }
1863
1864     call->arrivalProc = (void (*)())0;
1865     if (rc && call->error == 0) {
1866         rxi_CallError(call, rc);
1867         /* Send an abort message to the peer if this error code has
1868          * only just been set.  If it was set previously, assume the
1869          * peer has already been sent the error code or will request it 
1870          */
1871         rxi_SendCallAbort(call, (struct rx_packet *)0, 0, 0);
1872     }
1873     if (conn->type == RX_SERVER_CONNECTION) {
1874         /* Make sure reply or at least dummy reply is sent */
1875         if (call->mode == RX_MODE_RECEIVING) {
1876             rxi_WriteProc(call, 0, 0);
1877         }
1878         if (call->mode == RX_MODE_SENDING) {
1879             rxi_FlushWrite(call);
1880         }
1881         service = conn->service;
1882         rxi_calltrace(RX_CALL_END, call);
1883         /* Call goes to hold state until reply packets are acknowledged */
1884         if (call->tfirst + call->nSoftAcked < call->tnext) {
1885             call->state = RX_STATE_HOLD;
1886         } else {
1887             call->state = RX_STATE_DALLY;
1888             rxi_ClearTransmitQueue(call, 0);
1889             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1890             rxevent_Cancel(call->keepAliveEvent, call,
1891                            RX_CALL_REFCOUNT_ALIVE);
1892         }
1893     } else {                    /* Client connection */
1894         char dummy;
1895         /* Make sure server receives input packets, in the case where
1896          * no reply arguments are expected */
1897         if ((call->mode == RX_MODE_SENDING)
1898             || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1899             (void)rxi_ReadProc(call, &dummy, 1);
1900         }
1901
1902         /* If we had an outstanding delayed ack, be nice to the server
1903          * and force-send it now.
1904          */
1905         if (call->delayedAckEvent) {
1906             rxevent_Cancel(call->delayedAckEvent, call,
1907                            RX_CALL_REFCOUNT_DELAY);
1908             call->delayedAckEvent = NULL;
1909             rxi_SendDelayedAck(NULL, call, NULL);
1910         }
1911
1912         /* We need to release the call lock since it's lower than the
1913          * conn_call_lock and we don't want to hold the conn_call_lock
1914          * over the rx_ReadProc call. The conn_call_lock needs to be held
1915          * here for the case where rx_NewCall is perusing the calls on
1916          * the connection structure. We don't want to signal until
1917          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
1918          * have checked this call, found it active and by the time it
1919          * goes to sleep, will have missed the signal.
1920          *
1921          * Do not clear the RX_CONN_MAKECALL_WAITING flag as long as
1922          * there are threads waiting to use the conn object.
1923          */
1924         MUTEX_EXIT(&call->lock);
1925         MUTEX_ENTER(&conn->conn_call_lock);
1926         MUTEX_ENTER(&call->lock);
1927         MUTEX_ENTER(&conn->conn_data_lock);
1928         conn->flags |= RX_CONN_BUSY;
1929         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
1930             if (conn->makeCallWaiters == 0)
1931                 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
1932             MUTEX_EXIT(&conn->conn_data_lock);
1933 #ifdef  RX_ENABLE_LOCKS
1934             CV_BROADCAST(&conn->conn_call_cv);
1935 #else
1936             osi_rxWakeup(conn);
1937 #endif
1938         }
1939 #ifdef RX_ENABLE_LOCKS
1940         else {
1941             MUTEX_EXIT(&conn->conn_data_lock);
1942         }
1943 #endif /* RX_ENABLE_LOCKS */
1944         call->state = RX_STATE_DALLY;
1945     }
1946     error = call->error;
1947
1948     /* currentPacket, nLeft, and NFree must be zeroed here, because
1949      * ResetCall cannot: ResetCall may be called at splnet(), in the
1950      * kernel version, and may interrupt the macros rx_Read or
1951      * rx_Write, which run at normal priority for efficiency. */
1952     if (call->currentPacket) {
1953         queue_Prepend(&call->iovq, call->currentPacket);
1954         call->currentPacket = (struct rx_packet *)0;
1955     }
1956         
1957     call->nLeft = call->nFree = call->curlen = 0;
1958
1959     /* Free any packets from the last call to ReadvProc/WritevProc */
1960     rxi_FreePackets(0, &call->iovq);
1961
1962     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
1963     MUTEX_EXIT(&call->lock);
1964     if (conn->type == RX_CLIENT_CONNECTION) {
1965         MUTEX_EXIT(&conn->conn_call_lock);
1966         conn->flags &= ~RX_CONN_BUSY;
1967     }
1968     USERPRI;
1969     /*
1970      * Map errors to the local host's errno.h format.
1971      */
1972     error = ntoh_syserr_conv(error);
1973     return error;
1974 }
1975
1976 #if !defined(KERNEL)
1977
1978 /* Call this routine when shutting down a server or client (especially
1979  * clients).  This will allow Rx to gracefully garbage collect server
1980  * connections, and reduce the number of retries that a server might
1981  * make to a dead client.
1982  * This is not quite right, since some calls may still be ongoing and
1983  * we can't lock them to destroy them. */
1984 void
1985 rx_Finalize(void)
1986 {
1987     register struct rx_connection **conn_ptr, **conn_end;
1988
1989     INIT_PTHREAD_LOCKS;
1990     LOCK_RX_INIT;
1991     if (rxinit_status == 1) {
1992         UNLOCK_RX_INIT;
1993         return;                 /* Already shutdown. */
1994     }
1995     rxi_DeleteCachedConnections();
1996     if (rx_connHashTable) {
1997         MUTEX_ENTER(&rx_connHashTable_lock);
1998         for (conn_ptr = &rx_connHashTable[0], conn_end =
1999              &rx_connHashTable[rx_hashTableSize]; conn_ptr < conn_end;
2000              conn_ptr++) {
2001             struct rx_connection *conn, *next;
2002             for (conn = *conn_ptr; conn; conn = next) {
2003                 next = conn->next;
2004                 if (conn->type == RX_CLIENT_CONNECTION) {
2005                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
2006                     conn->refCount++;
2007                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
2008 #ifdef RX_ENABLE_LOCKS
2009                     rxi_DestroyConnectionNoLock(conn);
2010 #else /* RX_ENABLE_LOCKS */
2011                     rxi_DestroyConnection(conn);
2012 #endif /* RX_ENABLE_LOCKS */
2013                 }
2014             }
2015         }
2016 #ifdef RX_ENABLE_LOCKS
2017         while (rx_connCleanup_list) {
2018             struct rx_connection *conn;
2019             conn = rx_connCleanup_list;
2020             rx_connCleanup_list = rx_connCleanup_list->next;
2021             MUTEX_EXIT(&rx_connHashTable_lock);
2022             rxi_CleanupConnection(conn);
2023             MUTEX_ENTER(&rx_connHashTable_lock);
2024         }
2025         MUTEX_EXIT(&rx_connHashTable_lock);
2026 #endif /* RX_ENABLE_LOCKS */
2027     }
2028     rxi_flushtrace();
2029
2030 #ifdef AFS_NT40_ENV
2031     afs_winsockCleanup();
2032 #endif
2033
2034     rxinit_status = 1;
2035     UNLOCK_RX_INIT;
2036 }
2037 #endif
2038
2039 /* if we wakeup packet waiter too often, can get in loop with two
2040     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
2041 void
2042 rxi_PacketsUnWait(void)
2043 {
2044     if (!rx_waitingForPackets) {
2045         return;
2046     }
2047 #ifdef KERNEL
2048     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
2049         return;                 /* still over quota */
2050     }
2051 #endif /* KERNEL */
2052     rx_waitingForPackets = 0;
2053 #ifdef  RX_ENABLE_LOCKS
2054     CV_BROADCAST(&rx_waitingForPackets_cv);
2055 #else
2056     osi_rxWakeup(&rx_waitingForPackets);
2057 #endif
2058     return;
2059 }
2060
2061
2062 /* ------------------Internal interfaces------------------------- */
2063
2064 /* Return this process's service structure for the
2065  * specified socket and service */
2066 struct rx_service *
2067 rxi_FindService(register osi_socket socket, register u_short serviceId)
2068 {
2069     register struct rx_service **sp;
2070     for (sp = &rx_services[0]; *sp; sp++) {
2071         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
2072             return *sp;
2073     }
2074     return 0;
2075 }
2076
2077 /* Allocate a call structure, for the indicated channel of the
2078  * supplied connection.  The mode and state of the call must be set by
2079  * the caller. Returns the call with mutex locked. */
2080 struct rx_call *
2081 rxi_NewCall(register struct rx_connection *conn, register int channel)
2082 {
2083     register struct rx_call *call;
2084 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2085     register struct rx_call *cp;        /* Call pointer temp */
2086     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
2087 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2088
2089     dpf(("rxi_NewCall(conn %x, channel %d)\n", conn, channel));
2090
2091     /* Grab an existing call structure, or allocate a new one.
2092      * Existing call structures are assumed to have been left reset by
2093      * rxi_FreeCall */
2094     MUTEX_ENTER(&rx_freeCallQueue_lock);
2095
2096 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2097     /*
2098      * EXCEPT that the TQ might not yet be cleared out.
2099      * Skip over those with in-use TQs.
2100      */
2101     call = NULL;
2102     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
2103         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
2104             call = cp;
2105             break;
2106         }
2107     }
2108     if (call) {
2109 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2110     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
2111         call = queue_First(&rx_freeCallQueue, rx_call);
2112 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2113         queue_Remove(call);
2114         MUTEX_ENTER(&rx_stats_mutex);
2115         rx_stats.nFreeCallStructs--;
2116         MUTEX_EXIT(&rx_stats_mutex);
2117         MUTEX_EXIT(&rx_freeCallQueue_lock);
2118         MUTEX_ENTER(&call->lock);
2119         CLEAR_CALL_QUEUE_LOCK(call);
2120 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2121         /* Now, if TQ wasn't cleared earlier, do it now. */
2122         if (call->flags & RX_CALL_TQ_CLEARME) {
2123             rxi_ClearTransmitQueue(call, 0);
2124             queue_Init(&call->tq);
2125         }
2126 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2127         /* Bind the call to its connection structure */
2128         call->conn = conn;
2129         rxi_ResetCall(call, 1);
2130     } else {
2131         call = (struct rx_call *)rxi_Alloc(sizeof(struct rx_call));
2132
2133         MUTEX_EXIT(&rx_freeCallQueue_lock);
2134         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
2135         MUTEX_ENTER(&call->lock);
2136         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
2137         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
2138         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
2139
2140         MUTEX_ENTER(&rx_stats_mutex);
2141         rx_stats.nCallStructs++;
2142         MUTEX_EXIT(&rx_stats_mutex);
2143         /* Initialize once-only items */
2144         queue_Init(&call->tq);
2145         queue_Init(&call->rq);
2146         queue_Init(&call->iovq);
2147         /* Bind the call to its connection structure (prereq for reset) */
2148         call->conn = conn;
2149         rxi_ResetCall(call, 1);
2150     }
2151     call->channel = channel;
2152     call->callNumber = &conn->callNumber[channel];
2153     /* Note that the next expected call number is retained (in
2154      * conn->callNumber[i]), even if we reallocate the call structure
2155      */
2156     conn->call[channel] = call;
2157     /* if the channel's never been used (== 0), we should start at 1, otherwise
2158      * the call number is valid from the last time this channel was used */
2159     if (*call->callNumber == 0)
2160         *call->callNumber = 1;
2161
2162     return call;
2163 }
2164
2165 /* A call has been inactive long enough that so we can throw away
2166  * state, including the call structure, which is placed on the call
2167  * free list.
2168  * Call is locked upon entry.
2169  * haveCTLock set if called from rxi_ReapConnections
2170  */
2171 #ifdef RX_ENABLE_LOCKS
2172 void
2173 rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2174 #else /* RX_ENABLE_LOCKS */
2175 void
2176 rxi_FreeCall(register struct rx_call *call)
2177 #endif                          /* RX_ENABLE_LOCKS */
2178 {
2179     register int channel = call->channel;
2180     register struct rx_connection *conn = call->conn;
2181
2182
2183     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2184         (*call->callNumber)++;
2185     rxi_ResetCall(call, 0);
2186     call->conn->call[channel] = (struct rx_call *)0;
2187
2188     MUTEX_ENTER(&rx_freeCallQueue_lock);
2189     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2190 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2191     /* A call may be free even though its transmit queue is still in use.
2192      * Since we search the call list from head to tail, put busy calls at
2193      * the head of the list, and idle calls at the tail.
2194      */
2195     if (call->flags & RX_CALL_TQ_BUSY)
2196         queue_Prepend(&rx_freeCallQueue, call);
2197     else
2198         queue_Append(&rx_freeCallQueue, call);
2199 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2200     queue_Append(&rx_freeCallQueue, call);
2201 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2202     MUTEX_ENTER(&rx_stats_mutex);
2203     rx_stats.nFreeCallStructs++;
2204     MUTEX_EXIT(&rx_stats_mutex);
2205
2206     MUTEX_EXIT(&rx_freeCallQueue_lock);
2207
2208     /* Destroy the connection if it was previously slated for
2209      * destruction, i.e. the Rx client code previously called
2210      * rx_DestroyConnection (client connections), or
2211      * rxi_ReapConnections called the same routine (server
2212      * connections).  Only do this, however, if there are no
2213      * outstanding calls. Note that for fine grain locking, there appears
2214      * to be a deadlock in that rxi_FreeCall has a call locked and
2215      * DestroyConnectionNoLock locks each call in the conn. But note a
2216      * few lines up where we have removed this call from the conn.
2217      * If someone else destroys a connection, they either have no
2218      * call lock held or are going through this section of code.
2219      */
2220     if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) {
2221         MUTEX_ENTER(&conn->conn_data_lock);
2222         conn->refCount++;
2223         MUTEX_EXIT(&conn->conn_data_lock);
2224 #ifdef RX_ENABLE_LOCKS
2225         if (haveCTLock)
2226             rxi_DestroyConnectionNoLock(conn);
2227         else
2228             rxi_DestroyConnection(conn);
2229 #else /* RX_ENABLE_LOCKS */
2230         rxi_DestroyConnection(conn);
2231 #endif /* RX_ENABLE_LOCKS */
2232     }
2233 }
2234
2235 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2236 char *
2237 rxi_Alloc(register size_t size)
2238 {
2239     register char *p;
2240
2241     MUTEX_ENTER(&rx_stats_mutex);
2242     rxi_Alloccnt++;
2243     rxi_Allocsize += (afs_int32)size;
2244     MUTEX_EXIT(&rx_stats_mutex);
2245
2246     p = (char *)osi_Alloc(size);
2247
2248     if (!p)
2249         osi_Panic("rxi_Alloc error");
2250     memset(p, 0, size);
2251     return p;
2252 }
2253
2254 void
2255 rxi_Free(void *addr, register size_t size)
2256 {
2257     MUTEX_ENTER(&rx_stats_mutex);
2258     rxi_Alloccnt--;
2259     rxi_Allocsize -= (afs_int32)size;
2260     MUTEX_EXIT(&rx_stats_mutex);
2261
2262     osi_Free(addr, size);
2263 }
2264
2265 /* Find the peer process represented by the supplied (host,port)
2266  * combination.  If there is no appropriate active peer structure, a
2267  * new one will be allocated and initialized 
2268  * The origPeer, if set, is a pointer to a peer structure on which the
2269  * refcount will be be decremented. This is used to replace the peer
2270  * structure hanging off a connection structure */
2271 struct rx_peer *
2272 rxi_FindPeer(register afs_uint32 host, register u_short port,
2273              struct rx_peer *origPeer, int create)
2274 {
2275     register struct rx_peer *pp;
2276     int hashIndex;
2277     hashIndex = PEER_HASH(host, port);
2278     MUTEX_ENTER(&rx_peerHashTable_lock);
2279     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2280         if ((pp->host == host) && (pp->port == port))
2281             break;
2282     }
2283     if (!pp) {
2284         if (create) {
2285             pp = rxi_AllocPeer();       /* This bzero's *pp */
2286             pp->host = host;    /* set here or in InitPeerParams is zero */
2287             pp->port = port;
2288             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2289             queue_Init(&pp->congestionQueue);
2290             queue_Init(&pp->rpcStats);
2291             pp->next = rx_peerHashTable[hashIndex];
2292             rx_peerHashTable[hashIndex] = pp;
2293             rxi_InitPeerParams(pp);
2294             MUTEX_ENTER(&rx_stats_mutex);
2295             rx_stats.nPeerStructs++;
2296             MUTEX_EXIT(&rx_stats_mutex);
2297         }
2298     }
2299     if (pp && create) {
2300         pp->refCount++;
2301     }
2302     if (origPeer)
2303         origPeer->refCount--;
2304     MUTEX_EXIT(&rx_peerHashTable_lock);
2305     return pp;
2306 }
2307
2308
2309 /* Find the connection at (host, port) started at epoch, and with the
2310  * given connection id.  Creates the server connection if necessary.
2311  * The type specifies whether a client connection or a server
2312  * connection is desired.  In both cases, (host, port) specify the
2313  * peer's (host, pair) pair.  Client connections are not made
2314  * automatically by this routine.  The parameter socket gives the
2315  * socket descriptor on which the packet was received.  This is used,
2316  * in the case of server connections, to check that *new* connections
2317  * come via a valid (port, serviceId).  Finally, the securityIndex
2318  * parameter must match the existing index for the connection.  If a
2319  * server connection is created, it will be created using the supplied
2320  * index, if the index is valid for this service */
2321 struct rx_connection *
2322 rxi_FindConnection(osi_socket socket, register afs_int32 host,
2323                    register u_short port, u_short serviceId, afs_uint32 cid,
2324                    afs_uint32 epoch, int type, u_int securityIndex)
2325 {
2326     int hashindex, flag;
2327     register struct rx_connection *conn;
2328     hashindex = CONN_HASH(host, port, cid, epoch, type);
2329     MUTEX_ENTER(&rx_connHashTable_lock);
2330     rxLastConn ? (conn = rxLastConn, flag = 0) : (conn =
2331                                                   rx_connHashTable[hashindex],
2332                                                   flag = 1);
2333     for (; conn;) {
2334         if ((conn->type == type) && ((cid & RX_CIDMASK) == conn->cid)
2335             && (epoch == conn->epoch)) {
2336             register struct rx_peer *pp = conn->peer;
2337             if (securityIndex != conn->securityIndex) {
2338                 /* this isn't supposed to happen, but someone could forge a packet
2339                  * like this, and there seems to be some CM bug that makes this
2340                  * happen from time to time -- in which case, the fileserver
2341                  * asserts. */
2342                 MUTEX_EXIT(&rx_connHashTable_lock);
2343                 return (struct rx_connection *)0;
2344             }
2345             if (pp->host == host && pp->port == port)
2346                 break;
2347             if (type == RX_CLIENT_CONNECTION && pp->port == port)
2348                 break;
2349             /* So what happens when it's a callback connection? */
2350             if (                /*type == RX_CLIENT_CONNECTION && */
2351                    (conn->epoch & 0x80000000))
2352                 break;
2353         }
2354         if (!flag) {
2355             /* the connection rxLastConn that was used the last time is not the
2356              ** one we are looking for now. Hence, start searching in the hash */
2357             flag = 1;
2358             conn = rx_connHashTable[hashindex];
2359         } else
2360             conn = conn->next;
2361     }
2362     if (!conn) {
2363         struct rx_service *service;
2364         if (type == RX_CLIENT_CONNECTION) {
2365             MUTEX_EXIT(&rx_connHashTable_lock);
2366             return (struct rx_connection *)0;
2367         }
2368         service = rxi_FindService(socket, serviceId);
2369         if (!service || (securityIndex >= service->nSecurityObjects)
2370             || (service->securityObjects[securityIndex] == 0)) {
2371             MUTEX_EXIT(&rx_connHashTable_lock);
2372             return (struct rx_connection *)0;
2373         }
2374         conn = rxi_AllocConnection();   /* This bzero's the connection */
2375         MUTEX_INIT(&conn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
2376         MUTEX_INIT(&conn->conn_data_lock, "conn data lock", MUTEX_DEFAULT, 0);
2377         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2378         conn->next = rx_connHashTable[hashindex];
2379         rx_connHashTable[hashindex] = conn;
2380         conn->peer = rxi_FindPeer(host, port, 0, 1);
2381         conn->type = RX_SERVER_CONNECTION;
2382         conn->lastSendTime = clock_Sec();       /* don't GC immediately */
2383         conn->epoch = epoch;
2384         conn->cid = cid & RX_CIDMASK;
2385         /* conn->serial = conn->lastSerial = 0; */
2386         /* conn->timeout = 0; */
2387         conn->ackRate = RX_FAST_ACK_RATE;
2388         conn->service = service;
2389         conn->serviceId = serviceId;
2390         conn->securityIndex = securityIndex;
2391         conn->securityObject = service->securityObjects[securityIndex];
2392         conn->nSpecific = 0;
2393         conn->specific = NULL;
2394         rx_SetConnDeadTime(conn, service->connDeadTime);
2395         rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
2396         /* Notify security object of the new connection */
2397         RXS_NewConnection(conn->securityObject, conn);
2398         /* XXXX Connection timeout? */
2399         if (service->newConnProc)
2400             (*service->newConnProc) (conn);
2401         MUTEX_ENTER(&rx_stats_mutex);
2402         rx_stats.nServerConns++;
2403         MUTEX_EXIT(&rx_stats_mutex);
2404     }
2405
2406     MUTEX_ENTER(&conn->conn_data_lock);
2407     conn->refCount++;
2408     MUTEX_EXIT(&conn->conn_data_lock);
2409
2410     rxLastConn = conn;          /* store this connection as the last conn used */
2411     MUTEX_EXIT(&rx_connHashTable_lock);
2412     return conn;
2413 }
2414
2415 /* There are two packet tracing routines available for testing and monitoring
2416  * Rx.  One is called just after every packet is received and the other is
2417  * called just before every packet is sent.  Received packets, have had their
2418  * headers decoded, and packets to be sent have not yet had their headers
2419  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2420  * containing the network address.  Both can be modified.  The return value, if
2421  * non-zero, indicates that the packet should be dropped.  */
2422
2423 int (*rx_justReceived) () = 0;
2424 int (*rx_almostSent) () = 0;
2425
2426 /* A packet has been received off the interface.  Np is the packet, socket is
2427  * the socket number it was received from (useful in determining which service
2428  * this packet corresponds to), and (host, port) reflect the host,port of the
2429  * sender.  This call returns the packet to the caller if it is finished with
2430  * it, rather than de-allocating it, just as a small performance hack */
2431
2432 struct rx_packet *
2433 rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
2434                   afs_uint32 host, u_short port, int *tnop,
2435                   struct rx_call **newcallp)
2436 {
2437     register struct rx_call *call;
2438     register struct rx_connection *conn;
2439     int channel;
2440     afs_uint32 currentCallNumber;
2441     int type;
2442     int skew;
2443 #ifdef RXDEBUG
2444     char *packetType;
2445 #endif
2446     struct rx_packet *tnp;
2447
2448 #ifdef RXDEBUG
2449 /* We don't print out the packet until now because (1) the time may not be
2450  * accurate enough until now in the lwp implementation (rx_Listener only gets
2451  * the time after the packet is read) and (2) from a protocol point of view,
2452  * this is the first time the packet has been seen */
2453     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2454         ? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
2455     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2456          np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
2457          np->header.epoch, np->header.cid, np->header.callNumber,
2458          np->header.seq, np->header.flags, np));
2459 #endif
2460
2461     if (np->header.type == RX_PACKET_TYPE_VERSION) {
2462         return rxi_ReceiveVersionPacket(np, socket, host, port, 1);
2463     }
2464
2465     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2466         return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
2467     }
2468 #ifdef RXDEBUG
2469     /* If an input tracer function is defined, call it with the packet and
2470      * network address.  Note this function may modify its arguments. */
2471     if (rx_justReceived) {
2472         struct sockaddr_in addr;
2473         int drop;
2474         addr.sin_family = AF_INET;
2475         addr.sin_port = port;
2476         addr.sin_addr.s_addr = host;
2477 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
2478         addr.sin_len = sizeof(addr);
2479 #endif /* AFS_OSF_ENV */
2480         drop = (*rx_justReceived) (np, &addr);
2481         /* drop packet if return value is non-zero */
2482         if (drop)
2483             return np;
2484         port = addr.sin_port;   /* in case fcn changed addr */
2485         host = addr.sin_addr.s_addr;
2486     }
2487 #endif
2488
2489     /* If packet was not sent by the client, then *we* must be the client */
2490     type = ((np->header.flags & RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2491         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2492
2493     /* Find the connection (or fabricate one, if we're the server & if
2494      * necessary) associated with this packet */
2495     conn =
2496         rxi_FindConnection(socket, host, port, np->header.serviceId,
2497                            np->header.cid, np->header.epoch, type,
2498                            np->header.securityIndex);
2499
2500     if (!conn) {
2501         /* If no connection found or fabricated, just ignore the packet.
2502          * (An argument could be made for sending an abort packet for
2503          * the conn) */
2504         return np;
2505     }
2506
2507     MUTEX_ENTER(&conn->conn_data_lock);
2508     if (conn->maxSerial < np->header.serial)
2509         conn->maxSerial = np->header.serial;
2510     MUTEX_EXIT(&conn->conn_data_lock);
2511
2512     /* If the connection is in an error state, send an abort packet and ignore
2513      * the incoming packet */
2514     if (conn->error) {
2515         /* Don't respond to an abort packet--we don't want loops! */
2516         MUTEX_ENTER(&conn->conn_data_lock);
2517         if (np->header.type != RX_PACKET_TYPE_ABORT)
2518             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2519         conn->refCount--;
2520         MUTEX_EXIT(&conn->conn_data_lock);
2521         return np;
2522     }
2523
2524     /* Check for connection-only requests (i.e. not call specific). */
2525     if (np->header.callNumber == 0) {
2526         switch (np->header.type) {
2527         case RX_PACKET_TYPE_ABORT: {
2528             /* What if the supplied error is zero? */
2529             afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
2530             dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode));
2531             rxi_ConnectionError(conn, errcode);
2532             MUTEX_ENTER(&conn->conn_data_lock);
2533             conn->refCount--;
2534             MUTEX_EXIT(&conn->conn_data_lock);
2535             return np;
2536         }
2537         case RX_PACKET_TYPE_CHALLENGE:
2538             tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2539             MUTEX_ENTER(&conn->conn_data_lock);
2540             conn->refCount--;
2541             MUTEX_EXIT(&conn->conn_data_lock);
2542             return tnp;
2543         case RX_PACKET_TYPE_RESPONSE:
2544             tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2545             MUTEX_ENTER(&conn->conn_data_lock);
2546             conn->refCount--;
2547             MUTEX_EXIT(&conn->conn_data_lock);
2548             return tnp;
2549         case RX_PACKET_TYPE_PARAMS:
2550         case RX_PACKET_TYPE_PARAMS + 1:
2551         case RX_PACKET_TYPE_PARAMS + 2:
2552             /* ignore these packet types for now */
2553             MUTEX_ENTER(&conn->conn_data_lock);
2554             conn->refCount--;
2555             MUTEX_EXIT(&conn->conn_data_lock);
2556             return np;
2557
2558
2559         default:
2560             /* Should not reach here, unless the peer is broken: send an
2561              * abort packet */
2562             rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2563             MUTEX_ENTER(&conn->conn_data_lock);
2564             tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2565             conn->refCount--;
2566             MUTEX_EXIT(&conn->conn_data_lock);
2567             return tnp;
2568         }
2569     }
2570
2571     channel = np->header.cid & RX_CHANNELMASK;
2572     call = conn->call[channel];
2573 #ifdef  RX_ENABLE_LOCKS
2574     if (call)
2575         MUTEX_ENTER(&call->lock);
2576     /* Test to see if call struct is still attached to conn. */
2577     if (call != conn->call[channel]) {
2578         if (call)
2579             MUTEX_EXIT(&call->lock);
2580         if (type == RX_SERVER_CONNECTION) {
2581             call = conn->call[channel];
2582             /* If we started with no call attached and there is one now,
2583              * another thread is also running this routine and has gotten
2584              * the connection channel. We should drop this packet in the tests
2585              * below. If there was a call on this connection and it's now
2586              * gone, then we'll be making a new call below.
2587              * If there was previously a call and it's now different then
2588              * the old call was freed and another thread running this routine
2589              * has created a call on this channel. One of these two threads
2590              * has a packet for the old call and the code below handles those
2591              * cases.
2592              */
2593             if (call)
2594                 MUTEX_ENTER(&call->lock);
2595         } else {
2596             /* This packet can't be for this call. If the new call address is
2597              * 0 then no call is running on this channel. If there is a call
2598              * then, since this is a client connection we're getting data for
2599              * it must be for the previous call.
2600              */
2601             MUTEX_ENTER(&rx_stats_mutex);
2602             rx_stats.spuriousPacketsRead++;
2603             MUTEX_EXIT(&rx_stats_mutex);
2604             MUTEX_ENTER(&conn->conn_data_lock);
2605             conn->refCount--;
2606             MUTEX_EXIT(&conn->conn_data_lock);
2607             return np;
2608         }
2609     }
2610 #endif
2611     currentCallNumber = conn->callNumber[channel];
2612
2613     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2614         if (np->header.callNumber < currentCallNumber) {
2615             MUTEX_ENTER(&rx_stats_mutex);
2616             rx_stats.spuriousPacketsRead++;
2617             MUTEX_EXIT(&rx_stats_mutex);
2618 #ifdef  RX_ENABLE_LOCKS
2619             if (call)
2620                 MUTEX_EXIT(&call->lock);
2621 #endif
2622             MUTEX_ENTER(&conn->conn_data_lock);
2623             conn->refCount--;
2624             MUTEX_EXIT(&conn->conn_data_lock);
2625             return np;
2626         }
2627         if (!call) {
2628             MUTEX_ENTER(&conn->conn_call_lock);
2629             call = rxi_NewCall(conn, channel);
2630             MUTEX_EXIT(&conn->conn_call_lock);
2631             *call->callNumber = np->header.callNumber;
2632             if (np->header.callNumber == 0) 
2633                 dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
2634
2635             call->state = RX_STATE_PRECALL;
2636             clock_GetTime(&call->queueTime);
2637             hzero(call->bytesSent);
2638             hzero(call->bytesRcvd);
2639             /*
2640              * If the number of queued calls exceeds the overload
2641              * threshold then abort this call.
2642              */
2643             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2644                 struct rx_packet *tp;
2645                 
2646                 rxi_CallError(call, rx_BusyError);
2647                 tp = rxi_SendCallAbort(call, np, 1, 0);
2648                 MUTEX_EXIT(&call->lock);
2649                 MUTEX_ENTER(&conn->conn_data_lock);
2650                 conn->refCount--;
2651                 MUTEX_EXIT(&conn->conn_data_lock);
2652                 MUTEX_ENTER(&rx_stats_mutex);
2653                 rx_stats.nBusies++;
2654                 MUTEX_EXIT(&rx_stats_mutex);
2655                 return tp;
2656             }
2657             rxi_KeepAliveOn(call);
2658         } else if (np->header.callNumber != currentCallNumber) {
2659             /* Wait until the transmit queue is idle before deciding
2660              * whether to reset the current call. Chances are that the
2661              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2662              * flag is cleared.
2663              */
2664 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2665             while ((call->state == RX_STATE_ACTIVE)
2666                    && (call->flags & RX_CALL_TQ_BUSY)) {
2667                 call->flags |= RX_CALL_TQ_WAIT;
2668                 call->tqWaiters++;
2669 #ifdef RX_ENABLE_LOCKS
2670                 osirx_AssertMine(&call->lock, "rxi_Start lock3");
2671                 CV_WAIT(&call->cv_tq, &call->lock);
2672 #else /* RX_ENABLE_LOCKS */
2673                 osi_rxSleep(&call->tq);
2674 #endif /* RX_ENABLE_LOCKS */
2675                 call->tqWaiters--;
2676                 if (call->tqWaiters == 0)
2677                     call->flags &= ~RX_CALL_TQ_WAIT;
2678             }
2679 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2680             /* If the new call cannot be taken right now send a busy and set
2681              * the error condition in this call, so that it terminates as
2682              * quickly as possible */
2683             if (call->state == RX_STATE_ACTIVE) {
2684                 struct rx_packet *tp;
2685
2686                 rxi_CallError(call, RX_CALL_DEAD);
2687                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
2688                                      NULL, 0, 1);
2689                 MUTEX_EXIT(&call->lock);
2690                 MUTEX_ENTER(&conn->conn_data_lock);
2691                 conn->refCount--;
2692                 MUTEX_EXIT(&conn->conn_data_lock);
2693                 return tp;
2694             }
2695             rxi_ResetCall(call, 0);
2696             *call->callNumber = np->header.callNumber;
2697             if (np->header.callNumber == 0) 
2698                 dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
2699
2700             call->state = RX_STATE_PRECALL;
2701             clock_GetTime(&call->queueTime);
2702             hzero(call->bytesSent);
2703             hzero(call->bytesRcvd);
2704             /*
2705              * If the number of queued calls exceeds the overload
2706              * threshold then abort this call.
2707              */
2708             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2709                 struct rx_packet *tp;
2710
2711                 rxi_CallError(call, rx_BusyError);
2712                 tp = rxi_SendCallAbort(call, np, 1, 0);
2713                 MUTEX_EXIT(&call->lock);
2714                 MUTEX_ENTER(&conn->conn_data_lock);
2715                 conn->refCount--;
2716                 MUTEX_EXIT(&conn->conn_data_lock);
2717                 MUTEX_ENTER(&rx_stats_mutex);
2718                 rx_stats.nBusies++;
2719                 MUTEX_EXIT(&rx_stats_mutex);
2720                 return tp;
2721             }
2722             rxi_KeepAliveOn(call);
2723         } else {
2724             /* Continuing call; do nothing here. */
2725         }
2726     } else {                    /* we're the client */
2727         /* Ignore all incoming acknowledgements for calls in DALLY state */
2728         if (call && (call->state == RX_STATE_DALLY)
2729             && (np->header.type == RX_PACKET_TYPE_ACK)) {
2730             MUTEX_ENTER(&rx_stats_mutex);
2731             rx_stats.ignorePacketDally++;
2732             MUTEX_EXIT(&rx_stats_mutex);
2733 #ifdef  RX_ENABLE_LOCKS
2734             if (call) {
2735                 MUTEX_EXIT(&call->lock);
2736             }
2737 #endif
2738             MUTEX_ENTER(&conn->conn_data_lock);
2739             conn->refCount--;
2740             MUTEX_EXIT(&conn->conn_data_lock);
2741             return np;
2742         }
2743
2744         /* Ignore anything that's not relevant to the current call.  If there
2745          * isn't a current call, then no packet is relevant. */
2746         if (!call || (np->header.callNumber != currentCallNumber)) {
2747             MUTEX_ENTER(&rx_stats_mutex);
2748             rx_stats.spuriousPacketsRead++;
2749             MUTEX_EXIT(&rx_stats_mutex);
2750 #ifdef  RX_ENABLE_LOCKS
2751             if (call) {
2752                 MUTEX_EXIT(&call->lock);
2753             }
2754 #endif
2755             MUTEX_ENTER(&conn->conn_data_lock);
2756             conn->refCount--;
2757             MUTEX_EXIT(&conn->conn_data_lock);
2758             return np;
2759         }
2760         /* If the service security object index stamped in the packet does not
2761          * match the connection's security index, ignore the packet */
2762         if (np->header.securityIndex != conn->securityIndex) {
2763 #ifdef  RX_ENABLE_LOCKS
2764             MUTEX_EXIT(&call->lock);
2765 #endif
2766             MUTEX_ENTER(&conn->conn_data_lock);
2767             conn->refCount--;
2768             MUTEX_EXIT(&conn->conn_data_lock);
2769             return np;
2770         }
2771
2772         /* If we're receiving the response, then all transmit packets are
2773          * implicitly acknowledged.  Get rid of them. */
2774         if (np->header.type == RX_PACKET_TYPE_DATA) {
2775 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2776             /* XXX Hack. Because we must release the global rx lock when
2777              * sending packets (osi_NetSend) we drop all acks while we're
2778              * traversing the tq in rxi_Start sending packets out because
2779              * packets may move to the freePacketQueue as result of being here!
2780              * So we drop these packets until we're safely out of the
2781              * traversing. Really ugly! 
2782              * For fine grain RX locking, we set the acked field in the
2783              * packets and let rxi_Start remove them from the transmit queue.
2784              */
2785             if (call->flags & RX_CALL_TQ_BUSY) {
2786 #ifdef  RX_ENABLE_LOCKS
2787                 rxi_SetAcksInTransmitQueue(call);
2788 #else
2789                 conn->refCount--;
2790                 return np;      /* xmitting; drop packet */
2791 #endif
2792             } else {
2793                 rxi_ClearTransmitQueue(call, 0);
2794             }
2795 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2796             rxi_ClearTransmitQueue(call, 0);
2797 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2798         } else {
2799             if (np->header.type == RX_PACKET_TYPE_ACK) {
2800                 /* now check to see if this is an ack packet acknowledging that the
2801                  * server actually *lost* some hard-acked data.  If this happens we
2802                  * ignore this packet, as it may indicate that the server restarted in
2803                  * the middle of a call.  It is also possible that this is an old ack
2804                  * packet.  We don't abort the connection in this case, because this
2805                  * *might* just be an old ack packet.  The right way to detect a server
2806                  * restart in the midst of a call is to notice that the server epoch
2807                  * changed, btw.  */
2808                 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2809                  * XXX unacknowledged.  I think that this is off-by-one, but
2810                  * XXX I don't dare change it just yet, since it will
2811                  * XXX interact badly with the server-restart detection 
2812                  * XXX code in receiveackpacket.  */
2813                 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2814                     MUTEX_ENTER(&rx_stats_mutex);
2815                     rx_stats.spuriousPacketsRead++;
2816                     MUTEX_EXIT(&rx_stats_mutex);
2817                     MUTEX_EXIT(&call->lock);
2818                     MUTEX_ENTER(&conn->conn_data_lock);
2819                     conn->refCount--;
2820                     MUTEX_EXIT(&conn->conn_data_lock);
2821                     return np;
2822                 }
2823             }
2824         }                       /* else not a data packet */
2825     }
2826
2827     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2828     /* Set remote user defined status from packet */
2829     call->remoteStatus = np->header.userStatus;
2830
2831     /* Note the gap between the expected next packet and the actual
2832      * packet that arrived, when the new packet has a smaller serial number
2833      * than expected.  Rioses frequently reorder packets all by themselves,
2834      * so this will be quite important with very large window sizes.
2835      * Skew is checked against 0 here to avoid any dependence on the type of
2836      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2837      * true! 
2838      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2839      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2840      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2841      */
2842     MUTEX_ENTER(&conn->conn_data_lock);
2843     skew = conn->lastSerial - np->header.serial;
2844     conn->lastSerial = np->header.serial;
2845     MUTEX_EXIT(&conn->conn_data_lock);
2846     if (skew > 0) {
2847         register struct rx_peer *peer;
2848         peer = conn->peer;
2849         if (skew > peer->inPacketSkew) {
2850             dpf(("*** In skew changed from %d to %d\n", peer->inPacketSkew,
2851                  skew));
2852             peer->inPacketSkew = skew;
2853         }
2854     }
2855
2856     /* Now do packet type-specific processing */
2857     switch (np->header.type) {
2858     case RX_PACKET_TYPE_DATA:
2859         np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port, tnop,
2860                                    newcallp);
2861         break;
2862     case RX_PACKET_TYPE_ACK:
2863         /* Respond immediately to ack packets requesting acknowledgement
2864          * (ping packets) */
2865         if (np->header.flags & RX_REQUEST_ACK) {
2866             if (call->error)
2867                 (void)rxi_SendCallAbort(call, 0, 1, 0);
2868             else
2869                 (void)rxi_SendAck(call, 0, np->header.serial,
2870                                   RX_ACK_PING_RESPONSE, 1);
2871         }
2872         np = rxi_ReceiveAckPacket(call, np, 1);
2873         break;
2874     case RX_PACKET_TYPE_ABORT: {
2875         /* An abort packet: reset the call, passing the error up to the user. */
2876         /* What if error is zero? */
2877         /* What if the error is -1? the application will treat it as a timeout. */
2878         afs_int32 errdata = ntohl(*(afs_int32 *) rx_DataOf(np));
2879         dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata));
2880         rxi_CallError(call, errdata);
2881         MUTEX_EXIT(&call->lock);
2882         MUTEX_ENTER(&conn->conn_data_lock);
2883         conn->refCount--;
2884         MUTEX_EXIT(&conn->conn_data_lock);
2885         return np;              /* xmitting; drop packet */
2886     }
2887     case RX_PACKET_TYPE_BUSY:
2888         /* XXXX */
2889         break;
2890     case RX_PACKET_TYPE_ACKALL:
2891         /* All packets acknowledged, so we can drop all packets previously
2892          * readied for sending */
2893 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2894         /* XXX Hack. We because we can't release the global rx lock when
2895          * sending packets (osi_NetSend) we drop all ack pkts while we're
2896          * traversing the tq in rxi_Start sending packets out because
2897          * packets may move to the freePacketQueue as result of being
2898          * here! So we drop these packets until we're safely out of the
2899          * traversing. Really ugly! 
2900          * For fine grain RX locking, we set the acked field in the packets
2901          * and let rxi_Start remove the packets from the transmit queue.
2902          */
2903         if (call->flags & RX_CALL_TQ_BUSY) {
2904 #ifdef  RX_ENABLE_LOCKS
2905             rxi_SetAcksInTransmitQueue(call);
2906             break;
2907 #else /* RX_ENABLE_LOCKS */
2908             MUTEX_EXIT(&call->lock);
2909             MUTEX_ENTER(&conn->conn_data_lock);
2910             conn->refCount--;
2911             MUTEX_EXIT(&conn->conn_data_lock);
2912             return np;          /* xmitting; drop packet */
2913 #endif /* RX_ENABLE_LOCKS */
2914         }
2915 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2916         rxi_ClearTransmitQueue(call, 0);
2917         break;
2918     default:
2919         /* Should not reach here, unless the peer is broken: send an abort
2920          * packet */
2921         rxi_CallError(call, RX_PROTOCOL_ERROR);
2922         np = rxi_SendCallAbort(call, np, 1, 0);
2923         break;
2924     };
2925     /* Note when this last legitimate packet was received, for keep-alive
2926      * processing.  Note, we delay getting the time until now in the hope that
2927      * the packet will be delivered to the user before any get time is required
2928      * (if not, then the time won't actually be re-evaluated here). */
2929     call->lastReceiveTime = clock_Sec();
2930     MUTEX_EXIT(&call->lock);
2931     MUTEX_ENTER(&conn->conn_data_lock);
2932     conn->refCount--;
2933     MUTEX_EXIT(&conn->conn_data_lock);
2934     return np;
2935 }
2936
2937 /* return true if this is an "interesting" connection from the point of view
2938     of someone trying to debug the system */
2939 int
2940 rxi_IsConnInteresting(struct rx_connection *aconn)
2941 {
2942     register int i;
2943     register struct rx_call *tcall;
2944
2945     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
2946         return 1;
2947     for (i = 0; i < RX_MAXCALLS; i++) {
2948         tcall = aconn->call[i];
2949         if (tcall) {
2950             if ((tcall->state == RX_STATE_PRECALL)
2951                 || (tcall->state == RX_STATE_ACTIVE))
2952                 return 1;
2953             if ((tcall->mode == RX_MODE_SENDING)
2954                 || (tcall->mode == RX_MODE_RECEIVING))
2955                 return 1;
2956         }
2957     }
2958     return 0;
2959 }
2960
2961 #ifdef KERNEL
2962 /* if this is one of the last few packets AND it wouldn't be used by the
2963    receiving call to immediately satisfy a read request, then drop it on
2964    the floor, since accepting it might prevent a lock-holding thread from
2965    making progress in its reading. If a call has been cleared while in
2966    the precall state then ignore all subsequent packets until the call
2967    is assigned to a thread. */
2968
2969 static int
2970 TooLow(struct rx_packet *ap, struct rx_call *acall)
2971 {
2972     int rc = 0;
2973     MUTEX_ENTER(&rx_stats_mutex);
2974     if (((ap->header.seq != 1) && (acall->flags & RX_CALL_CLEARED)
2975          && (acall->state == RX_STATE_PRECALL))
2976         || ((rx_nFreePackets < rxi_dataQuota + 2)
2977             && !((ap->header.seq < acall->rnext + rx_initSendWindow)
2978                  && (acall->flags & RX_CALL_READER_WAIT)))) {
2979         rc = 1;
2980     }
2981     MUTEX_EXIT(&rx_stats_mutex);
2982     return rc;
2983 }
2984 #endif /* KERNEL */
2985
2986 static void
2987 rxi_CheckReachEvent(struct rxevent *event, struct rx_connection *conn,
2988                     struct rx_call *acall)
2989 {
2990     struct rx_call *call = acall;
2991     struct clock when;
2992     int i, waiting;
2993
2994     MUTEX_ENTER(&conn->conn_data_lock);
2995     conn->checkReachEvent = NULL;
2996     waiting = conn->flags & RX_CONN_ATTACHWAIT;
2997     if (event)
2998         conn->refCount--;
2999     MUTEX_EXIT(&conn->conn_data_lock);
3000
3001     if (waiting) {
3002         if (!call) {
3003             MUTEX_ENTER(&conn->conn_call_lock);
3004             MUTEX_ENTER(&conn->conn_data_lock);
3005             for (i = 0; i < RX_MAXCALLS; i++) {
3006                 struct rx_call *tc = conn->call[i];
3007                 if (tc && tc->state == RX_STATE_PRECALL) {
3008                     call = tc;
3009                     break;
3010                 }
3011             }
3012             if (!call)
3013                 /* Indicate that rxi_CheckReachEvent is no longer running by
3014                  * clearing the flag.  Must be atomic under conn_data_lock to
3015                  * avoid a new call slipping by: rxi_CheckConnReach holds
3016                  * conn_data_lock while checking RX_CONN_ATTACHWAIT.
3017                  */
3018                 conn->flags &= ~RX_CONN_ATTACHWAIT;
3019             MUTEX_EXIT(&conn->conn_data_lock);
3020             MUTEX_EXIT(&conn->conn_call_lock);
3021         }
3022
3023         if (call) {
3024             if (call != acall)
3025                 MUTEX_ENTER(&call->lock);
3026             rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
3027             if (call != acall)
3028                 MUTEX_EXIT(&call->lock);
3029
3030             clock_GetTime(&when);
3031             when.sec += RX_CHECKREACH_TIMEOUT;
3032             MUTEX_ENTER(&conn->conn_data_lock);
3033             if (!conn->checkReachEvent) {
3034                 conn->refCount++;
3035                 conn->checkReachEvent =
3036                     rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
3037             }
3038             MUTEX_EXIT(&conn->conn_data_lock);
3039         }
3040     }
3041 }
3042
3043 static int
3044 rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
3045 {
3046     struct rx_service *service = conn->service;
3047     struct rx_peer *peer = conn->peer;
3048     afs_uint32 now, lastReach;
3049
3050     if (service->checkReach == 0)
3051         return 0;
3052
3053     now = clock_Sec();
3054     MUTEX_ENTER(&peer->peer_lock);
3055     lastReach = peer->lastReachTime;
3056     MUTEX_EXIT(&peer->peer_lock);
3057     if (now - lastReach < RX_CHECKREACH_TTL)
3058         return 0;
3059
3060     MUTEX_ENTER(&conn->conn_data_lock);
3061     if (conn->flags & RX_CONN_ATTACHWAIT) {
3062         MUTEX_EXIT(&conn->conn_data_lock);
3063         return 1;
3064     }
3065     conn->flags |= RX_CONN_ATTACHWAIT;
3066     MUTEX_EXIT(&conn->conn_data_lock);
3067     if (!conn->checkReachEvent)
3068         rxi_CheckReachEvent(NULL, conn, call);
3069
3070     return 1;
3071 }
3072
3073 /* try to attach call, if authentication is complete */
3074 static void
3075 TryAttach(register struct rx_call *acall, register osi_socket socket,
3076           register int *tnop, register struct rx_call **newcallp,
3077           int reachOverride)
3078 {
3079     struct rx_connection *conn = acall->conn;
3080
3081     if (conn->type == RX_SERVER_CONNECTION
3082         && acall->state == RX_STATE_PRECALL) {
3083         /* Don't attach until we have any req'd. authentication. */
3084         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
3085             if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
3086                 rxi_AttachServerProc(acall, socket, tnop, newcallp);
3087             /* Note:  this does not necessarily succeed; there
3088              * may not any proc available
3089              */
3090         } else {
3091             rxi_ChallengeOn(acall->conn);
3092         }
3093     }
3094 }
3095
3096 /* A data packet has been received off the interface.  This packet is
3097  * appropriate to the call (the call is in the right state, etc.).  This
3098  * routine can return a packet to the caller, for re-use */
3099
3100 struct rx_packet *
3101 rxi_ReceiveDataPacket(register struct rx_call *call,
3102                       register struct rx_packet *np, int istack,
3103                       osi_socket socket, afs_uint32 host, u_short port,
3104                       int *tnop, struct rx_call **newcallp)
3105 {
3106     int ackNeeded = 0;          /* 0 means no, otherwise ack_reason */
3107     int newPackets = 0;
3108     int didHardAck = 0;
3109     int haveLast = 0;
3110     afs_uint32 seq, serial, flags;
3111     int isFirst;
3112     struct rx_packet *tnp;
3113     struct clock when;
3114     MUTEX_ENTER(&rx_stats_mutex);
3115     rx_stats.dataPacketsRead++;
3116     MUTEX_EXIT(&rx_stats_mutex);
3117
3118 #ifdef KERNEL
3119     /* If there are no packet buffers, drop this new packet, unless we can find
3120      * packet buffers from inactive calls */
3121     if (!call->error
3122         && (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
3123         MUTEX_ENTER(&rx_freePktQ_lock);
3124         rxi_NeedMorePackets = TRUE;
3125         MUTEX_EXIT(&rx_freePktQ_lock);
3126         MUTEX_ENTER(&rx_stats_mutex);
3127         rx_stats.noPacketBuffersOnRead++;
3128         MUTEX_EXIT(&rx_stats_mutex);
3129         call->rprev = np->header.serial;
3130         rxi_calltrace(RX_TRACE_DROP, call);
3131         dpf(("packet %x dropped on receipt - quota problems", np));
3132         if (rxi_doreclaim)
3133             rxi_ClearReceiveQueue(call);
3134         clock_GetTime(&when);
3135         clock_Add(&when, &rx_softAckDelay);
3136         if (!call->delayedAckEvent
3137             || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3138             rxevent_Cancel(call->delayedAckEvent, call,
3139                            RX_CALL_REFCOUNT_DELAY);
3140             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3141             call->delayedAckEvent =
3142                 rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
3143         }
3144         /* we've damaged this call already, might as well do it in. */
3145         return np;
3146     }
3147 #endif /* KERNEL */
3148
3149     /*
3150      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
3151      * packet is one of several packets transmitted as a single
3152      * datagram. Do not send any soft or hard acks until all packets
3153      * in a jumbogram have been processed. Send negative acks right away.
3154      */
3155     for (isFirst = 1, tnp = NULL; isFirst || tnp; isFirst = 0) {
3156         /* tnp is non-null when there are more packets in the
3157          * current jumbo gram */
3158         if (tnp) {
3159             if (np)
3160                 rxi_FreePacket(np);
3161             np = tnp;
3162         }
3163
3164         seq = np->header.seq;
3165         serial = np->header.serial;
3166         flags = np->header.flags;
3167
3168         /* If the call is in an error state, send an abort message */
3169         if (call->error)
3170             return rxi_SendCallAbort(call, np, istack, 0);
3171
3172         /* The RX_JUMBO_PACKET is set in all but the last packet in each
3173          * AFS 3.5 jumbogram. */
3174         if (flags & RX_JUMBO_PACKET) {
3175             tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
3176         } else {
3177             tnp = NULL;
3178         }
3179
3180         if (np->header.spare != 0) {
3181             MUTEX_ENTER(&call->conn->conn_data_lock);
3182             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3183             MUTEX_EXIT(&call->conn->conn_data_lock);
3184         }
3185
3186         /* The usual case is that this is the expected next packet */
3187         if (seq == call->rnext) {
3188
3189             /* Check to make sure it is not a duplicate of one already queued */
3190             if (queue_IsNotEmpty(&call->rq)
3191                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3192                 MUTEX_ENTER(&rx_stats_mutex);
3193                 rx_stats.dupPacketsRead++;
3194                 MUTEX_EXIT(&rx_stats_mutex);
3195                 dpf(("packet %x dropped on receipt - duplicate", np));
3196                 rxevent_Cancel(call->delayedAckEvent, call,
3197                                RX_CALL_REFCOUNT_DELAY);
3198                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3199                 ackNeeded = 0;
3200                 call->rprev = seq;
3201                 continue;
3202             }
3203
3204             /* It's the next packet. Stick it on the receive queue
3205              * for this call. Set newPackets to make sure we wake
3206              * the reader once all packets have been processed */
3207             queue_Prepend(&call->rq, np);
3208             call->nSoftAcks++;
3209             np = NULL;          /* We can't use this anymore */
3210             newPackets = 1;
3211
3212             /* If an ack is requested then set a flag to make sure we
3213              * send an acknowledgement for this packet */
3214             if (flags & RX_REQUEST_ACK) {
3215                 ackNeeded = RX_ACK_REQUESTED;
3216             }
3217
3218             /* Keep track of whether we have received the last packet */
3219             if (flags & RX_LAST_PACKET) {
3220                 call->flags |= RX_CALL_HAVE_LAST;
3221                 haveLast = 1;
3222             }
3223
3224             /* Check whether we have all of the packets for this call */
3225             if (call->flags & RX_CALL_HAVE_LAST) {
3226                 afs_uint32 tseq;        /* temporary sequence number */
3227                 struct rx_packet *tp;   /* Temporary packet pointer */
3228                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
3229
3230                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3231                     if (tseq != tp->header.seq)
3232                         break;
3233                     if (tp->header.flags & RX_LAST_PACKET) {
3234                         call->flags |= RX_CALL_RECEIVE_DONE;
3235                         break;
3236                     }
3237                     tseq++;
3238                 }
3239             }
3240
3241             /* Provide asynchronous notification for those who want it
3242              * (e.g. multi rx) */
3243             if (call->arrivalProc) {
3244                 (*call->arrivalProc) (call, call->arrivalProcHandle,
3245                                       call->arrivalProcArg);
3246                 call->arrivalProc = (void (*)())0;
3247             }
3248
3249             /* Update last packet received */
3250             call->rprev = seq;
3251
3252             /* If there is no server process serving this call, grab
3253              * one, if available. We only need to do this once. If a
3254              * server thread is available, this thread becomes a server
3255              * thread and the server thread becomes a listener thread. */
3256             if (isFirst) {
3257                 TryAttach(call, socket, tnop, newcallp, 0);
3258             }
3259         }
3260         /* This is not the expected next packet. */
3261         else {
3262             /* Determine whether this is a new or old packet, and if it's
3263              * a new one, whether it fits into the current receive window.
3264              * Also figure out whether the packet was delivered in sequence.
3265              * We use the prev variable to determine whether the new packet
3266              * is the successor of its immediate predecessor in the
3267              * receive queue, and the missing flag to determine whether
3268              * any of this packets predecessors are missing.  */
3269
3270             afs_uint32 prev;    /* "Previous packet" sequence number */
3271             struct rx_packet *tp;       /* Temporary packet pointer */
3272             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3273             int missing;        /* Are any predecessors missing? */
3274
3275             /* If the new packet's sequence number has been sent to the
3276              * application already, then this is a duplicate */
3277             if (seq < call->rnext) {
3278                 MUTEX_ENTER(&rx_stats_mutex);
3279                 rx_stats.dupPacketsRead++;
3280                 MUTEX_EXIT(&rx_stats_mutex);
3281                 rxevent_Cancel(call->delayedAckEvent, call,
3282                                RX_CALL_REFCOUNT_DELAY);
3283                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3284                 ackNeeded = 0;
3285                 call->rprev = seq;
3286                 continue;
3287             }
3288
3289             /* If the sequence number is greater than what can be
3290              * accomodated by the current window, then send a negative
3291              * acknowledge and drop the packet */
3292             if ((call->rnext + call->rwind) <= seq) {
3293                 rxevent_Cancel(call->delayedAckEvent, call,
3294                                RX_CALL_REFCOUNT_DELAY);
3295                 np = rxi_SendAck(call, np, serial, RX_ACK_EXCEEDS_WINDOW,
3296                                  istack);
3297                 ackNeeded = 0;
3298                 call->rprev = seq;
3299                 continue;
3300             }
3301
3302             /* Look for the packet in the queue of old received packets */
3303             for (prev = call->rnext - 1, missing =
3304                  0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3305                 /*Check for duplicate packet */
3306                 if (seq == tp->header.seq) {
3307                     MUTEX_ENTER(&rx_stats_mutex);
3308                     rx_stats.dupPacketsRead++;
3309                     MUTEX_EXIT(&rx_stats_mutex);
3310                     rxevent_Cancel(call->delayedAckEvent, call,
3311                                    RX_CALL_REFCOUNT_DELAY);
3312                     np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
3313                                      istack);
3314                     ackNeeded = 0;
3315                     call->rprev = seq;
3316                     goto nextloop;
3317                 }
3318                 /* If we find a higher sequence packet, break out and
3319                  * insert the new packet here. */
3320                 if (seq < tp->header.seq)
3321                     break;
3322                 /* Check for missing packet */
3323                 if (tp->header.seq != prev + 1) {
3324                     missing = 1;
3325                 }
3326
3327                 prev = tp->header.seq;
3328             }
3329
3330             /* Keep track of whether we have received the last packet. */
3331             if (flags & RX_LAST_PACKET) {
3332                 call->flags |= RX_CALL_HAVE_LAST;
3333             }
3334
3335             /* It's within the window: add it to the the receive queue.
3336              * tp is left by the previous loop either pointing at the
3337              * packet before which to insert the new packet, or at the
3338              * queue head if the queue is empty or the packet should be
3339              * appended. */
3340             queue_InsertBefore(tp, np);
3341             call->nSoftAcks++;
3342             np = NULL;
3343
3344             /* Check whether we have all of the packets for this call */
3345             if ((call->flags & RX_CALL_HAVE_LAST)
3346                 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3347                 afs_uint32 tseq;        /* temporary sequence number */
3348
3349                 for (tseq =
3350                      call->rnext, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3351                     if (tseq != tp->header.seq)
3352                         break;
3353                     if (tp->header.flags & RX_LAST_PACKET) {
3354                         call->flags |= RX_CALL_RECEIVE_DONE;
3355                         break;
3356                     }
3357                     tseq++;
3358                 }
3359             }
3360
3361             /* We need to send an ack of the packet is out of sequence, 
3362              * or if an ack was requested by the peer. */
3363             if (seq != prev + 1 || missing) {
3364                 ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
3365             } else if (flags & RX_REQUEST_ACK) {
3366                 ackNeeded = RX_ACK_REQUESTED;
3367             }
3368
3369             /* Acknowledge the last packet for each call */
3370             if (flags & RX_LAST_PACKET) {
3371                 haveLast = 1;
3372             }
3373
3374             call->rprev = seq;
3375         }
3376       nextloop:;
3377     }
3378
3379     if (newPackets) {
3380         /*
3381          * If the receiver is waiting for an iovec, fill the iovec
3382          * using the data from the receive queue */
3383         if (call->flags & RX_CALL_IOVEC_WAIT) {
3384             didHardAck = rxi_FillReadVec(call, serial);
3385             /* the call may have been aborted */
3386             if (call->error) {
3387                 return NULL;
3388             }
3389             if (didHardAck) {
3390                 ackNeeded = 0;
3391             }
3392         }
3393
3394         /* Wakeup the reader if any */
3395         if ((call->flags & RX_CALL_READER_WAIT)
3396             && (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes)
3397                 || (call->iovNext >= call->iovMax)
3398                 || (call->flags & RX_CALL_RECEIVE_DONE))) {
3399             call->flags &= ~RX_CALL_READER_WAIT;
3400 #ifdef  RX_ENABLE_LOCKS
3401             CV_BROADCAST(&call->cv_rq);
3402 #else
3403             osi_rxWakeup(&call->rq);
3404 #endif
3405         }
3406     }
3407
3408     /*
3409      * Send an ack when requested by the peer, or once every
3410      * rxi_SoftAckRate packets until the last packet has been
3411      * received. Always send a soft ack for the last packet in
3412      * the server's reply. */
3413     if (ackNeeded) {
3414         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3415         np = rxi_SendAck(call, np, serial, ackNeeded, istack);
3416     } else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) {
3417         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3418         np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
3419     } else if (call->nSoftAcks) {
3420         clock_GetTime(&when);
3421         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3422             clock_Add(&when, &rx_lastAckDelay);
3423         } else {
3424             clock_Add(&when, &rx_softAckDelay);
3425         }
3426         if (!call->delayedAckEvent
3427             || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3428             rxevent_Cancel(call->delayedAckEvent, call,
3429                            RX_CALL_REFCOUNT_DELAY);
3430             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3431             call->delayedAckEvent =
3432                 rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
3433         }
3434     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3435         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3436     }
3437
3438     return np;
3439 }
3440
3441 #ifdef  ADAPT_WINDOW
3442 static void rxi_ComputeRate();
3443 #endif
3444
3445 static void
3446 rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3447 {
3448     struct rx_peer *peer = conn->peer;
3449
3450     MUTEX_ENTER(&peer->peer_lock);
3451     peer->lastReachTime = clock_Sec();
3452     MUTEX_EXIT(&peer->peer_lock);
3453
3454     MUTEX_ENTER(&conn->conn_data_lock);
3455     if (conn->flags & RX_CONN_ATTACHWAIT) {
3456         int i;
3457
3458         conn->flags &= ~RX_CONN_ATTACHWAIT;
3459         MUTEX_EXIT(&conn->conn_data_lock);
3460
3461         for (i = 0; i < RX_MAXCALLS; i++) {
3462             struct rx_call *call = conn->call[i];
3463             if (call) {
3464                 if (call != acall)
3465                     MUTEX_ENTER(&call->lock);
3466                 /* tnop can be null if newcallp is null */
3467                 TryAttach(call, (osi_socket) - 1, NULL, NULL, 1);
3468                 if (call != acall)
3469                     MUTEX_EXIT(&call->lock);
3470             }
3471         }
3472     } else
3473         MUTEX_EXIT(&conn->conn_data_lock);
3474 }
3475
3476 static const char *
3477 rx_ack_reason(int reason)
3478 {
3479     switch (reason) {
3480     case RX_ACK_REQUESTED:
3481         return "requested";
3482     case RX_ACK_DUPLICATE:
3483         return "duplicate";
3484     case RX_ACK_OUT_OF_SEQUENCE:
3485         return "sequence";
3486     case RX_ACK_EXCEEDS_WINDOW:
3487         return "window";
3488     case RX_ACK_NOSPACE:
3489         return "nospace";
3490     case RX_ACK_PING:
3491         return "ping";
3492     case RX_ACK_PING_RESPONSE:
3493         return "response";
3494     case RX_ACK_DELAY:
3495         return "delay";
3496     case RX_ACK_IDLE:
3497         return "idle";
3498     default:
3499         return "unknown!!";
3500     }
3501 }
3502
3503
3504 /* rxi_ComputePeerNetStats
3505  *
3506  * Called exclusively by rxi_ReceiveAckPacket to compute network link
3507  * estimates (like RTT and throughput) based on ack packets.  Caller
3508  * must ensure that the packet in question is the right one (i.e.
3509  * serial number matches).
3510  */
3511 static void
3512 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3513                         struct rx_ackPacket *ap, struct rx_packet *np)
3514 {
3515     struct rx_peer *peer = call->conn->peer;
3516
3517     /* Use RTT if not delayed by client. */
3518     if (ap->reason != RX_ACK_DELAY)
3519         rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3520 #ifdef ADAPT_WINDOW
3521     rxi_ComputeRate(peer, call, p, np, ap->reason);
3522 #endif
3523 }
3524
3525 /* The real smarts of the whole thing.  */
3526 struct rx_packet *
3527 rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
3528                      int istack)
3529 {
3530     struct rx_ackPacket *ap;
3531     int nAcks;
3532     register struct rx_packet *tp;
3533     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3534     register struct rx_connection *conn = call->conn;
3535     struct rx_peer *peer = conn->peer;
3536     afs_uint32 first;
3537     afs_uint32 serial;
3538     /* because there are CM's that are bogus, sending weird values for this. */
3539     afs_uint32 skew = 0;
3540     int nbytes;
3541     int missing;
3542     int acked;
3543     int nNacked = 0;
3544     int newAckCount = 0;
3545     u_short maxMTU = 0;         /* Set if peer supports AFS 3.4a jumbo datagrams */
3546     int maxDgramPackets = 0;    /* Set if peer supports AFS 3.5 jumbo datagrams */
3547
3548     MUTEX_ENTER(&rx_stats_mutex);
3549     rx_stats.ackPacketsRead++;
3550     MUTEX_EXIT(&rx_stats_mutex);
3551     ap = (struct rx_ackPacket *)rx_DataOf(np);
3552     nbytes = rx_Contiguous(np) - (int)((ap->acks) - (u_char *) ap);
3553     if (nbytes < 0)
3554         return np;              /* truncated ack packet */
3555
3556     /* depends on ack packet struct */
3557     nAcks = MIN((unsigned)nbytes, (unsigned)ap->nAcks);
3558     first = ntohl(ap->firstPacket);
3559     serial = ntohl(ap->serial);
3560     /* temporarily disabled -- needs to degrade over time 
3561      * skew = ntohs(ap->maxSkew); */
3562
3563     /* Ignore ack packets received out of order */
3564     if (first < call->tfirst) {
3565         return np;
3566     }
3567
3568     if (np->header.flags & RX_SLOW_START_OK) {
3569         call->flags |= RX_CALL_SLOW_START_OK;
3570     }
3571
3572     if (ap->reason == RX_ACK_PING_RESPONSE)
3573         rxi_UpdatePeerReach(conn, call);
3574
3575 #ifdef RXDEBUG
3576 #ifdef AFS_NT40_ENV
3577     if (rxdebug_active) {
3578         char msg[512];
3579         size_t len;
3580
3581         len = _snprintf(msg, sizeof(msg),
3582                         "tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %u space %u ",
3583                          GetCurrentThreadId(), rx_ack_reason(ap->reason), 
3584                          ntohl(ap->serial), ntohl(ap->previousPacket),
3585                          (unsigned int)np->header.seq, (unsigned int)skew, 
3586                          ntohl(ap->firstPacket), ap->nAcks, ntohs(ap->bufferSpace) );
3587         if (nAcks) {
3588             int offset;
3589
3590             for (offset = 0; offset < nAcks && len < sizeof(msg); offset++) 
3591                 msg[len++] = (ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*');
3592         }
3593         msg[len++]='\n';
3594         msg[len] = '\0';
3595         OutputDebugString(msg);
3596     }
3597 #else /* AFS_NT40_ENV */
3598     if (rx_Log) {
3599         fprintf(rx_Log,
3600                 "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
3601                 ap->reason, ntohl(ap->previousPacket),
3602                 (unsigned int)np->header.seq, (unsigned int)serial,
3603                 (unsigned int)skew, ntohl(ap->firstPacket));
3604         if (nAcks) {
3605             int offset;
3606             for (offset = 0; offset < nAcks; offset++)
3607                 putc(ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*',
3608                      rx_Log);
3609         }
3610         putc('\n', rx_Log);
3611     }
3612 #endif /* AFS_NT40_ENV */
3613 #endif
3614
3615     /* Update the outgoing packet skew value to the latest value of
3616      * the peer's incoming packet skew value.  The ack packet, of
3617      * course, could arrive out of order, but that won't affect things
3618      * much */
3619     MUTEX_ENTER(&peer->peer_lock);
3620     peer->outPacketSkew = skew;
3621
3622     /* Check for packets that no longer need to be transmitted, and
3623      * discard them.  This only applies to packets positively
3624      * acknowledged as having been sent to the peer's upper level.
3625      * All other packets must be retained.  So only packets with
3626      * sequence numbers < ap->firstPacket are candidates. */
3627     for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3628         if (tp->header.seq >= first)
3629             break;
3630         call->tfirst = tp->header.seq + 1;
3631         if (serial
3632             && (tp->header.serial == serial || tp->firstSerial == serial))
3633             rxi_ComputePeerNetStats(call, tp, ap, np);
3634         if (!(tp->flags & RX_PKTFLAG_ACKED)) {
3635             newAckCount++;
3636         }
3637 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3638         /* XXX Hack. Because we have to release the global rx lock when sending
3639          * packets (osi_NetSend) we drop all acks while we're traversing the tq
3640          * in rxi_Start sending packets out because packets may move to the
3641          * freePacketQueue as result of being here! So we drop these packets until
3642          * we're safely out of the traversing. Really ugly! 
3643          * To make it even uglier, if we're using fine grain locking, we can
3644          * set the ack bits in the packets and have rxi_Start remove the packets
3645          * when it's done transmitting.
3646          */
3647         if (call->flags & RX_CALL_TQ_BUSY) {
3648 #ifdef RX_ENABLE_LOCKS
3649             tp->flags |= RX_PKTFLAG_ACKED;
3650             call->flags |= RX_CALL_TQ_SOME_ACKED;
3651 #else /* RX_ENABLE_LOCKS */
3652             break;
3653 #endif /* RX_ENABLE_LOCKS */
3654         } else
3655 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3656         {
3657             queue_Remove(tp);
3658             rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
3659         }
3660     }
3661
3662 #ifdef ADAPT_WINDOW
3663     /* Give rate detector a chance to respond to ping requests */
3664     if (ap->reason == RX_ACK_PING_RESPONSE) {
3665         rxi_ComputeRate(peer, call, 0, np, ap->reason);
3666     }
3667 #endif
3668
3669     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
3670
3671     /* Now go through explicit acks/nacks and record the results in
3672      * the waiting packets.  These are packets that can't be released
3673      * yet, even with a positive acknowledge.  This positive
3674      * acknowledge only means the packet has been received by the
3675      * peer, not that it will be retained long enough to be sent to
3676      * the peer's upper level.  In addition, reset the transmit timers
3677      * of any missing packets (those packets that must be missing
3678      * because this packet was out of sequence) */
3679
3680     call->nSoftAcked = 0;
3681     for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
3682         /* Update round trip time if the ack was stimulated on receipt
3683          * of this packet */
3684 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
3685 #ifdef RX_ENABLE_LOCKS
3686         if (tp->header.seq >= first)
3687 #endif /* RX_ENABLE_LOCKS */
3688 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3689             if (serial
3690                 && (tp->header.serial == serial || tp->firstSerial == serial))
3691                 rxi_ComputePeerNetStats(call, tp, ap, np);
3692
3693         /* Set the acknowledge flag per packet based on the
3694          * information in the ack packet. An acknowlegded packet can
3695          * be downgraded when the server has discarded a pack