/*
* Copyright 2000, International Business Machines Corporation and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
#undef kmem_free
#undef mem_alloc
#undef mem_free
-#undef register
#endif /* AFS_OSF_ENV */
#else /* !UKERNEL */
#include "afs/sysincludes.h"
#include "rx.h"
#include "rx_globals.h"
#include "rx_trace.h"
+#include "rx_atomic.h"
#define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
#define AFSOP_STOP_AFS 211 /* Stop AFS process */
#define AFSOP_STOP_BKG 212 /* Stop BKG process */
#include "sys/lockl.h"
#include "sys/lock_def.h"
#endif /* AFS_AIX41_ENV */
-# include "rxgen_consts.h"
+# include "afs/rxgen_consts.h"
#else /* KERNEL */
# include <sys/types.h>
# include <string.h>
# include <stdarg.h>
# include <errno.h>
+# ifdef HAVE_STDINT_H
+# include <stdint.h>
+# endif
#ifdef AFS_NT40_ENV
# include <stdlib.h>
# include <fcntl.h>
# include "rx_user.h"
# include "rx_clock.h"
# include "rx_queue.h"
+# include "rx_atomic.h"
# include "rx_globals.h"
# include "rx_trace.h"
# include <afs/rxgen_consts.h>
* rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
* currently allocated within rx. This number is used to allocate the
* memory required to return the statistics when queried.
+ * Protected by the rx_rpc_stats mutex.
*/
static unsigned int rxi_rpc_peer_stat_cnt;
* rxi_rpc_process_stat_cnt counts the total number of local process stat
* structures currently allocated within rx. The number is used to allocate
* the memory required to return the statistics when queried.
+ * Protected by the rx_rpc_stats mutex.
*/
static unsigned int rxi_rpc_process_stat_cnt;
#include <stddef.h> /* for definition of offsetof() */
#endif
+#ifdef RX_ENABLE_LOCKS
+afs_kmutex_t rx_atomic_mutex;
+#endif
+
#ifdef AFS_PTHREAD_ENV
#include <assert.h>
MUTEX_INIT(&rx_clock_mutex, "clock", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_stats_mutex, "stats", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_waiting_mutex, "waiting", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_atomic_mutex, "atomic", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_quota_mutex, "quota", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_pthread_mutex, "pthread", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_packets_mutex, "packets", MUTEX_DEFAULT, 0);
== 0);
assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
-
+
rxkad_global_stats_init();
MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
* rxi_totalMin
*/
-/*
+/*
* The rx_freePktQ_lock protects the following global variables:
- * rx_nFreePackets
+ * rx_nFreePackets
*/
/*
* are locked. To this end, the code has been modified under #ifdef
* RX_ENABLE_LOCKS so that quota checks and reservation occur at the
* same time. A new function, ReturnToServerPool() returns the allocation.
- *
+ *
* A call can be on several queue's (but only one at a time). When
* rxi_ResetCall wants to remove the call from a queue, it has to ensure
* that no one else is touching the queue. To this end, we store the address
void *arg1, int istack);
#endif
-/* We keep a "last conn pointer" in rxi_FindConnection. The odds are
-** pretty good that the next packet coming in is from the same connection
+/* We keep a "last conn pointer" in rxi_FindConnection. The odds are
+** pretty good that the next packet coming in is from the same connection
** as the last packet, since we're send multiple packets in a transmit window.
*/
struct rx_connection *rxLastConn = 0;
* freeSQEList_lock
*
* serverQueueEntry->lock
- * rx_rpc_stats
* rx_peerHashTable_lock - locked under rx_connHashTable_lock
+ * rx_rpc_stats
* peer->lock - locks peer data fields.
* conn_data_lock - that more than one thread is not updating a conn data
* field at the same time.
* multi_handle->lock
* rxevent_lock
* rx_stats_mutex
+ * rx_atomic_mutex
*
* Do we need a lock to protect the peer field in the conn structure?
* conn->peer was previously a constant for all intents and so has no
* lock protecting this field. The multihomed client delta introduced
* a RX code change : change the peer field in the connection structure
- * to that remote inetrface from which the last packet for this
+ * to that remote interface from which the last packet for this
* connection was sent out. This may become an issue if further changes
* are made.
*/
#endif /* KERNEL */
char *htable, *ptable;
int tmp_status;
-
+
SPLVAR;
-
+
INIT_PTHREAD_LOCKS;
LOCK_RX_INIT;
if (rxinit_status == 0) {
rxi_nCalls = 0;
rx_connDeadTime = 12;
rx_tranquil = 0; /* reset flag */
- memset((char *)&rx_stats, 0, sizeof(struct rx_statistics));
+ memset(&rx_stats, 0, sizeof(struct rx_statistics));
htable = (char *)
osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *)); /* XXXXX */
rx_nFreePackets = 0;
queue_Init(&rx_freePacketQueue);
rxi_NeedMorePackets = FALSE;
+ rx_nPackets = 0; /* rx_nPackets is managed by rxi_MorePackets* */
+
+ /* enforce a minimum number of allocated packets */
+ if (rx_extraPackets < rxi_nSendFrags * rx_maxSendWindow)
+ rx_extraPackets = rxi_nSendFrags * rx_maxSendWindow;
+
+ /* allocate the initial free packet pool */
#ifdef RX_ENABLE_TSFPQ
- rx_nPackets = 0; /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
#else /* RX_ENABLE_TSFPQ */
- rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
- rxi_MorePackets(rx_nPackets);
+ rxi_MorePackets(rx_extraPackets + RX_MAX_QUOTA + 2); /* fudge */
#endif /* RX_ENABLE_TSFPQ */
rx_CheckPackets();
#else
socklen_t addrlen = sizeof(addr);
#endif
- if (getsockname((int)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
+ if (getsockname((intptr_t)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
rx_Finalize();
return -1;
}
/* otherwise, can use only if there are enough to allow everyone
* to go to their min quota after this guy starts.
*/
+ MUTEX_ENTER(&rx_quota_mutex);
if (rxi_availProcs > rxi_minDeficit)
rc = 1;
+ MUTEX_EXIT(&rx_quota_mutex);
return rc;
}
#endif /* RX_ENABLE_LOCKS */
static int nProcs;
#ifdef AFS_PTHREAD_ENV
pid_t pid;
- pid = (pid_t) pthread_self();
+ pid = afs_pointer_to_int(pthread_self());
#else /* AFS_PTHREAD_ENV */
PROCESS pid;
LWP_CurrentProcess(&pid);
}
#ifdef RX_ENABLE_TSFPQ
/* no use leaving packets around in this thread's local queue if
- * it isn't getting donated to the server thread pool.
+ * it isn't getting donated to the server thread pool.
*/
rxi_FlushLocalPacketsTSFPQ();
#endif /* RX_ENABLE_TSFPQ */
SPLVAR;
clock_NewTime();
- dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex));
+ dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %p, "
+ "serviceSecurityIndex %d)\n",
+ ntohl(shost), ntohs(sport), sservice, securityObject,
+ serviceSecurityIndex));
/* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
* the case of kmem_alloc? */
conn->securityData = (void *) 0;
conn->securityIndex = serviceSecurityIndex;
rx_SetConnDeadTime(conn, rx_connDeadTime);
+ rx_SetConnSecondsUntilNatPing(conn, 0);
conn->ackRate = RX_FAST_ACK_RATE;
conn->nSpecific = 0;
conn->specific = NULL;
* waiting, treat this as a running call, and wait to destroy the
* connection later when the call completes. */
if ((conn->type == RX_CLIENT_CONNECTION)
- && (conn->flags & RX_CONN_MAKECALL_WAITING)) {
+ && (conn->flags & (RX_CONN_MAKECALL_WAITING|RX_CONN_MAKECALL_ACTIVE))) {
conn->flags |= RX_CONN_DESTROY_ME;
MUTEX_EXIT(&conn->conn_data_lock);
USERPRI;
return;
}
+ if (conn->natKeepAliveEvent) {
+ rxi_NatKeepAliveOff(conn);
+ }
+
if (conn->delayedAbortEvent) {
rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
if (conn->checkReachEvent)
rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
+ if (conn->natKeepAliveEvent)
+ rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
/* Add the connection to the list of destroyed connections that
* need to be cleaned up. This is necessary to avoid deadlocks
}
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-/* Wait for the transmit queue to no longer be busy.
+/* Wait for the transmit queue to no longer be busy.
* requires the call->lock to be held */
static void rxi_WaitforTQBusy(struct rx_call *call) {
while (call->flags & RX_CALL_TQ_BUSY) {
* 0. Maxtime gives the maximum number of seconds this call may take,
* after rx_NewCall returns. After this time interval, a call to any
* of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
- * For fine grain locking, we hold the conn_call_lock in order to
+ * For fine grain locking, we hold the conn_call_lock in order to
* to ensure that we don't get signalle after we found a call in an active
* state and before we go to sleep.
*/
struct rx_call *
rx_NewCall(struct rx_connection *conn)
{
- int i;
+ int i, wait;
struct rx_call *call;
struct clock queueTime;
SPLVAR;
clock_NewTime();
- dpf(("rx_NewCall(conn %x)\n", conn));
+ dpf(("rx_NewCall(conn %"AFS_PTR_FMT")\n", conn));
NETPRI;
clock_GetTime(&queueTime);
- MUTEX_ENTER(&conn->conn_call_lock);
-
/*
* Check if there are others waiting for a new call.
* If so, let them go first to avoid starving them.
* This is a fairly simple scheme, and might not be
* a complete solution for large numbers of waiters.
- *
- * makeCallWaiters keeps track of the number of
- * threads waiting to make calls and the
- * RX_CONN_MAKECALL_WAITING flag bit is used to
+ *
+ * makeCallWaiters keeps track of the number of
+ * threads waiting to make calls and the
+ * RX_CONN_MAKECALL_WAITING flag bit is used to
* indicate that there are indeed calls waiting.
* The flag is set when the waiter is incremented.
- * It is only cleared in rx_EndCall when
- * makeCallWaiters is 0. This prevents us from
- * accidently destroying the connection while it
- * is potentially about to be used.
+ * It is only cleared when makeCallWaiters is 0.
+ * This prevents us from accidently destroying the
+ * connection while it is potentially about to be used.
*/
+ MUTEX_ENTER(&conn->conn_call_lock);
MUTEX_ENTER(&conn->conn_data_lock);
- if (conn->makeCallWaiters) {
- conn->flags |= RX_CONN_MAKECALL_WAITING;
+ while (conn->flags & RX_CONN_MAKECALL_ACTIVE) {
+ conn->flags |= RX_CONN_MAKECALL_WAITING;
conn->makeCallWaiters++;
MUTEX_EXIT(&conn->conn_data_lock);
#endif
MUTEX_ENTER(&conn->conn_data_lock);
conn->makeCallWaiters--;
- }
+ if (conn->makeCallWaiters == 0)
+ conn->flags &= ~RX_CONN_MAKECALL_WAITING;
+ }
+
+ /* We are now the active thread in rx_NewCall */
+ conn->flags |= RX_CONN_MAKECALL_ACTIVE;
MUTEX_EXIT(&conn->conn_data_lock);
for (;;) {
+ wait = 1;
+
for (i = 0; i < RX_MAXCALLS; i++) {
call = conn->call[i];
if (call) {
- MUTEX_ENTER(&call->lock);
if (call->state == RX_STATE_DALLY) {
- rxi_ResetCall(call, 0);
- (*call->callNumber)++;
- break;
- }
- MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&call->lock);
+ if (call->state == RX_STATE_DALLY) {
+ /*
+ * We are setting the state to RX_STATE_RESET to
+ * ensure that no one else will attempt to use this
+ * call once we drop the conn->conn_call_lock and
+ * call->lock. We must drop the conn->conn_call_lock
+ * before calling rxi_ResetCall because the process
+ * of clearing the transmit queue can block for an
+ * extended period of time. If we block while holding
+ * the conn->conn_call_lock, then all rx_EndCall
+ * processing will block as well. This has a detrimental
+ * effect on overall system performance.
+ */
+ call->state = RX_STATE_RESET;
+ CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
+ MUTEX_EXIT(&conn->conn_call_lock);
+ rxi_ResetCall(call, 0);
+ (*call->callNumber)++;
+ if (MUTEX_TRYENTER(&conn->conn_call_lock))
+ break;
+
+ /*
+ * If we failed to be able to safely obtain the
+ * conn->conn_call_lock we will have to drop the
+ * call->lock to avoid a deadlock. When the call->lock
+ * is released the state of the call can change. If it
+ * is no longer RX_STATE_RESET then some other thread is
+ * using the call.
+ */
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_call_lock);
+ MUTEX_ENTER(&call->lock);
+
+ if (call->state == RX_STATE_RESET)
+ break;
+
+ /*
+ * If we get here it means that after dropping
+ * the conn->conn_call_lock and call->lock that
+ * the call is no longer ours. If we can't find
+ * a free call in the remaining slots we should
+ * not go immediately to RX_CONN_MAKECALL_WAITING
+ * because by dropping the conn->conn_call_lock
+ * we have given up synchronization with rx_EndCall.
+ * Instead, cycle through one more time to see if
+ * we can find a call that can call our own.
+ */
+ CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
+ wait = 0;
+ }
+ MUTEX_EXIT(&call->lock);
+ }
} else {
+ /* rxi_NewCall returns with mutex locked */
call = rxi_NewCall(conn, i);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
break;
}
}
if (i < RX_MAXCALLS) {
break;
}
+ if (!wait)
+ continue;
+
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_MAKECALL_WAITING;
conn->makeCallWaiters++;
#endif
MUTEX_ENTER(&conn->conn_data_lock);
conn->makeCallWaiters--;
+ if (conn->makeCallWaiters == 0)
+ conn->flags &= ~RX_CONN_MAKECALL_WAITING;
MUTEX_EXIT(&conn->conn_data_lock);
}
- /*
- * Wake up anyone else who might be giving us a chance to
- * run (see code above that avoids resource starvation).
- */
-#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&conn->conn_call_cv);
-#else
- osi_rxWakeup(conn);
-#endif
-
- CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
-
/* Client is initially in send mode */
call->state = RX_STATE_ACTIVE;
call->error = conn->error;
call->mode = RX_MODE_ERROR;
else
call->mode = RX_MODE_SENDING;
-
+
/* remember start time for call in case we have hard dead time limit */
call->queueTime = queueTime;
clock_GetTime(&call->startTime);
/* Turn on busy protocol. */
rxi_KeepAliveOn(call);
- MUTEX_EXIT(&call->lock);
+ /* Attempt MTU discovery */
+ rxi_GrowMTUOn(call);
+
+ /*
+ * We are no longer the active thread in rx_NewCall
+ */
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->flags &= ~RX_CONN_MAKECALL_ACTIVE;
+ MUTEX_EXIT(&conn->conn_data_lock);
+
+ /*
+ * Wake up anyone else who might be giving us a chance to
+ * run (see code above that avoids resource starvation).
+ */
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&conn->conn_call_cv);
+#else
+ osi_rxWakeup(conn);
+#endif
MUTEX_EXIT(&conn->conn_call_lock);
- USERPRI;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* Now, if TQ wasn't cleared earlier, do it now. */
- MUTEX_ENTER(&call->lock);
- rxi_WaitforTQBusy(call);
- if (call->flags & RX_CALL_TQ_CLEARME) {
- rxi_ClearTransmitQueue(call, 1);
- /*queue_Init(&call->tq);*/
+ if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
+ osi_Panic("rx_NewCall call about to be used without an empty tq");
}
- MUTEX_EXIT(&call->lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- dpf(("rx_NewCall(call %x)\n", call));
+ MUTEX_EXIT(&call->lock);
+ USERPRI;
+
+ dpf(("rx_NewCall(call %"AFS_PTR_FMT")\n", call));
return call;
}
/* Advertise a new service. A service is named locally by a UDP port
* number plus a 16-bit service id. Returns (struct rx_service *) 0
- * on a failure.
+ * on a failure.
*
char *serviceName; Name for identification purposes (e.g. the
service name might be used for probing for
statistics) */
struct rx_service *
-rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId,
+rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId,
char *serviceName, struct rx_securityClass **securityObjects,
- int nSecurityObjects,
+ int nSecurityObjects,
afs_int32(*serviceProc) (struct rx_call * acall))
{
osi_socket socket = OSI_NULLSOCKET;
tservice = rxi_AllocService();
NETPRI;
+
+#ifdef RX_ENABLE_LOCKS
+ MUTEX_INIT(&tservice->svc_data_lock, "svc data lock", MUTEX_DEFAULT, 0);
+#endif
+
for (i = 0; i < RX_MAX_SERVICES; i++) {
struct rx_service *service = rx_services[i];
if (service) {
service->connDeadTime = rx_connDeadTime;
service->executeRequestProc = serviceProc;
service->checkReach = 0;
+ service->nSpecific = 0;
+ service->specific = NULL;
rx_services[i] = service; /* not visible until now */
USERPRI;
return service;
/* Set configuration options for all of a service's security objects */
-afs_int32
-rx_SetSecurityConfiguration(struct rx_service *service,
+afs_int32
+rx_SetSecurityConfiguration(struct rx_service *service,
rx_securityConfigVariables type,
void *value)
{
int i;
for (i = 0; i<service->nSecurityObjects; i++) {
if (service->securityObjects[i]) {
- RXS_SetConfiguration(service->securityObjects[i], NULL, type,
+ RXS_SetConfiguration(service->securityObjects[i], NULL, type,
value, NULL);
}
}
if (tservice->beforeProc)
(*tservice->beforeProc) (call);
- code = call->conn->service->executeRequestProc(call);
+ code = tservice->executeRequestProc(call);
if (tservice->afterProc)
(*tservice->afterProc) (call, code);
/* meltdown:
* One thing that seems to happen is that all the server threads get
* tied up on some empty or slow call, and then a whole bunch of calls
- * arrive at once, using up the packet pool, so now there are more
+ * arrive at once, using up the packet pool, so now there are more
* empty calls. The most critical resources here are server threads
* and the free packet pool. The "doreclaim" code seems to help in
* general. I think that eventually we arrive in this state: there
* are lots of pending calls which do have all their packets present,
* so they won't be reclaimed, are multi-packet calls, so they won't
- * be scheduled until later, and thus are tying up most of the free
+ * be scheduled until later, and thus are tying up most of the free
* packet pool for a very long time.
* future options:
- * 1. schedule multi-packet calls if all the packets are present.
- * Probably CPU-bound operation, useful to return packets to pool.
+ * 1. schedule multi-packet calls if all the packets are present.
+ * Probably CPU-bound operation, useful to return packets to pool.
* Do what if there is a full window, but the last packet isn't here?
* 3. preserve one thread which *only* runs "best" calls, otherwise
* it sleeps and waits for that type of call.
- * 4. Don't necessarily reserve a whole window for each thread. In fact,
+ * 4. Don't necessarily reserve a whole window for each thread. In fact,
* the current dataquota business is badly broken. The quota isn't adjusted
* to reflect how many packets are presently queued for a running call.
* So, when we schedule a queued call with a full window of packets queued
MUTEX_EXIT(&freeSQEList_lock);
} else { /* otherwise allocate a new one and return that */
MUTEX_EXIT(&freeSQEList_lock);
- sq = (struct rx_serverQueueEntry *)
- rxi_Alloc(sizeof(struct rx_serverQueueEntry));
+ sq = rxi_Alloc(sizeof(struct rx_serverQueueEntry));
MUTEX_INIT(&sq->lock, "server Queue lock", MUTEX_DEFAULT, 0);
CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
}
* already executing */
/* One thread will process calls FCFS (to prevent starvation),
* while the other threads may run ahead looking for calls which
- * have all their input data available immediately. This helps
+ * have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
service = tcall->conn->service;
if (tno == rxi_fcfs_thread_num
|| !tcall->queue_item_header.next) {
MUTEX_EXIT(&rx_pthread_mutex);
- /* If we're the fcfs thread , then we'll just use
- * this call. If we haven't been able to find an optimal
- * choice, and we're at the end of the list, then use a
+ /* If we're the fcfs thread , then we'll just use
+ * this call. If we haven't been able to find an optimal
+ * choice, and we're at the end of the list, then use a
* 2d choice if one has been identified. Otherwise... */
call = (choice2 ? choice2 : tcall);
service = call->conn->service;
#endif
rxi_calltrace(RX_CALL_START, call);
- dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
+ dpf(("rx_GetCall(port=%d, service=%d) ==> call %"AFS_PTR_FMT"\n",
call->conn->service->servicePort, call->conn->service->serviceId,
call));
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
MUTEX_EXIT(&call->lock);
} else {
- dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
+ dpf(("rx_GetCall(socketp=%p, *socketp=0x%x)\n", socketp, *socketp));
}
return call;
MUTEX_EXIT(&freeSQEList_lock);
} else { /* otherwise allocate a new one and return that */
MUTEX_EXIT(&freeSQEList_lock);
- sq = (struct rx_serverQueueEntry *)
- rxi_Alloc(sizeof(struct rx_serverQueueEntry));
+ sq = rxi_Alloc(sizeof(struct rx_serverQueueEntry));
MUTEX_INIT(&sq->lock, "server Queue lock", MUTEX_DEFAULT, 0);
CV_INIT(&sq->cv, "server Queue lock", CV_DEFAULT, 0);
}
if (cur_service != NULL) {
cur_service->nRequestsRunning--;
+ MUTEX_ENTER(&rx_quota_mutex);
if (cur_service->nRequestsRunning < cur_service->minProcs)
rxi_minDeficit++;
rxi_availProcs++;
+ MUTEX_EXIT(&rx_quota_mutex);
}
if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
struct rx_call *tcall, *ncall;
* already executing */
/* One thread will process calls FCFS (to prevent starvation),
* while the other threads may run ahead looking for calls which
- * have all their input data available immediately. This helps
+ * have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
choice2 = (struct rx_call *)0;
for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
if (tno == rxi_fcfs_thread_num
|| !tcall->queue_item_header.next) {
MUTEX_EXIT(&rx_pthread_mutex);
- /* If we're the fcfs thread, then we'll just use
- * this call. If we haven't been able to find an optimal
- * choice, and we're at the end of the list, then use a
+ /* If we're the fcfs thread, then we'll just use
+ * this call. If we haven't been able to find an optimal
+ * choice, and we're at the end of the list, then use a
* 2d choice if one has been identified. Otherwise... */
call = (choice2 ? choice2 : tcall);
service = call->conn->service;
queue_Remove(call);
/* we can't schedule a call if there's no data!!! */
/* send an ack if there's no data, if we're missing the
- * first packet, or we're missing something between first
+ * first packet, or we're missing something between first
* and last -- there's a "hole" in the incoming data. */
if (queue_IsEmpty(&call->rq)
|| queue_First(&call->rq, rx_packet)->header.seq != 1
service->nRequestsRunning++;
/* just started call in minProcs pool, need fewer to maintain
* guarantee */
+ MUTEX_ENTER(&rx_quota_mutex);
if (service->nRequestsRunning <= service->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
+ MUTEX_EXIT(&rx_quota_mutex);
rx_nWaiting--;
/* MUTEX_EXIT(&call->lock); */
} else {
#endif
rxi_calltrace(RX_CALL_START, call);
- dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
+ dpf(("rx_GetCall(port=%d, service=%d) ==> call %p\n",
call->conn->service->servicePort, call->conn->service->serviceId,
call));
} else {
- dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
+ dpf(("rx_GetCall(socketp=%p, *socketp=0x%x)\n", socketp, *socketp));
}
USERPRI;
* and will also be called if there is an error condition on the or
* the call is complete. Used by multi rx to build a selection
* function which determines which of several calls is likely to be a
- * good one to read from.
+ * good one to read from.
* NOTE: the way this is currently implemented it is probably only a
* good idea to (1) use it immediately after a newcall (clients only)
* and (2) only use it once. Other uses currently void your warranty
rx_EndCall(struct rx_call *call, afs_int32 rc)
{
struct rx_connection *conn = call->conn;
- struct rx_service *service;
afs_int32 error;
SPLVAR;
-
-
- dpf(("rx_EndCall(call %x rc %d error %d abortCode %d)\n", call, rc, call->error, call->abortCode));
+ dpf(("rx_EndCall(call %"AFS_PTR_FMT" rc %d error %d abortCode %d)\n",
+ call, rc, call->error, call->abortCode));
NETPRI;
MUTEX_ENTER(&call->lock);
rxi_CallError(call, rc);
/* Send an abort message to the peer if this error code has
* only just been set. If it was set previously, assume the
- * peer has already been sent the error code or will request it
+ * peer has already been sent the error code or will request it
*/
rxi_SendCallAbort(call, (struct rx_packet *)0, 0, 0);
}
if (call->mode == RX_MODE_SENDING) {
rxi_FlushWrite(call);
}
- service = conn->service;
rxi_calltrace(RX_CALL_END, call);
/* Call goes to hold state until reply packets are acknowledged */
if (call->tfirst + call->nSoftAcked < call->tnext) {
* rx_NewCall is in a stable state. Otherwise, rx_NewCall may
* have checked this call, found it active and by the time it
* goes to sleep, will have missed the signal.
- *
- * Do not clear the RX_CONN_MAKECALL_WAITING flag as long as
- * there are threads waiting to use the conn object.
*/
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&conn->conn_call_lock);
- MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_call_lock);
+ MUTEX_ENTER(&call->lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_BUSY;
if (conn->flags & RX_CONN_MAKECALL_WAITING) {
- if (conn->makeCallWaiters == 0)
- conn->flags &= (~RX_CONN_MAKECALL_WAITING);
MUTEX_EXIT(&conn->conn_data_lock);
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&conn->conn_call_cv);
* kernel version, and may interrupt the macros rx_Read or
* rx_Write, which run at normal priority for efficiency. */
if (call->currentPacket) {
+#ifdef RX_TRACK_PACKETS
call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
rxi_FreePacket(call->currentPacket);
call->currentPacket = (struct rx_packet *)0;
}
-
+
call->nLeft = call->nFree = call->curlen = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
MUTEX_EXIT(&call->lock);
if (conn->type == RX_CLIENT_CONNECTION) {
- MUTEX_EXIT(&conn->conn_call_lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
conn->flags &= ~RX_CONN_BUSY;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
}
USERPRI;
/*
return 0;
}
-#ifdef DEBUG
+#ifdef RXDEBUG_PACKET
#ifdef KDUMP_RX_LOCK
static struct rx_call_rx_lock *rx_allCallsp = 0;
#else
static struct rx_call *rx_allCallsp = 0;
#endif
-#endif /* DEBUG */
+#endif /* RXDEBUG_PACKET */
/* Allocate a call structure, for the indicated channel of the
* supplied connection. The mode and state of the call must be set by
struct rx_call *nxp; /* Next call pointer, for queue_Scan */
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- dpf(("rxi_NewCall(conn %x, channel %d)\n", conn, channel));
+ dpf(("rxi_NewCall(conn %"AFS_PTR_FMT", channel %d)\n", conn, channel));
/* Grab an existing call structure, or allocate a new one.
* Existing call structures are assumed to have been left reset by
CLEAR_CALL_QUEUE_LOCK(call);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Now, if TQ wasn't cleared earlier, do it now. */
+ rxi_WaitforTQBusy(call);
if (call->flags & RX_CALL_TQ_CLEARME) {
rxi_ClearTransmitQueue(call, 1);
/*queue_Init(&call->tq);*/
rxi_ResetCall(call, 1);
} else {
- call = (struct rx_call *)rxi_Alloc(sizeof(struct rx_call));
+ call = rxi_Alloc(sizeof(struct rx_call));
#ifdef RXDEBUG_PACKET
call->allNextp = rx_allCallsp;
rx_allCallsp = call;
- call->call_id =
+ call->call_id =
#endif /* RXDEBUG_PACKET */
rx_MutexIncrement(rx_stats.nCallStructs, rx_stats_mutex);
-
+
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
MUTEX_ENTER(&call->lock);
* If someone else destroys a connection, they either have no
* call lock held or are going through this section of code.
*/
+ MUTEX_ENTER(&conn->conn_data_lock);
if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) {
- MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount++;
MUTEX_EXIT(&conn->conn_data_lock);
#ifdef RX_ENABLE_LOCKS
#else /* RX_ENABLE_LOCKS */
rxi_DestroyConnection(conn);
#endif /* RX_ENABLE_LOCKS */
+ } else {
+ MUTEX_EXIT(&conn->conn_data_lock);
}
}
afs_int32 rxi_Alloccnt = 0, rxi_Allocsize = 0;
-char *
+void *
rxi_Alloc(size_t size)
{
char *p;
osi_Free(addr, size);
}
-void
-rxi_SetPeerMtu(afs_uint32 host, afs_uint32 port, int mtu)
+void
+rxi_SetPeerMtu(struct rx_peer *peer, afs_uint32 host, afs_uint32 port, int mtu)
{
- struct rx_peer **peer_ptr, **peer_end;
+ struct rx_peer **peer_ptr = NULL, **peer_end = NULL;
+ struct rx_peer *next = NULL;
int hashIndex;
- MUTEX_ENTER(&rx_peerHashTable_lock);
- if (port == 0) {
- for (peer_ptr = &rx_peerHashTable[0], peer_end =
- &rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
- peer_ptr++) {
- struct rx_peer *peer, *next;
- for (peer = *peer_ptr; peer; peer = next) {
- next = peer->next;
- if (host == peer->host) {
- MUTEX_ENTER(&peer->peer_lock);
- peer->ifMTU=MIN(mtu, peer->ifMTU);
- peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
- MUTEX_EXIT(&peer->peer_lock);
- }
- }
- }
+ if (!peer) {
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ if (port == 0) {
+ peer_ptr = &rx_peerHashTable[0];
+ peer_end = &rx_peerHashTable[rx_hashTableSize];
+ next = NULL;
+ resume:
+ for ( ; peer_ptr < peer_end; peer_ptr++) {
+ if (!peer)
+ peer = *peer_ptr;
+ for ( ; peer; peer = next) {
+ next = peer->next;
+ if (host == peer->host)
+ break;
+ }
+ }
+ } else {
+ hashIndex = PEER_HASH(host, port);
+ for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
+ if ((peer->host == host) && (peer->port == port))
+ break;
+ }
+ }
} else {
- struct rx_peer *peer;
- hashIndex = PEER_HASH(host, port);
- for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
- if ((peer->host == host) && (peer->port == port)) {
- MUTEX_ENTER(&peer->peer_lock);
- peer->ifMTU=MIN(mtu, peer->ifMTU);
- peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
- MUTEX_EXIT(&peer->peer_lock);
- }
- }
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ }
+
+ if (peer) {
+ peer->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ MUTEX_ENTER(&peer->peer_lock);
+ /* We don't handle dropping below min, so don't */
+ mtu = MAX(mtu, RX_MIN_PACKET_SIZE);
+ peer->ifMTU=MIN(mtu, peer->ifMTU);
+ peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
+ /* if we tweaked this down, need to tune our peer MTU too */
+ peer->MTU = MIN(peer->MTU, peer->natMTU);
+ /* if we discovered a sub-1500 mtu, degrade */
+ if (peer->ifMTU < OLD_MAX_PACKET_SIZE)
+ peer->maxDgramPackets = 1;
+ /* We no longer have valid peer packet information */
+ if (peer->maxPacketSize-RX_IPUDP_SIZE > peer->ifMTU)
+ peer->maxPacketSize = 0;
+ MUTEX_EXIT(&peer->peer_lock);
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ peer->refCount--;
+ if (host && !port) {
+ peer = next;
+ /* pick up where we left off */
+ goto resume;
+ }
}
MUTEX_EXIT(&rx_peerHashTable_lock);
}
/* Find the peer process represented by the supplied (host,port)
* combination. If there is no appropriate active peer structure, a
- * new one will be allocated and initialized
+ * new one will be allocated and initialized
* The origPeer, if set, is a pointer to a peer structure on which the
* refcount will be be decremented. This is used to replace the peer
* structure hanging off a connection structure */
* server connection is created, it will be created using the supplied
* index, if the index is valid for this service */
struct rx_connection *
-rxi_FindConnection(osi_socket socket, afs_int32 host,
+rxi_FindConnection(osi_socket socket, afs_uint32 host,
u_short port, u_short serviceId, afs_uint32 cid,
afs_uint32 epoch, int type, u_int securityIndex)
{
* this is the first time the packet has been seen */
packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
- dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
+ dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %"AFS_PTR_FMT,
np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
np->header.epoch, np->header.cid, np->header.callNumber,
np->header.seq, np->header.flags, np));
MUTEX_EXIT(&conn->conn_call_lock);
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d",
+ np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
+ np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
+ np->header.flags, np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
*/
if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
struct rx_packet *tp;
-
+
rxi_CallError(call, rx_BusyError);
tp = rxi_SendCallAbort(call, np, 1, 0);
MUTEX_EXIT(&call->lock);
rxi_ResetCall(call, 0);
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d",
+ np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
+ np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
+ np->header.flags, np, np->retryTime.sec, np->retryTime.usec, np->length));
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
* traversing the tq in rxi_Start sending packets out because
* packets may move to the freePacketQueue as result of being here!
* So we drop these packets until we're safely out of the
- * traversing. Really ugly!
+ * traversing. Really ugly!
* For fine grain RX locking, we set the acked field in the
* packets and let rxi_Start remove them from the transmit queue.
*/
/* XXX I'm not sure this is exactly right, since tfirst **IS**
* XXX unacknowledged. I think that this is off-by-one, but
* XXX I don't dare change it just yet, since it will
- * XXX interact badly with the server-restart detection
+ * XXX interact badly with the server-restart detection
* XXX code in receiveackpacket. */
if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
if (rx_stats_active)
* so this will be quite important with very large window sizes.
* Skew is checked against 0 here to avoid any dependence on the type of
* inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
- * true!
+ * true!
* The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
* see CalculateRoundTripTime for an example of how to keep smoothed values.
* I think using a beta of 1/8 is probably appropriate. 93.04.21
struct rx_peer *peer;
peer = conn->peer;
if (skew > peer->inPacketSkew) {
- dpf(("*** In skew changed from %d to %d\n", peer->inPacketSkew,
- skew));
+ dpf(("*** In skew changed from %d to %d\n",
+ peer->inPacketSkew, skew));
peer->inPacketSkew = skew;
}
}
* traversing the tq in rxi_Start sending packets out because
* packets may move to the freePacketQueue as result of being
* here! So we drop these packets until we're safely out of the
- * traversing. Really ugly!
+ * traversing. Really ugly!
* For fine grain RX locking, we set the acked field in the packets
* and let rxi_Start remove the packets from the transmit queue.
*/
if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
return 1;
+
for (i = 0; i < RX_MAXCALLS; i++) {
tcall = aconn->call[i];
if (tcall) {
if (!conn->checkReachEvent) {
conn->refCount++;
conn->checkReachEvent =
- rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn,
+ rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn,
NULL);
}
MUTEX_EXIT(&conn->conn_data_lock);
int newPackets = 0;
int didHardAck = 0;
int haveLast = 0;
- afs_uint32 seq;
+ afs_uint32 seq;
afs_uint32 serial=0, flags=0;
int isFirst;
struct rx_packet *tnp;
rx_MutexIncrement(rx_stats.noPacketBuffersOnRead, rx_stats_mutex);
call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
- dpf(("packet %x dropped on receipt - quota problems", np));
+ dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems", np));
if (rxi_doreclaim)
rxi_ClearReceiveQueue(call);
clock_GetTime(&now);
&& queue_First(&call->rq, rx_packet)->header.seq == seq) {
if (rx_stats_active)
rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
- dpf(("packet %x dropped on receipt - duplicate", np));
+ dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate", np));
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
/* It's the next packet. Stick it on the receive queue
* for this call. Set newPackets to make sure we wake
* the reader once all packets have been processed */
+#ifdef RX_TRACK_PACKETS
np->flags |= RX_PKTFLAG_RQ;
+#endif
queue_Prepend(&call->rq, np);
#ifdef RXDEBUG_PACKET
call->rqc++;
* packet before which to insert the new packet, or at the
* queue head if the queue is empty or the packet should be
* appended. */
+#ifdef RX_TRACK_PACKETS
np->flags |= RX_PKTFLAG_RQ;
+#endif
#ifdef RXDEBUG_PACKET
call->rqc++;
#endif /* RXDEBUG_PACKET */
}
}
- /* We need to send an ack of the packet is out of sequence,
+ /* We need to send an ack of the packet is out of sequence,
* or if an ack was requested by the peer. */
if (seq != prev + 1 || missing) {
ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
MUTEX_EXIT(&conn->conn_data_lock);
}
+#if defined(RXDEBUG) && defined(AFS_NT40_ENV)
static const char *
rx_ack_reason(int reason)
{
return "unknown!!";
}
}
+#endif
/* rxi_ComputePeerNetStats
{
struct rx_peer *peer = call->conn->peer;
- /* Use RTT if not delayed by client. */
- if (ap->reason != RX_ACK_DELAY)
+ /* Use RTT if not delayed by client and
+ * ignore packets that were retransmitted. */
+ if (!(p->flags & RX_PKTFLAG_ACKED) &&
+ ap->reason != RX_ACK_DELAY &&
+ clock_Eq(&p->timeSent, &p->firstSent))
rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
#ifdef ADAPT_WINDOW
rxi_ComputeRate(peer, call, p, np, ap->reason);
int acked;
int nNacked = 0;
int newAckCount = 0;
- u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
+ int pktsize = 0; /* Set if we need to update the peer mtu */
+ int conn_data_locked = 0;
if (rx_stats_active)
rx_MutexIncrement(rx_stats.ackPacketsRead, rx_stats_mutex);
nAcks = MIN((unsigned)nbytes, (unsigned)ap->nAcks);
first = ntohl(ap->firstPacket);
serial = ntohl(ap->serial);
- /* temporarily disabled -- needs to degrade over time
+ /* temporarily disabled -- needs to degrade over time
* skew = ntohs(ap->maxSkew); */
/* Ignore ack packets received out of order */
if (ap->reason == RX_ACK_PING_RESPONSE)
rxi_UpdatePeerReach(conn, call);
+ if (conn->lastPacketSizeSeq) {
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn_data_locked = 1;
+ if ((first > conn->lastPacketSizeSeq) && (conn->lastPacketSize)) {
+ pktsize = conn->lastPacketSize;
+ conn->lastPacketSize = conn->lastPacketSizeSeq = 0;
+ }
+ }
+ if ((ap->reason == RX_ACK_PING_RESPONSE) && (conn->lastPingSizeSer)) {
+ if (!conn_data_locked) {
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn_data_locked = 1;
+ }
+ if ((conn->lastPingSizeSer == serial) && (conn->lastPingSize)) {
+ /* process mtu ping ack */
+ pktsize = conn->lastPingSize;
+ conn->lastPingSizeSer = conn->lastPingSize = 0;
+ }
+ }
+
+ if (conn_data_locked) {
+ MUTEX_EXIT(&conn->conn_data_lock);
+ conn_data_locked = 0;
+ }
#ifdef RXDEBUG
#ifdef AFS_NT40_ENV
if (rxdebug_active) {
len = _snprintf(msg, sizeof(msg),
"tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %u space %u ",
- GetCurrentThreadId(), rx_ack_reason(ap->reason),
+ GetCurrentThreadId(), rx_ack_reason(ap->reason),
ntohl(ap->serial), ntohl(ap->previousPacket),
- (unsigned int)np->header.seq, (unsigned int)skew,
+ (unsigned int)np->header.seq, (unsigned int)skew,
ntohl(ap->firstPacket), ap->nAcks, ntohs(ap->bufferSpace) );
if (nAcks) {
int offset;
- for (offset = 0; offset < nAcks && len < sizeof(msg); offset++)
+ for (offset = 0; offset < nAcks && len < sizeof(msg); offset++)
msg[len++] = (ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*');
}
msg[len++]='\n';
#endif /* AFS_NT40_ENV */
#endif
+ MUTEX_ENTER(&peer->peer_lock);
+ if (pktsize) {
+ /*
+ * Start somewhere. Can't assume we can send what we can receive,
+ * but we are clearly receiving.
+ */
+ if (!peer->maxPacketSize)
+ peer->maxPacketSize = RX_MIN_PACKET_SIZE+RX_IPUDP_SIZE;
+
+ if (pktsize > peer->maxPacketSize) {
+ peer->maxPacketSize = pktsize;
+ if ((pktsize-RX_IPUDP_SIZE > peer->ifMTU)) {
+ peer->ifMTU=pktsize-RX_IPUDP_SIZE;
+ peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
+ rxi_ScheduleGrowMTUEvent(call, 1);
+ }
+ }
+ }
+
/* Update the outgoing packet skew value to the latest value of
* the peer's incoming packet skew value. The ack packet, of
* course, could arrive out of order, but that won't affect things
* much */
- MUTEX_ENTER(&peer->peer_lock);
peer->outPacketSkew = skew;
/* Check for packets that no longer need to be transmitted, and
if (tp->header.seq >= first)
break;
call->tfirst = tp->header.seq + 1;
- if (serial
- && (tp->header.serial == serial || tp->firstSerial == serial))
- rxi_ComputePeerNetStats(call, tp, ap, np);
+ rxi_ComputePeerNetStats(call, tp, ap, np);
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
}
* packets (osi_NetSend) we drop all acks while we're traversing the tq
* in rxi_Start sending packets out because packets may move to the
* freePacketQueue as result of being here! So we drop these packets until
- * we're safely out of the traversing. Really ugly!
+ * we're safely out of the traversing. Really ugly!
* To make it even uglier, if we're using fine grain locking, we can
* set the ack bits in the packets and have rxi_Start remove the packets
* when it's done transmitting.
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
{
queue_Remove(tp);
+#ifdef RX_TRACK_PACKETS
tp->flags &= ~RX_PKTFLAG_TQ;
+#endif
#ifdef RXDEBUG_PACKET
call->tqc--;
#endif /* RXDEBUG_PACKET */
if (tp->header.seq >= first)
#endif /* RX_ENABLE_LOCKS */
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- if (serial
- && (tp->header.serial == serial || tp->firstSerial == serial))
- rxi_ComputePeerNetStats(call, tp, ap, np);
+ rxi_ComputePeerNetStats(call, tp, ap, np);
/* Set the acknowledge flag per packet based on the
* information in the ack packet. An acknowlegded packet can
missing = 1;
}
- /* If packet isn't yet acked, and it has been transmitted at least
- * once, reset retransmit time using latest timeout
- * ie, this should readjust the retransmit timer for all outstanding
+ /*
+ * Following the suggestion of Phil Kern, we back off the peer's
+ * timeout value for future packets until a successful response
+ * is received for an initial transmission.
+ */
+ if (missing && !peer->backedOff) {
+ struct clock c = peer->timeout;
+ struct clock max_to = {3, 0};
+
+ clock_Add(&peer->timeout, &c);
+ if (clock_Gt(&peer->timeout, &max_to))
+ peer->timeout = max_to;
+ peer->backedOff = 1;
+ }
+
+ /* If packet isn't yet acked, and it has been transmitted at least
+ * once, reset retransmit time using latest timeout
+ * ie, this should readjust the retransmit timer for all outstanding
* packets... So we don't just retransmit when we should know better*/
if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
- tp->retryTime = tp->timeSent;
+ tp->retryTime = tp->timeSent;
clock_Add(&tp->retryTime, &peer->timeout);
/* shift by eight because one quarter-sec ~ 256 milliseconds */
clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
if (np->length >= rx_AckDataSize(ap->nAcks) + 2 * sizeof(afs_int32)) {
afs_uint32 tSize;
- /* If the ack packet has a "recommended" size that is less than
+ /* If the ack packet has a "recommended" size that is less than
* what I am using now, reduce my size to match */
- rx_packetread(np, rx_AckDataSize(ap->nAcks) + sizeof(afs_int32),
+ rx_packetread(np, rx_AckDataSize(ap->nAcks) + (int)sizeof(afs_int32),
(int)sizeof(afs_int32), &tSize);
tSize = (afs_uint32) ntohl(tSize);
peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
/* sanity check - peer might have restarted with different params.
- * If peer says "send less", dammit, send less... Peer should never
+ * If peer says "send less", dammit, send less... Peer should never
* be unable to accept packets of the size that prior AFS versions would
* send without asking. */
if (peer->maxMTU != tSize) {
if (np->length == rx_AckDataSize(ap->nAcks) + 3 * sizeof(afs_int32)) {
/* AFS 3.4a */
rx_packetread(np,
- rx_AckDataSize(ap->nAcks) + 2 * sizeof(afs_int32),
+ rx_AckDataSize(ap->nAcks) + 2 * (int)sizeof(afs_int32),
(int)sizeof(afs_int32), &tSize);
tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
if (tSize < call->twind) { /* smaller than our send */
* network MTU confused with the loopback MTU. Calculate the
* maximum MTU here for use in the slow start code below.
*/
- maxMTU = peer->maxMTU;
/* Did peer restart with older RX version? */
if (peer->maxDgramPackets > 1) {
peer->maxDgramPackets = 1;
rx_AckDataSize(ap->nAcks) + 4 * sizeof(afs_int32)) {
/* AFS 3.5 */
rx_packetread(np,
- rx_AckDataSize(ap->nAcks) + 2 * sizeof(afs_int32),
+ rx_AckDataSize(ap->nAcks) + 2 * (int)sizeof(afs_int32),
sizeof(afs_int32), &tSize);
tSize = (afs_uint32) ntohl(tSize);
/*
- * As of AFS 3.5 we set the send window to match the receive window.
+ * As of AFS 3.5 we set the send window to match the receive window.
*/
if (tSize < call->twind) {
call->twind = tSize;
* larger than the natural MTU.
*/
rx_packetread(np,
- rx_AckDataSize(ap->nAcks) + 3 * sizeof(afs_int32),
- sizeof(afs_int32), &tSize);
+ rx_AckDataSize(ap->nAcks) + 3 * (int)sizeof(afs_int32),
+ (int)sizeof(afs_int32), &tSize);
maxDgramPackets = (afs_uint32) ntohl(tSize);
maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
maxDgramPackets =
}
call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
} else if (call->MTU < peer->maxMTU) {
- call->MTU += peer->natMTU;
- call->MTU = MIN(call->MTU, peer->maxMTU);
+ /* don't upgrade if we can't handle it */
+ if ((call->nDgramPackets == 1) && (call->MTU >= peer->ifMTU))
+ call->MTU = peer->ifMTU;
+ else {
+ call->MTU += peer->natMTU;
+ call->MTU = MIN(call->MTU, peer->maxMTU);
+ }
}
call->nAcks = 0;
}
call->flags &= ~RX_CALL_WAIT_PROC;
if (queue_IsOnQueue(call)) {
queue_Remove(call);
-
+
MUTEX_ENTER(&rx_waiting_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_waiting_mutex);
CV_SIGNAL(&sq->cv);
#else
service->nRequestsRunning++;
+ MUTEX_ENTER(&rx_quota_mutex);
if (service->nRequestsRunning <= service->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
+ MUTEX_EXIT(&rx_quota_mutex);
osi_rxWakeup(sq);
#endif
}
call->tqc -=
#endif /* RXDEBUG_PACKET */
rxi_FreePackets(0, &call->tq);
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ }
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->flags &= ~RX_CALL_TQ_CLEARME;
}
{
if (queue_IsNotEmpty(&call->rq)) {
u_short count;
-
+
count = rxi_FreePackets(0, &call->rq);
rx_packetReclaims += count;
#ifdef RXDEBUG_PACKET
call->rqc -= count;
- if ( call->rqc != 0 )
- dpf(("rxi_ClearReceiveQueue call %x rqc %u != 0", call, call->rqc));
+ if ( call->rqc != 0 )
+ dpf(("rxi_ClearReceiveQueue call %"AFS_PTR_FMT" rqc %u != 0", call, call->rqc));
#endif
call->flags &= ~(RX_CALL_RECEIVE_DONE | RX_CALL_HAVE_LAST);
}
if (error) {
int i;
- dpf(("rxi_ConnectionError conn %x error %d", conn, error));
+ dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d", conn, error));
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->challengeEvent)
rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
+ if (conn->natKeepAliveEvent)
+ rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
if (conn->checkReachEvent) {
rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
conn->checkReachEvent = 0;
#ifdef DEBUG
osirx_AssertMine(&call->lock, "rxi_CallError");
#endif
- dpf(("rxi_CallError call %x error %d call->error %d", call, error, call->error));
+ dpf(("rxi_CallError call %"AFS_PTR_FMT" error %d call->error %d", call, error, call->error));
if (call->error)
error = call->error;
#ifdef DEBUG
osirx_AssertMine(&call->lock, "rxi_ResetCall");
#endif
- dpf(("rxi_ResetCall(call %x, newcall %d)\n", call, newcall));
+ dpf(("rxi_ResetCall(call %"AFS_PTR_FMT", newcall %d)\n", call, newcall));
/* Notify anyone who is waiting for asynchronous packet arrival */
if (call->arrivalProc) {
flags = call->flags;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (flags & RX_CALL_TQ_BUSY) {
- call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
- call->flags |= (flags & RX_CALL_TQ_WAIT);
- } else
+ rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- {
- rxi_ClearTransmitQueue(call, 1);
- /* why init the queue if you just emptied it? queue_Init(&call->tq); */
- if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
- dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
- }
- call->flags = 0;
- while (call->tqWaiters) {
-#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&call->cv_tq);
-#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- call->tqWaiters--;
- }
+
+ rxi_ClearTransmitQueue(call, 1);
+ if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+ dpf(("rcall %"AFS_PTR_FMT" has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
}
+ call->flags = 0;
rxi_ClearReceiveQueue(call);
/* why init the queue if you just emptied it? queue_Init(&call->rq); */
-
- if (call->currentPacket) {
- call->currentPacket->flags &= ~RX_PKTFLAG_CP;
- call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
- queue_Prepend(&call->iovq, call->currentPacket);
-#ifdef RXDEBUG_PACKET
- call->iovqc++;
-#endif /* RXDEBUG_PACKET */
- call->currentPacket = (struct rx_packet *)0;
- }
- call->curlen = call->nLeft = call->nFree = 0;
-#ifdef RXDEBUG_PACKET
- call->iovqc -=
-#endif
- rxi_FreePackets(0, &call->iovq);
call->error = 0;
call->twind = call->conn->twind[call->channel];
if (queue_IsOnQueue(call)) {
queue_Remove(call);
if (flags & RX_CALL_WAIT_PROC) {
-
+
MUTEX_ENTER(&rx_waiting_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_waiting_mutex);
* higher level yet (unless, of course, the sender decides to abort
* the call altogether). Any of p, seq, serial, pflags, or reason may
* be set to zero without ill effect. That is, if they are zero, they
- * will not convey any information.
+ * will not convey any information.
* NOW there is a trailer field, after the ack where it will safely be
- * ignored by mundanes, which indicates the maximum size packet this
+ * ignored by mundanes, which indicates the maximum size packet this
* host can swallow. */
/*
- struct rx_packet *optionalPacket; use to send ack (or null)
- int seq; Sequence number of the packet we are acking
- int serial; Serial number of the packet
- int pflags; Flags field from packet header
- int reason; Reason an acknowledge was prompted
+ struct rx_packet *optionalPacket; use to send ack (or null)
+ int seq; Sequence number of the packet we are acking
+ int serial; Serial number of the packet
+ int pflags; Flags field from packet header
+ int reason; Reason an acknowledge was prompted
*/
struct rx_packet *
struct rx_packet *p;
u_char offset;
afs_int32 templ;
+ afs_uint32 padbytes = 0;
#ifdef RX_ENABLE_TSFPQ
struct rx_ts_info_t * rx_ts_info;
#endif
call->conn->rwind[call->channel] = call->rwind = rx_maxReceiveWindow;
}
+ /* Don't attempt to grow MTU if this is a critical ping */
+ if (reason == RX_ACK_MTU) {
+ /* keep track of per-call attempts, if we're over max, do in small
+ * otherwise in larger? set a size to increment by, decrease
+ * on failure, here?
+ */
+ if (call->conn->peer->maxPacketSize &&
+ (call->conn->peer->maxPacketSize < OLD_MAX_PACKET_SIZE
+ +RX_IPUDP_SIZE))
+ padbytes = call->conn->peer->maxPacketSize+16;
+ else
+ padbytes = call->conn->peer->maxMTU + 128;
+
+ /* do always try a minimum size ping */
+ padbytes = MAX(padbytes, RX_MIN_PACKET_SIZE+RX_IPUDP_SIZE+4);
+
+ /* subtract the ack payload */
+ padbytes -= (rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32));
+ reason = RX_ACK_PING;
+ }
+
call->nHardAcks = 0;
call->nSoftAcks = 0;
if (call->rnext > call->lastAcked)
}
#endif
- templ =
+ templ = padbytes +
rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32) -
rx_GetDataSize(p);
if (templ > 0) {
ap->previousPacket = htonl(call->rprev); /* Previous packet received */
/* No fear of running out of ack packet here because there can only be at most
- * one window full of unacknowledged packets. The window size must be constrained
+ * one window full of unacknowledged packets. The window size must be constrained
* to be less than the maximum ack size, of course. Also, an ack should always
* fit into a single packet -- it should not ever be fragmented. */
for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
#ifdef ADAPT_WINDOW
clock_GetTime(&call->pingRequestTime);
#endif
+ if (padbytes) {
+ p->length = padbytes +
+ rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32);
+
+ while (padbytes--)
+ /* not fast but we can potentially use this if truncated
+ * fragments are delivered to figure out the mtu.
+ */
+ rx_packetwrite(p, rx_AckDataSize(offset) + 4 *
+ sizeof(afs_int32), sizeof(afs_int32),
+ &padbytes);
+ }
}
if (call->conn->type == RX_CLIENT_CONNECTION)
p->header.flags |= RX_CLIENT_INITIATED;
len = _snprintf(msg, sizeof(msg),
"tid[%d] SACK: reason %s serial %u previous %u seq %u first %u acks %u space %u ",
- GetCurrentThreadId(), rx_ack_reason(ap->reason),
+ GetCurrentThreadId(), rx_ack_reason(ap->reason),
ntohl(ap->serial), ntohl(ap->previousPacket),
(unsigned int)p->header.seq, ntohl(ap->firstPacket),
ap->nAcks, ntohs(ap->bufferSpace) );
if (ap->nAcks) {
int offset;
- for (offset = 0; offset < ap->nAcks && len < sizeof(msg); offset++)
+ for (offset = 0; offset < ap->nAcks && len < sizeof(msg); offset++)
msg[len++] = (ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*');
}
msg[len++]='\n';
peer->nSent += len;
if (resending)
peer->reSends += len;
- if (rx_stats_active)
- rx_MutexAdd(rx_stats.dataPacketsSent, len, rx_stats_mutex);
MUTEX_EXIT(&peer->peer_lock);
+ if (rx_stats_active) {
+ if (resending)
+ rx_MutexAdd(rx_stats.dataPacketsReSent, len, rx_stats_mutex);
+ else
+ rx_MutexAdd(rx_stats.dataPacketsSent, len, rx_stats_mutex);
+ }
+
if (list[len - 1]->header.flags & RX_LAST_PACKET) {
lastPacket = 1;
}
* packet until the congestion window reaches the ack rate. */
if (list[i]->header.serial) {
requestAck = 1;
- if (rx_stats_active)
- rx_MutexIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex);
} else {
/* improved RTO calculation- not Karn */
list[i]->firstSent = *now;
/* Update last send time for this call (for keep-alive
* processing), and for the connection (so that we can discover
* idle connections) */
- call->lastSendData = conn->lastSendTime = call->lastSendTime = clock_Sec();
+ conn->lastSendTime = call->lastSendTime = clock_Sec();
+ /* Let a set of retransmits trigger an idle timeout */
+ if (!resending)
+ call->lastSendData = call->lastSendTime;
}
/* When sending packets we need to follow these rules:
#ifdef RX_ENABLE_LOCKS
/* Call rxi_Start, below, but with the call lock held. */
void
-rxi_StartUnlocked(struct rxevent *event,
+rxi_StartUnlocked(struct rxevent *event,
void *arg0, void *arg1, int istack)
{
struct rx_call *call = arg0;
-
+
MUTEX_ENTER(&call->lock);
rxi_Start(event, call, arg1, istack);
MUTEX_EXIT(&call->lock);
* better optimized for new packets, the usual case, now that we've
* got rid of queues of send packets. XXXXXXXXXXX */
void
-rxi_Start(struct rxevent *event,
+rxi_Start(struct rxevent *event,
void *arg0, void *arg1, int istack)
{
struct rx_call *call = arg0;
-
+
struct rx_packet *p;
struct rx_packet *nxp; /* Next pointer for queue_Scan */
struct rx_peer *peer = call->conn->peer;
* some of them have been retransmitted more times than more
* recent additions.
* Do a dance to avoid blocking after setting now. */
- clock_Zero(&retryTime);
MUTEX_ENTER(&peer->peer_lock);
- clock_Add(&retryTime, &peer->timeout);
+ retryTime = peer->timeout;
MUTEX_EXIT(&peer->peer_lock);
+
clock_GetTime(&now);
clock_Add(&retryTime, &now);
usenow = now;
if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
/* We shouldn't be sending packets if a thread is waiting
* to initiate congestion recovery */
+ dpf(("call %d waiting to initiate fast recovery\n",
+ *(call->callNumber)));
break;
}
if ((nXmitPackets)
&& (call->flags & RX_CALL_FAST_RECOVER)) {
/* Only send one packet during fast recovery */
+ dpf(("call %d restricted to one packet per send during fast recovery\n",
+ *(call->callNumber)));
break;
}
+#ifdef RX_TRACK_PACKETS
if ((p->flags & RX_PKTFLAG_FREE)
|| (!queue_IsEnd(&call->tq, nxp)
&& (nxp->flags & RX_PKTFLAG_FREE))
|| (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
osi_Panic("rxi_Start: xmit queue clobbered");
}
+#endif
if (p->flags & RX_PKTFLAG_ACKED) {
/* Since we may block, don't trust this */
usenow.sec = usenow.usec = 0;
/* Note: if we're waiting for more window space, we can
* still send retransmits; hence we don't return here, but
* break out to schedule a retransmit event */
- dpf(("call %d waiting for window",
- *(call->callNumber)));
+ dpf(("call %d waiting for window (seq %d, twind %d, nSoftAcked %d, cwind %d)\n",
+ *(call->callNumber), p->header.seq, call->twind, call->nSoftAcked,
+ call->cwind));
break;
}
/* Transmit the packet if it needs to be sent. */
if (!clock_Lt(&now, &p->retryTime)) {
if (nXmitPackets == maxXmitPackets) {
- rxi_SendXmitList(call, xmitList, nXmitPackets,
- istack, &now, &retryTime,
+ rxi_SendXmitList(call, xmitList, nXmitPackets,
+ istack, &now, &retryTime,
resending);
- osi_Free(xmitList, maxXmitPackets *
+ osi_Free(xmitList, maxXmitPackets *
sizeof(struct rx_packet *));
goto restart;
}
+ dpf(("call %d xmit packet %"AFS_PTR_FMT" now %u.%06u retryTime %u.%06u nextRetry %u.%06u\n",
+ *(call->callNumber), p,
+ now.sec, now.usec,
+ p->retryTime.sec, p->retryTime.usec,
+ retryTime.sec, retryTime.usec));
xmitList[nXmitPackets++] = p;
}
}
if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
call->flags &= ~RX_CALL_TQ_BUSY;
if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
- dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ dpf(("call %"AFS_PTR_FMT" has %d waiters and flags %d\n",
+ call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start start");
CV_BROADCAST(&call->cv_tq);
rx_MutexIncrement(rx_tq_debug.rxi_start_aborted, rx_stats_mutex);
call->flags &= ~RX_CALL_TQ_BUSY;
if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
- dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ dpf(("call error %d while xmit %p has %d waiters and flags %d\n",
+ call->error, call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start middle");
CV_BROADCAST(&call->cv_tq);
if (p->header.seq < call->tfirst
&& (p->flags & RX_PKTFLAG_ACKED)) {
queue_Remove(p);
+#ifdef RX_TRACK_PACKETS
p->flags &= ~RX_PKTFLAG_TQ;
+#endif
#ifdef RXDEBUG_PACKET
call->tqc--;
#endif
#ifdef RX_ENABLE_LOCKS
CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow,
+ rxevent_PostNow2(&retryTime, &usenow,
rxi_StartUnlocked,
(void *)call, 0, istack);
#else /* RX_ENABLE_LOCKS */
call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow, rxi_Start,
+ rxevent_PostNow2(&retryTime, &usenow, rxi_Start,
(void *)call, 0, istack);
#endif /* RX_ENABLE_LOCKS */
}
*/
call->flags &= ~RX_CALL_TQ_BUSY;
if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
- dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ dpf(("call %"AFS_PTR_FMT" has %d waiters and flags %d\n",
+ call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start end");
CV_BROADCAST(&call->cv_tq);
/* Update last send time for this call (for keep-alive
* processing), and for the connection (so that we can discover
* idle connections) */
- conn->lastSendTime = call->lastSendTime = clock_Sec();
- /* Don't count keepalives here, so idleness can be tracked. */
- if ((p->header.type != RX_PACKET_TYPE_ACK) || (((struct rx_ackPacket *)rx_DataOf(p))->reason != RX_ACK_PING))
- call->lastSendData = call->lastSendTime;
+ if ((p->header.type != RX_PACKET_TYPE_ACK) ||
+ (((struct rx_ackPacket *)rx_DataOf(p))->reason == RX_ACK_PING) ||
+ (p->length <= (rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32))))
+ {
+ conn->lastSendTime = call->lastSendTime = clock_Sec();
+ /* Don't count keepalive ping/acks here, so idleness can be tracked. */
+ if ((p->header.type != RX_PACKET_TYPE_ACK) ||
+ ((((struct rx_ackPacket *)rx_DataOf(p))->reason != RX_ACK_PING) &&
+ (((struct rx_ackPacket *)rx_DataOf(p))->reason !=
+ RX_ACK_PING_RESPONSE)))
+ call->lastSendData = call->lastSendTime;
+ }
}
-
/* Check if a call needs to be destroyed. Called by keep-alive code to ensure
* that things are fine. Also called periodically to guarantee that nothing
* falls through the cracks (e.g. (error + dally) connections have keepalive
struct rx_connection *conn = call->conn;
afs_uint32 now;
afs_uint32 deadTime;
+ int cerror = 0;
+ int newmtu = 0;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (call->flags & RX_CALL_TQ_BUSY) {
netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
ip_stack_t *ipst = ns->netstack_ip;
#endif
- ire = ire_cache_lookup(call->conn->peer->host
+ ire = ire_cache_lookup(conn->peer->host
#if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
, ALL_ZONES
#if defined(AFS_SUN510_ENV) && (defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID))
#endif
#endif
);
-
+
if (ire && ire->ire_max_frag > 0)
- rxi_SetPeerMtu(call->conn->peer->host, 0, ire->ire_max_frag);
+ rxi_SetPeerMtu(NULL, conn->peer->host, 0,
+ ire->ire_max_frag);
#if defined(GLOBAL_NETSTACKID)
netstack_rele(ns);
#endif
#endif
#endif /* ADAPT_PMTU */
- rxi_CallError(call, RX_CALL_DEAD);
- return -1;
+ cerror = RX_CALL_DEAD;
+ goto mtuout;
} else {
#ifdef RX_ENABLE_LOCKS
/* Cancel pending events */
}
/* see if we have a non-activity timeout */
if (call->startWait && conn->idleDeadTime
- && ((call->startWait + conn->idleDeadTime) < now)) {
+ && ((call->startWait + conn->idleDeadTime) < now) &&
+ (call->flags & RX_CALL_READER_WAIT)) {
if (call->state == RX_STATE_ACTIVE) {
- rxi_CallError(call, RX_CALL_TIMEOUT);
- return -1;
+ cerror = RX_CALL_TIMEOUT;
+ goto mtuout;
}
}
if (call->lastSendData && conn->idleDeadTime && (conn->idleDeadErr != 0)
&& ((call->lastSendData + conn->idleDeadTime) < now)) {
if (call->state == RX_STATE_ACTIVE) {
- rxi_CallError(call, conn->idleDeadErr);
- return -1;
+ cerror = conn->idleDeadErr;
+ goto mtuout;
}
}
/* see if we have a hard timeout */
return -1;
}
return 0;
+mtuout:
+ if (conn->msgsizeRetryErr && cerror != RX_CALL_TIMEOUT) {
+ int oldMTU = conn->peer->ifMTU;
+
+ /* if we thought we could send more, perhaps things got worse */
+ if (call->conn->peer->maxPacketSize > conn->lastPacketSize)
+ /* maxpacketsize will be cleared in rxi_SetPeerMtu */
+ newmtu = MAX(conn->peer->maxPacketSize-RX_IPUDP_SIZE,
+ conn->lastPacketSize-(128+RX_IPUDP_SIZE));
+ else
+ newmtu = conn->lastPacketSize-(128+RX_IPUDP_SIZE);
+
+ /* minimum capped in SetPeerMtu */
+ rxi_SetPeerMtu(conn->peer, 0, 0, newmtu);
+
+ /* clean up */
+ conn->lastPacketSize = 0;
+
+ /* needed so ResetCall doesn't clobber us. */
+ call->MTU = conn->peer->ifMTU;
+
+ /* if we never succeeded, let the error pass out as-is */
+ if (conn->peer->maxPacketSize && oldMTU != conn->peer->ifMTU)
+ cerror = conn->msgsizeRetryErr;
+
+ }
+ rxi_CallError(call, cerror);
+ return -1;
}
+void
+rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1, void *dummy)
+{
+ struct rx_connection *conn = arg1;
+ struct rx_header theader;
+ char tbuffer[1500];
+ struct sockaddr_in taddr;
+ char *tp;
+ char a[1] = { 0 };
+ struct iovec tmpiov[2];
+ osi_socket socket =
+ (conn->type ==
+ RX_CLIENT_CONNECTION ? rx_socket : conn->service->socket);
+
+
+ tp = &tbuffer[sizeof(struct rx_header)];
+ taddr.sin_family = AF_INET;
+ taddr.sin_port = rx_PortOf(rx_PeerOf(conn));
+ taddr.sin_addr.s_addr = rx_HostOf(rx_PeerOf(conn));
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
+ taddr.sin_len = sizeof(struct sockaddr_in);
+#endif
+ memset(&theader, 0, sizeof(theader));
+ theader.epoch = htonl(999);
+ theader.cid = 0;
+ theader.callNumber = 0;
+ theader.seq = 0;
+ theader.serial = 0;
+ theader.type = RX_PACKET_TYPE_VERSION;
+ theader.flags = RX_LAST_PACKET;
+ theader.serviceId = 0;
+
+ memcpy(tbuffer, &theader, sizeof(theader));
+ memcpy(tp, &a, sizeof(a));
+ tmpiov[0].iov_base = tbuffer;
+ tmpiov[0].iov_len = 1 + sizeof(struct rx_header);
+
+ osi_NetSend(socket, &taddr, tmpiov, 1, 1 + sizeof(struct rx_header), 1);
+
+ MUTEX_ENTER(&conn->conn_data_lock);
+ /* Only reschedule ourselves if the connection would not be destroyed */
+ if (conn->refCount <= 1) {
+ conn->natKeepAliveEvent = NULL;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ rx_DestroyConnection(conn); /* drop the reference for this */
+ } else {
+ conn->natKeepAliveEvent = NULL;
+ conn->refCount--; /* drop the reference for this */
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ MUTEX_EXIT(&conn->conn_data_lock);
+ }
+}
+
+void
+rxi_ScheduleNatKeepAliveEvent(struct rx_connection *conn)
+{
+ if (!conn->natKeepAliveEvent && conn->secondsUntilNatPing) {
+ struct clock when, now;
+ clock_GetTime(&now);
+ when = now;
+ when.sec += conn->secondsUntilNatPing;
+ conn->refCount++; /* hold a reference for this */
+ conn->natKeepAliveEvent =
+ rxevent_PostNow(&when, &now, rxi_NatKeepAliveEvent, conn, 0);
+ }
+}
+
+void
+rx_SetConnSecondsUntilNatPing(struct rx_connection *conn, afs_int32 seconds)
+{
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->secondsUntilNatPing = seconds;
+ if (seconds != 0)
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ MUTEX_EXIT(&conn->conn_data_lock);
+}
+
+void
+rxi_NatKeepAliveOn(struct rx_connection *conn)
+{
+ MUTEX_ENTER(&conn->conn_data_lock);
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ MUTEX_EXIT(&conn->conn_data_lock);
+}
/* When a call is in progress, this routine is called occasionally to
* make sure that some traffic has arrived (or been sent to) the peer.
conn = call->conn;
if ((now - call->lastSendTime) > conn->secondsUntilPing) {
/* Don't try to send keepalives if there is unacknowledged data */
- /* the rexmit code should be good enough, this little hack
+ /* the rexmit code should be good enough, this little hack
* doesn't quite work XXX */
(void)rxi_SendAck(call, NULL, 0, RX_ACK_PING, 0);
}
MUTEX_EXIT(&call->lock);
}
+/* Does what's on the nameplate. */
+void
+rxi_GrowMTUEvent(struct rxevent *event, void *arg1, void *dummy)
+{
+ struct rx_call *call = arg1;
+ struct rx_connection *conn;
+
+ MUTEX_ENTER(&call->lock);
+ CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
+ if (event == call->growMTUEvent)
+ call->growMTUEvent = NULL;
+
+#ifdef RX_ENABLE_LOCKS
+ if (rxi_CheckCall(call, 0)) {
+ MUTEX_EXIT(&call->lock);
+ return;
+ }
+#else /* RX_ENABLE_LOCKS */
+ if (rxi_CheckCall(call))
+ return;
+#endif /* RX_ENABLE_LOCKS */
+
+ /* Don't bother with dallying calls */
+ if (call->state == RX_STATE_DALLY) {
+ MUTEX_EXIT(&call->lock);
+ return;
+ }
+
+ conn = call->conn;
+
+ /*
+ * keep being scheduled, just don't do anything if we're at peak,
+ * or we're not set up to be properly handled (idle timeout required)
+ */
+ if ((conn->peer->maxPacketSize != 0) &&
+ (conn->peer->natMTU < RX_MAX_PACKET_SIZE) &&
+ (conn->idleDeadErr))
+ (void)rxi_SendAck(call, NULL, 0, RX_ACK_MTU, 0);
+ rxi_ScheduleGrowMTUEvent(call, 0);
+ MUTEX_EXIT(&call->lock);
+}
void
rxi_ScheduleKeepAliveEvent(struct rx_call *call)
}
}
+void
+rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs)
+{
+ if (!call->growMTUEvent) {
+ struct clock when, now;
+
+ clock_GetTime(&now);
+ when = now;
+ if (!secs) {
+ if (call->conn->secondsUntilPing)
+ secs = (6*call->conn->secondsUntilPing)-1;
+
+ if (call->conn->secondsUntilDead)
+ secs = MIN(secs, (call->conn->secondsUntilDead-1));
+ }
+
+ when.sec += secs;
+ CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
+ call->growMTUEvent =
+ rxevent_PostNow(&when, &now, rxi_GrowMTUEvent, call, 0);
+ }
+}
+
/* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
void
rxi_KeepAliveOn(struct rx_call *call)
rxi_ScheduleKeepAliveEvent(call);
}
+void
+rxi_GrowMTUOn(struct rx_call *call)
+{
+ struct rx_connection *conn = call->conn;
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->lastPingSizeSer = conn->lastPingSize = 0;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ rxi_ScheduleGrowMTUEvent(call, 1);
+}
+
/* This routine is called to send connection abort messages
* that have been delayed to throttle looping clients. */
void
void *arg1, void *unused)
{
struct rx_connection *conn = arg1;
-
+
afs_int32 error;
struct rx_packet *packet;
/* This routine is called to send call abort messages
* that have been delayed to throttle looping clients. */
void
-rxi_SendDelayedCallAbort(struct rxevent *event,
+rxi_SendDelayedCallAbort(struct rxevent *event,
void *arg1, void *dummy)
{
struct rx_call *call = arg1;
-
+
afs_int32 error;
struct rx_packet *packet;
* issues a challenge to the client, which is obtained from the
* security object associated with the connection */
void
-rxi_ChallengeEvent(struct rxevent *event,
+rxi_ChallengeEvent(struct rxevent *event,
void *arg0, void *arg1, int tries)
{
struct rx_connection *conn = arg0;
-
+
conn->challengeEvent = NULL;
if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
struct rx_packet *packet;
return; /* somebody set the clock back, don't count this time. */
}
clock_Sub(rttp, sentp);
+ dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rttp=%d.%06d sec)\n",
+ p->header.callNumber, p, rttp->sec, rttp->usec));
+
+ if (rttp->sec == 0 && rttp->usec == 0) {
+ /*
+ * The actual round trip time is shorter than the
+ * clock_GetTime resolution. It is most likely 1ms or 100ns.
+ * Since we can't tell which at the moment we will assume 1ms.
+ */
+ rttp->usec = 1000;
+ }
+
if (rx_stats_active) {
MUTEX_ENTER(&rx_stats_mutex);
if (clock_Lt(rttp, &rx_stats.minRtt))
* srtt is stored as fixed point with 3 bits after the binary
* point (i.e., scaled by 8). The following magic is
* equivalent to the smoothing algorithm in rfc793 with an
- * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
- * srtt*8 = srtt*8 + rtt - srtt
- * srtt = srtt + rtt/8 - srtt/8
+ * alpha of .875 (srtt' = rtt/8 + srtt*7/8 in fixed point).
+ * srtt'*8 = rtt + srtt*7
+ * srtt'*8 = srtt*8 + rtt - srtt
+ * srtt' = srtt + rtt/8 - srtt/8
+ * srtt' = srtt + (rtt - srtt)/8
*/
- delta = MSEC(rttp) - (peer->rtt >> 3);
- peer->rtt += delta;
+ delta = _8THMSEC(rttp) - peer->rtt;
+ peer->rtt += (delta >> 3);
/*
* We accumulate a smoothed rtt variance (actually, a smoothed
* rttvar is stored as
* fixed point with 2 bits after the binary point (scaled by
* 4). The following is equivalent to rfc793 smoothing with
- * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4). This
- * replaces rfc793's wired-in beta.
+ * an alpha of .75 (rttvar' = rttvar*3/4 + |delta| / 4).
+ * rttvar'*4 = rttvar*3 + |delta|
+ * rttvar'*4 = rttvar*4 + |delta| - rttvar
+ * rttvar' = rttvar + |delta|/4 - rttvar/4
+ * rttvar' = rttvar + (|delta| - rttvar)/4
+ * This replaces rfc793's wired-in beta.
* dev*4 = dev*4 + (|actual - expected| - dev)
*/
if (delta < 0)
delta = -delta;
- delta -= (peer->rtt_dev >> 2);
- peer->rtt_dev += delta;
+ delta -= (peer->rtt_dev << 1);
+ peer->rtt_dev += (delta >> 3);
} else {
/* I don't have a stored RTT so I start with this value. Since I'm
* probably just starting a call, and will be pushing more data down
- * this, I expect congestion to increase rapidly. So I fudge a
+ * this, I expect congestion to increase rapidly. So I fudge a
* little, and I set deviance to half the rtt. In practice,
* deviance tends to approach something a little less than
* half the smoothed rtt. */
- peer->rtt = (MSEC(rttp) << 3) + 8;
+ peer->rtt = _8THMSEC(rttp) + 8;
peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
}
- /* the timeout is RTT + 4*MDEV + 0.35 sec This is because one end or
- * the other of these connections is usually in a user process, and can
- * be switched and/or swapped out. So on fast, reliable networks, the
- * timeout would otherwise be too short.
- */
- rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 350;
+ /* the timeout is RTT + 4*MDEV but no less than rx_minPeerTimeout msec.
+ * This is because one end or the other of these connections is usually
+ * in a user process, and can be switched and/or swapped out. So on fast,
+ * reliable networks, the timeout would otherwise be too short. */
+ rtt_timeout = MAX(((peer->rtt >> 3) + peer->rtt_dev), rx_minPeerTimeout);
clock_Zero(&(peer->timeout));
clock_Addmsec(&(peer->timeout), rtt_timeout);
- dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n", MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2, (peer->timeout.sec), (peer->timeout.usec)));
+ /* Reset the backedOff flag since we just computed a new timeout value */
+ peer->backedOff = 0;
+
+ dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%06d sec)\n",
+ p->header.callNumber, p, MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2, (peer->timeout.sec), (peer->timeout.usec)));
}
{
struct rx_peer **peer_ptr, **peer_end;
int code;
- MUTEX_ENTER(&rx_rpc_stats);
- MUTEX_ENTER(&rx_peerHashTable_lock);
+
+ /*
+ * Why do we need to hold the rx_peerHashTable_lock across
+ * the incrementing of peer_ptr since the rx_peerHashTable
+ * array is not changing? We don't.
+ *
+ * By dropping the lock periodically we can permit other
+ * activities to be performed while a rxi_ReapConnections
+ * call is in progress. The goal of reap connections
+ * is to clean up quickly without causing large amounts
+ * of contention. Therefore, it is important that global
+ * mutexes not be held for extended periods of time.
+ */
for (peer_ptr = &rx_peerHashTable[0], peer_end =
&rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
peer_ptr++) {
struct rx_peer *peer, *next, *prev;
- for (prev = peer = *peer_ptr; peer; peer = next) {
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ for (prev = peer = *peer_ptr; peer; peer = next) {
next = peer->next;
code = MUTEX_TRYENTER(&peer->peer_lock);
if ((code) && (peer->refCount == 0)
&& ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
+
+ /*
+ * now know that this peer object is one to be
+ * removed from the hash table. Once it is removed
+ * it can't be referenced by other threads.
+ * Lets remove it first and decrement the struct
+ * nPeerStructs count.
+ */
+ if (peer == *peer_ptr) {
+ *peer_ptr = next;
+ prev = next;
+ } else
+ prev->next = next;
+
+ if (rx_stats_active)
+ rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
+
+ /*
+ * Now if we hold references on 'prev' and 'next'
+ * we can safely drop the rx_peerHashTable_lock
+ * while we destroy this 'peer' object.
+ */
+ if (next)
+ next->refCount++;
+ if (prev)
+ prev->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
MUTEX_EXIT(&peer->peer_lock);
MUTEX_DESTROY(&peer->peer_lock);
for (queue_Scan
sizeof(rx_function_entry_v1_t);
rxi_Free(rpc_stat, space);
+
+ MUTEX_ENTER(&rx_rpc_stats);
rxi_rpc_peer_stat_cnt -= num_funcs;
+ MUTEX_EXIT(&rx_rpc_stats);
}
rxi_FreePeer(peer);
- if (rx_stats_active)
- rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
- if (peer == *peer_ptr) {
- *peer_ptr = next;
- prev = next;
- } else
- prev->next = next;
+
+ /*
+ * Regain the rx_peerHashTable_lock and
+ * decrement the reference count on 'prev'
+ * and 'next'.
+ */
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ if (next)
+ next->refCount--;
+ if (prev)
+ prev->refCount--;
} else {
if (code) {
MUTEX_EXIT(&peer->peer_lock);
prev = peer;
}
}
+ MUTEX_EXIT(&rx_peerHashTable_lock);
}
- MUTEX_EXIT(&rx_peerHashTable_lock);
- MUTEX_EXIT(&rx_rpc_stats);
}
/* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
} else {
return;
}
- xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
+ xferSize = rx_AckDataSize(rx_maxSendWindow) + RX_HEADER_SIZE;
break;
default:
return;
}
- dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, rtt %u, ps %u)", ntohl(peer->host), ntohs(peer->port), (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"), xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt, peer->ifMTU));
+ dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %d.%06d, rtt %u, ps %u)",
+ ntohl(peer->host), ntohs(peer->port), (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
+ xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt, peer->ifMTU));
/* Track only packets that are big enough. */
if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
* one packet exchange */
if (clock_Gt(&newTO, &peer->timeout)) {
- dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, ps %u)", ntohl(peer->host), ntohs(peer->port), peer->timeout.sec, peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt, peer->packetSize));
+ dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)",
+ ntohl(peer->host), ntohs(peer->port), peer->timeout.sec, peer->timeout.usec,
+ newTO.sec, newTO.usec, peer->smRtt));
peer->timeout = newTO;
}
/* Now, convert to the number of full packets that could fit in a
* reasonable fraction of that interval */
minTime /= (peer->smRtt << 1);
+ minTime = MAX(minTime, rx_minPeerTimeout);
xferSize = minTime; /* (make a copy) */
/* Now clamp the size to reasonable bounds. */
if (minTime <= 1)
minTime = 1;
- else if (minTime > rx_Window)
- minTime = rx_Window;
+ else if (minTime > rx_maxSendWindow)
+ minTime = rx_maxSendWindow;
/* if (minTime != peer->maxWindow) {
- dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
+ dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)",
ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
- peer->timeout.sec, peer->timeout.usec, peer->smRtt,
- peer->packetSize));
+ peer->timeout.sec, peer->timeout.usec, peer->smRtt));
peer->maxWindow = minTime;
- elide... call->twind = minTime;
+ elide... call->twind = minTime;
}
*/
/* Cut back on the peer timeout if it had earlier grown unreasonably.
* Discern this by calculating the timeout necessary for rx_Window
* packets. */
- if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
+ if ((xferSize > rx_maxSendWindow) && (peer->timeout.sec >= 3)) {
/* calculate estimate for transmission interval in milliseconds */
- minTime = rx_Window * peer->smRtt;
+ minTime = rx_maxSendWindow * peer->smRtt;
if (minTime < 1000) {
- dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, ps %u)",
+ dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)",
ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
- peer->timeout.usec, peer->smRtt, peer->packetSize));
+ peer->timeout.usec, peer->smRtt));
newTO.sec = 0; /* cut back on timeout by half a second */
newTO.usec = 500000;
va_end(ap);
#else
struct clock now;
-
+
va_start(ap, format);
clock_GetTime(&now);
- fprintf(rx_Log, " %u.%.3u:", (unsigned int)now.sec,
- (unsigned int)now.usec / 1000);
+ fprintf(rx_Log, " %d.%06d:", (unsigned int)now.sec,
+ (unsigned int)now.usec);
vfprintf(rx_Log, format, ap);
putc('\n', rx_Log);
va_end(ap);
rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size,
afs_int32 freePackets, char version)
{
-#ifdef RXDEBUG
int i;
if (size != sizeof(struct rx_statistics)) {
fprintf(file,
- "Unexpected size of stats structure: was %d, expected %lud\n",
+ "Unexpected size of stats structure: was %d, expected %" AFS_SIZET_FMT "\n",
size, sizeof(struct rx_statistics));
}
#if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
fprintf(file, " %d clock updates\n", clock_nUpdates);
#endif
-#else
- fprintf(file, "ERROR: compiled without RXDEBUG\n");
-#endif
}
/* for backward compatibility */
void
rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
{
- fprintf(file, "Peer %x.%d. " "Burst size %d, " "burst wait %u.%d.\n",
- ntohl(peer->host), (int)peer->port, (int)peer->burstSize,
+ fprintf(file, "Peer %x.%d. " "Burst size %d, " "burst wait %d.%06d.\n",
+ ntohl(peer->host), (int)ntohs(peer->port), (int)peer->burstSize,
(int)peer->burstWait.sec, (int)peer->burstWait.usec);
fprintf(file,
#define UNLOCK_RX_DEBUG
#endif /* AFS_PTHREAD_ENV */
-#ifdef RXDEBUG
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
static int
MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
u_char type, void *inputData, size_t inputLength,
void *outputData, size_t outputLength)
{
static afs_int32 counter = 100;
- time_t waitTime, waitCount, startTime;
+ time_t waitTime, waitCount;
struct rx_header theader;
char tbuffer[1500];
afs_int32 code;
fd_set imask;
char *tp;
- startTime = time(0);
waitTime = 1;
waitCount = 5;
LOCK_RX_DEBUG;
tv_delta.tv_sec = tv_wake.tv_sec;
tv_delta.tv_usec = tv_wake.tv_usec;
gettimeofday(&tv_now, 0);
-
+
if (tv_delta.tv_usec < tv_now.tv_usec) {
/* borrow */
tv_delta.tv_usec += 1000000;
tv_delta.tv_sec--;
}
tv_delta.tv_usec -= tv_now.tv_usec;
-
+
if (tv_delta.tv_sec < tv_now.tv_sec) {
/* time expired */
break;
}
tv_delta.tv_sec -= tv_now.tv_sec;
-
+
+#ifdef AFS_NT40_ENV
+ code = select(0, &imask, 0, 0, &tv_delta);
+#else /* AFS_NT40_ENV */
code = select(socket + 1, &imask, 0, 0, &tv_delta);
+#endif /* AFS_NT40_ENV */
if (code == 1 && FD_ISSET(socket, &imask)) {
/* now receive a packet */
faddrLen = sizeof(struct sockaddr_in);
code =
recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
(struct sockaddr *)&faddr, &faddrLen);
-
+
if (code > 0) {
memcpy(&theader, tbuffer, sizeof(struct rx_header));
if (counter == ntohl(theader.callNumber))
}
waitTime <<= 1;
}
-
+
success:
code -= sizeof(struct rx_header);
if (code > outputLength)
afs_uint16 remotePort, struct rx_debugStats * stat,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
- afs_int32 *lp = (afs_int32 *) stat;
*supportedValues = 0;
in.type = htonl(RX_DEBUGI_GETSTATS);
stat->nWaited = ntohl(stat->nWaited);
stat->nPackets = ntohl(stat->nPackets);
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}
afs_uint16 remotePort, struct rx_statistics * stat,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
afs_int32 *lp = (afs_int32 *) stat;
*lp = ntohl(*lp);
}
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}
afs_uint16 remotePort, size_t version_length,
char *version)
{
-#ifdef RXDEBUG
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
char a[1] = { 0 };
return MakeDebugCall(socket, remoteAddr, remotePort,
RX_PACKET_TYPE_VERSION, a, 1, version,
struct rx_debugConn * conn,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
int i;
conn->epoch = ntohl(conn->epoch);
conn->natMTU = ntohl(conn->natMTU);
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}
afs_uint32 debugSupportedValues, struct rx_debugPeer * peer,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}
+afs_int32
+rx_GetLocalPeers(afs_uint32 peerHost, afs_uint16 peerPort,
+ struct rx_debugPeer * peerStats)
+{
+ struct rx_peer *tp;
+ afs_int32 error = 1; /* default to "did not succeed" */
+ afs_uint32 hashValue = PEER_HASH(peerHost, peerPort);
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ for(tp = rx_peerHashTable[hashValue];
+ tp != NULL; tp = tp->next) {
+ if (tp->host == peerHost)
+ break;
+ }
+
+ if (tp) {
+ tp->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ error = 0;
+
+ MUTEX_ENTER(&tp->peer_lock);
+ peerStats->host = tp->host;
+ peerStats->port = tp->port;
+ peerStats->ifMTU = tp->ifMTU;
+ peerStats->idleWhen = tp->idleWhen;
+ peerStats->refCount = tp->refCount;
+ peerStats->burstSize = tp->burstSize;
+ peerStats->burst = tp->burst;
+ peerStats->burstWait.sec = tp->burstWait.sec;
+ peerStats->burstWait.usec = tp->burstWait.usec;
+ peerStats->rtt = tp->rtt;
+ peerStats->rtt_dev = tp->rtt_dev;
+ peerStats->timeout.sec = tp->timeout.sec;
+ peerStats->timeout.usec = tp->timeout.usec;
+ peerStats->nSent = tp->nSent;
+ peerStats->reSends = tp->reSends;
+ peerStats->inPacketSkew = tp->inPacketSkew;
+ peerStats->outPacketSkew = tp->outPacketSkew;
+ peerStats->rateFlag = tp->rateFlag;
+ peerStats->natMTU = tp->natMTU;
+ peerStats->maxMTU = tp->maxMTU;
+ peerStats->maxDgramPackets = tp->maxDgramPackets;
+ peerStats->ifDgramPackets = tp->ifDgramPackets;
+ peerStats->MTU = tp->MTU;
+ peerStats->cwind = tp->cwind;
+ peerStats->nDgramPackets = tp->nDgramPackets;
+ peerStats->congestSeq = tp->congestSeq;
+ peerStats->bytesSent.high = tp->bytesSent.high;
+ peerStats->bytesSent.low = tp->bytesSent.low;
+ peerStats->bytesReceived.high = tp->bytesReceived.high;
+ peerStats->bytesReceived.low = tp->bytesReceived.low;
+ MUTEX_EXIT(&tp->peer_lock);
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ tp->refCount--;
+ }
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ return error;
+}
+
void
shutdown_rx(void)
{
&rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
peer_ptr++) {
struct rx_peer *peer, *next;
- for (peer = *peer_ptr; peer; peer = next) {
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ for (peer = *peer_ptr; peer; peer = next) {
rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ MUTEX_ENTER(&peer->peer_lock);
for (queue_Scan
(&peer->rpcStats, rpc_stat, nrpc_stat,
rx_interface_stat)) {
sizeof(rx_function_entry_v1_t);
rxi_Free(rpc_stat, space);
- MUTEX_ENTER(&rx_rpc_stats);
+
+ /* rx_rpc_stats must be held */
rxi_rpc_peer_stat_cnt -= num_funcs;
- MUTEX_EXIT(&rx_rpc_stats);
}
+ MUTEX_EXIT(&peer->peer_lock);
+ MUTEX_EXIT(&rx_rpc_stats);
+
next = peer->next;
rxi_FreePeer(peer);
if (rx_stats_active)
rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
}
+ MUTEX_EXIT(&rx_peerHashTable_lock);
}
}
for (i = 0; i < RX_MAX_SERVICES; i++) {
MUTEX_EXIT(&conn->conn_data_lock);
}
+void
+rx_SetServiceSpecific(struct rx_service *svc, int key, void *ptr)
+{
+ int i;
+ MUTEX_ENTER(&svc->svc_data_lock);
+ if (!svc->specific) {
+ svc->specific = (void **)malloc((key + 1) * sizeof(void *));
+ for (i = 0; i < key; i++)
+ svc->specific[i] = NULL;
+ svc->nSpecific = key + 1;
+ svc->specific[key] = ptr;
+ } else if (key >= svc->nSpecific) {
+ svc->specific = (void **)
+ realloc(svc->specific, (key + 1) * sizeof(void *));
+ for (i = svc->nSpecific; i < key; i++)
+ svc->specific[i] = NULL;
+ svc->nSpecific = key + 1;
+ svc->specific[key] = ptr;
+ } else {
+ if (svc->specific[key] && rxi_keyCreate_destructor[key])
+ (*rxi_keyCreate_destructor[key]) (svc->specific[key]);
+ svc->specific[key] = ptr;
+ }
+ MUTEX_EXIT(&svc->svc_data_lock);
+}
+
void *
rx_GetSpecific(struct rx_connection *conn, int key)
{
return ptr;
}
+void *
+rx_GetServiceSpecific(struct rx_service *svc, int key)
+{
+ void *ptr;
+ MUTEX_ENTER(&svc->svc_data_lock);
+ if (key >= svc->nSpecific)
+ ptr = NULL;
+ else
+ ptr = svc->specific[key];
+ MUTEX_EXIT(&svc->svc_data_lock);
+ return ptr;
+}
+
+
#endif /* !KERNEL */
/*
sizeof(rx_interface_stat_t) +
totalFunc * sizeof(rx_function_entry_v1_t);
- rpc_stat = (rx_interface_stat_p) rxi_Alloc(space);
+ rpc_stat = rxi_Alloc(space);
if (rpc_stat == NULL) {
rc = 1;
goto fail;
return;
MUTEX_ENTER(&rx_rpc_stats);
- MUTEX_ENTER(&peer->peer_lock);
if (rxi_monitor_peerStats) {
+ MUTEX_ENTER(&peer->peer_lock);
rxi_AddRpcStat(&peer->rpcStats, rxInterface, currentFunc, totalFunc,
queueTime, execTime, bytesSent, bytesRcvd, isServer,
peer->host, peer->port, 1, &rxi_rpc_peer_stat_cnt);
+ MUTEX_EXIT(&peer->peer_lock);
}
if (rxi_monitor_processStats) {
0xffffffff, 0xffffffff, 0, &rxi_rpc_process_stat_cnt);
}
- MUTEX_EXIT(&peer->peer_lock);
MUTEX_EXIT(&rx_rpc_stats);
}
if (space > (size_t) 0) {
*allocSize = space;
- ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
+ ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
rx_interface_stat_p rpc_stat, nrpc_stat;
if (space > (size_t) 0) {
*allocSize = space;
- ptr = *stats = (afs_uint32 *) rxi_Alloc(space);
+ ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
rx_interface_stat_p rpc_stat, nrpc_stat;
struct rx_peer **peer_ptr, **peer_end;
int code;
- MUTEX_ENTER(&rx_rpc_stats);
-
/*
* Turn off peer statistics and if process stats is also off, turn
* off everything
rx_enable_stats = 0;
}
- MUTEX_ENTER(&rx_peerHashTable_lock);
for (peer_ptr = &rx_peerHashTable[0], peer_end =
&rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
peer_ptr++) {
struct rx_peer *peer, *next, *prev;
- for (prev = peer = *peer_ptr; peer; peer = next) {
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ MUTEX_ENTER(&rx_rpc_stats);
+ for (prev = peer = *peer_ptr; peer; peer = next) {
next = peer->next;
code = MUTEX_TRYENTER(&peer->peer_lock);
if (code) {
rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
- for (queue_Scan
+
+ if (prev == *peer_ptr) {
+ *peer_ptr = next;
+ prev = next;
+ } else
+ prev->next = next;
+
+ if (next)
+ next->refCount++;
+ if (prev)
+ prev->refCount++;
+ peer->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ for (queue_Scan
(&peer->rpcStats, rpc_stat, nrpc_stat,
rx_interface_stat)) {
unsigned int num_funcs = 0;
rxi_rpc_peer_stat_cnt -= num_funcs;
}
MUTEX_EXIT(&peer->peer_lock);
- if (prev == *peer_ptr) {
- *peer_ptr = next;
- prev = next;
- } else
- prev->next = next;
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ if (next)
+ next->refCount--;
+ if (prev)
+ prev->refCount--;
+ peer->refCount--;
} else {
prev = peer;
}
}
+ MUTEX_EXIT(&rx_rpc_stats);
+ MUTEX_EXIT(&rx_peerHashTable_lock);
}
- MUTEX_EXIT(&rx_peerHashTable_lock);
- MUTEX_EXIT(&rx_rpc_stats);
}
/*
return FALSE;
}
}
+#endif /* AFS_NT40_ENV */
-#ifdef AFS_NT40_ENV
+#ifndef KERNEL
int rx_DumpCalls(FILE *outputFile, char *cookie)
{
#ifdef RXDEBUG_PACKET
- int zilch;
#ifdef KDUMP_RX_LOCK
struct rx_call_rx_lock *c;
#else
struct rx_call *c;
#endif
+#ifdef AFS_NT40_ENV
+ int zilch;
char output[2048];
+#define RXDPRINTF sprintf
+#define RXDPRINTOUT output
+#else
+#define RXDPRINTF fprintf
+#define RXDPRINTOUT outputFile
+#endif
- sprintf(output, "%s - Start dumping all Rx Calls - count=%u\r\n", cookie, rx_stats.nCallStructs);
+ RXDPRINTF(RXDPRINTOUT, "%s - Start dumping all Rx Calls - count=%u\r\n", cookie, rx_stats.nCallStructs);
+#ifdef AFS_NT40_ENV
WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);
+#endif
for (c = rx_allCallsp; c; c = c->allNextp) {
u_short rqc, tqc, iovqc;
queue_Count(&c->tq, p, np, rx_packet, tqc);
queue_Count(&c->iovq, p, np, rx_packet, iovqc);
- sprintf(output, "%s - call=0x%p, id=%u, state=%u, mode=%u, conn=%p, epoch=%u, cid=%u, callNum=%u, connFlags=0x%x, flags=0x%x, "
+ RXDPRINTF(RXDPRINTOUT, "%s - call=0x%p, id=%u, state=%u, mode=%u, conn=%p, epoch=%u, cid=%u, callNum=%u, connFlags=0x%x, flags=0x%x, "
"rqc=%u,%u, tqc=%u,%u, iovqc=%u,%u, "
"lstatus=%u, rstatus=%u, error=%d, timeout=%u, "
"resendEvent=%d, timeoutEvt=%d, keepAliveEvt=%d, delayedAckEvt=%d, delayedAbortEvt=%d, abortCode=%d, abortCount=%d, "
"\r\n",
cookie, c, c->call_id, (afs_uint32)c->state, (afs_uint32)c->mode, c->conn, c->conn?c->conn->epoch:0, c->conn?c->conn->cid:0,
c->callNumber?*c->callNumber:0, c->conn?c->conn->flags:0, c->flags,
- (afs_uint32)c->rqc, (afs_uint32)rqc, (afs_uint32)c->tqc, (afs_uint32)tqc, (afs_uint32)c->iovqc, (afs_uint32)iovqc,
- (afs_uint32)c->localStatus, (afs_uint32)c->remoteStatus, c->error, c->timeout,
+ (afs_uint32)c->rqc, (afs_uint32)rqc, (afs_uint32)c->tqc, (afs_uint32)tqc, (afs_uint32)c->iovqc, (afs_uint32)iovqc,
+ (afs_uint32)c->localStatus, (afs_uint32)c->remoteStatus, c->error, c->timeout,
c->resendEvent?1:0, c->timeoutEvent?1:0, c->keepAliveEvent?1:0, c->delayedAckEvent?1:0, c->delayedAbortEvent?1:0,
c->abortCode, c->abortCount, c->lastSendTime, c->lastReceiveTime, c->lastSendData
#ifdef RX_ENABLE_LOCKS
);
MUTEX_EXIT(&c->lock);
+#ifdef AFS_NT40_ENV
WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);
+#endif
}
- sprintf(output, "%s - End dumping all Rx Calls\r\n", cookie);
+ RXDPRINTF(RXDPRINTOUT, "%s - End dumping all Rx Calls\r\n", cookie);
+#ifdef AFS_NT40_ENV
WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);
+#endif
#endif /* RXDEBUG_PACKET */
return 0;
}
-#endif /* AFS_NT40_ENV */
#endif
-