# include "h/socket.h"
# endif
# include "netinet/in.h"
-# ifdef AFS_SUN58_ENV
+# ifdef AFS_SUN5_ENV
# include "netinet/ip6.h"
-# endif
-# ifdef AFS_SUN57_ENV
# include "inet/common.h"
# include "inet/ip.h"
# include "inet/ip_ire.h"
/* Local static routines */
static void rxi_DestroyConnectionNoLock(struct rx_connection *conn);
static void rxi_ComputeRoundTripTime(struct rx_packet *, struct rx_ackPacket *,
- struct rx_peer *, struct clock *);
+ struct rx_call *, struct rx_peer *,
+ struct clock *);
+static void rxi_Resend(struct rxevent *event, void *arg0, void *arg1,
+ int istack);
#ifdef RX_ENABLE_LOCKS
static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
* to manipulate the queue.
*/
-#if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
+#if defined(RX_ENABLE_LOCKS)
static afs_kmutex_t rx_rpc_stats;
-static void rxi_StartUnlocked(struct rxevent *event, void *call,
- void *arg1, int istack);
#endif
/* We keep a "last conn pointer" in rxi_FindConnection. The odds are
return rx_InitHost(htonl(INADDR_ANY), port);
}
+/* RTT Timer
+ * ---------
+ *
+ * The rxi_rto functions implement a TCP (RFC2988) style algorithm for
+ * maintaing the round trip timer.
+ *
+ */
+
+/*!
+ * Start a new RTT timer for a given call and packet.
+ *
+ * There must be no resendEvent already listed for this call, otherwise this
+ * will leak events - intended for internal use within the RTO code only
+ *
+ * @param[in] call
+ * the RX call to start the timer for
+ * @param[in] lastPacket
+ * a flag indicating whether the last packet has been sent or not
+ *
+ * @pre call must be locked before calling this function
+ *
+ */
+static_inline void
+rxi_rto_startTimer(struct rx_call *call, int lastPacket, int istack)
+{
+ struct clock now, retryTime;
+
+ clock_GetTime(&now);
+ retryTime = now;
+
+ clock_Add(&retryTime, &call->rto);
+
+ /* If we're sending the last packet, and we're the client, then the server
+ * may wait for an additional 400ms before returning the ACK, wait for it
+ * rather than hitting a timeout */
+ if (lastPacket && call->conn->type == RX_CLIENT_CONNECTION)
+ clock_Addmsec(&retryTime, 400);
+
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
+ MUTEX_EXIT(&rx_refcnt_mutex);
+ call->resendEvent = rxevent_PostNow2(&retryTime, &now, rxi_Resend,
+ call, 0, istack);
+}
+
+/*!
+ * Cancel an RTT timer for a given call.
+ *
+ *
+ * @param[in] call
+ * the RX call to cancel the timer for
+ *
+ * @pre call must be locked before calling this function
+ *
+ */
+
+static_inline void
+rxi_rto_cancel(struct rx_call *call)
+{
+ if (!call->resendEvent)
+ return;
+
+ rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+}
+
+/*!
+ * Tell the RTO timer that we have sent a packet.
+ *
+ * If the timer isn't already running, then start it. If the timer is running,
+ * then do nothing.
+ *
+ * @param[in] call
+ * the RX call that the packet has been sent on
+ * @param[in] lastPacket
+ * A flag which is true if this is the last packet for the call
+ *
+ * @pre The call must be locked before calling this function
+ *
+ */
+
+static_inline void
+rxi_rto_packet_sent(struct rx_call *call, int lastPacket, int istack)
+{
+ if (call->resendEvent)
+ return;
+
+ rxi_rto_startTimer(call, lastPacket, istack);
+}
+
+/*!
+ * Tell the RTO timer that we have received an new ACK message
+ *
+ * This function should be called whenever a call receives an ACK that
+ * acknowledges new packets. Whatever happens, we stop the current timer.
+ * If there are unacked packets in the queue which have been sent, then
+ * we restart the timer from now. Otherwise, we leave it stopped.
+ *
+ * @param[in] call
+ * the RX call that the ACK has been received on
+ */
+
+static_inline void
+rxi_rto_packet_acked(struct rx_call *call, int istack)
+{
+ struct rx_packet *p, *nxp;
+
+ rxi_rto_cancel(call);
+
+ if (queue_IsEmpty(&call->tq))
+ return;
+
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (p->header.seq > call->tfirst + call->twind)
+ return;
+
+ if (!(p->flags & RX_PKTFLAG_ACKED) && p->flags & RX_PKTFLAG_SENT) {
+ rxi_rto_startTimer(call, p->header.flags & RX_LAST_PACKET, istack);
+ return;
+ }
+ }
+}
+
+
+/**
+ * Set an initial round trip timeout for a peer connection
+ *
+ * @param[in] secs The timeout to set in seconds
+ */
+
+void
+rx_rto_setPeerTimeoutSecs(struct rx_peer *peer, int secs) {
+ peer->rtt = secs * 8000;
+}
+
/**
* Sets the error generated when a busy call channel is detected.
*
MUTEX_EXIT(&conn->conn_data_lock);
/* Check for extant references to this connection */
+ MUTEX_ENTER(&conn->conn_call_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
struct rx_call *call = conn->call[i];
if (call) {
}
}
}
+ MUTEX_EXIT(&conn->conn_call_lock);
+
#ifdef RX_ENABLE_LOCKS
if (!havecalls) {
if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
} else {
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
rxevent_Cancel(call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
}
call->state = RX_STATE_RESET;
MUTEX_EXIT(&rx_refcnt_mutex);
rxi_ResetCall(call, 0);
- call->conn->call[channel] = (struct rx_call *)0;
+
+ MUTEX_ENTER(&conn->conn_call_lock);
+ if (call->conn->call[channel] == call)
+ call->conn->call[channel] = 0;
+ MUTEX_EXIT(&conn->conn_call_lock);
MUTEX_ENTER(&rx_freeCallQueue_lock);
SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
channel = np->header.cid & RX_CHANNELMASK;
call = conn->call[channel];
-#ifdef RX_ENABLE_LOCKS
- if (call)
- MUTEX_ENTER(&call->lock);
- /* Test to see if call struct is still attached to conn. */
- if (call != conn->call[channel]) {
- if (call)
- MUTEX_EXIT(&call->lock);
- if (type == RX_SERVER_CONNECTION) {
- call = conn->call[channel];
- /* If we started with no call attached and there is one now,
- * another thread is also running this routine and has gotten
- * the connection channel. We should drop this packet in the tests
- * below. If there was a call on this connection and it's now
- * gone, then we'll be making a new call below.
- * If there was previously a call and it's now different then
- * the old call was freed and another thread running this routine
- * has created a call on this channel. One of these two threads
- * has a packet for the old call and the code below handles those
- * cases.
- */
- if (call)
- MUTEX_ENTER(&call->lock);
- } else {
- /* This packet can't be for this call. If the new call address is
- * 0 then no call is running on this channel. If there is a call
- * then, since this is a client connection we're getting data for
- * it must be for the previous call.
- */
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
- }
-#endif
- currentCallNumber = conn->callNumber[channel];
- if (type == RX_SERVER_CONNECTION) { /* We're the server */
- if (np->header.callNumber < currentCallNumber) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-#ifdef RX_ENABLE_LOCKS
- if (call)
- MUTEX_EXIT(&call->lock);
-#endif
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
- if (!call) {
- MUTEX_ENTER(&conn->conn_call_lock);
- call = rxi_NewCall(conn, channel);
- MUTEX_EXIT(&conn->conn_call_lock);
- *call->callNumber = np->header.callNumber;
+ if (call) {
+ MUTEX_ENTER(&call->lock);
+ currentCallNumber = conn->callNumber[channel];
+ } else if (type == RX_SERVER_CONNECTION) { /* No call allocated */
+ MUTEX_ENTER(&conn->conn_call_lock);
+ call = conn->call[channel];
+ if (call) {
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+ currentCallNumber = conn->callNumber[channel];
+ } else {
+ call = rxi_NewCall(conn, channel); /* returns locked call */
+ MUTEX_EXIT(&conn->conn_call_lock);
+ *call->callNumber = currentCallNumber = np->header.callNumber;
#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
+ np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
+ np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
+ np->header.flags, np, np->length));
#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
-
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ hzero(call->bytesSent);
+ hzero(call->bytesRcvd);
+ /*
+ * If the number of queued calls exceeds the overload
+ * threshold then abort this call.
+ */
+ if ((rx_BusyThreshold > 0) &&
+ (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
+ struct rx_packet *tp;
+
+ rxi_CallError(call, rx_BusyError);
+ tp = rxi_SendCallAbort(call, np, 1, 0);
+ MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
+ conn->refCount--;
MUTEX_EXIT(&rx_refcnt_mutex);
if (rx_stats_active)
rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- } else if (np->header.callNumber != currentCallNumber) {
+ return tp;
+ }
+ rxi_KeepAliveOn(call);
+ }
+ } else { /* RX_CLIENT_CONNECTION and No call allocated */
+ /* This packet can't be for this call. If the new call address is
+ * 0 then no call is running on this channel. If there is a call
+ * then, since this is a client connection we're getting data for
+ * it must be for the previous call.
+ */
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ conn->refCount--;
+ MUTEX_EXIT(&rx_refcnt_mutex);
+ return np;
+ }
+
+ /* There is a non-NULL locked call at this point */
+ if (type == RX_SERVER_CONNECTION) { /* We're the server */
+ if (np->header.callNumber < currentCallNumber) {
+ MUTEX_EXIT(&call->lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ conn->refCount--;
+ MUTEX_EXIT(&rx_refcnt_mutex);
+ return np;
+ } else if (np->header.callNumber != currentCallNumber) {
/* Wait until the transmit queue is idle before deciding
* whether to reset the current call. Chances are that the
* call will be in ether DALLY or HOLD state once the TQ_BUSY
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d\n",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->retryTime.sec, np->retryTime.usec, np->length));
+ np->header.flags, np, np->length));
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
}
} else { /* we're the client */
/* Ignore all incoming acknowledgements for calls in DALLY state */
- if (call && (call->state == RX_STATE_DALLY)
+ if ((call->state == RX_STATE_DALLY)
&& (np->header.type == RX_PACKET_TYPE_ACK)) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.ignorePacketDally);
-#ifdef RX_ENABLE_LOCKS
- if (call) {
- MUTEX_EXIT(&call->lock);
- }
-#endif
+ MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount--;
MUTEX_EXIT(&rx_refcnt_mutex);
/* Ignore anything that's not relevant to the current call. If there
* isn't a current call, then no packet is relevant. */
- if (!call || (np->header.callNumber != currentCallNumber)) {
+ if (np->header.callNumber != currentCallNumber) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-#ifdef RX_ENABLE_LOCKS
- if (call) {
- MUTEX_EXIT(&call->lock);
- }
-#endif
+ MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount--;
MUTEX_EXIT(&rx_refcnt_mutex);
/* If the service security object index stamped in the packet does not
* match the connection's security index, ignore the packet */
if (np->header.securityIndex != conn->securityIndex) {
-#ifdef RX_ENABLE_LOCKS
MUTEX_EXIT(&call->lock);
-#endif
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount--;
MUTEX_EXIT(&rx_refcnt_mutex);
}
#endif /* KERNEL */
+/*!
+ * Clear the attach wait flag on a connection and proceed.
+ *
+ * Any processing waiting for a connection to be attached should be
+ * unblocked. We clear the flag and do any other needed tasks.
+ *
+ * @param[in] conn
+ * the conn to unmark waiting for attach
+ *
+ * @pre conn's conn_data_lock must be locked before calling this function
+ *
+ */
+static void
+rxi_ConnClearAttachWait(struct rx_connection *conn)
+{
+ /* Indicate that rxi_CheckReachEvent is no longer running by
+ * clearing the flag. Must be atomic under conn_data_lock to
+ * avoid a new call slipping by: rxi_CheckConnReach holds
+ * conn_data_lock while checking RX_CONN_ATTACHWAIT.
+ */
+ conn->flags &= ~RX_CONN_ATTACHWAIT;
+ if (conn->flags & RX_CONN_NAT_PING) {
+ conn->flags &= ~RX_CONN_NAT_PING;
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ }
+}
+
static void
rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2)
{
}
}
if (!call)
- /* Indicate that rxi_CheckReachEvent is no longer running by
- * clearing the flag. Must be atomic under conn_data_lock to
- * avoid a new call slipping by: rxi_CheckConnReach holds
- * conn_data_lock while checking RX_CONN_ATTACHWAIT.
- */
- conn->flags &= ~RX_CONN_ATTACHWAIT;
+ rxi_ConnClearAttachWait(conn);
MUTEX_EXIT(&conn->conn_data_lock);
MUTEX_EXIT(&conn->conn_call_lock);
}
* Send an ack when requested by the peer, or once every
* rxi_SoftAckRate packets until the last packet has been
* received. Always send a soft ack for the last packet in
- * the server's reply.
- *
- * If there was more than one packet received for the call
- * and we have received all of them, immediately send an
- * RX_PACKET_TYPE_ACKALL packet so that the peer can empty
- * its packet transmit queue and cancel all resend events.
- *
- * When there is only one packet in the call there is a
- * chance that we can race with Ping ACKs sent as part of
- * connection establishment if the udp packets are delivered
- * out of order. When the race occurs, a two second delay
- * will occur while waiting for a new Ping ACK to be sent.
- */
- if (!isFirst && (call->flags & RX_CALL_RECEIVE_DONE)) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
- rxi_AckAll(NULL, call, 0);
- } else if (ackNeeded) {
+ * the server's reply. */
+ if (ackNeeded) {
rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, ackNeeded, istack);
} else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) {
call->delayedAckEvent =
rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
+ } else if (call->flags & RX_CALL_RECEIVE_DONE) {
+ rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
}
return np;
if (conn->flags & RX_CONN_ATTACHWAIT) {
int i;
- conn->flags &= ~RX_CONN_ATTACHWAIT;
+ rxi_ConnClearAttachWait(conn);
MUTEX_EXIT(&conn->conn_data_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
* by the peer.
*
* The four section is packets which have not yet been transmitted.
- * These packets will have a retryTime of 0.
+ * These packets will have a header.serial of 0.
*/
/* First section - implicitly acknowledged packets that can be
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
- rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
+ rxi_ComputeRoundTripTime(tp, ap, call, peer, &now);
}
#ifdef ADAPT_WINDOW
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
tp->flags |= RX_PKTFLAG_ACKED;
-
- rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
+ rxi_ComputeRoundTripTime(tp, ap, call, peer, &now);
#ifdef ADAPT_WINDOW
rxi_ComputeRate(call->conn->peer, call, tp, np, ap->reason);
#endif
missing = 1;
}
- /*
- * Following the suggestion of Phil Kern, we back off the peer's
- * timeout value for future packets until a successful response
- * is received for an initial transmission.
- */
- if (missing && !peer->backedOff) {
- struct clock c = peer->timeout;
- struct clock max_to = {3, 0};
-
- clock_Add(&peer->timeout, &c);
- if (clock_Gt(&peer->timeout, &max_to))
- peer->timeout = max_to;
- peer->backedOff = 1;
- }
-
- /* If packet isn't yet acked, and it has been transmitted at least
- * once, reset retransmit time using latest timeout
- * ie, this should readjust the retransmit timer for all outstanding
- * packets... So we don't just retransmit when we should know better*/
-
- if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
- tp->retryTime = tp->timeSent;
- clock_Add(&tp->retryTime, &peer->timeout);
- /* shift by eight because one quarter-sec ~ 256 milliseconds */
- clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
- }
-
tp = queue_Next(tp, rx_packet);
}
- /* The third case, packets which the ack packet tells us
- * nothing about at all. We just need to adjust the retryTime to match
- * any new timeouts that have been calculated for this peer.
- * We use the fact that we send in order to terminate this loop as soon as
- * we find a packet that has not been sent.
+ /* We don't need to take any action with the 3rd or 4th section in the
+ * queue - they're not addressed by the contents of this ACK packet.
*/
- while (!queue_IsEnd(&call->tq, tp) && !clock_IsZero(&tp->retryTime)) {
- tp->retryTime = tp->timeSent;
- clock_Add(&tp->retryTime, &peer->timeout);
- clock_Addmsec(&tp->retryTime, ((afs_uint32) tp->backoff) << 8);
- tp = queue_Next(tp, rx_packet);
- }
-
- /* The fourth set of packets - those which have yet to be transmitted,
- * we don't care about at all here */
-
/* If the window has been extended by this acknowledge packet,
* then wakeup a sender waiting in alloc for window space, or try
* sending packets now, if he's been sitting on packets due to
call->nNacks = 0;
}
+ /* If the packet contained new acknowledgements, rather than just
+ * being a duplicate of one we have previously seen, then we can restart
+ * the RTT timer
+ */
+ if (newAckCount > 0)
+ rxi_rto_packet_acked(call, istack);
+
if (call->flags & RX_CALL_FAST_RECOVER) {
- if (nNacked) {
+ if (newAckCount == 0) {
call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
} else {
call->flags &= ~RX_CALL_FAST_RECOVER;
call->nCwindAcks = 0;
} else if (nNacked && call->nNacks >= (u_short) rx_nackThreshold) {
/* Three negative acks in a row trigger congestion recovery */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- MUTEX_EXIT(&peer->peer_lock);
- if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
- /* someone else is waiting to start recovery */
- return np;
- }
- call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- rxi_WaitforTQBusy(call);
- MUTEX_ENTER(&peer->peer_lock);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
call->flags |= RX_CALL_FAST_RECOVER;
call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
call->cwind =
peer->nDgramPackets = call->nDgramPackets;
peer->congestSeq++;
call->congestSeq = peer->congestSeq;
+
/* Reset the resend times on the packets that were nacked
- * so we will retransmit as soon as the window permits*/
+ * so we will retransmit as soon as the window permits
+ */
+
for (acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
if (acked) {
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
- clock_Zero(&tp->retryTime);
+ tp->flags &= ~RX_PKTFLAG_SENT;
}
} else if (tp->flags & RX_PKTFLAG_ACKED) {
acked = 1;
rxi_ClearTransmitQueue(call, 0);
rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
} else if (!queue_IsEmpty(&call->tq)) {
- rxi_Start(0, call, 0, istack);
+ rxi_Start(call, istack);
}
return np;
}
queue_Append(&rx_incomingCallQueue, call);
}
} else {
- sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
+ sq = queue_Last(&rx_idleServerQueue, rx_serverQueueEntry);
/* If hot threads are enabled, and both newcallp and sq->socketp
* are non-null, then this thread will process the call, and the
call->flags |= RX_CALL_TQ_SOME_ACKED;
}
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
+
call->tfirst = call->tnext;
call->nSoftAcked = 0;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
call->nSoftAcked = 0;
if (conn->checkReachEvent) {
rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
conn->checkReachEvent = 0;
- conn->flags &= ~RX_CONN_ATTACHWAIT;
+ conn->flags &= ~(RX_CONN_ATTACHWAIT|RX_CONN_NAT_PING);
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount--;
MUTEX_EXIT(&rx_refcnt_mutex);
call->ssthresh = rx_maxSendWindow;
call->nDgramPackets = peer->nDgramPackets;
call->congestSeq = peer->congestSeq;
+ call->rtt = peer->rtt;
+ call->rtt_dev = peer->rtt_dev;
+ clock_Zero(&call->rto);
+ clock_Addmsec(&call->rto,
+ MAX(((call->rtt >> 3) + call->rtt_dev), rx_minPeerTimeout) + 200);
MUTEX_EXIT(&peer->peer_lock);
flags = call->flags;
int i;
int requestAck = 0;
int lastPacket = 0;
- struct clock now, retryTime;
+ struct clock now;
struct rx_connection *conn = call->conn;
struct rx_peer *peer = conn->peer;
peer->nSent += xmit->len;
if (xmit->resending)
peer->reSends += xmit->len;
- retryTime = peer->timeout;
MUTEX_EXIT(&peer->peer_lock);
if (rx_stats_active) {
}
clock_GetTime(&now);
- clock_Add(&retryTime, &now);
if (xmit->list[xmit->len - 1]->header.flags & RX_LAST_PACKET) {
lastPacket = 1;
for (i = 0; i < xmit->len; i++) {
struct rx_packet *packet = xmit->list[i];
- packet->retryTime = retryTime;
- if (packet->header.serial) {
- /* Exponentially backoff retry times */
- if (packet->backoff < MAXBACKOFF) {
- /* so it can't stay == 0 */
- packet->backoff = (packet->backoff << 1) + 1;
- } else
- packet->backoff++;
- clock_Addmsec(&(packet->retryTime),
- ((afs_uint32) packet->backoff) << 8);
- }
-
- /* Wait a little extra for the ack on the last packet */
- if (lastPacket
- && !(packet->header.flags & RX_CLIENT_INITIATED)) {
- clock_Addmsec(&(packet->retryTime), 400);
- }
-
/* Record the time sent */
packet->timeSent = now;
+ packet->flags |= RX_PKTFLAG_SENT;
/* Ask for an ack on retransmitted packets, on every other packet
* if the peer doesn't support slow start. Ask for an ack on every
if (packet->header.serial) {
requestAck = 1;
} else {
- /* improved RTO calculation- not Karn */
packet->firstSent = now;
if (!lastPacket && (call->cwind <= (u_short) (conn->ackRate + 1)
|| (!(call->flags & RX_CALL_SLOW_START_OK)
CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
MUTEX_EXIT(&rx_refcnt_mutex);
+ /* Tell the RTO calculation engine that we have sent a packet, and
+ * if it was the last one */
+ rxi_rto_packet_sent(call, lastPacket, istack);
+
/* Update last send time for this call (for keep-alive
* processing), and for the connection (so that we can discover
* idle connections) */
int istack)
{
int i;
+ int recovery;
struct xmitlist working;
struct xmitlist last;
working.len = 0;
working.resending = 0;
+ recovery = call->flags & RX_CALL_FAST_RECOVER;
+
for (i = 0; i < len; i++) {
/* Does the current packet force us to flush the current list? */
if (working.len > 0
rxi_SendList(call, &last, istack, 1);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
- if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
+ if (call->error
+ || (!recovery && (call->flags & RX_CALL_FAST_RECOVER)))
return;
}
last = working;
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
if (call->error
- || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
+ || (!recovery && (call->flags & RX_CALL_FAST_RECOVER)))
return;
}
last = working;
rxi_SendList(call, &last, istack, morePackets);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
- if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
+ if (call->error
+ || (!recovery && (call->flags & RX_CALL_FAST_RECOVER)))
return;
}
if (morePackets) {
}
}
-#ifdef RX_ENABLE_LOCKS
-/* Call rxi_Start, below, but with the call lock held. */
-void
-rxi_StartUnlocked(struct rxevent *event,
- void *arg0, void *arg1, int istack)
+static void
+rxi_Resend(struct rxevent *event, void *arg0, void *arg1, int istack)
{
struct rx_call *call = arg0;
+ struct rx_peer *peer;
+ struct rx_packet *p, *nxp;
+ struct clock maxTimeout = { 60, 0 };
MUTEX_ENTER(&call->lock);
- rxi_Start(event, call, arg1, istack);
+
+ peer = call->conn->peer;
+
+ /* Make sure that the event pointer is removed from the call
+ * structure, since there is no longer a per-call retransmission
+ * event pending. */
+ if (event == call->resendEvent) {
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
+ MUTEX_EXIT(&rx_refcnt_mutex);
+ call->resendEvent = NULL;
+ }
+
+ if (rxi_busyChannelError && (call->flags & RX_CALL_PEER_BUSY)) {
+ rxi_CheckBusy(call);
+ }
+
+ if (queue_IsEmpty(&call->tq)) {
+ /* Nothing to do. This means that we've been raced, and that an
+ * ACK has come in between when we were triggered, and when we
+ * actually got to run. */
+ goto out;
+ }
+
+ /* We're in loss recovery */
+ call->flags |= RX_CALL_FAST_RECOVER;
+
+ /* Mark all of the pending packets in the queue as being lost */
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (!(p->flags & RX_PKTFLAG_ACKED))
+ p->flags &= ~RX_PKTFLAG_SENT;
+ }
+
+ /* We're resending, so we double the timeout of the call. This will be
+ * dropped back down by the first successful ACK that we receive.
+ *
+ * We apply a maximum value here of 60 seconds
+ */
+ clock_Add(&call->rto, &call->rto);
+ if (clock_Gt(&call->rto, &maxTimeout))
+ call->rto = maxTimeout;
+
+ /* Packet loss is most likely due to congestion, so drop our window size
+ * and start again from the beginning */
+ if (peer->maxDgramPackets >1) {
+ call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
+ call->MTU = MIN(peer->natMTU, peer->maxMTU);
+ }
+ call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
+ call->nDgramPackets = 1;
+ call->cwind = 1;
+ call->nextCwind = 1;
+ call->nAcks = 0;
+ call->nNacks = 0;
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->MTU = call->MTU;
+ peer->cwind = call->cwind;
+ peer->nDgramPackets = 1;
+ peer->congestSeq++;
+ call->congestSeq = peer->congestSeq;
+ MUTEX_EXIT(&peer->peer_lock);
+
+ rxi_Start(call, istack);
+
+out:
MUTEX_EXIT(&call->lock);
}
-#endif /* RX_ENABLE_LOCKS */
/* This routine is called when new packets are readied for
* transmission and when retransmission may be necessary, or when the
* better optimized for new packets, the usual case, now that we've
* got rid of queues of send packets. XXXXXXXXXXX */
void
-rxi_Start(struct rxevent *event,
- void *arg0, void *arg1, int istack)
+rxi_Start(struct rx_call *call, int istack)
{
- struct rx_call *call = arg0;
struct rx_packet *p;
struct rx_packet *nxp; /* Next pointer for queue_Scan */
- struct clock now, usenow, retryTime;
- int haveEvent;
int nXmitPackets;
int maxXmitPackets;
- /* If rxi_Start is being called as a result of a resend event,
- * then make sure that the event pointer is removed from the call
- * structure, since there is no longer a per-call retransmission
- * event pending. */
- if (event && event == call->resendEvent) {
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->resendEvent = NULL;
-
- if (rxi_busyChannelError && (call->flags & RX_CALL_PEER_BUSY)) {
- rxi_CheckBusy(call);
- }
-
- if (queue_IsEmpty(&call->tq)) {
- /* Nothing to do */
- return;
- }
- }
-
if (call->error) {
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (rx_stats_active)
if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
- clock_GetTime(&now);
- usenow = now;
-
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
- if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
- /* We shouldn't be sending packets if a thread is waiting
- * to initiate congestion recovery */
- dpf(("call %d waiting to initiate fast recovery\n",
- *(call->callNumber)));
- break;
- }
- if ((nXmitPackets)
- && (call->flags & RX_CALL_FAST_RECOVER)) {
- /* Only send one packet during fast recovery */
- dpf(("call %d restricted to one packet per send during fast recovery\n",
- *(call->callNumber)));
- break;
- }
#ifdef RX_TRACK_PACKETS
if ((p->flags & RX_PKTFLAG_FREE)
|| (!queue_IsEnd(&call->tq, nxp)
#endif
if (p->flags & RX_PKTFLAG_ACKED) {
/* Since we may block, don't trust this */
- usenow.sec = usenow.usec = 0;
if (rx_stats_active)
rx_atomic_inc(&rx_stats.ignoreAckedPacket);
continue; /* Ignore this packet if it has been acknowledged */
}
/* Transmit the packet if it needs to be sent. */
- if (!clock_Lt(&now, &p->retryTime)) {
+ if (!(p->flags & RX_PKTFLAG_SENT)) {
if (nXmitPackets == maxXmitPackets) {
rxi_SendXmitList(call, call->xmitList,
nXmitPackets, istack);
goto restart;
}
- dpf(("call %d xmit packet %"AFS_PTR_FMT" now %u.%06u retryTime %u.%06u\n",
- *(call->callNumber), p,
- now.sec, now.usec,
- p->retryTime.sec, p->retryTime.usec));
+ dpf(("call %d xmit packet %"AFS_PTR_FMT"\n",
+ *(call->callNumber), p));
call->xmitList[nXmitPackets++] = p;
}
}
}
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /*
- * TQ references no longer protected by this flag; they must remain
- * protected by the global lock.
- */
- if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
- call->flags &= ~RX_CALL_TQ_BUSY;
- rxi_WakeUpTransmitQueue(call);
- return;
- }
if (call->error) {
/* We went into the error state while sending packets. Now is
* the time to reset the call. This will also inform the using
call->flags |= RX_CALL_TQ_CLEARME;
}
#endif /* RX_ENABLE_LOCKS */
- /* Don't bother doing retransmits if the TQ is cleared. */
- if (call->flags & RX_CALL_TQ_CLEARME) {
+ if (call->flags & RX_CALL_TQ_CLEARME)
rxi_ClearTransmitQueue(call, 1);
- } else
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- {
-
- /* Always post a resend event, if there is anything in the
- * queue, and resend is possible. There should be at least
- * one unacknowledged packet in the queue ... otherwise none
- * of these packets should be on the queue in the first place.
- */
- if (call->resendEvent) {
- /* Cancel the existing event and post a new one */
- rxevent_Cancel(call->resendEvent, call,
- RX_CALL_REFCOUNT_RESEND);
- }
-
- /* The retry time is the retry time on the first unacknowledged
- * packet inside the current window */
- for (haveEvent =
- 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
- /* Don't set timers for packets outside the window */
- if (p->header.seq >= call->tfirst + call->twind) {
- break;
- }
-
- if (!(p->flags & RX_PKTFLAG_ACKED)
- && !clock_IsZero(&p->retryTime)) {
- haveEvent = 1;
- retryTime = p->retryTime;
- break;
- }
- }
-
- /* Post a new event to re-run rxi_Start when retries may be needed */
- if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
-#ifdef RX_ENABLE_LOCKS
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow,
- rxi_StartUnlocked,
- (void *)call, 0, istack);
-#else /* RX_ENABLE_LOCKS */
- call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow, rxi_Start,
- (void *)call, 0, istack);
-#endif /* RX_ENABLE_LOCKS */
- }
- }
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
} while (call->flags & RX_CALL_NEED_START);
/*
* TQ references no longer protected by this flag; they must remain
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
} else {
- if (call->resendEvent) {
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
- }
+ rxi_rto_cancel(call);
}
}
}
#endif
/* RTT + 8*MDEV, rounded up to the next second. */
- fudgeFactor = (((afs_uint32) conn->peer->rtt >> 3) +
- ((afs_uint32) conn->peer->rtt_dev << 1) + 1023) >> 10;
+ fudgeFactor = (((afs_uint32) call->rtt >> 3) +
+ ((afs_uint32) call->rtt_dev << 1) + 1023) >> 10;
deadTime = conn->secondsUntilDead + fudgeFactor;
now = clock_Sec();
if (now > (call->lastReceiveTime + deadTime)) {
if (call->state == RX_STATE_ACTIVE) {
#ifdef ADAPT_PMTU
-#if defined(KERNEL) && defined(AFS_SUN57_ENV)
+#if defined(KERNEL) && defined(AFS_SUN5_ENV)
ire_t *ire;
#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
/* Cancel pending events */
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
rxevent_Cancel(call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
if (call->growMTUEvent)
{
MUTEX_ENTER(&conn->conn_data_lock);
conn->secondsUntilNatPing = seconds;
- if (seconds != 0)
- rxi_ScheduleNatKeepAliveEvent(conn);
+ if (seconds != 0) {
+ if (!(conn->flags & RX_CONN_ATTACHWAIT))
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ else
+ conn->flags |= RX_CONN_NAT_PING;
+ }
MUTEX_EXIT(&conn->conn_data_lock);
}
rxi_NatKeepAliveOn(struct rx_connection *conn)
{
MUTEX_ENTER(&conn->conn_data_lock);
- rxi_ScheduleNatKeepAliveEvent(conn);
+ /* if it's already attached */
+ if (!(conn->flags & RX_CONN_ATTACHWAIT))
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ else
+ conn->flags |= RX_CONN_NAT_PING;
MUTEX_EXIT(&conn->conn_data_lock);
}
static void
rxi_ComputeRoundTripTime(struct rx_packet *p,
struct rx_ackPacket *ack,
+ struct rx_call *call,
struct rx_peer *peer,
struct clock *now)
{
/* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
/* Apply VanJacobson round-trip estimations */
- if (peer->rtt) {
+ if (call->rtt) {
int delta;
/*
- * srtt (peer->rtt) is in units of one-eighth-milliseconds.
+ * srtt (call->rtt) is in units of one-eighth-milliseconds.
* srtt is stored as fixed point with 3 bits after the binary
* point (i.e., scaled by 8). The following magic is
* equivalent to the smoothing algorithm in rfc793 with an
* srtt' = srtt + (rtt - srtt)/8
*/
- delta = _8THMSEC(&thisRtt) - peer->rtt;
- peer->rtt += (delta >> 3);
+ delta = _8THMSEC(&thisRtt) - call->rtt;
+ call->rtt += (delta >> 3);
/*
* We accumulate a smoothed rtt variance (actually, a smoothed
if (delta < 0)
delta = -delta;
- delta -= (peer->rtt_dev << 1);
- peer->rtt_dev += (delta >> 3);
+ delta -= (call->rtt_dev << 1);
+ call->rtt_dev += (delta >> 3);
} else {
/* I don't have a stored RTT so I start with this value. Since I'm
* probably just starting a call, and will be pushing more data down
* little, and I set deviance to half the rtt. In practice,
* deviance tends to approach something a little less than
* half the smoothed rtt. */
- peer->rtt = _8THMSEC(&thisRtt) + 8;
- peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
+ call->rtt = _8THMSEC(&thisRtt) + 8;
+ call->rtt_dev = call->rtt >> 2; /* rtt/2: they're scaled differently */
}
- /* the timeout is RTT + 4*MDEV + rx_minPeerTimeout msec.
- * This is because one end or the other of these connections is usually
- * in a user process, and can be switched and/or swapped out. So on fast,
- * reliable networks, the timeout would otherwise be too short. */
- rtt_timeout = ((peer->rtt >> 3) + peer->rtt_dev) + rx_minPeerTimeout;
- clock_Zero(&(peer->timeout));
- clock_Addmsec(&(peer->timeout), rtt_timeout);
+ /* the smoothed RTT time is RTT + 4*MDEV
+ *
+ * We allow a user specified minimum to be set for this, to allow clamping
+ * at a minimum value in the same way as TCP. In addition, we have to allow
+ * for the possibility that this packet is answered by a delayed ACK, so we
+ * add on a fixed 200ms to account for that timer expiring.
+ */
+
+ rtt_timeout = MAX(((call->rtt >> 3) + call->rtt_dev),
+ rx_minPeerTimeout) + 200;
+ clock_Zero(&call->rto);
+ clock_Addmsec(&call->rto, rtt_timeout);
- /* Reset the backedOff flag since we just computed a new timeout value */
- peer->backedOff = 0;
+ /* Update the peer, so any new calls start with our values */
+ peer->rtt_dev = call->rtt_dev;
+ peer->rtt = call->rtt;
dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%06d sec)\n",
- p->header.callNumber, p, MSEC(&thisRtt), peer->rtt >> 3, peer->rtt_dev >> 2, (peer->timeout.sec), (peer->timeout.usec)));
+ p->header.callNumber, p, MSEC(&thisRtt), call->rtt >> 3, call->rtt_dev >> 2, (call->rto.sec), (call->rto.usec)));
}
case RX_ACK_REQUESTED:
xferSize =
p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize;
- xferMs = peer->rtt;
+ xferMs = call->rtt;
break;
case RX_ACK_PING_RESPONSE:
(int)peer->burstWait.sec, (int)peer->burstWait.usec);
fprintf(file,
- " Rtt %d, " "retry time %u.%06d, " "total sent %d, "
- "resent %d\n", peer->rtt, (int)peer->timeout.sec,
- (int)peer->timeout.usec, peer->nSent, peer->reSends);
+ " Rtt %d, " "total sent %d, " "resent %d\n",
+ peer->rtt, peer->nSent, peer->reSends);
fprintf(file,
" Packet size %d, " "max in packet skew %d, "
peer->burstWait.usec = ntohl(peer->burstWait.usec);
peer->rtt = ntohl(peer->rtt);
peer->rtt_dev = ntohl(peer->rtt_dev);
- peer->timeout.sec = ntohl(peer->timeout.sec);
- peer->timeout.usec = ntohl(peer->timeout.usec);
+ peer->timeout.sec = 0;
+ peer->timeout.usec = 0;
peer->nSent = ntohl(peer->nSent);
peer->reSends = ntohl(peer->reSends);
peer->inPacketSkew = ntohl(peer->inPacketSkew);
peerStats->burstWait.usec = tp->burstWait.usec;
peerStats->rtt = tp->rtt;
peerStats->rtt_dev = tp->rtt_dev;
- peerStats->timeout.sec = tp->timeout.sec;
- peerStats->timeout.usec = tp->timeout.usec;
+ peerStats->timeout.sec = 0;
+ peerStats->timeout.usec = 0;
peerStats->nSent = tp->nSent;
peerStats->reSends = tp->reSends;
peerStats->inPacketSkew = tp->inPacketSkew;