bringing-rx-into-21st-century-20060504
[openafs.git] / src / rx / rx.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX:  Extended Remote Procedure Call */
11
12 #include <afsconfig.h>
13 #ifdef  KERNEL
14 #include "afs/param.h"
15 #else
16 #include <afs/param.h>
17 #endif
18
19 RCSID
20     ("$Header$");
21
22 #ifdef KERNEL
23 #include "afs/sysincludes.h"
24 #include "afsincludes.h"
25 #ifndef UKERNEL
26 #include "h/types.h"
27 #include "h/time.h"
28 #include "h/stat.h"
29 #ifdef  AFS_OSF_ENV
30 #include <net/net_globals.h>
31 #endif /* AFS_OSF_ENV */
32 #ifdef AFS_LINUX20_ENV
33 #include "h/socket.h"
34 #endif
35 #include "netinet/in.h"
36 #include "afs/afs_args.h"
37 #include "afs/afs_osi.h"
38 #ifdef RX_KERNEL_TRACE
39 #include "rx_kcommon.h"
40 #endif
41 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
42 #include "h/systm.h"
43 #endif
44 #ifdef RXDEBUG
45 #undef RXDEBUG                  /* turn off debugging */
46 #endif /* RXDEBUG */
47 #if defined(AFS_SGI_ENV)
48 #include "sys/debug.h"
49 #endif
50 #include "afsint.h"
51 #ifdef  AFS_OSF_ENV
52 #undef kmem_alloc
53 #undef kmem_free
54 #undef mem_alloc
55 #undef mem_free
56 #undef register
57 #endif /* AFS_OSF_ENV */
58 #else /* !UKERNEL */
59 #include "afs/sysincludes.h"
60 #include "afsincludes.h"
61 #endif /* !UKERNEL */
62 #include "afs/lock.h"
63 #include "rx_kmutex.h"
64 #include "rx_kernel.h"
65 #include "rx_clock.h"
66 #include "rx_queue.h"
67 #include "rx.h"
68 #include "rx_globals.h"
69 #include "rx_trace.h"
70 #define AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
71 #define AFSOP_STOP_AFS          211     /* Stop AFS process */
72 #define AFSOP_STOP_BKG          212     /* Stop BKG process */
73 #include "afsint.h"
74 extern afs_int32 afs_termState;
75 #ifdef AFS_AIX41_ENV
76 #include "sys/lockl.h"
77 #include "sys/lock_def.h"
78 #endif /* AFS_AIX41_ENV */
79 # include "rxgen_consts.h"
80 #else /* KERNEL */
81 # include <sys/types.h>
82 # include <errno.h>
83 #ifdef AFS_NT40_ENV
84 # include <stdlib.h>
85 # include <fcntl.h>
86 # include <afs/afsutil.h>
87 # include <WINNT\afsreg.h>
88 #else
89 # include <sys/socket.h>
90 # include <sys/file.h>
91 # include <netdb.h>
92 # include <sys/stat.h>
93 # include <netinet/in.h>
94 # include <sys/time.h>
95 #endif
96 #ifdef HAVE_STRING_H
97 #include <string.h>
98 #else
99 #ifdef HAVE_STRINGS_H
100 #include <strings.h>
101 #endif
102 #endif
103 # include "rx.h"
104 # include "rx_user.h"
105 # include "rx_clock.h"
106 # include "rx_queue.h"
107 # include "rx_globals.h"
108 # include "rx_trace.h"
109 # include <afs/rxgen_consts.h>
110 #endif /* KERNEL */
111
112 int (*registerProgram) () = 0;
113 int (*swapNameProgram) () = 0;
114
115 /* Local static routines */
116 static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
117 #ifdef RX_ENABLE_LOCKS
118 static void rxi_SetAcksInTransmitQueue(register struct rx_call *call);
119 #endif
120
121 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
122 struct rx_tq_debug {
123     afs_int32 rxi_start_aborted;        /* rxi_start awoke after rxi_Send in error. */
124     afs_int32 rxi_start_in_error;
125 } rx_tq_debug;
126 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
127
128 /*
129  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
130  * currently allocated within rx.  This number is used to allocate the
131  * memory required to return the statistics when queried.
132  */
133
134 static unsigned int rxi_rpc_peer_stat_cnt;
135
136 /*
137  * rxi_rpc_process_stat_cnt counts the total number of local process stat
138  * structures currently allocated within rx.  The number is used to allocate
139  * the memory required to return the statistics when queried.
140  */
141
142 static unsigned int rxi_rpc_process_stat_cnt;
143
144 #if !defined(offsetof)
145 #include <stddef.h>             /* for definition of offsetof() */
146 #endif
147
148 #ifdef AFS_PTHREAD_ENV
149 #include <assert.h>
150
151 /*
152  * Use procedural initialization of mutexes/condition variables
153  * to ease NT porting
154  */
155
156 extern pthread_mutex_t rx_stats_mutex;
157 extern pthread_mutex_t des_init_mutex;
158 extern pthread_mutex_t des_random_mutex;
159 extern pthread_mutex_t rx_clock_mutex;
160 extern pthread_mutex_t rxi_connCacheMutex;
161 extern pthread_mutex_t rx_event_mutex;
162 extern pthread_mutex_t osi_malloc_mutex;
163 extern pthread_mutex_t event_handler_mutex;
164 extern pthread_mutex_t listener_mutex;
165 extern pthread_mutex_t rx_if_init_mutex;
166 extern pthread_mutex_t rx_if_mutex;
167 extern pthread_mutex_t rxkad_client_uid_mutex;
168 extern pthread_mutex_t rxkad_random_mutex;
169
170 extern pthread_cond_t rx_event_handler_cond;
171 extern pthread_cond_t rx_listener_cond;
172
173 static pthread_mutex_t epoch_mutex;
174 static pthread_mutex_t rx_init_mutex;
175 static pthread_mutex_t rx_debug_mutex;
176
177 static void
178 rxi_InitPthread(void)
179 {
180     assert(pthread_mutex_init(&rx_clock_mutex, (const pthread_mutexattr_t *)0)
181            == 0);
182     assert(pthread_mutex_init(&rx_stats_mutex, (const pthread_mutexattr_t *)0)
183            == 0);
184     assert(pthread_mutex_init
185            (&rxi_connCacheMutex, (const pthread_mutexattr_t *)0) == 0);
186     assert(pthread_mutex_init(&rx_init_mutex, (const pthread_mutexattr_t *)0)
187            == 0);
188     assert(pthread_mutex_init(&epoch_mutex, (const pthread_mutexattr_t *)0) ==
189            0);
190     assert(pthread_mutex_init(&rx_event_mutex, (const pthread_mutexattr_t *)0)
191            == 0);
192     assert(pthread_mutex_init(&des_init_mutex, (const pthread_mutexattr_t *)0)
193            == 0);
194     assert(pthread_mutex_init
195            (&des_random_mutex, (const pthread_mutexattr_t *)0) == 0);
196     assert(pthread_mutex_init
197            (&osi_malloc_mutex, (const pthread_mutexattr_t *)0) == 0);
198     assert(pthread_mutex_init
199            (&event_handler_mutex, (const pthread_mutexattr_t *)0) == 0);
200     assert(pthread_mutex_init(&listener_mutex, (const pthread_mutexattr_t *)0)
201            == 0);
202     assert(pthread_mutex_init
203            (&rx_if_init_mutex, (const pthread_mutexattr_t *)0) == 0);
204     assert(pthread_mutex_init(&rx_if_mutex, (const pthread_mutexattr_t *)0) ==
205            0);
206     assert(pthread_mutex_init
207            (&rxkad_client_uid_mutex, (const pthread_mutexattr_t *)0) == 0);
208     assert(pthread_mutex_init
209            (&rxkad_random_mutex, (const pthread_mutexattr_t *)0) == 0);
210     assert(pthread_mutex_init(&rx_debug_mutex, (const pthread_mutexattr_t *)0)
211            == 0);
212
213     assert(pthread_cond_init
214            (&rx_event_handler_cond, (const pthread_condattr_t *)0) == 0);
215     assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
216            == 0);
217     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
218     assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
219  
220     rxkad_global_stats_init();
221 }
222
223 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
224 #define INIT_PTHREAD_LOCKS \
225 assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
226 /*
227  * The rx_stats_mutex mutex protects the following global variables:
228  * rxi_dataQuota
229  * rxi_minDeficit
230  * rxi_availProcs
231  * rxi_totalMin
232  * rxi_lowConnRefCount
233  * rxi_lowPeerRefCount
234  * rxi_nCalls
235  * rxi_Alloccnt
236  * rxi_Allocsize
237  * rx_nFreePackets
238  * rx_tq_debug
239  * rx_stats
240  */
241 #else
242 #define INIT_PTHREAD_LOCKS
243 #endif
244
245
246 /* Variables for handling the minProcs implementation.  availProcs gives the
247  * number of threads available in the pool at this moment (not counting dudes
248  * executing right now).  totalMin gives the total number of procs required
249  * for handling all minProcs requests.  minDeficit is a dynamic variable
250  * tracking the # of procs required to satisfy all of the remaining minProcs
251  * demands.
252  * For fine grain locking to work, the quota check and the reservation of
253  * a server thread has to come while rxi_availProcs and rxi_minDeficit
254  * are locked. To this end, the code has been modified under #ifdef
255  * RX_ENABLE_LOCKS so that quota checks and reservation occur at the
256  * same time. A new function, ReturnToServerPool() returns the allocation.
257  * 
258  * A call can be on several queue's (but only one at a time). When
259  * rxi_ResetCall wants to remove the call from a queue, it has to ensure
260  * that no one else is touching the queue. To this end, we store the address
261  * of the queue lock in the call structure (under the call lock) when we
262  * put the call on a queue, and we clear the call_queue_lock when the
263  * call is removed from a queue (once the call lock has been obtained).
264  * This allows rxi_ResetCall to safely synchronize with others wishing
265  * to manipulate the queue.
266  */
267
268 #ifdef RX_ENABLE_LOCKS
269 static afs_kmutex_t rx_rpc_stats;
270 void rxi_StartUnlocked();
271 #endif
272
273 /* We keep a "last conn pointer" in rxi_FindConnection. The odds are 
274 ** pretty good that the next packet coming in is from the same connection 
275 ** as the last packet, since we're send multiple packets in a transmit window.
276 */
277 struct rx_connection *rxLastConn = 0;
278
279 #ifdef RX_ENABLE_LOCKS
280 /* The locking hierarchy for rx fine grain locking is composed of these
281  * tiers:
282  *
283  * rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
284  * conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
285  * call->lock - locks call data fields.
286  * These are independent of each other:
287  *      rx_freeCallQueue_lock
288  *      rxi_keyCreate_lock
289  * rx_serverPool_lock
290  * freeSQEList_lock
291  *
292  * serverQueueEntry->lock
293  * rx_rpc_stats
294  * rx_peerHashTable_lock - locked under rx_connHashTable_lock
295  * peer->lock - locks peer data fields.
296  * conn_data_lock - that more than one thread is not updating a conn data
297  *                  field at the same time.
298  * rx_freePktQ_lock
299  *
300  * lowest level:
301  *      multi_handle->lock
302  *      rxevent_lock
303  *      rx_stats_mutex
304  *
305  * Do we need a lock to protect the peer field in the conn structure?
306  *      conn->peer was previously a constant for all intents and so has no
307  *      lock protecting this field. The multihomed client delta introduced
308  *      a RX code change : change the peer field in the connection structure
309  *      to that remote inetrface from which the last packet for this
310  *      connection was sent out. This may become an issue if further changes
311  *      are made.
312  */
313 #define SET_CALL_QUEUE_LOCK(C, L) (C)->call_queue_lock = (L)
314 #define CLEAR_CALL_QUEUE_LOCK(C) (C)->call_queue_lock = NULL
315 #ifdef RX_LOCKS_DB
316 /* rxdb_fileID is used to identify the lock location, along with line#. */
317 static int rxdb_fileID = RXDB_FILE_RX;
318 #endif /* RX_LOCKS_DB */
319 #else /* RX_ENABLE_LOCKS */
320 #define SET_CALL_QUEUE_LOCK(C, L)
321 #define CLEAR_CALL_QUEUE_LOCK(C)
322 #endif /* RX_ENABLE_LOCKS */
323 struct rx_serverQueueEntry *rx_waitForPacket = 0;
324 struct rx_serverQueueEntry *rx_waitingForPacket = 0;
325
326 /* ------------Exported Interfaces------------- */
327
328 /* This function allows rxkad to set the epoch to a suitably random number
329  * which rx_NewConnection will use in the future.  The principle purpose is to
330  * get rxnull connections to use the same epoch as the rxkad connections do, at
331  * least once the first rxkad connection is established.  This is important now
332  * that the host/port addresses aren't used in FindConnection: the uniqueness
333  * of epoch/cid matters and the start time won't do. */
334
335 #ifdef AFS_PTHREAD_ENV
336 /*
337  * This mutex protects the following global variables:
338  * rx_epoch
339  */
340
341 #define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0)
342 #define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0)
343 #else
344 #define LOCK_EPOCH
345 #define UNLOCK_EPOCH
346 #endif /* AFS_PTHREAD_ENV */
347
348 void
349 rx_SetEpoch(afs_uint32 epoch)
350 {
351     LOCK_EPOCH;
352     rx_epoch = epoch;
353     UNLOCK_EPOCH;
354 }
355
356 /* Initialize rx.  A port number may be mentioned, in which case this
357  * becomes the default port number for any service installed later.
358  * If 0 is provided for the port number, a random port will be chosen
359  * by the kernel.  Whether this will ever overlap anything in
360  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
361  * error. */
362 static int rxinit_status = 1;
363 #ifdef AFS_PTHREAD_ENV
364 /*
365  * This mutex protects the following global variables:
366  * rxinit_status
367  */
368
369 #define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0)
370 #define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0)
371 #else
372 #define LOCK_RX_INIT
373 #define UNLOCK_RX_INIT
374 #endif
375
376 /*
377  * Now, rx_InitHost is just a stub for rx_InitAddrs
378  */
379
380 int
381 rx_InitHost(u_int host, u_int port)
382 {
383     struct sockaddr_storage saddr;
384     int type = SOCK_DGRAM, len = sizeof(struct sockaddr_in);
385
386     memset((void *) &saddr, 0, sizeof(saddr));
387     rx_ssfamily(&saddr) = AF_INET;
388     ((struct sockaddr_in *) &saddr)->sin_addr.s_addr = host;
389     ((struct sockaddr_in *) &saddr)->sin_port = htons(port);
390 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
391     ((struct sockaddr_in *) &saddr)->sin_len = sizeof(struct sockaddr_in);
392 #endif
393     return rx_InitAddrs(&saddr, &type, &len, 1);
394 }
395
396 /*
397  * New API: rx_InitAddrs(struct sockaddr_storage *, int *, int)
398  *
399  * Arguments:
400  *
401  * struct sockaddr_storage - array of struct sockaddr_storage elements,
402  *                           each one listing an interface/protocol to
403  *                           be listened on.
404  * int *                   - array of integers listing the socket type
405  *                           (SOCK_STREAM or SOCK_DGRAM) to be used
406  *                           by the corresponding struct sockaddr_storage
407  * int *                   - array of integers listing saddr sizes
408  * int                     - Number of elements in sockaddr_storage array.
409  *
410  * Note that in general only servers should call this function; clients
411  * should (for now) continue to call rx_Init().
412  */
413
414 int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
415                  int nelem)
416 {
417 #ifdef KERNEL
418     osi_timeval_t tv;
419 #else /* KERNEL */
420     struct timeval tv;
421 #endif /* KERNEL */
422     char *htable, *ptable;
423     int tmp_status, i;
424
425 #if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
426     __djgpp_set_quiet_socket(1);
427 #endif
428
429     SPLVAR;
430
431     INIT_PTHREAD_LOCKS;
432     LOCK_RX_INIT;
433     if (rxinit_status == 0) {
434         tmp_status = rxinit_status;
435         UNLOCK_RX_INIT;
436         return tmp_status;      /* Already started; return previous error code. */
437     }
438 #ifdef RXDEBUG
439     rxi_DebugInit();
440 #endif
441 #ifdef AFS_NT40_ENV
442     if (afs_winsockInit() < 0)
443         return -1;
444 #endif
445
446 #ifndef KERNEL
447     /*
448      * Initialize anything necessary to provide a non-premptive threading
449      * environment.
450      */
451     rxi_InitializeThreadSupport();
452 #endif
453
454     /* Allocate and initialize a socket for client and perhaps server
455      * connections. */
456     
457     rx_socket = -1;
458     rx_port = 0;
459
460     for (i = 0; i < nelem; i++) {
461         switch (types[i]) {
462         case SOCK_DGRAM:
463             rx_socket = rxi_GetHostUDPSocket(&saddrs[i], salens[i]);
464             if (rx_socket == OSI_NULLSOCKET) {
465                 UNLOCK_RX_INIT;
466                 return RX_ADDRINUSE;
467             }
468             rx_port = rx_ss2pn(&saddrs[i]);
469             break;
470         default:
471                 return RX_INVALID_OPERATION;
472         }
473
474     }
475
476 #ifdef  RX_ENABLE_LOCKS
477 #ifdef RX_LOCKS_DB
478     rxdb_init();
479 #endif /* RX_LOCKS_DB */
480     MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
481     MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
482     MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0);
483     MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0);
484     MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock", MUTEX_DEFAULT,
485                0);
486     CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv", CV_DEFAULT,
487             0);
488     MUTEX_INIT(&rx_peerHashTable_lock, "rx_peerHashTable_lock", MUTEX_DEFAULT,
489                0);
490     MUTEX_INIT(&rx_connHashTable_lock, "rx_connHashTable_lock", MUTEX_DEFAULT,
491                0);
492     MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
493 #ifndef KERNEL
494     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
495 #endif /* !KERNEL */
496 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
497     if (!uniprocessor)
498         rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER - 10, "rx_sleepLock");
499 #endif /* KERNEL && AFS_HPUX110_ENV */
500 #endif /* RX_ENABLE_LOCKS */
501
502     rxi_nCalls = 0;
503     rx_connDeadTime = 12;
504     rx_tranquil = 0;            /* reset flag */
505     memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
506     htable = (char *)
507         osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
508     PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *));     /* XXXXX */
509     memset(htable, 0, rx_hashTableSize * sizeof(struct rx_connection *));
510     ptable = (char *)osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
511     PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *));   /* XXXXX */
512     memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
513
514     /* Malloc up a bunch of packets & buffers */
515     rx_nFreePackets = 0;
516     queue_Init(&rx_freePacketQueue);
517     rxi_NeedMorePackets = FALSE;
518 #ifdef RX_ENABLE_TSFPQ
519     rx_nPackets = 0;    /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
520     rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
521 #else /* RX_ENABLE_TSFPQ */
522     rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;   /* fudge */
523     rxi_MorePackets(rx_nPackets);
524 #endif /* RX_ENABLE_TSFPQ */
525     rx_CheckPackets();
526
527     NETPRI;
528
529     clock_Init();
530
531 #if defined(AFS_NT40_ENV) && !defined(AFS_PTHREAD_ENV)
532     tv.tv_sec = clock_now.sec;
533     tv.tv_usec = clock_now.usec;
534     srand((unsigned int)tv.tv_usec);
535 #else
536     osi_GetTime(&tv);
537 #endif
538
539     if (! rx_port) {
540 #if defined(KERNEL) && !defined(UKERNEL)
541         /* Really, this should never happen in a real kernel */
542         rx_port = 0;
543 #else
544         struct sockaddr_storage sn;
545         socklen_t addrlen = sizeof(sn);
546         if (getsockname((int)rx_socket, (struct sockaddr *)&sn, &addrlen)) {
547             rx_Finalize();
548             return -1;
549         }
550         rx_port = rx_ss2pn(&sn);
551 #endif
552     }
553     rx_stats.minRtt.sec = 9999999;
554 #ifdef  KERNEL
555     rx_SetEpoch(tv.tv_sec | 0x80000000);
556 #else
557     rx_SetEpoch(tv.tv_sec);     /* Start time of this package, rxkad
558                                  * will provide a randomer value. */
559 #endif
560     MUTEX_ENTER(&rx_stats_mutex);
561     rxi_dataQuota += rx_extraQuota;     /* + extra pkts caller asked to rsrv */
562     MUTEX_EXIT(&rx_stats_mutex);
563     /* *Slightly* random start time for the cid.  This is just to help
564      * out with the hashing function at the peer */
565     rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
566     rx_connHashTable = (struct rx_connection **)htable;
567     rx_peerHashTable = (struct rx_peer **)ptable;
568
569     rx_lastAckDelay.sec = 0;
570     rx_lastAckDelay.usec = 400000;      /* 400 milliseconds */
571     rx_hardAckDelay.sec = 0;
572     rx_hardAckDelay.usec = 100000;      /* 100 milliseconds */
573     rx_softAckDelay.sec = 0;
574     rx_softAckDelay.usec = 100000;      /* 100 milliseconds */
575
576     rxevent_Init(20, rxi_ReScheduleEvents);
577
578     /* Initialize various global queues */
579     queue_Init(&rx_idleServerQueue);
580     queue_Init(&rx_incomingCallQueue);
581     queue_Init(&rx_freeCallQueue);
582
583 #if defined(AFS_NT40_ENV) && !defined(KERNEL)
584     /* Initialize our list of usable IP addresses. */
585     rx_GetIFInfo();
586 #endif
587
588     /* Start listener process (exact function is dependent on the
589      * implementation environment--kernel or user space) */
590     rxi_StartListener();
591
592     USERPRI;
593     tmp_status = rxinit_status = 0;
594     UNLOCK_RX_INIT;
595     return tmp_status;
596 }
597
598 int
599 rx_Init(u_int port)
600 {
601     return rx_InitHost(htonl(INADDR_ANY), port);
602 }
603
604
605 /* called with unincremented nRequestsRunning to see if it is OK to start
606  * a new thread in this service.  Could be "no" for two reasons: over the
607  * max quota, or would prevent others from reaching their min quota.
608  */
609 #ifdef RX_ENABLE_LOCKS
610 /* This verion of QuotaOK reserves quota if it's ok while the
611  * rx_serverPool_lock is held.  Return quota using ReturnToServerPool().
612  */
613 static int
614 QuotaOK(register struct rx_service *aservice)
615 {
616     /* check if over max quota */
617     if (aservice->nRequestsRunning >= aservice->maxProcs) {
618         return 0;
619     }
620
621     /* under min quota, we're OK */
622     /* otherwise, can use only if there are enough to allow everyone
623      * to go to their min quota after this guy starts.
624      */
625     MUTEX_ENTER(&rx_stats_mutex);
626     if ((aservice->nRequestsRunning < aservice->minProcs)
627         || (rxi_availProcs > rxi_minDeficit)) {
628         aservice->nRequestsRunning++;
629         /* just started call in minProcs pool, need fewer to maintain
630          * guarantee */
631         if (aservice->nRequestsRunning <= aservice->minProcs)
632             rxi_minDeficit--;
633         rxi_availProcs--;
634         MUTEX_EXIT(&rx_stats_mutex);
635         return 1;
636     }
637     MUTEX_EXIT(&rx_stats_mutex);
638
639     return 0;
640 }
641
642 static void
643 ReturnToServerPool(register struct rx_service *aservice)
644 {
645     aservice->nRequestsRunning--;
646     MUTEX_ENTER(&rx_stats_mutex);
647     if (aservice->nRequestsRunning < aservice->minProcs)
648         rxi_minDeficit++;
649     rxi_availProcs++;
650     MUTEX_EXIT(&rx_stats_mutex);
651 }
652
653 #else /* RX_ENABLE_LOCKS */
654 static int
655 QuotaOK(register struct rx_service *aservice)
656 {
657     int rc = 0;
658     /* under min quota, we're OK */
659     if (aservice->nRequestsRunning < aservice->minProcs)
660         return 1;
661
662     /* check if over max quota */
663     if (aservice->nRequestsRunning >= aservice->maxProcs)
664         return 0;
665
666     /* otherwise, can use only if there are enough to allow everyone
667      * to go to their min quota after this guy starts.
668      */
669     if (rxi_availProcs > rxi_minDeficit)
670         rc = 1;
671     return rc;
672 }
673 #endif /* RX_ENABLE_LOCKS */
674
675 #ifndef KERNEL
676 /* Called by rx_StartServer to start up lwp's to service calls.
677    NExistingProcs gives the number of procs already existing, and which
678    therefore needn't be created. */
679 void
680 rxi_StartServerProcs(int nExistingProcs)
681 {
682     register struct rx_service *service;
683     register int i;
684     int maxdiff = 0;
685     int nProcs = 0;
686
687     /* For each service, reserve N processes, where N is the "minimum"
688      * number of processes that MUST be able to execute a request in parallel,
689      * at any time, for that process.  Also compute the maximum difference
690      * between any service's maximum number of processes that can run
691      * (i.e. the maximum number that ever will be run, and a guarantee
692      * that this number will run if other services aren't running), and its
693      * minimum number.  The result is the extra number of processes that
694      * we need in order to provide the latter guarantee */
695     for (i = 0; i < RX_MAX_SERVICES; i++) {
696         int diff;
697         service = rx_services[i];
698         if (service == (struct rx_service *)0)
699             break;
700         nProcs += service->minProcs;
701         diff = service->maxProcs - service->minProcs;
702         if (diff > maxdiff)
703             maxdiff = diff;
704     }
705     nProcs += maxdiff;          /* Extra processes needed to allow max number requested to run in any given service, under good conditions */
706     nProcs -= nExistingProcs;   /* Subtract the number of procs that were previously created for use as server procs */
707     for (i = 0; i < nProcs; i++) {
708         rxi_StartServerProc(rx_ServerProc, rx_stackSize);
709     }
710 }
711 #endif /* KERNEL */
712
713 #ifdef AFS_NT40_ENV
714 /* This routine is only required on Windows */
715 void
716 rx_StartClientThread(void)
717 {
718 #ifdef AFS_PTHREAD_ENV
719     int pid;
720     pid = (int) pthread_self();
721 #endif /* AFS_PTHREAD_ENV */
722 }
723 #endif /* AFS_NT40_ENV */
724
725 /* This routine must be called if any services are exported.  If the
726  * donateMe flag is set, the calling process is donated to the server
727  * process pool */
728 void
729 rx_StartServer(int donateMe)
730 {
731     register struct rx_service *service;
732     register int i;
733     SPLVAR;
734     clock_NewTime();
735
736     NETPRI;
737     /* Start server processes, if necessary (exact function is dependent
738      * on the implementation environment--kernel or user space).  DonateMe
739      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
740      * case, one less new proc will be created rx_StartServerProcs.
741      */
742     rxi_StartServerProcs(donateMe);
743
744     /* count up the # of threads in minProcs, and add set the min deficit to
745      * be that value, too.
746      */
747     for (i = 0; i < RX_MAX_SERVICES; i++) {
748         service = rx_services[i];
749         if (service == (struct rx_service *)0)
750             break;
751         MUTEX_ENTER(&rx_stats_mutex);
752         rxi_totalMin += service->minProcs;
753         /* below works even if a thread is running, since minDeficit would
754          * still have been decremented and later re-incremented.
755          */
756         rxi_minDeficit += service->minProcs;
757         MUTEX_EXIT(&rx_stats_mutex);
758     }
759
760     /* Turn on reaping of idle server connections */
761     rxi_ReapConnections();
762
763     USERPRI;
764
765     if (donateMe) {
766 #ifndef AFS_NT40_ENV
767 #ifndef KERNEL
768         char name[32];
769         static int nProcs;
770 #ifdef AFS_PTHREAD_ENV
771         pid_t pid;
772         pid = (pid_t) pthread_self();
773 #else /* AFS_PTHREAD_ENV */
774         PROCESS pid;
775         LWP_CurrentProcess(&pid);
776 #endif /* AFS_PTHREAD_ENV */
777
778         sprintf(name, "srv_%d", ++nProcs);
779         if (registerProgram)
780             (*registerProgram) (pid, name);
781 #endif /* KERNEL */
782 #endif /* AFS_NT40_ENV */
783         rx_ServerProc();        /* Never returns */
784     }
785 #ifdef RX_ENABLE_TSFPQ
786     /* no use leaving packets around in this thread's local queue if
787      * it isn't getting donated to the server thread pool. 
788      */
789     rxi_FlushLocalPacketsTSFPQ();
790 #endif /* RX_ENABLE_TSFPQ */
791     return;
792 }
793
794 /*
795  * Now, rx_NewConnection is just a stub for rx_NewConnectionAddrs()
796  */
797
798 struct rx_connection *
799 rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
800                  register struct rx_securityClass *securityObject,
801                  int serviceSecurityIndex)
802 {
803     struct sockaddr_in sin;
804     int len = sizeof(sin), type = SOCK_DGRAM;
805
806     memset((void *) &sin, 0, sizeof(sin));
807
808     sin.sin_family = AF_INET;
809     sin.sin_addr.s_addr = shost;
810     sin.sin_port = sport;
811
812     return rx_NewConnectionAddrs((struct sockaddr_storage *) &sin, &type,
813                                  &len, 1, sservice, securityObject,
814                                  serviceSecurityIndex);
815 }
816
817 /* Create a new client connection to the specified service, using the
818  * specified security object to implement the security model for this
819  * connection
820  *
821  * This follows the same logic as rx_InitAddrs() for the first four
822  * arguments.
823  */
824 struct rx_connection *
825 rx_NewConnectionAddrs(struct sockaddr_storage *saddr, int *type, int *slen,
826                       int nelem, u_short sservice,
827                       struct rx_securityClass *securityObject,
828                       int serviceSecurityIndex)
829 {
830     int hashindex, i;
831     afs_int32 cid;
832     register struct rx_connection *conn;
833
834     SPLVAR;
835
836     clock_NewTime();
837     dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(rx_ss2v4addr(saddr)), ntohs(rx_ss2pn(saddr)), sservice, securityObject, serviceSecurityIndex));
838
839     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
840      * the case of kmem_alloc? */
841     conn = rxi_AllocConnection();
842 #ifdef  RX_ENABLE_LOCKS
843     MUTEX_INIT(&conn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
844     MUTEX_INIT(&conn->conn_data_lock, "conn call lock", MUTEX_DEFAULT, 0);
845     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
846 #endif
847     NETPRI;
848     MUTEX_ENTER(&rx_connHashTable_lock);
849     cid = (rx_nextCid += RX_MAXCALLS);
850     conn->type = RX_CLIENT_CONNECTION;
851     conn->cid = cid;
852     conn->epoch = rx_epoch;
853     /*
854      * Right now we're going to just call rxi_FindPeer for UDP connections
855      * We're only going to support one.
856      */
857     for (i = 0; i < nelem; i++) {
858         if (type[i] == SOCK_DGRAM) {
859             conn->peer = rxi_FindPeer(&saddr[i], slen[i], type[i], 0, 1);
860             break;
861         }
862     }
863     conn->serviceId = sservice;
864     conn->securityObject = securityObject;
865     /* This doesn't work in all compilers with void (they're buggy), so fake it
866      * with VOID */
867     conn->securityData = (VOID *) 0;
868     conn->securityIndex = serviceSecurityIndex;
869     rx_SetConnDeadTime(conn, rx_connDeadTime);
870     conn->ackRate = RX_FAST_ACK_RATE;
871     conn->nSpecific = 0;
872     conn->specific = NULL;
873     conn->challengeEvent = NULL;
874     conn->delayedAbortEvent = NULL;
875     conn->abortCount = 0;
876     conn->error = 0;
877
878     RXS_NewConnection(securityObject, conn);
879     hashindex =
880         CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
881
882     conn->refCount++;           /* no lock required since only this thread knows... */
883     conn->next = rx_connHashTable[hashindex];
884     rx_connHashTable[hashindex] = conn;
885     MUTEX_ENTER(&rx_stats_mutex);
886     rx_stats.nClientConns++;
887     MUTEX_EXIT(&rx_stats_mutex);
888
889     MUTEX_EXIT(&rx_connHashTable_lock);
890     USERPRI;
891     return conn;
892 }
893
894 void
895 rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
896 {
897     /* The idea is to set the dead time to a value that allows several
898      * keepalives to be dropped without timing out the connection. */
899     conn->secondsUntilDead = MAX(seconds, 6);
900     conn->secondsUntilPing = conn->secondsUntilDead / 6;
901 }
902
903 int rxi_lowPeerRefCount = 0;
904 int rxi_lowConnRefCount = 0;
905
906 /*
907  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
908  * NOTE: must not be called with rx_connHashTable_lock held.
909  */
910 void
911 rxi_CleanupConnection(struct rx_connection *conn)
912 {
913     /* Notify the service exporter, if requested, that this connection
914      * is being destroyed */
915     if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
916         (*conn->service->destroyConnProc) (conn);
917
918     /* Notify the security module that this connection is being destroyed */
919     RXS_DestroyConnection(conn->securityObject, conn);
920
921     /* If this is the last connection using the rx_peer struct, set its
922      * idle time to now. rxi_ReapConnections will reap it if it's still
923      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
924      */
925     MUTEX_ENTER(&rx_peerHashTable_lock);
926     if (conn->peer->refCount < 2) {
927         conn->peer->idleWhen = clock_Sec();
928         if (conn->peer->refCount < 1) {
929             conn->peer->refCount = 1;
930             MUTEX_ENTER(&rx_stats_mutex);
931             rxi_lowPeerRefCount++;
932             MUTEX_EXIT(&rx_stats_mutex);
933         }
934     }
935     conn->peer->refCount--;
936     MUTEX_EXIT(&rx_peerHashTable_lock);
937
938     MUTEX_ENTER(&rx_stats_mutex);
939     if (conn->type == RX_SERVER_CONNECTION)
940         rx_stats.nServerConns--;
941     else
942         rx_stats.nClientConns--;
943     MUTEX_EXIT(&rx_stats_mutex);
944
945 #ifndef KERNEL
946     if (conn->specific) {
947         int i;
948         for (i = 0; i < conn->nSpecific; i++) {
949             if (conn->specific[i] && rxi_keyCreate_destructor[i])
950                 (*rxi_keyCreate_destructor[i]) (conn->specific[i]);
951             conn->specific[i] = NULL;
952         }
953         free(conn->specific);
954     }
955     conn->specific = NULL;
956     conn->nSpecific = 0;
957 #endif /* !KERNEL */
958
959     MUTEX_DESTROY(&conn->conn_call_lock);
960     MUTEX_DESTROY(&conn->conn_data_lock);
961     CV_DESTROY(&conn->conn_call_cv);
962
963     rxi_FreeConnection(conn);
964 }
965
966 /* Destroy the specified connection */
967 void
968 rxi_DestroyConnection(register struct rx_connection *conn)
969 {
970     MUTEX_ENTER(&rx_connHashTable_lock);
971     rxi_DestroyConnectionNoLock(conn);
972     /* conn should be at the head of the cleanup list */
973     if (conn == rx_connCleanup_list) {
974         rx_connCleanup_list = rx_connCleanup_list->next;
975         MUTEX_EXIT(&rx_connHashTable_lock);
976         rxi_CleanupConnection(conn);
977     }
978 #ifdef RX_ENABLE_LOCKS
979     else {
980         MUTEX_EXIT(&rx_connHashTable_lock);
981     }
982 #endif /* RX_ENABLE_LOCKS */
983 }
984
985 static void
986 rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
987 {
988     register struct rx_connection **conn_ptr;
989     register int havecalls = 0;
990     struct rx_packet *packet;
991     int i;
992     SPLVAR;
993
994     clock_NewTime();
995
996     NETPRI;
997     MUTEX_ENTER(&conn->conn_data_lock);
998     if (conn->refCount > 0)
999         conn->refCount--;
1000     else {
1001         MUTEX_ENTER(&rx_stats_mutex);
1002         rxi_lowConnRefCount++;
1003         MUTEX_EXIT(&rx_stats_mutex);
1004     }
1005
1006     if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
1007         /* Busy; wait till the last guy before proceeding */
1008         MUTEX_EXIT(&conn->conn_data_lock);
1009         USERPRI;
1010         return;
1011     }
1012
1013     /* If the client previously called rx_NewCall, but it is still
1014      * waiting, treat this as a running call, and wait to destroy the
1015      * connection later when the call completes. */
1016     if ((conn->type == RX_CLIENT_CONNECTION)
1017         && (conn->flags & RX_CONN_MAKECALL_WAITING)) {
1018         conn->flags |= RX_CONN_DESTROY_ME;
1019         MUTEX_EXIT(&conn->conn_data_lock);
1020         USERPRI;
1021         return;
1022     }
1023     MUTEX_EXIT(&conn->conn_data_lock);
1024
1025     /* Check for extant references to this connection */
1026     for (i = 0; i < RX_MAXCALLS; i++) {
1027         register struct rx_call *call = conn->call[i];
1028         if (call) {
1029             havecalls = 1;
1030             if (conn->type == RX_CLIENT_CONNECTION) {
1031                 MUTEX_ENTER(&call->lock);
1032                 if (call->delayedAckEvent) {
1033                     /* Push the final acknowledgment out now--there
1034                      * won't be a subsequent call to acknowledge the
1035                      * last reply packets */
1036                     rxevent_Cancel(call->delayedAckEvent, call,
1037                                    RX_CALL_REFCOUNT_DELAY);
1038                     if (call->state == RX_STATE_PRECALL
1039                         || call->state == RX_STATE_ACTIVE) {
1040                         rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1041                     } else {
1042                         rxi_AckAll(NULL, call, 0);
1043                     }
1044                 }
1045                 MUTEX_EXIT(&call->lock);
1046             }
1047         }
1048     }
1049 #ifdef RX_ENABLE_LOCKS
1050     if (!havecalls) {
1051         if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
1052             MUTEX_EXIT(&conn->conn_data_lock);
1053         } else {
1054             /* Someone is accessing a packet right now. */
1055             havecalls = 1;
1056         }
1057     }
1058 #endif /* RX_ENABLE_LOCKS */
1059
1060     if (havecalls) {
1061         /* Don't destroy the connection if there are any call
1062          * structures still in use */
1063         MUTEX_ENTER(&conn->conn_data_lock);
1064         conn->flags |= RX_CONN_DESTROY_ME;
1065         MUTEX_EXIT(&conn->conn_data_lock);
1066         USERPRI;
1067         return;
1068     }
1069
1070     if (conn->delayedAbortEvent) {
1071         rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
1072         packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
1073         if (packet) {
1074             MUTEX_ENTER(&conn->conn_data_lock);
1075             rxi_SendConnectionAbort(conn, packet, 0, 1);
1076             MUTEX_EXIT(&conn->conn_data_lock);
1077             rxi_FreePacket(packet);
1078         }
1079     }
1080
1081     /* Remove from connection hash table before proceeding */
1082     conn_ptr =
1083         &rx_connHashTable[CONN_HASH
1084                           (peer->host, peer->port, conn->cid, conn->epoch,
1085                            conn->type)];
1086     for (; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
1087         if (*conn_ptr == conn) {
1088             *conn_ptr = conn->next;
1089             break;
1090         }
1091     }
1092     /* if the conn that we are destroying was the last connection, then we
1093      * clear rxLastConn as well */
1094     if (rxLastConn == conn)
1095         rxLastConn = 0;
1096
1097     /* Make sure the connection is completely reset before deleting it. */
1098     /* get rid of pending events that could zap us later */
1099     if (conn->challengeEvent)
1100         rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
1101     if (conn->checkReachEvent)
1102         rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
1103
1104     /* Add the connection to the list of destroyed connections that
1105      * need to be cleaned up. This is necessary to avoid deadlocks
1106      * in the routines we call to inform others that this connection is
1107      * being destroyed. */
1108     conn->next = rx_connCleanup_list;
1109     rx_connCleanup_list = conn;
1110 }
1111
1112 /* Externally available version */
1113 void
1114 rx_DestroyConnection(register struct rx_connection *conn)
1115 {
1116     SPLVAR;
1117
1118     NETPRI;
1119     rxi_DestroyConnection(conn);
1120     USERPRI;
1121 }
1122
1123 void
1124 rx_GetConnection(register struct rx_connection *conn)
1125 {
1126     SPLVAR;
1127
1128     NETPRI;
1129     MUTEX_ENTER(&conn->conn_data_lock);
1130     conn->refCount++;
1131     MUTEX_EXIT(&conn->conn_data_lock);
1132     USERPRI;
1133 }
1134
1135 /* Start a new rx remote procedure call, on the specified connection.
1136  * If wait is set to 1, wait for a free call channel; otherwise return
1137  * 0.  Maxtime gives the maximum number of seconds this call may take,
1138  * after rx_MakeCall returns.  After this time interval, a call to any
1139  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
1140  * For fine grain locking, we hold the conn_call_lock in order to 
1141  * to ensure that we don't get signalle after we found a call in an active
1142  * state and before we go to sleep.
1143  */
1144 struct rx_call *
1145 rx_NewCall(register struct rx_connection *conn)
1146 {
1147     register int i;
1148     register struct rx_call *call;
1149     struct clock queueTime;
1150     SPLVAR;
1151
1152     clock_NewTime();
1153     dpf(("rx_MakeCall(conn %x)\n", conn));
1154
1155     NETPRI;
1156     clock_GetTime(&queueTime);
1157     MUTEX_ENTER(&conn->conn_call_lock);
1158
1159     /*
1160      * Check if there are others waiting for a new call.
1161      * If so, let them go first to avoid starving them.
1162      * This is a fairly simple scheme, and might not be
1163      * a complete solution for large numbers of waiters.
1164      * 
1165      * makeCallWaiters keeps track of the number of 
1166      * threads waiting to make calls and the 
1167      * RX_CONN_MAKECALL_WAITING flag bit is used to 
1168      * indicate that there are indeed calls waiting.
1169      * The flag is set when the waiter is incremented.
1170      * It is only cleared in rx_EndCall when 
1171      * makeCallWaiters is 0.  This prevents us from 
1172      * accidently destroying the connection while it
1173      * is potentially about to be used.
1174      */
1175     MUTEX_ENTER(&conn->conn_data_lock);
1176     if (conn->makeCallWaiters) {
1177         conn->flags |= RX_CONN_MAKECALL_WAITING;
1178         conn->makeCallWaiters++;
1179         MUTEX_EXIT(&conn->conn_data_lock);
1180
1181 #ifdef  RX_ENABLE_LOCKS
1182         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1183 #else
1184         osi_rxSleep(conn);
1185 #endif
1186         MUTEX_ENTER(&conn->conn_data_lock);
1187         conn->makeCallWaiters--;
1188     } 
1189     MUTEX_EXIT(&conn->conn_data_lock);
1190
1191     for (;;) {
1192         for (i = 0; i < RX_MAXCALLS; i++) {
1193             call = conn->call[i];
1194             if (call) {
1195                 MUTEX_ENTER(&call->lock);
1196                 if (call->state == RX_STATE_DALLY) {
1197                     rxi_ResetCall(call, 0);
1198                     (*call->callNumber)++;
1199                     break;
1200                 }
1201                 MUTEX_EXIT(&call->lock);
1202             } else {
1203                 call = rxi_NewCall(conn, i);
1204                 break;
1205             }
1206         }
1207         if (i < RX_MAXCALLS) {
1208             break;
1209         }
1210         MUTEX_ENTER(&conn->conn_data_lock);
1211         conn->flags |= RX_CONN_MAKECALL_WAITING;
1212         conn->makeCallWaiters++;
1213         MUTEX_EXIT(&conn->conn_data_lock);
1214
1215 #ifdef  RX_ENABLE_LOCKS
1216         CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
1217 #else
1218         osi_rxSleep(conn);
1219 #endif
1220         MUTEX_ENTER(&conn->conn_data_lock);
1221         conn->makeCallWaiters--;
1222         MUTEX_EXIT(&conn->conn_data_lock);
1223     }
1224     /*
1225      * Wake up anyone else who might be giving us a chance to
1226      * run (see code above that avoids resource starvation).
1227      */
1228 #ifdef  RX_ENABLE_LOCKS
1229     CV_BROADCAST(&conn->conn_call_cv);
1230 #else
1231     osi_rxWakeup(conn);
1232 #endif
1233
1234     CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1235
1236     /* Client is initially in send mode */
1237     call->state = RX_STATE_ACTIVE;
1238     call->error = conn->error;
1239     if (call->error)
1240         call->mode = RX_MODE_ERROR;
1241     else
1242         call->mode = RX_MODE_SENDING;
1243     
1244     /* remember start time for call in case we have hard dead time limit */
1245     call->queueTime = queueTime;
1246     clock_GetTime(&call->startTime);
1247     hzero(call->bytesSent);
1248     hzero(call->bytesRcvd);
1249
1250     /* Turn on busy protocol. */
1251     rxi_KeepAliveOn(call);
1252
1253     MUTEX_EXIT(&call->lock);
1254     MUTEX_EXIT(&conn->conn_call_lock);
1255     USERPRI;
1256
1257 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1258     /* Now, if TQ wasn't cleared earlier, do it now. */
1259     MUTEX_ENTER(&call->lock);
1260     while (call->flags & RX_CALL_TQ_BUSY) {
1261         call->flags |= RX_CALL_TQ_WAIT;
1262         call->tqWaiters++;
1263 #ifdef RX_ENABLE_LOCKS
1264         osirx_AssertMine(&call->lock, "rxi_Start lock4");
1265         CV_WAIT(&call->cv_tq, &call->lock);
1266 #else /* RX_ENABLE_LOCKS */
1267         osi_rxSleep(&call->tq);
1268 #endif /* RX_ENABLE_LOCKS */
1269         call->tqWaiters--;
1270         if (call->tqWaiters == 0) {
1271             call->flags &= ~RX_CALL_TQ_WAIT;
1272         }
1273     }
1274     if (call->flags & RX_CALL_TQ_CLEARME) {
1275         rxi_ClearTransmitQueue(call, 0);
1276         queue_Init(&call->tq);
1277     }
1278     MUTEX_EXIT(&call->lock);
1279 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1280
1281     return call;
1282 }
1283
1284 int
1285 rxi_HasActiveCalls(register struct rx_connection *aconn)
1286 {
1287     register int i;
1288     register struct rx_call *tcall;
1289     SPLVAR;
1290
1291     NETPRI;
1292     for (i = 0; i < RX_MAXCALLS; i++) {
1293         if ((tcall = aconn->call[i])) {
1294             if ((tcall->state == RX_STATE_ACTIVE)
1295                 || (tcall->state == RX_STATE_PRECALL)) {
1296                 USERPRI;
1297                 return 1;
1298             }
1299         }
1300     }
1301     USERPRI;
1302     return 0;
1303 }
1304
1305 int
1306 rxi_GetCallNumberVector(register struct rx_connection *aconn,
1307                         register afs_int32 * aint32s)
1308 {
1309     register int i;
1310     register struct rx_call *tcall;
1311     SPLVAR;
1312
1313     NETPRI;
1314     for (i = 0; i < RX_MAXCALLS; i++) {
1315         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1316             aint32s[i] = aconn->callNumber[i] + 1;
1317         else
1318             aint32s[i] = aconn->callNumber[i];
1319     }
1320     USERPRI;
1321     return 0;
1322 }
1323
1324 int
1325 rxi_SetCallNumberVector(register struct rx_connection *aconn,
1326                         register afs_int32 * aint32s)
1327 {
1328     register int i;
1329     register struct rx_call *tcall;
1330     SPLVAR;
1331
1332     NETPRI;
1333     for (i = 0; i < RX_MAXCALLS; i++) {
1334         if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
1335             aconn->callNumber[i] = aint32s[i] - 1;
1336         else
1337             aconn->callNumber[i] = aint32s[i];
1338     }
1339     USERPRI;
1340     return 0;
1341 }
1342
1343 /* Advertise a new service.  A service is named locally by a UDP port
1344  * number plus a 16-bit service id.  Returns (struct rx_service *) 0
1345  * on a failure. 
1346  *
1347      char *serviceName;  Name for identification purposes (e.g. the
1348                          service name might be used for probing for
1349                          statistics) */
1350 struct rx_service *
1351 rx_NewService(u_short port, u_short serviceId, char *serviceName,
1352               struct rx_securityClass **securityObjects, int nSecurityObjects,
1353               afs_int32(*serviceProc) (struct rx_call * acall))
1354 {
1355     osi_socket socket = OSI_NULLSOCKET;
1356     register struct rx_service *tservice;
1357     register int i;
1358     SPLVAR;
1359
1360     clock_NewTime();
1361
1362     if (serviceId == 0) {
1363         (osi_Msg
1364          "rx_NewService:  service id for service %s is not non-zero.\n",
1365          serviceName);
1366         return 0;
1367     }
1368     if (port == 0) {
1369         if (rx_port == 0) {
1370             (osi_Msg
1371              "rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n",
1372              serviceName);
1373             return 0;
1374         }
1375         port = rx_port;
1376         socket = rx_socket;
1377     }
1378
1379     tservice = rxi_AllocService();
1380     NETPRI;
1381     for (i = 0; i < RX_MAX_SERVICES; i++) {
1382         register struct rx_service *service = rx_services[i];
1383         if (service) {
1384             if (port == service->servicePort) {
1385                 if (service->serviceId == serviceId) {
1386                     /* The identical service has already been
1387                      * installed; if the caller was intending to
1388                      * change the security classes used by this
1389                      * service, he/she loses. */
1390                     (osi_Msg
1391                      "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n",
1392                      serviceName, serviceId, service->serviceName);
1393                     USERPRI;
1394                     rxi_FreeService(tservice);
1395                     return service;
1396                 }
1397                 /* Different service, same port: re-use the socket
1398                  * which is bound to the same port */
1399                 socket = service->socket;
1400             }
1401         } else {
1402             if (socket == OSI_NULLSOCKET) {
1403                 /* If we don't already have a socket (from another
1404                  * service on same port) get a new one */
1405                 struct sockaddr_in sin;
1406
1407                 memset((void *) &sin, 0, sizeof(sin));
1408                 sin.sin_family = AF_INET;
1409                 sin.sin_addr.s_addr = htonl(INADDR_ANY);
1410                 sin.sin_port = port;
1411 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
1412                 sin.sin_len = sizeof(sin);
1413 #endif
1414                 socket = rxi_GetHostUDPSocket((struct sockaddr_storage *) &sin,
1415                                               sizeof(sin));
1416                 if (socket == OSI_NULLSOCKET) {
1417                     USERPRI;
1418                     rxi_FreeService(tservice);
1419                     return 0;
1420                 }
1421             }
1422             service = tservice;
1423             service->socket = socket;
1424             service->servicePort = port;
1425             service->serviceId = serviceId;
1426             service->serviceName = serviceName;
1427             service->nSecurityObjects = nSecurityObjects;
1428             service->securityObjects = securityObjects;
1429             service->minProcs = 0;
1430             service->maxProcs = 1;
1431             service->idleDeadTime = 60;
1432             service->connDeadTime = rx_connDeadTime;
1433             service->executeRequestProc = serviceProc;
1434             service->checkReach = 0;
1435             rx_services[i] = service;   /* not visible until now */
1436             USERPRI;
1437             return service;
1438         }
1439     }
1440     USERPRI;
1441     rxi_FreeService(tservice);
1442     (osi_Msg "rx_NewService: cannot support > %d services\n",
1443      RX_MAX_SERVICES);
1444     return 0;
1445 }
1446
1447 /* Generic request processing loop. This routine should be called
1448  * by the implementation dependent rx_ServerProc. If socketp is
1449  * non-null, it will be set to the file descriptor that this thread
1450  * is now listening on. If socketp is null, this routine will never
1451  * returns. */
1452 void
1453 rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket * socketp)
1454 {
1455     register struct rx_call *call;
1456     register afs_int32 code;
1457     register struct rx_service *tservice = NULL;
1458
1459     for (;;) {
1460         if (newcall) {
1461             call = newcall;
1462             newcall = NULL;
1463         } else {
1464             call = rx_GetCall(threadID, tservice, socketp);
1465             if (socketp && *socketp != OSI_NULLSOCKET) {
1466                 /* We are now a listener thread */
1467                 return;
1468             }
1469         }
1470
1471         /* if server is restarting( typically smooth shutdown) then do not
1472          * allow any new calls.
1473          */
1474
1475         if (rx_tranquil && (call != NULL)) {
1476             SPLVAR;
1477
1478             NETPRI;
1479             MUTEX_ENTER(&call->lock);
1480
1481             rxi_CallError(call, RX_RESTARTING);
1482             rxi_SendCallAbort(call, (struct rx_packet *)0, 0, 0);
1483
1484             MUTEX_EXIT(&call->lock);
1485             USERPRI;
1486         }
1487 #ifdef  KERNEL
1488         if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1489 #ifdef RX_ENABLE_LOCKS
1490             AFS_GLOCK();
1491 #endif /* RX_ENABLE_LOCKS */
1492             afs_termState = AFSOP_STOP_AFS;
1493             afs_osi_Wakeup(&afs_termState);
1494 #ifdef RX_ENABLE_LOCKS
1495             AFS_GUNLOCK();
1496 #endif /* RX_ENABLE_LOCKS */
1497             return;
1498         }
1499 #endif
1500
1501         tservice = call->conn->service;
1502
1503         if (tservice->beforeProc)
1504             (*tservice->beforeProc) (call);
1505
1506         code = call->conn->service->executeRequestProc(call);
1507
1508         if (tservice->afterProc)
1509             (*tservice->afterProc) (call, code);
1510
1511         rx_EndCall(call, code);
1512         MUTEX_ENTER(&rx_stats_mutex);
1513         rxi_nCalls++;
1514         MUTEX_EXIT(&rx_stats_mutex);
1515     }
1516 }
1517
1518
1519 void
1520 rx_WakeupServerProcs(void)
1521 {
1522     struct rx_serverQueueEntry *np, *tqp;
1523     SPLVAR;
1524
1525     NETPRI;
1526     MUTEX_ENTER(&rx_serverPool_lock);
1527
1528 #ifdef RX_ENABLE_LOCKS
1529     if (rx_waitForPacket)
1530         CV_BROADCAST(&rx_waitForPacket->cv);
1531 #else /* RX_ENABLE_LOCKS */
1532     if (rx_waitForPacket)
1533         osi_rxWakeup(rx_waitForPacket);
1534 #endif /* RX_ENABLE_LOCKS */
1535     MUTEX_ENTER(&freeSQEList_lock);
1536     for (np = rx_FreeSQEList; np; np = tqp) {
1537         tqp = *(struct rx_serverQueueEntry **)np;
1538 #ifdef RX_ENABLE_LOCKS
1539         CV_BROADCAST(&np->cv);
1540 #else /* RX_ENABLE_LOCKS */
1541         osi_rxWakeup(np);
1542 #endif /* RX_ENABLE_LOCKS */
1543     }
1544     MUTEX_EXIT(&freeSQEList_lock);
1545     for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
1546 #ifdef RX_ENABLE_LOCKS
1547         CV_BROADCAST(&np->cv);
1548 #else /* RX_ENABLE_LOCKS */
1549         osi_rxWakeup(np);
1550 #endif /* RX_ENABLE_LOCKS */
1551     }
1552     MUTEX_EXIT(&rx_serverPool_lock);
1553     USERPRI;
1554 }
1555
1556 /* meltdown:
1557  * One thing that seems to happen is that all the server threads get
1558  * tied up on some empty or slow call, and then a whole bunch of calls
1559  * arrive at once, using up the packet pool, so now there are more 
1560  * empty calls.  The most critical resources here are server threads
1561  * and the free packet pool.  The "doreclaim" code seems to help in
1562  * general.  I think that eventually we arrive in this state: there
1563  * are lots of pending calls which do have all their packets present,
1564  * so they won't be reclaimed, are multi-packet calls, so they won't
1565  * be scheduled until later, and thus are tying up most of the free 
1566  * packet pool for a very long time.
1567  * future options:
1568  * 1.  schedule multi-packet calls if all the packets are present.  
1569  * Probably CPU-bound operation, useful to return packets to pool. 
1570  * Do what if there is a full window, but the last packet isn't here?
1571  * 3.  preserve one thread which *only* runs "best" calls, otherwise
1572  * it sleeps and waits for that type of call.
1573  * 4.  Don't necessarily reserve a whole window for each thread.  In fact, 
1574  * the current dataquota business is badly broken.  The quota isn't adjusted
1575  * to reflect how many packets are presently queued for a running call.
1576  * So, when we schedule a queued call with a full window of packets queued
1577  * up for it, that *should* free up a window full of packets for other 2d-class
1578  * calls to be able to use from the packet pool.  But it doesn't.
1579  *
1580  * NB.  Most of the time, this code doesn't run -- since idle server threads
1581  * sit on the idle server queue and are assigned by "...ReceivePacket" as soon
1582  * as a new call arrives.
1583  */
1584 /* Sleep until a call arrives.  Returns a pointer to the call, ready
1585  * for an rx_Read. */
1586 #ifdef RX_ENABLE_LOCKS
1587 struct rx_call *
1588 rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
1589 {
1590     struct rx_serverQueueEntry *sq;
1591     register struct rx_call *call = (struct rx_call *)0;
1592     struct rx_service *service = NULL;
1593     SPLVAR;
1594
1595     MUTEX_ENTER(&freeSQEList_lock);
1596
1597     if ((sq = rx_FreeSQEList)) {
1598         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1599         MUTEX_EXIT(&freeSQEList_lock);
1600     } else {                    /* otherwise allocate a new one and return that */
1601         MUTEX_EXIT(&freeSQEList_lock);
1602         sq = (struct rx_serverQueueEntry *)
1603             rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1604         MUTEX_INIT(&sq->lock, "server Queue lock", MUTEX_DEFAULT, 0);
1605         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1606     }
1607
1608     MUTEX_ENTER(&rx_serverPool_lock);
1609     if (cur_service != NULL) {
1610         ReturnToServerPool(cur_service);
1611     }
1612     while (1) {
1613         if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1614             register struct rx_call *tcall, *ncall, *choice2 = NULL;
1615
1616             /* Scan for eligible incoming calls.  A call is not eligible
1617              * if the maximum number of calls for its service type are
1618              * already executing */
1619             /* One thread will process calls FCFS (to prevent starvation),
1620              * while the other threads may run ahead looking for calls which
1621              * have all their input data available immediately.  This helps 
1622              * keep threads from blocking, waiting for data from the client. */
1623             for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1624                 service = tcall->conn->service;
1625                 if (!QuotaOK(service)) {
1626                     continue;
1627                 }
1628                 if (tno == rxi_fcfs_thread_num
1629                     || !tcall->queue_item_header.next) {
1630                     /* If we're the fcfs thread , then  we'll just use 
1631                      * this call. If we haven't been able to find an optimal 
1632                      * choice, and we're at the end of the list, then use a 
1633                      * 2d choice if one has been identified.  Otherwise... */
1634                     call = (choice2 ? choice2 : tcall);
1635                     service = call->conn->service;
1636                 } else if (!queue_IsEmpty(&tcall->rq)) {
1637                     struct rx_packet *rp;
1638                     rp = queue_First(&tcall->rq, rx_packet);
1639                     if (rp->header.seq == 1) {
1640                         if (!meltdown_1pkt
1641                             || (rp->header.flags & RX_LAST_PACKET)) {
1642                             call = tcall;
1643                         } else if (rxi_2dchoice && !choice2
1644                                    && !(tcall->flags & RX_CALL_CLEARED)
1645                                    && (tcall->rprev > rxi_HardAckRate)) {
1646                             choice2 = tcall;
1647                         } else
1648                             rxi_md2cnt++;
1649                     }
1650                 }
1651                 if (call) {
1652                     break;
1653                 } else {
1654                     ReturnToServerPool(service);
1655                 }
1656             }
1657         }
1658
1659         if (call) {
1660             queue_Remove(call);
1661             MUTEX_EXIT(&rx_serverPool_lock);
1662             MUTEX_ENTER(&call->lock);
1663
1664             if (call->flags & RX_CALL_WAIT_PROC) {
1665                 call->flags &= ~RX_CALL_WAIT_PROC;
1666                 MUTEX_ENTER(&rx_stats_mutex);
1667                 rx_nWaiting--;
1668                 MUTEX_EXIT(&rx_stats_mutex);
1669             }
1670
1671             if (call->state != RX_STATE_PRECALL || call->error) {
1672                 MUTEX_EXIT(&call->lock);
1673                 MUTEX_ENTER(&rx_serverPool_lock);
1674                 ReturnToServerPool(service);
1675                 call = NULL;
1676                 continue;
1677             }
1678
1679             if (queue_IsEmpty(&call->rq)
1680                 || queue_First(&call->rq, rx_packet)->header.seq != 1)
1681                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1682
1683             CLEAR_CALL_QUEUE_LOCK(call);
1684             break;
1685         } else {
1686             /* If there are no eligible incoming calls, add this process
1687              * to the idle server queue, to wait for one */
1688             sq->newcall = 0;
1689             sq->tno = tno;
1690             if (socketp) {
1691                 *socketp = OSI_NULLSOCKET;
1692             }
1693             sq->socketp = socketp;
1694             queue_Append(&rx_idleServerQueue, sq);
1695 #ifndef AFS_AIX41_ENV
1696             rx_waitForPacket = sq;
1697 #else
1698             rx_waitingForPacket = sq;
1699 #endif /* AFS_AIX41_ENV */
1700             do {
1701                 CV_WAIT(&sq->cv, &rx_serverPool_lock);
1702 #ifdef  KERNEL
1703                 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1704                     MUTEX_EXIT(&rx_serverPool_lock);
1705                     return (struct rx_call *)0;
1706                 }
1707 #endif
1708             } while (!(call = sq->newcall)
1709                      && !(socketp && *socketp != OSI_NULLSOCKET));
1710             MUTEX_EXIT(&rx_serverPool_lock);
1711             if (call) {
1712                 MUTEX_ENTER(&call->lock);
1713             }
1714             break;
1715         }
1716     }
1717
1718     MUTEX_ENTER(&freeSQEList_lock);
1719     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1720     rx_FreeSQEList = sq;
1721     MUTEX_EXIT(&freeSQEList_lock);
1722
1723     if (call) {
1724         clock_GetTime(&call->startTime);
1725         call->state = RX_STATE_ACTIVE;
1726         call->mode = RX_MODE_RECEIVING;
1727 #ifdef RX_KERNEL_TRACE
1728         if (ICL_SETACTIVE(afs_iclSetp)) {
1729             int glockOwner = ISAFS_GLOCK();
1730             if (!glockOwner)
1731                 AFS_GLOCK();
1732             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1733                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1734                        call);
1735             if (!glockOwner)
1736                 AFS_GUNLOCK();
1737         }
1738 #endif
1739
1740         rxi_calltrace(RX_CALL_START, call);
1741         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1742              call->conn->service->servicePort, call->conn->service->serviceId,
1743              call));
1744
1745         CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
1746         MUTEX_EXIT(&call->lock);
1747     } else {
1748         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1749     }
1750
1751     return call;
1752 }
1753 #else /* RX_ENABLE_LOCKS */
1754 struct rx_call *
1755 rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
1756 {
1757     struct rx_serverQueueEntry *sq;
1758     register struct rx_call *call = (struct rx_call *)0, *choice2;
1759     struct rx_service *service = NULL;
1760     SPLVAR;
1761
1762     NETPRI;
1763     MUTEX_ENTER(&freeSQEList_lock);
1764
1765     if ((sq = rx_FreeSQEList)) {
1766         rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
1767         MUTEX_EXIT(&freeSQEList_lock);
1768     } else {                    /* otherwise allocate a new one and return that */
1769         MUTEX_EXIT(&freeSQEList_lock);
1770         sq = (struct rx_serverQueueEntry *)
1771             rxi_Alloc(sizeof(struct rx_serverQueueEntry));
1772         MUTEX_INIT(&sq->lock, "server Queue lock", MUTEX_DEFAULT, 0);
1773         CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
1774     }
1775     MUTEX_ENTER(&sq->lock);
1776
1777     if (cur_service != NULL) {
1778         cur_service->nRequestsRunning--;
1779         if (cur_service->nRequestsRunning < cur_service->minProcs)
1780             rxi_minDeficit++;
1781         rxi_availProcs++;
1782     }
1783     if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
1784         register struct rx_call *tcall, *ncall;
1785         /* Scan for eligible incoming calls.  A call is not eligible
1786          * if the maximum number of calls for its service type are
1787          * already executing */
1788         /* One thread will process calls FCFS (to prevent starvation),
1789          * while the other threads may run ahead looking for calls which
1790          * have all their input data available immediately.  This helps 
1791          * keep threads from blocking, waiting for data from the client. */
1792         choice2 = (struct rx_call *)0;
1793         for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
1794             service = tcall->conn->service;
1795             if (QuotaOK(service)) {
1796                 if (tno == rxi_fcfs_thread_num
1797                     || !tcall->queue_item_header.next) {
1798                     /* If we're the fcfs thread, then  we'll just use 
1799                      * this call. If we haven't been able to find an optimal 
1800                      * choice, and we're at the end of the list, then use a 
1801                      * 2d choice if one has been identified.  Otherwise... */
1802                     call = (choice2 ? choice2 : tcall);
1803                     service = call->conn->service;
1804                 } else if (!queue_IsEmpty(&tcall->rq)) {
1805                     struct rx_packet *rp;
1806                     rp = queue_First(&tcall->rq, rx_packet);
1807                     if (rp->header.seq == 1
1808                         && (!meltdown_1pkt
1809                             || (rp->header.flags & RX_LAST_PACKET))) {
1810                         call = tcall;
1811                     } else if (rxi_2dchoice && !choice2
1812                                && !(tcall->flags & RX_CALL_CLEARED)
1813                                && (tcall->rprev > rxi_HardAckRate)) {
1814                         choice2 = tcall;
1815                     } else
1816                         rxi_md2cnt++;
1817                 }
1818             }
1819             if (call)
1820                 break;
1821         }
1822     }
1823
1824     if (call) {
1825         queue_Remove(call);
1826         /* we can't schedule a call if there's no data!!! */
1827         /* send an ack if there's no data, if we're missing the
1828          * first packet, or we're missing something between first 
1829          * and last -- there's a "hole" in the incoming data. */
1830         if (queue_IsEmpty(&call->rq)
1831             || queue_First(&call->rq, rx_packet)->header.seq != 1
1832             || call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
1833             rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
1834
1835         call->flags &= (~RX_CALL_WAIT_PROC);
1836         service->nRequestsRunning++;
1837         /* just started call in minProcs pool, need fewer to maintain
1838          * guarantee */
1839         if (service->nRequestsRunning <= service->minProcs)
1840             rxi_minDeficit--;
1841         rxi_availProcs--;
1842         rx_nWaiting--;
1843         /* MUTEX_EXIT(&call->lock); */
1844     } else {
1845         /* If there are no eligible incoming calls, add this process
1846          * to the idle server queue, to wait for one */
1847         sq->newcall = 0;
1848         if (socketp) {
1849             *socketp = OSI_NULLSOCKET;
1850         }
1851         sq->socketp = socketp;
1852         queue_Append(&rx_idleServerQueue, sq);
1853         do {
1854             osi_rxSleep(sq);
1855 #ifdef  KERNEL
1856             if (afs_termState == AFSOP_STOP_RXCALLBACK) {
1857                 USERPRI;
1858                 rxi_Free(sq, sizeof(struct rx_serverQueueEntry));
1859                 return (struct rx_call *)0;
1860             }
1861 #endif
1862         } while (!(call = sq->newcall)
1863                  && !(socketp && *socketp != OSI_NULLSOCKET));
1864     }
1865     MUTEX_EXIT(&sq->lock);
1866
1867     MUTEX_ENTER(&freeSQEList_lock);
1868     *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
1869     rx_FreeSQEList = sq;
1870     MUTEX_EXIT(&freeSQEList_lock);
1871
1872     if (call) {
1873         clock_GetTime(&call->startTime);
1874         call->state = RX_STATE_ACTIVE;
1875         call->mode = RX_MODE_RECEIVING;
1876 #ifdef RX_KERNEL_TRACE
1877         if (ICL_SETACTIVE(afs_iclSetp)) {
1878             int glockOwner = ISAFS_GLOCK();
1879             if (!glockOwner)
1880                 AFS_GLOCK();
1881             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1882                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1883                        call);
1884             if (!glockOwner)
1885                 AFS_GUNLOCK();
1886         }
1887 #endif
1888
1889         rxi_calltrace(RX_CALL_START, call);
1890         dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
1891              call->conn->service->servicePort, call->conn->service->serviceId,
1892              call));
1893     } else {
1894         dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
1895     }
1896
1897     USERPRI;
1898
1899     return call;
1900 }
1901 #endif /* RX_ENABLE_LOCKS */
1902
1903
1904
1905 /* Establish a procedure to be called when a packet arrives for a
1906  * call.  This routine will be called at most once after each call,
1907  * and will also be called if there is an error condition on the or
1908  * the call is complete.  Used by multi rx to build a selection
1909  * function which determines which of several calls is likely to be a
1910  * good one to read from.  
1911  * NOTE: the way this is currently implemented it is probably only a
1912  * good idea to (1) use it immediately after a newcall (clients only)
1913  * and (2) only use it once.  Other uses currently void your warranty
1914  */
1915 void
1916 rx_SetArrivalProc(register struct rx_call *call,
1917                   register void (*proc) (register struct rx_call * call,
1918                                         register VOID * mh,
1919                                         register int index),
1920                   register VOID * handle, register int arg)
1921 {
1922     call->arrivalProc = proc;
1923     call->arrivalProcHandle = handle;
1924     call->arrivalProcArg = arg;
1925 }
1926
1927 /* Call is finished (possibly prematurely).  Return rc to the peer, if
1928  * appropriate, and return the final error code from the conversation
1929  * to the caller */
1930
1931 afs_int32
1932 rx_EndCall(register struct rx_call *call, afs_int32 rc)
1933 {
1934     register struct rx_connection *conn = call->conn;
1935     register struct rx_service *service;
1936     afs_int32 error;
1937     SPLVAR;
1938
1939
1940
1941     dpf(("rx_EndCall(call %x rc %d error %d abortCode %d)\n", call, rc, call->error, call->abortCode));
1942
1943     NETPRI;
1944     MUTEX_ENTER(&call->lock);
1945
1946     if (rc == 0 && call->error == 0) {
1947         call->abortCode = 0;
1948         call->abortCount = 0;
1949     }
1950
1951     call->arrivalProc = (void (*)())0;
1952     if (rc && call->error == 0) {
1953         rxi_CallError(call, rc);
1954         /* Send an abort message to the peer if this error code has
1955          * only just been set.  If it was set previously, assume the
1956          * peer has already been sent the error code or will request it 
1957          */
1958         rxi_SendCallAbort(call, (struct rx_packet *)0, 0, 0);
1959     }
1960     if (conn->type == RX_SERVER_CONNECTION) {
1961         /* Make sure reply or at least dummy reply is sent */
1962         if (call->mode == RX_MODE_RECEIVING) {
1963             rxi_WriteProc(call, 0, 0);
1964         }
1965         if (call->mode == RX_MODE_SENDING) {
1966             rxi_FlushWrite(call);
1967         }
1968         service = conn->service;
1969         rxi_calltrace(RX_CALL_END, call);
1970         /* Call goes to hold state until reply packets are acknowledged */
1971         if (call->tfirst + call->nSoftAcked < call->tnext) {
1972             call->state = RX_STATE_HOLD;
1973         } else {
1974             call->state = RX_STATE_DALLY;
1975             rxi_ClearTransmitQueue(call, 0);
1976             rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
1977             rxevent_Cancel(call->keepAliveEvent, call,
1978                            RX_CALL_REFCOUNT_ALIVE);
1979         }
1980     } else {                    /* Client connection */
1981         char dummy;
1982         /* Make sure server receives input packets, in the case where
1983          * no reply arguments are expected */
1984         if ((call->mode == RX_MODE_SENDING)
1985             || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
1986             (void)rxi_ReadProc(call, &dummy, 1);
1987         }
1988
1989         /* If we had an outstanding delayed ack, be nice to the server
1990          * and force-send it now.
1991          */
1992         if (call->delayedAckEvent) {
1993             rxevent_Cancel(call->delayedAckEvent, call,
1994                            RX_CALL_REFCOUNT_DELAY);
1995             call->delayedAckEvent = NULL;
1996             rxi_SendDelayedAck(NULL, call, NULL);
1997         }
1998
1999         /* We need to release the call lock since it's lower than the
2000          * conn_call_lock and we don't want to hold the conn_call_lock
2001          * over the rx_ReadProc call. The conn_call_lock needs to be held
2002          * here for the case where rx_NewCall is perusing the calls on
2003          * the connection structure. We don't want to signal until
2004          * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
2005          * have checked this call, found it active and by the time it
2006          * goes to sleep, will have missed the signal.
2007          *
2008          * Do not clear the RX_CONN_MAKECALL_WAITING flag as long as
2009          * there are threads waiting to use the conn object.
2010          */
2011         MUTEX_EXIT(&call->lock);
2012         MUTEX_ENTER(&conn->conn_call_lock);
2013         MUTEX_ENTER(&call->lock);
2014         MUTEX_ENTER(&conn->conn_data_lock);
2015         conn->flags |= RX_CONN_BUSY;
2016         if (conn->flags & RX_CONN_MAKECALL_WAITING) {
2017             if (conn->makeCallWaiters == 0)
2018                 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
2019             MUTEX_EXIT(&conn->conn_data_lock);
2020 #ifdef  RX_ENABLE_LOCKS
2021             CV_BROADCAST(&conn->conn_call_cv);
2022 #else
2023             osi_rxWakeup(conn);
2024 #endif
2025         }
2026 #ifdef RX_ENABLE_LOCKS
2027         else {
2028             MUTEX_EXIT(&conn->conn_data_lock);
2029         }
2030 #endif /* RX_ENABLE_LOCKS */
2031         call->state = RX_STATE_DALLY;
2032     }
2033     error = call->error;
2034
2035     /* currentPacket, nLeft, and NFree must be zeroed here, because
2036      * ResetCall cannot: ResetCall may be called at splnet(), in the
2037      * kernel version, and may interrupt the macros rx_Read or
2038      * rx_Write, which run at normal priority for efficiency. */
2039     if (call->currentPacket) {
2040         queue_Prepend(&call->iovq, call->currentPacket);
2041         call->currentPacket = (struct rx_packet *)0;
2042     }
2043         
2044     call->nLeft = call->nFree = call->curlen = 0;
2045
2046     /* Free any packets from the last call to ReadvProc/WritevProc */
2047     rxi_FreePackets(0, &call->iovq);
2048
2049     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
2050     MUTEX_EXIT(&call->lock);
2051     if (conn->type == RX_CLIENT_CONNECTION) {
2052         MUTEX_EXIT(&conn->conn_call_lock);
2053         conn->flags &= ~RX_CONN_BUSY;
2054     }
2055     USERPRI;
2056     /*
2057      * Map errors to the local host's errno.h format.
2058      */
2059     error = ntoh_syserr_conv(error);
2060     return error;
2061 }
2062
2063 #if !defined(KERNEL)
2064
2065 /* Call this routine when shutting down a server or client (especially
2066  * clients).  This will allow Rx to gracefully garbage collect server
2067  * connections, and reduce the number of retries that a server might
2068  * make to a dead client.
2069  * This is not quite right, since some calls may still be ongoing and
2070  * we can't lock them to destroy them. */
2071 void
2072 rx_Finalize(void)
2073 {
2074     register struct rx_connection **conn_ptr, **conn_end;
2075
2076     INIT_PTHREAD_LOCKS;
2077     LOCK_RX_INIT;
2078     if (rxinit_status == 1) {
2079         UNLOCK_RX_INIT;
2080         return;                 /* Already shutdown. */
2081     }
2082     rxi_DeleteCachedConnections();
2083     if (rx_connHashTable) {
2084         MUTEX_ENTER(&rx_connHashTable_lock);
2085         for (conn_ptr = &rx_connHashTable[0], conn_end =
2086              &rx_connHashTable[rx_hashTableSize]; conn_ptr < conn_end;
2087              conn_ptr++) {
2088             struct rx_connection *conn, *next;
2089             for (conn = *conn_ptr; conn; conn = next) {
2090                 next = conn->next;
2091                 if (conn->type == RX_CLIENT_CONNECTION) {
2092                     /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
2093                     conn->refCount++;
2094                     /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
2095 #ifdef RX_ENABLE_LOCKS
2096                     rxi_DestroyConnectionNoLock(conn);
2097 #else /* RX_ENABLE_LOCKS */
2098                     rxi_DestroyConnection(conn);
2099 #endif /* RX_ENABLE_LOCKS */
2100                 }
2101             }
2102         }
2103 #ifdef RX_ENABLE_LOCKS
2104         while (rx_connCleanup_list) {
2105             struct rx_connection *conn;
2106             conn = rx_connCleanup_list;
2107             rx_connCleanup_list = rx_connCleanup_list->next;
2108             MUTEX_EXIT(&rx_connHashTable_lock);
2109             rxi_CleanupConnection(conn);
2110             MUTEX_ENTER(&rx_connHashTable_lock);
2111         }
2112         MUTEX_EXIT(&rx_connHashTable_lock);
2113 #endif /* RX_ENABLE_LOCKS */
2114     }
2115     rxi_flushtrace();
2116
2117     rxinit_status = 1;
2118     UNLOCK_RX_INIT;
2119 }
2120 #endif
2121
2122 /* if we wakeup packet waiter too often, can get in loop with two
2123     AllocSendPackets each waking each other up (from ReclaimPacket calls) */
2124 void
2125 rxi_PacketsUnWait(void)
2126 {
2127     if (!rx_waitingForPackets) {
2128         return;
2129     }
2130 #ifdef KERNEL
2131     if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
2132         return;                 /* still over quota */
2133     }
2134 #endif /* KERNEL */
2135     rx_waitingForPackets = 0;
2136 #ifdef  RX_ENABLE_LOCKS
2137     CV_BROADCAST(&rx_waitingForPackets_cv);
2138 #else
2139     osi_rxWakeup(&rx_waitingForPackets);
2140 #endif
2141     return;
2142 }
2143
2144
2145 /* ------------------Internal interfaces------------------------- */
2146
2147 /* Return this process's service structure for the
2148  * specified socket and service */
2149 struct rx_service *
2150 rxi_FindService(register osi_socket socket, register u_short serviceId)
2151 {
2152     register struct rx_service **sp;
2153     for (sp = &rx_services[0]; *sp; sp++) {
2154         if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
2155             return *sp;
2156     }
2157     return 0;
2158 }
2159
2160 /* Allocate a call structure, for the indicated channel of the
2161  * supplied connection.  The mode and state of the call must be set by
2162  * the caller. Returns the call with mutex locked. */
2163 struct rx_call *
2164 rxi_NewCall(register struct rx_connection *conn, register int channel)
2165 {
2166     register struct rx_call *call;
2167 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2168     register struct rx_call *cp;        /* Call pointer temp */
2169     register struct rx_call *nxp;       /* Next call pointer, for queue_Scan */
2170 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2171
2172     /* Grab an existing call structure, or allocate a new one.
2173      * Existing call structures are assumed to have been left reset by
2174      * rxi_FreeCall */
2175     MUTEX_ENTER(&rx_freeCallQueue_lock);
2176
2177 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2178     /*
2179      * EXCEPT that the TQ might not yet be cleared out.
2180      * Skip over those with in-use TQs.
2181      */
2182     call = NULL;
2183     for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
2184         if (!(cp->flags & RX_CALL_TQ_BUSY)) {
2185             call = cp;
2186             break;
2187         }
2188     }
2189     if (call) {
2190 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2191     if (queue_IsNotEmpty(&rx_freeCallQueue)) {
2192         call = queue_First(&rx_freeCallQueue, rx_call);
2193 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2194         queue_Remove(call);
2195         MUTEX_ENTER(&rx_stats_mutex);
2196         rx_stats.nFreeCallStructs--;
2197         MUTEX_EXIT(&rx_stats_mutex);
2198         MUTEX_EXIT(&rx_freeCallQueue_lock);
2199         MUTEX_ENTER(&call->lock);
2200         CLEAR_CALL_QUEUE_LOCK(call);
2201 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2202         /* Now, if TQ wasn't cleared earlier, do it now. */
2203         if (call->flags & RX_CALL_TQ_CLEARME) {
2204             rxi_ClearTransmitQueue(call, 0);
2205             queue_Init(&call->tq);
2206         }
2207 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2208         /* Bind the call to its connection structure */
2209         call->conn = conn;
2210         rxi_ResetCall(call, 1);
2211     } else {
2212         call = (struct rx_call *)rxi_Alloc(sizeof(struct rx_call));
2213
2214         MUTEX_EXIT(&rx_freeCallQueue_lock);
2215         MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
2216         MUTEX_ENTER(&call->lock);
2217         CV_INIT(&call->cv_twind, "call twind", CV_DEFAULT, 0);
2218         CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
2219         CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
2220
2221         MUTEX_ENTER(&rx_stats_mutex);
2222         rx_stats.nCallStructs++;
2223         MUTEX_EXIT(&rx_stats_mutex);
2224         /* Initialize once-only items */
2225         queue_Init(&call->tq);
2226         queue_Init(&call->rq);
2227         queue_Init(&call->iovq);
2228         /* Bind the call to its connection structure (prereq for reset) */
2229         call->conn = conn;
2230         rxi_ResetCall(call, 1);
2231     }
2232     call->channel = channel;
2233     call->callNumber = &conn->callNumber[channel];
2234     /* Note that the next expected call number is retained (in
2235      * conn->callNumber[i]), even if we reallocate the call structure
2236      */
2237     conn->call[channel] = call;
2238     /* if the channel's never been used (== 0), we should start at 1, otherwise
2239      * the call number is valid from the last time this channel was used */
2240     if (*call->callNumber == 0)
2241         *call->callNumber = 1;
2242
2243     return call;
2244 }
2245
2246 /* A call has been inactive long enough that so we can throw away
2247  * state, including the call structure, which is placed on the call
2248  * free list.
2249  * Call is locked upon entry.
2250  * haveCTLock set if called from rxi_ReapConnections
2251  */
2252 #ifdef RX_ENABLE_LOCKS
2253 void
2254 rxi_FreeCall(register struct rx_call *call, int haveCTLock)
2255 #else /* RX_ENABLE_LOCKS */
2256 void
2257 rxi_FreeCall(register struct rx_call *call)
2258 #endif                          /* RX_ENABLE_LOCKS */
2259 {
2260     register int channel = call->channel;
2261     register struct rx_connection *conn = call->conn;
2262
2263
2264     if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
2265         (*call->callNumber)++;
2266     rxi_ResetCall(call, 0);
2267     call->conn->call[channel] = (struct rx_call *)0;
2268
2269     MUTEX_ENTER(&rx_freeCallQueue_lock);
2270     SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
2271 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2272     /* A call may be free even though its transmit queue is still in use.
2273      * Since we search the call list from head to tail, put busy calls at
2274      * the head of the list, and idle calls at the tail.
2275      */
2276     if (call->flags & RX_CALL_TQ_BUSY)
2277         queue_Prepend(&rx_freeCallQueue, call);
2278     else
2279         queue_Append(&rx_freeCallQueue, call);
2280 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2281     queue_Append(&rx_freeCallQueue, call);
2282 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2283     MUTEX_ENTER(&rx_stats_mutex);
2284     rx_stats.nFreeCallStructs++;
2285     MUTEX_EXIT(&rx_stats_mutex);
2286
2287     MUTEX_EXIT(&rx_freeCallQueue_lock);
2288
2289     /* Destroy the connection if it was previously slated for
2290      * destruction, i.e. the Rx client code previously called
2291      * rx_DestroyConnection (client connections), or
2292      * rxi_ReapConnections called the same routine (server
2293      * connections).  Only do this, however, if there are no
2294      * outstanding calls. Note that for fine grain locking, there appears
2295      * to be a deadlock in that rxi_FreeCall has a call locked and
2296      * DestroyConnectionNoLock locks each call in the conn. But note a
2297      * few lines up where we have removed this call from the conn.
2298      * If someone else destroys a connection, they either have no
2299      * call lock held or are going through this section of code.
2300      */
2301     if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) {
2302         MUTEX_ENTER(&conn->conn_data_lock);
2303         conn->refCount++;
2304         MUTEX_EXIT(&conn->conn_data_lock);
2305 #ifdef RX_ENABLE_LOCKS
2306         if (haveCTLock)
2307             rxi_DestroyConnectionNoLock(conn);
2308         else
2309             rxi_DestroyConnection(conn);
2310 #else /* RX_ENABLE_LOCKS */
2311         rxi_DestroyConnection(conn);
2312 #endif /* RX_ENABLE_LOCKS */
2313     }
2314 }
2315
2316 afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
2317 char *
2318 rxi_Alloc(register size_t size)
2319 {
2320     register char *p;
2321
2322     MUTEX_ENTER(&rx_stats_mutex);
2323     rxi_Alloccnt++;
2324     rxi_Allocsize += (afs_int32)size;
2325     MUTEX_EXIT(&rx_stats_mutex);
2326
2327     p = (char *)osi_Alloc(size);
2328
2329     if (!p)
2330         osi_Panic("rxi_Alloc error");
2331     memset(p, 0, size);
2332     return p;
2333 }
2334
2335 void
2336 rxi_Free(void *addr, register size_t size)
2337 {
2338     MUTEX_ENTER(&rx_stats_mutex);
2339     rxi_Alloccnt--;
2340     rxi_Allocsize -= (afs_int32)size;
2341     MUTEX_EXIT(&rx_stats_mutex);
2342
2343     osi_Free(addr, size);
2344 }
2345
2346 /* Find the peer process represented by the supplied (host,port)
2347  * combination.  If there is no appropriate active peer structure, a
2348  * new one will be allocated and initialized 
2349  * The origPeer, if set, is a pointer to a peer structure on which the
2350  * refcount will be be decremented. This is used to replace the peer
2351  * structure hanging off a connection structure */
2352 struct rx_peer *
2353 rxi_FindPeer(struct sockaddr_storage *saddr, int slen, int stype,
2354              struct rx_peer *origPeer, int create)
2355 {
2356     register struct rx_peer *pp;
2357     int hashIndex, i, j;
2358     for (i = 0, j = 0; i < slen; i++)
2359         j += ((unsigned char *) saddr)[i];
2360     hashIndex = j % rx_hashTableSize;
2361     MUTEX_ENTER(&rx_peerHashTable_lock);
2362     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
2363         if (memcmp(saddr, &pp->saddr, slen) == 0 && stype == pp->socktype)
2364             break;
2365     }
2366     if (!pp) {
2367         if (create) {
2368             pp = rxi_AllocPeer();       /* This bzero's *pp */
2369             memcpy(&pp->saddr, saddr, slen);
2370             pp->saddrlen = slen;
2371             pp->socktype = stype;
2372             switch (rx_ssfamily(saddr)) {
2373             case AF_INET:
2374                 /*
2375                  * Should be enough storage for a dotted quad
2376                  */
2377                 sprintf(pp->addrstring, "%d.%d.%d.%d",
2378                         rx_ss2addrp(saddr)[0], rx_ss2addrp(saddr)[1],
2379                         rx_ss2addrp(saddr)[2], rx_ss2addrp(saddr)[3]);
2380                 break;
2381 #ifdef AF_INET6
2382             case AF_INET6:
2383                 /*
2384                  * This gets more complicated, unfortunately
2385                  */
2386                 if (IN6_IS_ADDR_V4COMPAT(&(rx_ss2sin6(saddr)->sin6_addr))) {
2387                     sprintf(pp->addrstring, "%d.%d.%d.%d",
2388                             rx_ss2addrp(saddr)[12], rx_ss2addrp(saddr)[13],
2389                             rx_ss2addrp(saddr)[14], rx_ss2addrp(saddr)[15]);
2390                 } else {
2391                     sprintf(pp->addrstring, "%x:%x:%x:%x:%x:%x:%x:%x",
2392                             ntohs(rx_ss2addrp6(saddr)[0]),
2393                             ntohs(rx_ss2addrp6(saddr)[1]),
2394                             ntohs(rx_ss2addrp6(saddr)[2]),
2395                             ntohs(rx_ss2addrp6(saddr)[3]),
2396                             ntohs(rx_ss2addrp6(saddr)[4]),
2397                             ntohs(rx_ss2addrp6(saddr)[5]),
2398                             ntohs(rx_ss2addrp6(saddr)[6]),
2399                             ntohs(rx_ss2addrp6(saddr)[7]));
2400                 }
2401                 break;
2402 #endif /* AF_INET6 */
2403             default:
2404                 strcpy(pp->addrstring, "??.??.??.??");
2405                 break;
2406             }
2407             MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
2408             queue_Init(&pp->congestionQueue);
2409             queue_Init(&pp->rpcStats);
2410             pp->next = rx_peerHashTable[hashIndex];
2411             rx_peerHashTable[hashIndex] = pp;
2412             rxi_InitPeerParams(pp);
2413             MUTEX_ENTER(&rx_stats_mutex);
2414             rx_stats.nPeerStructs++;
2415             MUTEX_EXIT(&rx_stats_mutex);
2416         }
2417     }
2418     if (pp && create) {
2419         pp->refCount++;
2420     }
2421     if (origPeer)
2422         origPeer->refCount--;
2423     MUTEX_EXIT(&rx_peerHashTable_lock);
2424     return pp;
2425 }
2426
2427
2428 /* Find the connection at (host, port) started at epoch, and with the
2429  * given connection id.  Creates the server connection if necessary.
2430  * The type specifies whether a client connection or a server
2431  * connection is desired.  In both cases, (host, port) specify the
2432  * peer's (host, pair) pair.  Client connections are not made
2433  * automatically by this routine.  The parameter socket gives the
2434  * socket descriptor on which the packet was received.  This is used,
2435  * in the case of server connections, to check that *new* connections
2436  * come via a valid (port, serviceId).  Finally, the securityIndex
2437  * parameter must match the existing index for the connection.  If a
2438  * server connection is created, it will be created using the supplied
2439  * index, if the index is valid for this service */
2440 struct rx_connection *
2441 rxi_FindConnection(osi_socket socket, struct sockaddr_storage *saddr,
2442                    int slen, int socktype, u_short serviceId, afs_uint32 cid,
2443                    afs_uint32 epoch, int type, u_int securityIndex)
2444 {
2445     int hashindex, flag;
2446     register struct rx_connection *conn;
2447     hashindex = CONN_HASH(host, port, cid, epoch, type);
2448     MUTEX_ENTER(&rx_connHashTable_lock);
2449     rxLastConn ? (conn = rxLastConn, flag = 0) : (conn =
2450                                                   rx_connHashTable[hashindex],
2451                                                   flag = 1);
2452     for (; conn;) {
2453         if ((conn->type == type) && ((cid & RX_CIDMASK) == conn->cid)
2454             && (epoch == conn->epoch)) {
2455             register struct rx_peer *pp = conn->peer;
2456             if (securityIndex != conn->securityIndex) {
2457                 /* this isn't supposed to happen, but someone could forge a packet
2458                  * like this, and there seems to be some CM bug that makes this
2459                  * happen from time to time -- in which case, the fileserver
2460                  * asserts. */
2461                 MUTEX_EXIT(&rx_connHashTable_lock);
2462                 return (struct rx_connection *)0;
2463             }
2464             if (memcmp(&pp->saddr, saddr, slen) == 0 &&
2465                                                 socktype == pp->socktype)
2466                 break;
2467             if (type == RX_CLIENT_CONNECTION &&
2468                                         rx_ss2pn(&pp->saddr) == rx_ss2pn(saddr))
2469                 break;
2470             /* So what happens when it's a callback connection? */
2471             if (                /*type == RX_CLIENT_CONNECTION && */
2472                    (conn->epoch & 0x80000000))
2473                 break;
2474         }
2475         if (!flag) {
2476             /* the connection rxLastConn that was used the last time is not the
2477              ** one we are looking for now. Hence, start searching in the hash */
2478             flag = 1;
2479             conn = rx_connHashTable[hashindex];
2480         } else
2481             conn = conn->next;
2482     }
2483     if (!conn) {
2484         struct rx_service *service;
2485         if (type == RX_CLIENT_CONNECTION) {
2486             MUTEX_EXIT(&rx_connHashTable_lock);
2487             return (struct rx_connection *)0;
2488         }
2489         service = rxi_FindService(socket, serviceId);
2490         if (!service || (securityIndex >= service->nSecurityObjects)
2491             || (service->securityObjects[securityIndex] == 0)) {
2492             MUTEX_EXIT(&rx_connHashTable_lock);
2493             return (struct rx_connection *)0;
2494         }
2495         conn = rxi_AllocConnection();   /* This bzero's the connection */
2496         MUTEX_INIT(&conn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
2497         MUTEX_INIT(&conn->conn_data_lock, "conn data lock", MUTEX_DEFAULT, 0);
2498         CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
2499         conn->next = rx_connHashTable[hashindex];
2500         rx_connHashTable[hashindex] = conn;
2501         conn->peer = rxi_FindPeer(saddr, slen, socktype, 0, 1);
2502         conn->type = RX_SERVER_CONNECTION;
2503         conn->lastSendTime = clock_Sec();       /* don't GC immediately */
2504         conn->epoch = epoch;
2505         conn->cid = cid & RX_CIDMASK;
2506         /* conn->serial = conn->lastSerial = 0; */
2507         /* conn->timeout = 0; */
2508         conn->ackRate = RX_FAST_ACK_RATE;
2509         conn->service = service;
2510         conn->serviceId = serviceId;
2511         conn->securityIndex = securityIndex;
2512         conn->securityObject = service->securityObjects[securityIndex];
2513         conn->nSpecific = 0;
2514         conn->specific = NULL;
2515         rx_SetConnDeadTime(conn, service->connDeadTime);
2516         rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
2517         /* Notify security object of the new connection */
2518         RXS_NewConnection(conn->securityObject, conn);
2519         /* XXXX Connection timeout? */
2520         if (service->newConnProc)
2521             (*service->newConnProc) (conn);
2522         MUTEX_ENTER(&rx_stats_mutex);
2523         rx_stats.nServerConns++;
2524         MUTEX_EXIT(&rx_stats_mutex);
2525     }
2526
2527     MUTEX_ENTER(&conn->conn_data_lock);
2528     conn->refCount++;
2529     MUTEX_EXIT(&conn->conn_data_lock);
2530
2531     rxLastConn = conn;          /* store this connection as the last conn used */
2532     MUTEX_EXIT(&rx_connHashTable_lock);
2533     return conn;
2534 }
2535
2536 /* There are two packet tracing routines available for testing and monitoring
2537  * Rx.  One is called just after every packet is received and the other is
2538  * called just before every packet is sent.  Received packets, have had their
2539  * headers decoded, and packets to be sent have not yet had their headers
2540  * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
2541  * containing the network address.  Both can be modified.  The return value, if
2542  * non-zero, indicates that the packet should be dropped.  */
2543
2544 int (*rx_justReceived) () = 0;
2545 int (*rx_almostSent) () = 0;
2546
2547 /* A packet has been received off the interface.  Np is the packet, socket is
2548  * the socket number it was received from (useful in determining which service
2549  * this packet corresponds to), and (host, port) reflect the host,port of the
2550  * sender.  This call returns the packet to the caller if it is finished with
2551  * it, rather than de-allocating it, just as a small performance hack */
2552
2553 struct rx_packet *
2554 rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
2555                   struct sockaddr_storage *saddr, int slen, int *tnop,
2556                   struct rx_call **newcallp)
2557 {
2558     register struct rx_call *call;
2559     register struct rx_connection *conn;
2560     int channel;
2561     afs_uint32 currentCallNumber;
2562     int type;
2563     int skew;
2564 #ifdef RXDEBUG
2565     char *packetType;
2566 #endif
2567     struct rx_packet *tnp;
2568
2569 #ifdef RXDEBUG
2570 /* We don't print out the packet until now because (1) the time may not be
2571  * accurate enough until now in the lwp implementation (rx_Listener only gets
2572  * the time after the packet is read) and (2) from a protocol point of view,
2573  * this is the first time the packet has been seen */
2574     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
2575         ? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
2576     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
2577          np->header.serial, packetType, ntohl(rx_ss2v4addr(saddr)),
2578          ntohs(rx_ss2pn(saddr)), np->header.serviceId,
2579          np->header.epoch, np->header.cid, np->header.callNumber,
2580          np->header.seq, np->header.flags, np));
2581 #endif
2582
2583     if (np->header.type == RX_PACKET_TYPE_VERSION) {
2584         return rxi_ReceiveVersionPacket(np, socket, saddr, slen, 1);
2585     }
2586
2587     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
2588         return rxi_ReceiveDebugPacket(np, socket, saddr, slen, 1);
2589     }
2590 #ifdef RXDEBUG
2591     /* If an input tracer function is defined, call it with the packet and
2592      * network address.  Note this function may modify its arguments. */
2593     if (rx_justReceived) {
2594         struct sockaddr_in *addr = (struct sockaddr_in *) saddr;
2595         int drop;
2596         drop = (*rx_justReceived) (np, addr);
2597         /* drop packet if return value is non-zero */
2598         if (drop)
2599             return np;
2600     }
2601 #endif
2602
2603     /* If packet was not sent by the client, then *we* must be the client */
2604     type = ((np->header.flags & RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
2605         ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
2606
2607     /* Find the connection (or fabricate one, if we're the server & if
2608      * necessary) associated with this packet */
2609     conn =
2610         rxi_FindConnection(socket, saddr, slen, SOCK_DGRAM,
2611                            np->header.serviceId, np->header.cid,
2612                            np->header.epoch, type, np->header.securityIndex);
2613
2614     if (!conn) {
2615         /* If no connection found or fabricated, just ignore the packet.
2616          * (An argument could be made for sending an abort packet for
2617          * the conn) */
2618         return np;
2619     }
2620
2621     MUTEX_ENTER(&conn->conn_data_lock);
2622     if (conn->maxSerial < np->header.serial)
2623         conn->maxSerial = np->header.serial;
2624     MUTEX_EXIT(&conn->conn_data_lock);
2625
2626     /* If the connection is in an error state, send an abort packet and ignore
2627      * the incoming packet */
2628     if (conn->error) {
2629         /* Don't respond to an abort packet--we don't want loops! */
2630         MUTEX_ENTER(&conn->conn_data_lock);
2631         if (np->header.type != RX_PACKET_TYPE_ABORT)
2632             np = rxi_SendConnectionAbort(conn, np, 1, 0);
2633         conn->refCount--;
2634         MUTEX_EXIT(&conn->conn_data_lock);
2635         return np;
2636     }
2637
2638     /* Check for connection-only requests (i.e. not call specific). */
2639     if (np->header.callNumber == 0) {
2640         switch (np->header.type) {
2641         case RX_PACKET_TYPE_ABORT: {
2642             /* What if the supplied error is zero? */
2643             afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
2644             dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode));
2645             rxi_ConnectionError(conn, errcode);
2646             MUTEX_ENTER(&conn->conn_data_lock);
2647             conn->refCount--;
2648             MUTEX_EXIT(&conn->conn_data_lock);
2649             return np;
2650         }
2651         case RX_PACKET_TYPE_CHALLENGE:
2652             tnp = rxi_ReceiveChallengePacket(conn, np, 1);
2653             MUTEX_ENTER(&conn->conn_data_lock);
2654             conn->refCount--;
2655             MUTEX_EXIT(&conn->conn_data_lock);
2656             return tnp;
2657         case RX_PACKET_TYPE_RESPONSE:
2658             tnp = rxi_ReceiveResponsePacket(conn, np, 1);
2659             MUTEX_ENTER(&conn->conn_data_lock);
2660             conn->refCount--;
2661             MUTEX_EXIT(&conn->conn_data_lock);
2662             return tnp;
2663         case RX_PACKET_TYPE_PARAMS:
2664         case RX_PACKET_TYPE_PARAMS + 1:
2665         case RX_PACKET_TYPE_PARAMS + 2:
2666             /* ignore these packet types for now */
2667             MUTEX_ENTER(&conn->conn_data_lock);
2668             conn->refCount--;
2669             MUTEX_EXIT(&conn->conn_data_lock);
2670             return np;
2671
2672
2673         default:
2674             /* Should not reach here, unless the peer is broken: send an
2675              * abort packet */
2676             rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
2677             MUTEX_ENTER(&conn->conn_data_lock);
2678             tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
2679             conn->refCount--;
2680             MUTEX_EXIT(&conn->conn_data_lock);
2681             return tnp;
2682         }
2683     }
2684
2685     channel = np->header.cid & RX_CHANNELMASK;
2686     call = conn->call[channel];
2687 #ifdef  RX_ENABLE_LOCKS
2688     if (call)
2689         MUTEX_ENTER(&call->lock);
2690     /* Test to see if call struct is still attached to conn. */
2691     if (call != conn->call[channel]) {
2692         if (call)
2693             MUTEX_EXIT(&call->lock);
2694         if (type == RX_SERVER_CONNECTION) {
2695             call = conn->call[channel];
2696             /* If we started with no call attached and there is one now,
2697              * another thread is also running this routine and has gotten
2698              * the connection channel. We should drop this packet in the tests
2699              * below. If there was a call on this connection and it's now
2700              * gone, then we'll be making a new call below.
2701              * If there was previously a call and it's now different then
2702              * the old call was freed and another thread running this routine
2703              * has created a call on this channel. One of these two threads
2704              * has a packet for the old call and the code below handles those
2705              * cases.
2706              */
2707             if (call)
2708                 MUTEX_ENTER(&call->lock);
2709         } else {
2710             /* This packet can't be for this call. If the new call address is
2711              * 0 then no call is running on this channel. If there is a call
2712              * then, since this is a client connection we're getting data for
2713              * it must be for the previous call.
2714              */
2715             MUTEX_ENTER(&rx_stats_mutex);
2716             rx_stats.spuriousPacketsRead++;
2717             MUTEX_EXIT(&rx_stats_mutex);
2718             MUTEX_ENTER(&conn->conn_data_lock);
2719             conn->refCount--;
2720             MUTEX_EXIT(&conn->conn_data_lock);
2721             return np;
2722         }
2723     }
2724 #endif
2725     currentCallNumber = conn->callNumber[channel];
2726
2727     if (type == RX_SERVER_CONNECTION) { /* We're the server */
2728         if (np->header.callNumber < currentCallNumber) {
2729             MUTEX_ENTER(&rx_stats_mutex);
2730             rx_stats.spuriousPacketsRead++;
2731             MUTEX_EXIT(&rx_stats_mutex);
2732 #ifdef  RX_ENABLE_LOCKS
2733             if (call)
2734                 MUTEX_EXIT(&call->lock);
2735 #endif
2736             MUTEX_ENTER(&conn->conn_data_lock);
2737             conn->refCount--;
2738             MUTEX_EXIT(&conn->conn_data_lock);
2739             return np;
2740         }
2741         if (!call) {
2742             MUTEX_ENTER(&conn->conn_call_lock);
2743             call = rxi_NewCall(conn, channel);
2744             MUTEX_EXIT(&conn->conn_call_lock);
2745             *call->callNumber = np->header.callNumber;
2746             if (np->header.callNumber == 0) 
2747                 dpf(("RecPacket call 0 %d %s: %s.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], rx_AddrStringOf(conn->peer), ntohs(rx_PortOf(conn->peer)), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
2748
2749             call->state = RX_STATE_PRECALL;
2750             clock_GetTime(&call->queueTime);
2751             hzero(call->bytesSent);
2752             hzero(call->bytesRcvd);
2753             /*
2754              * If the number of queued calls exceeds the overload
2755              * threshold then abort this call.
2756              */
2757             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2758                 struct rx_packet *tp;
2759                 
2760                 rxi_CallError(call, rx_BusyError);
2761                 tp = rxi_SendCallAbort(call, np, 1, 0);
2762                 MUTEX_EXIT(&call->lock);
2763                 MUTEX_ENTER(&conn->conn_data_lock);
2764                 conn->refCount--;
2765                 MUTEX_EXIT(&conn->conn_data_lock);
2766                 MUTEX_ENTER(&rx_stats_mutex);
2767                 rx_stats.nBusies++;
2768                 MUTEX_EXIT(&rx_stats_mutex);
2769                 return tp;
2770             }
2771             rxi_KeepAliveOn(call);
2772         } else if (np->header.callNumber != currentCallNumber) {
2773             /* Wait until the transmit queue is idle before deciding
2774              * whether to reset the current call. Chances are that the
2775              * call will be in ether DALLY or HOLD state once the TQ_BUSY
2776              * flag is cleared.
2777              */
2778 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
2779             while ((call->state == RX_STATE_ACTIVE)
2780                    && (call->flags & RX_CALL_TQ_BUSY)) {
2781                 call->flags |= RX_CALL_TQ_WAIT;
2782                 call->tqWaiters++;
2783 #ifdef RX_ENABLE_LOCKS
2784                 osirx_AssertMine(&call->lock, "rxi_Start lock3");
2785                 CV_WAIT(&call->cv_tq, &call->lock);
2786 #else /* RX_ENABLE_LOCKS */
2787                 osi_rxSleep(&call->tq);
2788 #endif /* RX_ENABLE_LOCKS */
2789                 call->tqWaiters--;
2790                 if (call->tqWaiters == 0)
2791                     call->flags &= ~RX_CALL_TQ_WAIT;
2792             }
2793 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2794             /* If the new call cannot be taken right now send a busy and set
2795              * the error condition in this call, so that it terminates as
2796              * quickly as possible */
2797             if (call->state == RX_STATE_ACTIVE) {
2798                 struct rx_packet *tp;
2799
2800                 rxi_CallError(call, RX_CALL_DEAD);
2801                 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
2802                                      NULL, 0, 1);
2803                 MUTEX_EXIT(&call->lock);
2804                 MUTEX_ENTER(&conn->conn_data_lock);
2805                 conn->refCount--;
2806                 MUTEX_EXIT(&conn->conn_data_lock);
2807                 return tp;
2808             }
2809             rxi_ResetCall(call, 0);
2810             *call->callNumber = np->header.callNumber;
2811             if (np->header.callNumber == 0) 
2812                 dpf(("RecPacket call 0 %d %s: %s.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], rx_AddrStringOf(conn->peer), ntohs(rx_PortOf(conn->peer)), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
2813
2814             call->state = RX_STATE_PRECALL;
2815             clock_GetTime(&call->queueTime);
2816             hzero(call->bytesSent);
2817             hzero(call->bytesRcvd);
2818             /*
2819              * If the number of queued calls exceeds the overload
2820              * threshold then abort this call.
2821              */
2822             if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
2823                 struct rx_packet *tp;
2824
2825                 rxi_CallError(call, rx_BusyError);
2826                 tp = rxi_SendCallAbort(call, np, 1, 0);
2827                 MUTEX_EXIT(&call->lock);
2828                 MUTEX_ENTER(&conn->conn_data_lock);
2829                 conn->refCount--;
2830                 MUTEX_EXIT(&conn->conn_data_lock);
2831                 MUTEX_ENTER(&rx_stats_mutex);
2832                 rx_stats.nBusies++;
2833                 MUTEX_EXIT(&rx_stats_mutex);
2834                 return tp;
2835             }
2836             rxi_KeepAliveOn(call);
2837         } else {
2838             /* Continuing call; do nothing here. */
2839         }
2840     } else {                    /* we're the client */
2841         /* Ignore all incoming acknowledgements for calls in DALLY state */
2842         if (call && (call->state == RX_STATE_DALLY)
2843             && (np->header.type == RX_PACKET_TYPE_ACK)) {
2844             MUTEX_ENTER(&rx_stats_mutex);
2845             rx_stats.ignorePacketDally++;
2846             MUTEX_EXIT(&rx_stats_mutex);
2847 #ifdef  RX_ENABLE_LOCKS
2848             if (call) {
2849                 MUTEX_EXIT(&call->lock);
2850             }
2851 #endif
2852             MUTEX_ENTER(&conn->conn_data_lock);
2853             conn->refCount--;
2854             MUTEX_EXIT(&conn->conn_data_lock);
2855             return np;
2856         }
2857
2858         /* Ignore anything that's not relevant to the current call.  If there
2859          * isn't a current call, then no packet is relevant. */
2860         if (!call || (np->header.callNumber != currentCallNumber)) {
2861             MUTEX_ENTER(&rx_stats_mutex);
2862             rx_stats.spuriousPacketsRead++;
2863             MUTEX_EXIT(&rx_stats_mutex);
2864 #ifdef  RX_ENABLE_LOCKS
2865             if (call) {
2866                 MUTEX_EXIT(&call->lock);
2867             }
2868 #endif
2869             MUTEX_ENTER(&conn->conn_data_lock);
2870             conn->refCount--;
2871             MUTEX_EXIT(&conn->conn_data_lock);
2872             return np;
2873         }
2874         /* If the service security object index stamped in the packet does not
2875          * match the connection's security index, ignore the packet */
2876         if (np->header.securityIndex != conn->securityIndex) {
2877 #ifdef  RX_ENABLE_LOCKS
2878             MUTEX_EXIT(&call->lock);
2879 #endif
2880             MUTEX_ENTER(&conn->conn_data_lock);
2881             conn->refCount--;
2882             MUTEX_EXIT(&conn->conn_data_lock);
2883             return np;
2884         }
2885
2886         /* If we're receiving the response, then all transmit packets are
2887          * implicitly acknowledged.  Get rid of them. */
2888         if (np->header.type == RX_PACKET_TYPE_DATA) {
2889 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
2890             /* XXX Hack. Because we must release the global rx lock when
2891              * sending packets (osi_NetSend) we drop all acks while we're
2892              * traversing the tq in rxi_Start sending packets out because
2893              * packets may move to the freePacketQueue as result of being here!
2894              * So we drop these packets until we're safely out of the
2895              * traversing. Really ugly! 
2896              * For fine grain RX locking, we set the acked field in the
2897              * packets and let rxi_Start remove them from the transmit queue.
2898              */
2899             if (call->flags & RX_CALL_TQ_BUSY) {
2900 #ifdef  RX_ENABLE_LOCKS
2901                 rxi_SetAcksInTransmitQueue(call);
2902 #else
2903                 conn->refCount--;
2904                 return np;      /* xmitting; drop packet */
2905 #endif
2906             } else {
2907                 rxi_ClearTransmitQueue(call, 0);
2908             }
2909 #else /* AFS_GLOBAL_RXLOCK_KERNEL */
2910             rxi_ClearTransmitQueue(call, 0);
2911 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
2912         } else {
2913             if (np->header.type == RX_PACKET_TYPE_ACK) {
2914                 /* now check to see if this is an ack packet acknowledging that the
2915                  * server actually *lost* some hard-acked data.  If this happens we
2916                  * ignore this packet, as it may indicate that the server restarted in
2917                  * the middle of a call.  It is also possible that this is an old ack
2918                  * packet.  We don't abort the connection in this case, because this
2919                  * *might* just be an old ack packet.  The right way to detect a server
2920                  * restart in the midst of a call is to notice that the server epoch
2921                  * changed, btw.  */
2922                 /* XXX I'm not sure this is exactly right, since tfirst **IS**
2923                  * XXX unacknowledged.  I think that this is off-by-one, but
2924                  * XXX I don't dare change it just yet, since it will
2925                  * XXX interact badly with the server-restart detection 
2926                  * XXX code in receiveackpacket.  */
2927                 if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
2928                     MUTEX_ENTER(&rx_stats_mutex);
2929                     rx_stats.spuriousPacketsRead++;
2930                     MUTEX_EXIT(&rx_stats_mutex);
2931                     MUTEX_EXIT(&call->lock);
2932                     MUTEX_ENTER(&conn->conn_data_lock);
2933                     conn->refCount--;
2934                     MUTEX_EXIT(&conn->conn_data_lock);
2935                     return np;
2936                 }
2937             }
2938         }                       /* else not a data packet */
2939     }
2940
2941     osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
2942     /* Set remote user defined status from packet */
2943     call->remoteStatus = np->header.userStatus;
2944
2945     /* Note the gap between the expected next packet and the actual
2946      * packet that arrived, when the new packet has a smaller serial number
2947      * than expected.  Rioses frequently reorder packets all by themselves,
2948      * so this will be quite important with very large window sizes.
2949      * Skew is checked against 0 here to avoid any dependence on the type of
2950      * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
2951      * true! 
2952      * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
2953      * see CalculateRoundTripTime for an example of how to keep smoothed values.
2954      * I think using a beta of 1/8 is probably appropriate.  93.04.21
2955      */
2956     MUTEX_ENTER(&conn->conn_data_lock);
2957     skew = conn->lastSerial - np->header.serial;
2958     conn->lastSerial = np->header.serial;
2959     MUTEX_EXIT(&conn->conn_data_lock);
2960     if (skew > 0) {
2961         register struct rx_peer *peer;
2962         peer = conn->peer;
2963         if (skew > peer->inPacketSkew) {
2964             dpf(("*** In skew changed from %d to %d\n", peer->inPacketSkew,
2965                  skew));
2966             peer->inPacketSkew = skew;
2967         }
2968     }
2969
2970     /* Now do packet type-specific processing */
2971     switch (np->header.type) {
2972     case RX_PACKET_TYPE_DATA:
2973         np = rxi_ReceiveDataPacket(call, np, 1, socket, saddr, slen, tnop,
2974                                    newcallp);
2975         break;
2976     case RX_PACKET_TYPE_ACK:
2977         /* Respond immediately to ack packets requesting acknowledgement
2978          * (ping packets) */
2979         if (np->header.flags & RX_REQUEST_ACK) {
2980             if (call->error)
2981                 (void)rxi_SendCallAbort(call, 0, 1, 0);
2982             else
2983                 (void)rxi_SendAck(call, 0, np->header.serial,
2984                                   RX_ACK_PING_RESPONSE, 1);
2985         }
2986         np = rxi_ReceiveAckPacket(call, np, 1);
2987         break;
2988     case RX_PACKET_TYPE_ABORT: {
2989         /* An abort packet: reset the call, passing the error up to the user. */
2990         /* What if error is zero? */
2991         /* What if the error is -1? the application will treat it as a timeout. */
2992         afs_int32 errdata = ntohl(*(afs_int32 *) rx_DataOf(np));
2993         dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata));
2994         rxi_CallError(call, errdata);
2995         MUTEX_EXIT(&call->lock);
2996         MUTEX_ENTER(&conn->conn_data_lock);
2997         conn->refCount--;
2998         MUTEX_EXIT(&conn->conn_data_lock);
2999         return np;              /* xmitting; drop packet */
3000     }
3001     case RX_PACKET_TYPE_BUSY:
3002         /* XXXX */
3003         break;
3004     case RX_PACKET_TYPE_ACKALL:
3005         /* All packets acknowledged, so we can drop all packets previously
3006          * readied for sending */
3007 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
3008         /* XXX Hack. We because we can't release the global rx lock when
3009          * sending packets (osi_NetSend) we drop all ack pkts while we're
3010          * traversing the tq in rxi_Start sending packets out because
3011          * packets may move to the freePacketQueue as result of being
3012          * here! So we drop these packets until we're safely out of the
3013          * traversing. Really ugly! 
3014          * For fine grain RX locking, we set the acked field in the packets
3015          * and let rxi_Start remove the packets from the transmit queue.
3016          */
3017         if (call->flags & RX_CALL_TQ_BUSY) {
3018 #ifdef  RX_ENABLE_LOCKS
3019             rxi_SetAcksInTransmitQueue(call);
3020             break;
3021 #else /* RX_ENABLE_LOCKS */
3022             MUTEX_EXIT(&call->lock);
3023             MUTEX_ENTER(&conn->conn_data_lock);
3024             conn->refCount--;
3025             MUTEX_EXIT(&conn->conn_data_lock);
3026             return np;          /* xmitting; drop packet */
3027 #endif /* RX_ENABLE_LOCKS */
3028         }
3029 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
3030         rxi_ClearTransmitQueue(call, 0);
3031         break;
3032     default:
3033         /* Should not reach here, unless the peer is broken: send an abort
3034          * packet */
3035         rxi_CallError(call, RX_PROTOCOL_ERROR);
3036         np = rxi_SendCallAbort(call, np, 1, 0);
3037         break;
3038     };
3039     /* Note when this last legitimate packet was received, for keep-alive
3040      * processing.  Note, we delay getting the time until now in the hope that
3041      * the packet will be delivered to the user before any get time is required
3042      * (if not, then the time won't actually be re-evaluated here). */
3043     call->lastReceiveTime = clock_Sec();
3044     MUTEX_EXIT(&call->lock);
3045     MUTEX_ENTER(&conn->conn_data_lock);
3046     conn->refCount--;
3047     MUTEX_EXIT(&conn->conn_data_lock);
3048     return np;
3049 }
3050
3051 /* return true if this is an "interesting" connection from the point of view
3052     of someone trying to debug the system */
3053 int
3054 rxi_IsConnInteresting(struct rx_connection *aconn)
3055 {
3056     register int i;
3057     register struct rx_call *tcall;
3058
3059     if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
3060         return 1;
3061     for (i = 0; i < RX_MAXCALLS; i++) {
3062         tcall = aconn->call[i];
3063         if (tcall) {
3064             if ((tcall->state == RX_STATE_PRECALL)
3065                 || (tcall->state == RX_STATE_ACTIVE))
3066                 return 1;
3067             if ((tcall->mode == RX_MODE_SENDING)
3068                 || (tcall->mode == RX_MODE_RECEIVING))
3069                 return 1;
3070         }
3071     }
3072     return 0;
3073 }
3074
3075 #ifdef KERNEL
3076 /* if this is one of the last few packets AND it wouldn't be used by the
3077    receiving call to immediately satisfy a read request, then drop it on
3078    the floor, since accepting it might prevent a lock-holding thread from
3079    making progress in its reading. If a call has been cleared while in
3080    the precall state then ignore all subsequent packets until the call
3081    is assigned to a thread. */
3082
3083 static int
3084 TooLow(struct rx_packet *ap, struct rx_call *acall)
3085 {
3086     int rc = 0;
3087     MUTEX_ENTER(&rx_stats_mutex);
3088     if (((ap->header.seq != 1) && (acall->flags & RX_CALL_CLEARED)
3089          && (acall->state == RX_STATE_PRECALL))
3090         || ((rx_nFreePackets < rxi_dataQuota + 2)
3091             && !((ap->header.seq < acall->rnext + rx_initSendWindow)
3092                  && (acall->flags & RX_CALL_READER_WAIT)))) {
3093         rc = 1;
3094     }
3095     MUTEX_EXIT(&rx_stats_mutex);
3096     return rc;
3097 }
3098 #endif /* KERNEL */
3099
3100 static void
3101 rxi_CheckReachEvent(struct rxevent *event, struct rx_connection *conn,
3102                     struct rx_call *acall)
3103 {
3104     struct rx_call *call = acall;
3105     struct clock when;
3106     int i, waiting;
3107
3108     MUTEX_ENTER(&conn->conn_data_lock);
3109     conn->checkReachEvent = NULL;
3110     waiting = conn->flags & RX_CONN_ATTACHWAIT;
3111     if (event)
3112         conn->refCount--;
3113     MUTEX_EXIT(&conn->conn_data_lock);
3114
3115     if (waiting) {
3116         if (!call) {
3117             MUTEX_ENTER(&conn->conn_call_lock);
3118             MUTEX_ENTER(&conn->conn_data_lock);
3119             for (i = 0; i < RX_MAXCALLS; i++) {
3120                 struct rx_call *tc = conn->call[i];
3121                 if (tc && tc->state == RX_STATE_PRECALL) {
3122                     call = tc;
3123                     break;
3124                 }
3125             }
3126             if (!call)
3127                 /* Indicate that rxi_CheckReachEvent is no longer running by
3128                  * clearing the flag.  Must be atomic under conn_data_lock to
3129                  * avoid a new call slipping by: rxi_CheckConnReach holds
3130                  * conn_data_lock while checking RX_CONN_ATTACHWAIT.
3131                  */
3132                 conn->flags &= ~RX_CONN_ATTACHWAIT;
3133             MUTEX_EXIT(&conn->conn_data_lock);
3134             MUTEX_EXIT(&conn->conn_call_lock);
3135         }
3136
3137         if (call) {
3138             if (call != acall)
3139                 MUTEX_ENTER(&call->lock);
3140             rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
3141             if (call != acall)
3142                 MUTEX_EXIT(&call->lock);
3143
3144             clock_GetTime(&when);
3145             when.sec += RX_CHECKREACH_TIMEOUT;
3146             MUTEX_ENTER(&conn->conn_data_lock);
3147             if (!conn->checkReachEvent) {
3148                 conn->refCount++;
3149                 conn->checkReachEvent =
3150                     rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
3151             }
3152             MUTEX_EXIT(&conn->conn_data_lock);
3153         }
3154     }
3155 }
3156
3157 static int
3158 rxi_CheckConnReach(struct rx_connection *conn, struct rx_call *call)
3159 {
3160     struct rx_service *service = conn->service;
3161     struct rx_peer *peer = conn->peer;
3162     afs_uint32 now, lastReach;
3163
3164     if (service->checkReach == 0)
3165         return 0;
3166
3167     now = clock_Sec();
3168     MUTEX_ENTER(&peer->peer_lock);
3169     lastReach = peer->lastReachTime;
3170     MUTEX_EXIT(&peer->peer_lock);
3171     if (now - lastReach < RX_CHECKREACH_TTL)
3172         return 0;
3173
3174     MUTEX_ENTER(&conn->conn_data_lock);
3175     if (conn->flags & RX_CONN_ATTACHWAIT) {
3176         MUTEX_EXIT(&conn->conn_data_lock);
3177         return 1;
3178     }
3179     conn->flags |= RX_CONN_ATTACHWAIT;
3180     MUTEX_EXIT(&conn->conn_data_lock);
3181     if (!conn->checkReachEvent)
3182         rxi_CheckReachEvent(NULL, conn, call);
3183
3184     return 1;
3185 }
3186
3187 /* try to attach call, if authentication is complete */
3188 static void
3189 TryAttach(register struct rx_call *acall, register osi_socket socket,
3190           register int *tnop, register struct rx_call **newcallp,
3191           int reachOverride)
3192 {
3193     struct rx_connection *conn = acall->conn;
3194
3195     if (conn->type == RX_SERVER_CONNECTION
3196         && acall->state == RX_STATE_PRECALL) {
3197         /* Don't attach until we have any req'd. authentication. */
3198         if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
3199             if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
3200                 rxi_AttachServerProc(acall, socket, tnop, newcallp);
3201             /* Note:  this does not necessarily succeed; there
3202              * may not any proc available
3203              */
3204         } else {
3205             rxi_ChallengeOn(acall->conn);
3206         }
3207     }
3208 }
3209
3210 /* A data packet has been received off the interface.  This packet is
3211  * appropriate to the call (the call is in the right state, etc.).  This
3212  * routine can return a packet to the caller, for re-use */
3213
3214 struct rx_packet *
3215 rxi_ReceiveDataPacket(register struct rx_call *call,
3216                       register struct rx_packet *np, int istack,
3217                       osi_socket socket, struct sockaddr_storage *saddr,
3218                       int slen, int *tnop, struct rx_call **newcallp)
3219 {
3220     int ackNeeded = 0;          /* 0 means no, otherwise ack_reason */
3221     int newPackets = 0;
3222     int didHardAck = 0;
3223     int haveLast = 0;
3224     afs_uint32 seq, serial, flags;
3225     int isFirst;
3226     struct rx_packet *tnp;
3227     struct clock when;
3228     MUTEX_ENTER(&rx_stats_mutex);
3229     rx_stats.dataPacketsRead++;
3230     MUTEX_EXIT(&rx_stats_mutex);
3231
3232 #ifdef KERNEL
3233     /* If there are no packet buffers, drop this new packet, unless we can find
3234      * packet buffers from inactive calls */
3235     if (!call->error
3236         && (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call))) {
3237         MUTEX_ENTER(&rx_freePktQ_lock);
3238         rxi_NeedMorePackets = TRUE;
3239         MUTEX_EXIT(&rx_freePktQ_lock);
3240         MUTEX_ENTER(&rx_stats_mutex);
3241         rx_stats.noPacketBuffersOnRead++;
3242         MUTEX_EXIT(&rx_stats_mutex);
3243         call->rprev = np->header.serial;
3244         rxi_calltrace(RX_TRACE_DROP, call);
3245         dpf(("packet %x dropped on receipt - quota problems", np));
3246         if (rxi_doreclaim)
3247             rxi_ClearReceiveQueue(call);
3248         clock_GetTime(&when);
3249         clock_Add(&when, &rx_softAckDelay);
3250         if (!call->delayedAckEvent
3251             || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3252             rxevent_Cancel(call->delayedAckEvent, call,
3253                            RX_CALL_REFCOUNT_DELAY);
3254             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3255             call->delayedAckEvent =
3256                 rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
3257         }
3258         /* we've damaged this call already, might as well do it in. */
3259         return np;
3260     }
3261 #endif /* KERNEL */
3262
3263     /*
3264      * New in AFS 3.5, if the RX_JUMBO_PACKET flag is set then this
3265      * packet is one of several packets transmitted as a single
3266      * datagram. Do not send any soft or hard acks until all packets
3267      * in a jumbogram have been processed. Send negative acks right away.
3268      */
3269     for (isFirst = 1, tnp = NULL; isFirst || tnp; isFirst = 0) {
3270         /* tnp is non-null when there are more packets in the
3271          * current jumbo gram */
3272         if (tnp) {
3273             if (np)
3274                 rxi_FreePacket(np);
3275             np = tnp;
3276         }
3277
3278         seq = np->header.seq;
3279         serial = np->header.serial;
3280         flags = np->header.flags;
3281
3282         /* If the call is in an error state, send an abort message */
3283         if (call->error)
3284             return rxi_SendCallAbort(call, np, istack, 0);
3285
3286         /* The RX_JUMBO_PACKET is set in all but the last packet in each
3287          * AFS 3.5 jumbogram. */
3288         if (flags & RX_JUMBO_PACKET) {
3289             tnp = rxi_SplitJumboPacket(np, saddr, slen, isFirst);
3290         } else {
3291             tnp = NULL;
3292         }
3293
3294         if (np->header.spare != 0) {
3295             MUTEX_ENTER(&call->conn->conn_data_lock);
3296             call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
3297             MUTEX_EXIT(&call->conn->conn_data_lock);
3298         }
3299
3300         /* The usual case is that this is the expected next packet */
3301         if (seq == call->rnext) {
3302
3303             /* Check to make sure it is not a duplicate of one already queued */
3304             if (queue_IsNotEmpty(&call->rq)
3305                 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
3306                 MUTEX_ENTER(&rx_stats_mutex);
3307                 rx_stats.dupPacketsRead++;
3308                 MUTEX_EXIT(&rx_stats_mutex);
3309                 dpf(("packet %x dropped on receipt - duplicate", np));
3310                 rxevent_Cancel(call->delayedAckEvent, call,
3311                                RX_CALL_REFCOUNT_DELAY);
3312                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3313                 ackNeeded = 0;
3314                 call->rprev = seq;
3315                 continue;
3316             }
3317
3318             /* It's the next packet. Stick it on the receive queue
3319              * for this call. Set newPackets to make sure we wake
3320              * the reader once all packets have been processed */
3321             queue_Prepend(&call->rq, np);
3322             call->nSoftAcks++;
3323             np = NULL;          /* We can't use this anymore */
3324             newPackets = 1;
3325
3326             /* If an ack is requested then set a flag to make sure we
3327              * send an acknowledgement for this packet */
3328             if (flags & RX_REQUEST_ACK) {
3329                 ackNeeded = RX_ACK_REQUESTED;
3330             }
3331
3332             /* Keep track of whether we have received the last packet */
3333             if (flags & RX_LAST_PACKET) {
3334                 call->flags |= RX_CALL_HAVE_LAST;
3335                 haveLast = 1;
3336             }
3337
3338             /* Check whether we have all of the packets for this call */
3339             if (call->flags & RX_CALL_HAVE_LAST) {
3340                 afs_uint32 tseq;        /* temporary sequence number */
3341                 struct rx_packet *tp;   /* Temporary packet pointer */
3342                 struct rx_packet *nxp;  /* Next pointer, for queue_Scan */
3343
3344                 for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3345                     if (tseq != tp->header.seq)
3346                         break;
3347                     if (tp->header.flags & RX_LAST_PACKET) {
3348                         call->flags |= RX_CALL_RECEIVE_DONE;
3349                         break;
3350                     }
3351                     tseq++;
3352                 }
3353             }
3354
3355             /* Provide asynchronous notification for those who want it
3356              * (e.g. multi rx) */
3357             if (call->arrivalProc) {
3358                 (*call->arrivalProc) (call, call->arrivalProcHandle,
3359                                       call->arrivalProcArg);
3360                 call->arrivalProc = (void (*)())0;
3361             }
3362
3363             /* Update last packet received */
3364             call->rprev = seq;
3365
3366             /* If there is no server process serving this call, grab
3367              * one, if available. We only need to do this once. If a
3368              * server thread is available, this thread becomes a server
3369              * thread and the server thread becomes a listener thread. */
3370             if (isFirst) {
3371                 TryAttach(call, socket, tnop, newcallp, 0);
3372             }
3373         }
3374         /* This is not the expected next packet. */
3375         else {
3376             /* Determine whether this is a new or old packet, and if it's
3377              * a new one, whether it fits into the current receive window.
3378              * Also figure out whether the packet was delivered in sequence.
3379              * We use the prev variable to determine whether the new packet
3380              * is the successor of its immediate predecessor in the
3381              * receive queue, and the missing flag to determine whether
3382              * any of this packets predecessors are missing.  */
3383
3384             afs_uint32 prev;    /* "Previous packet" sequence number */
3385             struct rx_packet *tp;       /* Temporary packet pointer */
3386             struct rx_packet *nxp;      /* Next pointer, for queue_Scan */
3387             int missing;        /* Are any predecessors missing? */
3388
3389             /* If the new packet's sequence number has been sent to the
3390              * application already, then this is a duplicate */
3391             if (seq < call->rnext) {
3392                 MUTEX_ENTER(&rx_stats_mutex);
3393                 rx_stats.dupPacketsRead++;
3394                 MUTEX_EXIT(&rx_stats_mutex);
3395                 rxevent_Cancel(call->delayedAckEvent, call,
3396                                RX_CALL_REFCOUNT_DELAY);
3397                 np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
3398                 ackNeeded = 0;
3399                 call->rprev = seq;
3400                 continue;
3401             }
3402
3403             /* If the sequence number is greater than what can be
3404              * accomodated by the current window, then send a negative
3405              * acknowledge and drop the packet */
3406             if ((call->rnext + call->rwind) <= seq) {
3407                 rxevent_Cancel(call->delayedAckEvent, call,
3408                                RX_CALL_REFCOUNT_DELAY);
3409                 np = rxi_SendAck(call, np, serial, RX_ACK_EXCEEDS_WINDOW,
3410                                  istack);
3411                 ackNeeded = 0;
3412                 call->rprev = seq;
3413                 continue;
3414             }
3415
3416             /* Look for the packet in the queue of old received packets */
3417             for (prev = call->rnext - 1, missing =
3418                  0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3419                 /*Check for duplicate packet */
3420                 if (seq == tp->header.seq) {
3421                     MUTEX_ENTER(&rx_stats_mutex);
3422                     rx_stats.dupPacketsRead++;
3423                     MUTEX_EXIT(&rx_stats_mutex);
3424                     rxevent_Cancel(call->delayedAckEvent, call,
3425                                    RX_CALL_REFCOUNT_DELAY);
3426                     np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
3427                                      istack);
3428                     ackNeeded = 0;
3429                     call->rprev = seq;
3430                     goto nextloop;
3431                 }
3432                 /* If we find a higher sequence packet, break out and
3433                  * insert the new packet here. */
3434                 if (seq < tp->header.seq)
3435                     break;
3436                 /* Check for missing packet */
3437                 if (tp->header.seq != prev + 1) {
3438                     missing = 1;
3439                 }
3440
3441                 prev = tp->header.seq;
3442             }
3443
3444             /* Keep track of whether we have received the last packet. */
3445             if (flags & RX_LAST_PACKET) {
3446                 call->flags |= RX_CALL_HAVE_LAST;
3447             }
3448
3449             /* It's within the window: add it to the the receive queue.
3450              * tp is left by the previous loop either pointing at the
3451              * packet before which to insert the new packet, or at the
3452              * queue head if the queue is empty or the packet should be
3453              * appended. */
3454             queue_InsertBefore(tp, np);
3455             call->nSoftAcks++;
3456             np = NULL;
3457
3458             /* Check whether we have all of the packets for this call */
3459             if ((call->flags & RX_CALL_HAVE_LAST)
3460                 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
3461                 afs_uint32 tseq;        /* temporary sequence number */
3462
3463                 for (tseq =
3464                      call->rnext, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
3465                     if (tseq != tp->header.seq)
3466                         break;
3467                     if (tp->header.flags & RX_LAST_PACKET) {
3468                         call->flags |= RX_CALL_RECEIVE_DONE;
3469                         break;
3470                     }
3471                     tseq++;
3472                 }
3473             }
3474
3475             /* We need to send an ack of the packet is out of sequence, 
3476              * or if an ack was requested by the peer. */
3477             if (seq != prev + 1 || missing || (flags & RX_REQUEST_ACK)) {
3478                 ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
3479             }
3480
3481             /* Acknowledge the last packet for each call */
3482             if (flags & RX_LAST_PACKET) {
3483                 haveLast = 1;
3484             }
3485
3486             call->rprev = seq;
3487         }
3488       nextloop:;
3489     }
3490
3491     if (newPackets) {
3492         /*
3493          * If the receiver is waiting for an iovec, fill the iovec
3494          * using the data from the receive queue */
3495         if (call->flags & RX_CALL_IOVEC_WAIT) {
3496             didHardAck = rxi_FillReadVec(call, serial);
3497             /* the call may have been aborted */
3498             if (call->error) {
3499                 return NULL;
3500             }
3501             if (didHardAck) {
3502                 ackNeeded = 0;
3503             }
3504         }
3505
3506         /* Wakeup the reader if any */
3507         if ((call->flags & RX_CALL_READER_WAIT)
3508             && (!(call->flags & RX_CALL_IOVEC_WAIT) || !(call->iovNBytes)
3509                 || (call->iovNext >= call->iovMax)
3510                 || (call->flags & RX_CALL_RECEIVE_DONE))) {
3511             call->flags &= ~RX_CALL_READER_WAIT;
3512 #ifdef  RX_ENABLE_LOCKS
3513             CV_BROADCAST(&call->cv_rq);
3514 #else
3515             osi_rxWakeup(&call->rq);
3516 #endif
3517         }
3518     }
3519
3520     /*
3521      * Send an ack when requested by the peer, or once every
3522      * rxi_SoftAckRate packets until the last packet has been
3523      * received. Always send a soft ack for the last packet in
3524      * the server's reply. */
3525     if (ackNeeded) {
3526         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3527         np = rxi_SendAck(call, np, serial, ackNeeded, istack);
3528     } else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) {
3529         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3530         np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
3531     } else if (call->nSoftAcks) {
3532         clock_GetTime(&when);
3533         if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
3534             clock_Add(&when, &rx_lastAckDelay);
3535         } else {
3536             clock_Add(&when, &rx_softAckDelay);
3537         }
3538         if (!call->delayedAckEvent
3539             || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
3540             rxevent_Cancel(call->delayedAckEvent, call,
3541                            RX_CALL_REFCOUNT_DELAY);
3542             CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
3543             call->delayedAckEvent =
3544                 rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
3545         }
3546     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
3547         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
3548     }
3549
3550     return np;
3551 }
3552
3553 #ifdef  ADAPT_WINDOW
3554 static void rxi_ComputeRate();
3555 #endif
3556
3557 static void
3558 rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
3559 {
3560     struct rx_peer *peer = conn->peer;
3561
3562     MUTEX_ENTER(&peer->peer_lock);
3563     peer->lastReachTime = clock_Sec();
3564     MUTEX_EXIT(&peer->peer_lock);
3565
3566     MUTEX_ENTER(&conn->conn_data_lock);
3567     if (conn->flags & RX_CONN_ATTACHWAIT) {
3568         int i;
3569
3570         conn->flags &= ~RX_CONN_ATTACHWAIT;
3571         MUTEX_EXIT(&conn->conn_data_lock);
3572
3573         for (i = 0; i < RX_MAXCALLS; i++) {
3574             struct rx_call *call = conn->call[i];
3575             if (call) {
3576                 if (call != acall)
3577                     MUTEX_ENTER(&call->lock);
3578                 /* tnop can be null if newcallp is null */
3579                 TryAttach(call, (osi_socket) - 1, NULL, NULL, 1);
3580                 if (call != acall)
3581                     MUTEX_EXIT(&call->lock);
3582             }
3583         }
3584     } else
3585         MUTEX_EXIT(&conn->conn_data_lock);
3586 }
3587
3588 static const char *
3589 rx_ack_reason(int reason)
3590 {
3591     switch (reason) {
3592     case RX_ACK_REQUESTED:
3593         return "requested";
3594     case RX_ACK_DUPLICATE:
3595         return "duplicate";
3596     case RX_ACK_OUT_OF_SEQUENCE:
3597         return "sequence";
3598     case RX_ACK_EXCEEDS_WINDOW:
3599         return "window";
3600     case RX_ACK_NOSPACE:
3601         return "nospace";
3602     case RX_ACK_PING:
3603         return "ping";
3604     case RX_ACK_PING_RESPONSE:
3605         return "response";
3606     case RX_ACK_DELAY:
3607         return "delay";
3608     case RX_ACK_IDLE:
3609         return "idle";
3610     default:
3611         return "unknown!!";
3612     }
3613 }
3614
3615
3616 /* rxi_ComputePeerNetStats
3617  *
3618  * Called exclusively by rxi_ReceiveAckPacket to compute network link
3619  * estimates (like RTT and throughput) based on ack packets.  Caller
3620  * must ensure that the packet in question is the right one (i.e.
3621  * serial number matches).
3622  */
3623 static void
3624 rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
3625                         struct rx_ackPacket *ap, struct rx_packet *np)
3626 {
3627     struct rx_peer *peer = call->conn->peer;
3628
3629     /* Use RTT if not delayed by client. */
3630     if (ap->reason != RX_ACK_DELAY)
3631         rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
3632 #ifdef ADAPT_WINDOW
3633     rxi_ComputeRate(peer, call, p, np, ap->reason);
3634 #endif
3635 }
3636
3637 /* The real smarts of the whole thing.  */
3638 struct rx_packet *
3639 rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
3640                      int istack)
3641 {
3642     struct rx_ackPacket *ap;
3643     int nAcks;
3644     register struct rx_packet *tp;
3645     register struct rx_packet *nxp;     /* Next packet pointer for queue_Scan */
3646     register struct rx_connection *conn = call->conn;
3647     struct rx_peer *peer = conn->peer;
3648     afs_uint32 first;
3649     afs_uint32 serial;
3650     /* because there are CM's that are bogus, sending weird values for this. */
3651     afs_uint32 skew = 0;
3652     int nbytes;
3653     int missing;
3654     int acked;
3655     int nNacked = 0;
3656     int newAckCount = 0;
3657     u_short maxMTU = 0;         /* Set if peer supports AFS 3.4a jumbo datagrams */
3658     int maxDgramPackets = 0;    /* Set if peer supports AFS 3.5 jumbo datagrams */
3659
3660     MUTEX_ENTER(&rx_stats_mutex);
3661     rx_stats.ackPacketsRead++;
3662     MUTEX_EXIT(&rx_stats_mutex);
3663     ap = (struct rx_ackPacket *)rx_DataOf(np);
3664     nbytes = rx_Contiguous(np) - (int)((ap->acks) - (u_char *) ap);
3665     if (nbytes < 0)
3666         return np;              /* truncated ack packet */
3667
3668     /* depends on ack packet struct */
3669     nAcks = MIN((unsigned)nbytes, (unsigned)ap->nAcks);
3670     first = ntohl(ap->firstPacket);
3671     serial = ntohl(ap->serial);
3672     /* temporarily disabled -- needs to degrade over time 
3673      * skew = ntohs(ap->maxSkew); */
3674
3675     /* Ignore ack packets received out of order */
3676     if (first < call->tfirst) {
3677         return np;
3678     }
3679
3680     if (np->header.flags & RX_SLOW_START_OK) {
3681         call->flags |= RX_CALL_SLOW_START_OK;
3682     }
3683
3684     if (ap->reason == RX_ACK_PING_RESPONSE)
3685         rxi_UpdatePeerReach(conn, call);
3686
3687 #ifdef RXDEBUG
3688 #ifdef AFS_NT40_ENV
3689     if (rxdebug_active) {
3690         char msg[512];
3691         size_t len;
3692
3693         len = _snprintf(msg, sizeof(msg),
3694                         "tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %