/* Local static routines */
static void rxi_DestroyConnectionNoLock(struct rx_connection *conn);
+static void rxi_ComputeRoundTripTime(struct rx_packet *, struct rx_ackPacket *,
+ struct rx_peer *, struct clock *);
+
#ifdef RX_ENABLE_LOCKS
static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
#endif
#endif
#ifdef AFS_PTHREAD_ENV
-#include <assert.h>
/*
* Use procedural initialization of mutexes/condition variables
MUTEX_INIT(&rxkad_random_mutex, "rxkad random", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_debug_mutex, "debug", MUTEX_DEFAULT, 0);
- assert(pthread_cond_init
+ osi_Assert(pthread_cond_init
(&rx_event_handler_cond, (const pthread_condattr_t *)0) == 0);
- assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
+ osi_Assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
== 0);
- assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
- assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
+ osi_Assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
+ osi_Assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
rxkad_global_stats_init();
}
pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
-#define INIT_PTHREAD_LOCKS \
-assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
+#define INIT_PTHREAD_LOCKS osi_Assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
/*
* The rx_stats_mutex mutex protects the following global variables:
* rxi_lowConnRefCount
return conn;
}
+/**
+ * Ensure a connection's timeout values are valid.
+ *
+ * @param[in] conn The connection to check
+ *
+ * @post conn->secondUntilDead <= conn->idleDeadTime <= conn->hardDeadTime,
+ * unless idleDeadTime and/or hardDeadTime are not set
+ * @internal
+ */
+static void
+rxi_CheckConnTimeouts(struct rx_connection *conn)
+{
+ /* a connection's timeouts must have the relationship
+ * deadTime <= idleDeadTime <= hardDeadTime. Otherwise, for example, a
+ * total loss of network to a peer may cause an idle timeout instead of a
+ * dead timeout, simply because the idle timeout gets hit first. Also set
+ * a minimum deadTime of 6, just to ensure it doesn't get set too low. */
+ /* this logic is slightly complicated by the fact that
+ * idleDeadTime/hardDeadTime may not be set at all, but it's not too bad.
+ */
+ conn->secondsUntilDead = MAX(conn->secondsUntilDead, 6);
+ if (conn->idleDeadTime) {
+ conn->idleDeadTime = MAX(conn->idleDeadTime, conn->secondsUntilDead);
+ }
+ if (conn->hardDeadTime) {
+ if (conn->idleDeadTime) {
+ conn->hardDeadTime = MAX(conn->idleDeadTime, conn->hardDeadTime);
+ } else {
+ conn->hardDeadTime = MAX(conn->secondsUntilDead, conn->hardDeadTime);
+ }
+ }
+}
+
void
rx_SetConnDeadTime(struct rx_connection *conn, int seconds)
{
/* The idea is to set the dead time to a value that allows several
* keepalives to be dropped without timing out the connection. */
- conn->secondsUntilDead = MAX(seconds, 6);
+ conn->secondsUntilDead = seconds;
+ rxi_CheckConnTimeouts(conn);
conn->secondsUntilPing = conn->secondsUntilDead / 6;
}
+void
+rx_SetConnHardDeadTime(struct rx_connection *conn, int seconds)
+{
+ conn->hardDeadTime = seconds;
+ rxi_CheckConnTimeouts(conn);
+}
+
+void
+rx_SetConnIdleDeadTime(struct rx_connection *conn, int seconds)
+{
+ conn->idleDeadTime = seconds;
+ rxi_CheckConnTimeouts(conn);
+}
+
int rxi_lowPeerRefCount = 0;
int rxi_lowConnRefCount = 0;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Wait for the transmit queue to no longer be busy.
* requires the call->lock to be held */
-static void rxi_WaitforTQBusy(struct rx_call *call) {
- while (call->flags & RX_CALL_TQ_BUSY) {
+void
+rxi_WaitforTQBusy(struct rx_call *call) {
+ while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
call->arrivalProc = (void (*)())0;
if (rc && call->error == 0) {
rxi_CallError(call, rc);
+ call->mode = RX_MODE_ERROR;
/* Send an abort message to the peer if this error code has
* only just been set. If it was set previously, assume the
* peer has already been sent the error code or will request it
if (conn->type == RX_SERVER_CONNECTION) {
/* Make sure reply or at least dummy reply is sent */
if (call->mode == RX_MODE_RECEIVING) {
+ MUTEX_EXIT(&call->lock);
rxi_WriteProc(call, 0, 0);
+ MUTEX_ENTER(&call->lock);
}
if (call->mode == RX_MODE_SENDING) {
+ MUTEX_EXIT(&call->lock);
rxi_FlushWrite(call);
+ MUTEX_ENTER(&call->lock);
}
rxi_calltrace(RX_CALL_END, call);
/* Call goes to hold state until reply packets are acknowledged */
* no reply arguments are expected */
if ((call->mode == RX_MODE_SENDING)
|| (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
+ MUTEX_EXIT(&call->lock);
(void)rxi_ReadProc(call, &dummy, 1);
+ MUTEX_ENTER(&call->lock);
}
/* If we had an outstanding delayed ack, be nice to the server
call->allNextp = rx_allCallsp;
rx_allCallsp = call;
call->call_id =
+ rx_atomic_inc_and_read(&rx_stats.nCallStructs);
+#else /* RXDEBUG_PACKET */
+ rx_atomic_inc(&rx_stats.nCallStructs);
#endif /* RXDEBUG_PACKET */
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.nCallStructs);
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
* this is the first time the packet has been seen */
packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
- dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %"AFS_PTR_FMT,
+ dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %"AFS_PTR_FMT"\n",
np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
np->header.epoch, np->header.cid, np->header.callNumber,
np->header.seq, np->header.flags, np));
case RX_PACKET_TYPE_ABORT: {
/* What if the supplied error is zero? */
afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
- dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode));
+ dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d\n", errcode));
rxi_ConnectionError(conn, errcode);
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount--;
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d\n",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
np->header.flags, np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
* flag is cleared.
*/
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- while ((call->state == RX_STATE_ACTIVE)
- && (call->flags & RX_CALL_TQ_BUSY)) {
- call->flags |= RX_CALL_TQ_WAIT;
- call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start lock3");
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- call->tqWaiters--;
- if (call->tqWaiters == 0)
- call->flags &= ~RX_CALL_TQ_WAIT;
- }
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_WaitforTQBusy(call);
+ /*
+ * If we entered error state while waiting,
+ * must call rxi_CallError to permit rxi_ResetCall
+ * to processed when the tqWaiter count hits zero.
+ */
+ if (call->error) {
+ rxi_CallError(call, call->error);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ conn->refCount--;
+ MUTEX_EXIT(&rx_refcnt_mutex);
+ return np;
+ }
+ }
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
/* If the new call cannot be taken right now send a busy and set
* the error condition in this call, so that it terminates as
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d\n",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
np->header.flags, np, np->retryTime.sec, np->retryTime.usec, np->length));
/* What if error is zero? */
/* What if the error is -1? the application will treat it as a timeout. */
afs_int32 errdata = ntohl(*(afs_int32 *) rx_DataOf(np));
- dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata));
+ dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d\n", errdata));
rxi_CallError(call, errdata);
MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&rx_refcnt_mutex);
rx_atomic_inc(&rx_stats.noPacketBuffersOnRead);
call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
- dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems", np));
+ dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems\n", np));
if (rxi_doreclaim)
rxi_ClearReceiveQueue(call);
clock_GetTime(&now);
&& queue_First(&call->rq, rx_packet)->header.seq == seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
- dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate", np));
+ dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np));
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
* Send an ack when requested by the peer, or once every
* rxi_SoftAckRate packets until the last packet has been
* received. Always send a soft ack for the last packet in
- * the server's reply. */
- if (ackNeeded) {
+ * the server's reply.
+ *
+ * If we have received all of the packets for the call
+ * immediately send an RX_PACKET_TYPE_ACKALL packet so that
+ * the peer can empty its packet queue and cancel all resend
+ * events.
+ */
+ if (call->flags & RX_CALL_RECEIVE_DONE) {
+ rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxi_AckAll(NULL, call, 0);
+ } else if (ackNeeded) {
rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, ackNeeded, istack);
} else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) {
call->delayedAckEvent =
rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
- } else if (call->flags & RX_CALL_RECEIVE_DONE) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
}
return np;
#endif
-/* rxi_ComputePeerNetStats
- *
- * Called exclusively by rxi_ReceiveAckPacket to compute network link
- * estimates (like RTT and throughput) based on ack packets. Caller
- * must ensure that the packet in question is the right one (i.e.
- * serial number matches).
- */
-static void
-rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
- struct rx_ackPacket *ap, struct rx_packet *np)
-{
- struct rx_peer *peer = call->conn->peer;
-
- /* Use RTT if not delayed by client and
- * ignore packets that were retransmitted. */
- if (!(p->flags & RX_PKTFLAG_ACKED) &&
- ap->reason != RX_ACK_DELAY &&
- clock_Eq(&p->timeSent, &p->firstSent))
- rxi_ComputeRoundTripTime(p, &p->timeSent, peer);
-#ifdef ADAPT_WINDOW
- rxi_ComputeRate(peer, call, p, np, ap->reason);
-#endif
-}
-
/* The real smarts of the whole thing. */
struct rx_packet *
rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
struct rx_connection *conn = call->conn;
struct rx_peer *peer = conn->peer;
+ struct clock now; /* Current time, for RTT calculations */
afs_uint32 first;
+ afs_uint32 prev;
afs_uint32 serial;
/* because there are CM's that are bogus, sending weird values for this. */
afs_uint32 skew = 0;
/* depends on ack packet struct */
nAcks = MIN((unsigned)nbytes, (unsigned)ap->nAcks);
first = ntohl(ap->firstPacket);
+ prev = ntohl(ap->previousPacket);
serial = ntohl(ap->serial);
/* temporarily disabled -- needs to degrade over time
* skew = ntohs(ap->maxSkew); */
/* Ignore ack packets received out of order */
- if (first < call->tfirst) {
+ if (first < call->tfirst ||
+ (first == call->tfirst && prev < call->tprev)) {
return np;
}
+ call->tprev = prev;
+
if (np->header.flags & RX_SLOW_START_OK) {
call->flags |= RX_CALL_SLOW_START_OK;
}
* acknowledged as having been sent to the peer's upper level.
* All other packets must be retained. So only packets with
* sequence numbers < ap->firstPacket are candidates. */
+
+ clock_GetTime(&now);
+
for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
if (tp->header.seq >= first)
break;
call->tfirst = tp->header.seq + 1;
- rxi_ComputePeerNetStats(call, tp, ap, np);
+
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
+
+ rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
}
+
+#ifdef ADAPT_WINDOW
+ rxi_ComputeRate(call->conn->peer, call, p, np, ap->reason);
+#endif
+
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* XXX Hack. Because we have to release the global rx lock when sending
* packets (osi_NetSend) we drop all acks while we're traversing the tq
call->nSoftAcked = 0;
for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
- /* Update round trip time if the ack was stimulated on receipt
- * of this packet */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-#ifdef RX_ENABLE_LOCKS
- if (tp->header.seq >= first)
-#endif /* RX_ENABLE_LOCKS */
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxi_ComputePeerNetStats(call, tp, ap, np);
- /* Set the acknowledge flag per packet based on the
+ /* Set the acknowledge flag per packet based on the
* information in the ack packet. An acknowlegded packet can
* be downgraded when the server has discarded a packet it
* soacked previously, or when an ack packet is received
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
tp->flags |= RX_PKTFLAG_ACKED;
+
+ rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
+#ifdef ADAPT_WINDOW
+ rxi_ComputeRate(call->conn->peer, call, tp, np,
+ ap->reason);
+#endif
}
if (missing) {
nNacked++;
missing = 1;
}
} else {
- tp->flags &= ~RX_PKTFLAG_ACKED;
- missing = 1;
+ if (tp->flags & RX_PKTFLAG_ACKED) {
+ tp->flags &= ~RX_PKTFLAG_ACKED;
+ missing = 1;
+ }
}
/*
maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
maxDgramPackets =
MIN(maxDgramPackets, (int)(peer->ifDgramPackets));
- maxDgramPackets = MIN(maxDgramPackets, tSize);
if (maxDgramPackets > 1) {
peer->maxDgramPackets = maxDgramPackets;
call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
#ifdef RXDEBUG_PACKET
call->rqc -= count;
if ( call->rqc != 0 )
- dpf(("rxi_ClearReceiveQueue call %"AFS_PTR_FMT" rqc %u != 0", call, call->rqc));
+ dpf(("rxi_ClearReceiveQueue call %"AFS_PTR_FMT" rqc %u != 0\n", call, call->rqc));
#endif
call->flags &= ~(RX_CALL_RECEIVE_DONE | RX_CALL_HAVE_LAST);
}
if (error) {
int i;
- dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d", conn, error));
+ dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d\n", conn, error));
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->challengeEvent)
}
}
+/**
+ * Interrupt an in-progress call with the specified error and wakeup waiters.
+ *
+ * @param[in] call The call to interrupt
+ * @param[in] error The error code to send to the peer
+ */
+void
+rx_InterruptCall(struct rx_call *call, afs_int32 error)
+{
+ MUTEX_ENTER(&call->lock);
+ rxi_CallError(call, error);
+ rxi_SendCallAbort(call, NULL, 0, 1);
+ MUTEX_EXIT(&call->lock);
+}
+
void
rxi_CallError(struct rx_call *call, afs_int32 error)
{
#ifdef DEBUG
osirx_AssertMine(&call->lock, "rxi_CallError");
#endif
- dpf(("rxi_CallError call %"AFS_PTR_FMT" error %d call->error %d", call, error, call->error));
+ dpf(("rxi_CallError call %"AFS_PTR_FMT" error %d call->error %d\n", call, error, call->error));
if (call->error)
error = call->error;
rxi_ResetCall(call, 0);
#endif
call->error = error;
- call->mode = RX_MODE_ERROR;
}
/* Reset various fields in a call structure, and wakeup waiting
call->nHardAcks = 0;
call->tfirst = call->rnext = call->tnext = 1;
+ call->tprev = 0;
call->rprev = 0;
call->lastAcked = 0;
call->localStatus = call->remoteStatus = 0;
return optionalPacket; /* Return packet for re-use by caller */
}
+struct xmitlist {
+ struct rx_packet **list;
+ int len;
+ int resending;
+};
+
/* Send all of the packets in the list in single datagram */
static void
-rxi_SendList(struct rx_call *call, struct rx_packet **list, int len,
+rxi_SendList(struct rx_call *call, struct xmitlist *xmit,
int istack, int moreFlag, struct clock *now,
- struct clock *retryTime, int resending)
+ struct clock *retryTime)
{
int i;
int requestAck = 0;
struct rx_peer *peer = conn->peer;
MUTEX_ENTER(&peer->peer_lock);
- peer->nSent += len;
- if (resending)
- peer->reSends += len;
+ peer->nSent += xmit->len;
+ if (xmit->resending)
+ peer->reSends += xmit->len;
MUTEX_EXIT(&peer->peer_lock);
if (rx_stats_active) {
- if (resending)
- rx_atomic_add(&rx_stats.dataPacketsReSent, len);
+ if (xmit->resending)
+ rx_atomic_add(&rx_stats.dataPacketsReSent, xmit->len);
else
- rx_atomic_add(&rx_stats.dataPacketsSent, len);
+ rx_atomic_add(&rx_stats.dataPacketsSent, xmit->len);
}
- if (list[len - 1]->header.flags & RX_LAST_PACKET) {
+ if (xmit->list[xmit->len - 1]->header.flags & RX_LAST_PACKET) {
lastPacket = 1;
}
/* Set the packet flags and schedule the resend events */
/* Only request an ack for the last packet in the list */
- for (i = 0; i < len; i++) {
- list[i]->retryTime = *retryTime;
- if (list[i]->header.serial) {
+ for (i = 0; i < xmit->len; i++) {
+ struct rx_packet *packet = xmit->list[i];
+
+ packet->retryTime = *retryTime;
+ if (packet->header.serial) {
/* Exponentially backoff retry times */
- if (list[i]->backoff < MAXBACKOFF) {
+ if (packet->backoff < MAXBACKOFF) {
/* so it can't stay == 0 */
- list[i]->backoff = (list[i]->backoff << 1) + 1;
+ packet->backoff = (packet->backoff << 1) + 1;
} else
- list[i]->backoff++;
- clock_Addmsec(&(list[i]->retryTime),
- ((afs_uint32) list[i]->backoff) << 8);
+ packet->backoff++;
+ clock_Addmsec(&(packet->retryTime),
+ ((afs_uint32) packet->backoff) << 8);
}
/* Wait a little extra for the ack on the last packet */
- if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
- clock_Addmsec(&(list[i]->retryTime), 400);
+ if (lastPacket
+ && !(packet->header.flags & RX_CLIENT_INITIATED)) {
+ clock_Addmsec(&(packet->retryTime), 400);
}
/* Record the time sent */
- list[i]->timeSent = *now;
+ packet->timeSent = *now;
/* Ask for an ack on retransmitted packets, on every other packet
* if the peer doesn't support slow start. Ask for an ack on every
* packet until the congestion window reaches the ack rate. */
- if (list[i]->header.serial) {
+ if (packet->header.serial) {
requestAck = 1;
} else {
/* improved RTO calculation- not Karn */
- list[i]->firstSent = *now;
+ packet->firstSent = *now;
if (!lastPacket && (call->cwind <= (u_short) (conn->ackRate + 1)
|| (!(call->flags & RX_CALL_SLOW_START_OK)
- && (list[i]->header.seq & 1)))) {
+ && (packet->header.seq & 1)))) {
requestAck = 1;
}
}
/* Tag this packet as not being the last in this group,
* for the receiver's benefit */
- if (i < len - 1 || moreFlag) {
- list[i]->header.flags |= RX_MORE_PACKETS;
+ if (i < xmit->len - 1 || moreFlag) {
+ packet->header.flags |= RX_MORE_PACKETS;
}
/* Install the new retransmit time for the packet, and
* record the time sent */
- list[i]->timeSent = *now;
+ packet->timeSent = *now;
}
if (requestAck) {
- list[len - 1]->header.flags |= RX_REQUEST_ACK;
+ xmit->list[xmit->len - 1]->header.flags |= RX_REQUEST_ACK;
}
/* Since we're about to send a data packet to the peer, it's
MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
MUTEX_EXIT(&rx_refcnt_mutex);
- if (len > 1) {
- rxi_SendPacketList(call, conn, list, len, istack);
+ if (xmit->len > 1) {
+ rxi_SendPacketList(call, conn, xmit->list, xmit->len, istack);
} else {
- rxi_SendPacket(call, conn, list[0], istack);
+ rxi_SendPacket(call, conn, xmit->list[0], istack);
}
MUTEX_ENTER(&call->lock);
MUTEX_ENTER(&rx_refcnt_mutex);
* idle connections) */
conn->lastSendTime = call->lastSendTime = clock_Sec();
/* Let a set of retransmits trigger an idle timeout */
- if (!resending)
+ if (!xmit->resending)
call->lastSendData = call->lastSendTime;
}
* We always keep the last list we should have sent so we
* can set the RX_MORE_PACKETS flags correctly.
*/
+
static void
rxi_SendXmitList(struct rx_call *call, struct rx_packet **list, int len,
- int istack, struct clock *now, struct clock *retryTime,
- int resending)
+ int istack, struct clock *now, struct clock *retryTime)
{
- int i, cnt, lastCnt = 0;
- struct rx_packet **listP, **lastP = 0;
+ int i;
+ struct xmitlist working;
+ struct xmitlist last;
+
struct rx_peer *peer = call->conn->peer;
int morePackets = 0;
- for (cnt = 0, listP = &list[0], i = 0; i < len; i++) {
+ memset(&last, 0, sizeof(struct xmitlist));
+ working.list = &list[0];
+ working.len = 0;
+ working.resending = 0;
+
+ for (i = 0; i < len; i++) {
/* Does the current packet force us to flush the current list? */
- if (cnt > 0
+ if (working.len > 0
&& (list[i]->header.serial || (list[i]->flags & RX_PKTFLAG_ACKED)
|| list[i]->length > RX_JUMBOBUFFERSIZE)) {
- if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime,
- resending);
+
+ /* This sends the 'last' list and then rolls the current working
+ * set into the 'last' one, and resets the working set */
+
+ if (last.len > 0) {
+ rxi_SendList(call, &last, istack, 1, now, retryTime);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
return;
}
- lastP = listP;
- lastCnt = cnt;
- listP = &list[i];
- cnt = 0;
+ last = working;
+ working.len = 0;
+ working.resending = 0;
+ working.list = &list[i];
}
/* Add the current packet to the list if it hasn't been acked.
* Otherwise adjust the list pointer to skip the current packet. */
if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
- cnt++;
+ working.len++;
+
+ if (list[i]->header.serial)
+ working.resending = 1;
+
/* Do we need to flush the list? */
- if (cnt >= (int)peer->maxDgramPackets
- || cnt >= (int)call->nDgramPackets || cnt >= (int)call->cwind
+ if (working.len >= (int)peer->maxDgramPackets
+ || working.len >= (int)call->nDgramPackets
+ || working.len >= (int)call->cwind
|| list[i]->header.serial
|| list[i]->length != RX_JUMBOBUFFERSIZE) {
- if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, 1, now,
- retryTime, resending);
+ if (last.len > 0) {
+ rxi_SendList(call, &last, istack, 1, now, retryTime);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
if (call->error
|| (call->flags & RX_CALL_FAST_RECOVER_WAIT))
return;
}
- lastP = listP;
- lastCnt = cnt;
- listP = &list[i + 1];
- cnt = 0;
+ last = working;
+ working.len = 0;
+ working.resending = 0;
+ working.list = &list[i + 1];
}
} else {
- if (cnt != 0) {
+ if (working.len != 0) {
osi_Panic("rxi_SendList error");
}
- listP = &list[i + 1];
+ working.list = &list[i + 1];
}
}
* an acked packet. Since we always send retransmissions
* in a separate packet, we only need to check the first
* packet in the list */
- if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
+ if (working.len > 0 && !(working.list[0]->flags & RX_PKTFLAG_ACKED)) {
morePackets = 1;
}
- if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, morePackets, now,
- retryTime, resending);
+ if (last.len > 0) {
+ rxi_SendList(call, &last, istack, morePackets, now, retryTime);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
return;
}
if (morePackets) {
- rxi_SendList(call, listP, cnt, istack, 0, now, retryTime,
- resending);
+ rxi_SendList(call, &working, istack, 0, now, retryTime);
}
- } else if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime,
- resending);
+ } else if (last.len > 0) {
+ rxi_SendList(call, &last, istack, 0, now, retryTime);
+ /* Packets which are in 'working' are not sent by this call */
}
}
int haveEvent;
int nXmitPackets;
int maxXmitPackets;
- struct rx_packet **xmitList;
- int resending = 0;
/* If rxi_Start is being called as a result of a resend event,
* then make sure that the event pointer is removed from the call
CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
MUTEX_EXIT(&rx_refcnt_mutex);
call->resendEvent = NULL;
- resending = 1;
if (queue_IsEmpty(&call->tq)) {
/* Nothing to do */
return;
rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
- call->flags |= RX_CALL_FAST_RECOVER;
- if (peer->maxDgramPackets > 1) {
- call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
- } else {
- call->MTU = MIN(peer->natMTU, peer->maxMTU);
- }
- call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
- call->nDgramPackets = 1;
- call->cwind = 1;
- call->nextCwind = 1;
- call->nAcks = 0;
- call->nNacks = 0;
- MUTEX_ENTER(&peer->peer_lock);
- peer->MTU = call->MTU;
- peer->cwind = call->cwind;
- peer->nDgramPackets = 1;
- peer->congestSeq++;
- call->congestSeq = peer->congestSeq;
- MUTEX_EXIT(&peer->peer_lock);
- /* Clear retry times on packets. Otherwise, it's possible for
- * some packets in the queue to force resends at rates faster
- * than recovery rates.
- */
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
- if (!(p->flags & RX_PKTFLAG_ACKED)) {
- clock_Zero(&p->retryTime);
- }
- }
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+ if (call->error) {
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_tq_debug.rxi_start_in_error);
+ return;
+ }
+#endif
+ call->flags |= RX_CALL_FAST_RECOVER;
+
+ if (peer->maxDgramPackets > 1) {
+ call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
+ } else {
+ call->MTU = MIN(peer->natMTU, peer->maxMTU);
+ }
+ call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
+ call->nDgramPackets = 1;
+ call->cwind = 1;
+ call->nextCwind = 1;
+ call->nAcks = 0;
+ call->nNacks = 0;
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->MTU = call->MTU;
+ peer->cwind = call->cwind;
+ peer->nDgramPackets = 1;
+ peer->congestSeq++;
+ call->congestSeq = peer->congestSeq;
+ MUTEX_EXIT(&peer->peer_lock);
+ /* Clear retry times on packets. Otherwise, it's possible for
+ * some packets in the queue to force resends at rates faster
+ * than recovery rates.
+ */
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (!(p->flags & RX_PKTFLAG_ACKED)) {
+ clock_Zero(&p->retryTime);
+ }
+ }
}
if (call->error) {
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
- xmitList = (struct rx_packet **)
-#if defined(KERNEL) && !defined(UKERNEL) && defined(AFS_FBSD80_ENV)
- /* XXXX else we must drop any mtx we hold */
- afs_osi_Alloc_NoSleep(maxXmitPackets * sizeof(struct rx_packet *));
-#else
- osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
-#endif
- if (xmitList == NULL)
- osi_Panic("rxi_Start, failed to allocate xmit list");
for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
/* We shouldn't be sending packets if a thread is waiting
/* Transmit the packet if it needs to be sent. */
if (!clock_Lt(&now, &p->retryTime)) {
if (nXmitPackets == maxXmitPackets) {
- rxi_SendXmitList(call, xmitList, nXmitPackets,
- istack, &now, &retryTime,
- resending);
- osi_Free(xmitList, maxXmitPackets *
- sizeof(struct rx_packet *));
+ rxi_SendXmitList(call, call->xmitList,
+ nXmitPackets, istack, &now,
+ &retryTime);
goto restart;
}
dpf(("call %d xmit packet %"AFS_PTR_FMT" now %u.%06u retryTime %u.%06u nextRetry %u.%06u\n",
now.sec, now.usec,
p->retryTime.sec, p->retryTime.usec,
retryTime.sec, retryTime.usec));
- xmitList[nXmitPackets++] = p;
+ call->xmitList[nXmitPackets++] = p;
}
}
/* xmitList now hold pointers to all of the packets that are
* ready to send. Now we loop to send the packets */
if (nXmitPackets > 0) {
- rxi_SendXmitList(call, xmitList, nXmitPackets, istack,
- &now, &retryTime, resending);
+ rxi_SendXmitList(call, call->xmitList, nXmitPackets,
+ istack, &now, &retryTime);
}
- osi_Free(xmitList,
- maxXmitPackets * sizeof(struct rx_packet *));
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/*
{
struct rx_connection *conn = call->conn;
afs_uint32 now;
- afs_uint32 deadTime;
+ afs_uint32 deadTime, idleDeadTime = 0, hardDeadTime = 0;
+ afs_uint32 fudgeFactor;
int cerror = 0;
int newmtu = 0;
return 0;
}
#endif
- /* dead time + RTT + 8*MDEV, rounded up to next second. */
- deadTime =
- (((afs_uint32) conn->secondsUntilDead << 10) +
- ((afs_uint32) conn->peer->rtt >> 3) +
- ((afs_uint32) conn->peer->rtt_dev << 1) + 1023) >> 10;
+ /* RTT + 8*MDEV, rounded up to the next second. */
+ fudgeFactor = (((afs_uint32) conn->peer->rtt >> 3) +
+ ((afs_uint32) conn->peer->rtt_dev << 1) + 1023) >> 10;
+
+ deadTime = conn->secondsUntilDead + fudgeFactor;
now = clock_Sec();
/* These are computed to the second (+- 1 second). But that's
* good enough for these values, which should be a significant
* to pings; active calls are simply flagged in error, so the
* attached process can die reasonably gracefully. */
}
+
+ if (conn->idleDeadTime) {
+ idleDeadTime = conn->idleDeadTime + fudgeFactor;
+ }
+
/* see if we have a non-activity timeout */
- if (call->startWait && conn->idleDeadTime
- && ((call->startWait + conn->idleDeadTime) < now) &&
+ if (call->startWait && idleDeadTime
+ && ((call->startWait + idleDeadTime) < now) &&
(call->flags & RX_CALL_READER_WAIT)) {
if (call->state == RX_STATE_ACTIVE) {
cerror = RX_CALL_TIMEOUT;
goto mtuout;
}
}
- if (call->lastSendData && conn->idleDeadTime && (conn->idleDeadErr != 0)
- && ((call->lastSendData + conn->idleDeadTime) < now)) {
+ if (call->lastSendData && idleDeadTime && (conn->idleDeadErr != 0)
+ && ((call->lastSendData + idleDeadTime) < now)) {
if (call->state == RX_STATE_ACTIVE) {
cerror = conn->idleDeadErr;
goto mtuout;
}
}
+
+ if (hardDeadTime) {
+ hardDeadTime = conn->hardDeadTime + fudgeFactor;
+ }
+
/* see if we have a hard timeout */
- if (conn->hardDeadTime
- && (now > (conn->hardDeadTime + call->startTime.sec))) {
+ if (hardDeadTime
+ && (now > (hardDeadTime + call->startTime.sec))) {
if (call->state == RX_STATE_ACTIVE)
rxi_CallError(call, RX_CALL_TIMEOUT);
return -1;
}
-/* Compute round trip time of the packet provided, in *rttp.
- */
-
/* rxi_ComputeRoundTripTime is called with peer locked. */
-/* sentp and/or peer may be null */
-void
+/* peer may be null */
+static void
rxi_ComputeRoundTripTime(struct rx_packet *p,
- struct clock *sentp,
- struct rx_peer *peer)
+ struct rx_ackPacket *ack,
+ struct rx_peer *peer,
+ struct clock *now)
{
- struct clock thisRtt, *rttp = &thisRtt;
-
+ struct clock thisRtt, *sentp;
int rtt_timeout;
+ int serial;
- clock_GetTime(rttp);
+ /* If the ACK is delayed, then do nothing */
+ if (ack->reason == RX_ACK_DELAY)
+ return;
- if (clock_Lt(rttp, sentp)) {
- clock_Zero(rttp);
- return; /* somebody set the clock back, don't count this time. */
+ /* On the wire, jumbograms are a single UDP packet. We shouldn't count
+ * their RTT multiple times, so only include the RTT of the last packet
+ * in a jumbogram */
+ if (p->flags & RX_JUMBO_PACKET)
+ return;
+
+ /* Use the serial number to determine which transmission the ACK is for,
+ * and set the sent time to match this. If we have no serial number, then
+ * only use the ACK for RTT calculations if the packet has not been
+ * retransmitted
+ */
+
+ serial = ntohl(ack->serial);
+ if (serial) {
+ if (serial == p->header.serial) {
+ sentp = &p->timeSent;
+ } else if (serial == p->firstSerial) {
+ sentp = &p->firstSent;
+ } else if (clock_Eq(&p->timeSent, &p->firstSent)) {
+ sentp = &p->firstSent;
+ } else
+ return;
+ } else {
+ if (clock_Eq(&p->timeSent, &p->firstSent)) {
+ sentp = &p->firstSent;
+ } else
+ return;
}
- clock_Sub(rttp, sentp);
+
+ thisRtt = *now;
+
+ if (clock_Lt(&thisRtt, sentp))
+ return; /* somebody set the clock back, don't count this time. */
+
+ clock_Sub(&thisRtt, sentp);
dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rttp=%d.%06d sec)\n",
- p->header.callNumber, p, rttp->sec, rttp->usec));
+ p->header.callNumber, p, thisRtt.sec, thisRtt.usec));
- if (rttp->sec == 0 && rttp->usec == 0) {
+ if (clock_IsZero(&thisRtt)) {
/*
* The actual round trip time is shorter than the
* clock_GetTime resolution. It is most likely 1ms or 100ns.
* Since we can't tell which at the moment we will assume 1ms.
*/
- rttp->usec = 1000;
+ thisRtt.usec = 1000;
}
if (rx_stats_active) {
MUTEX_ENTER(&rx_stats_mutex);
- if (clock_Lt(rttp, &rx_stats.minRtt))
- rx_stats.minRtt = *rttp;
- if (clock_Gt(rttp, &rx_stats.maxRtt)) {
- if (rttp->sec > 60) {
+ if (clock_Lt(&thisRtt, &rx_stats.minRtt))
+ rx_stats.minRtt = thisRtt;
+ if (clock_Gt(&thisRtt, &rx_stats.maxRtt)) {
+ if (thisRtt.sec > 60) {
MUTEX_EXIT(&rx_stats_mutex);
return; /* somebody set the clock ahead */
}
- rx_stats.maxRtt = *rttp;
+ rx_stats.maxRtt = thisRtt;
}
- clock_Add(&rx_stats.totalRtt, rttp);
+ clock_Add(&rx_stats.totalRtt, &thisRtt);
rx_atomic_inc(&rx_stats.nRttSamples);
MUTEX_EXIT(&rx_stats_mutex);
}
* srtt' = srtt + (rtt - srtt)/8
*/
- delta = _8THMSEC(rttp) - peer->rtt;
+ delta = _8THMSEC(&thisRtt) - peer->rtt;
peer->rtt += (delta >> 3);
/*
* little, and I set deviance to half the rtt. In practice,
* deviance tends to approach something a little less than
* half the smoothed rtt. */
- peer->rtt = _8THMSEC(rttp) + 8;
+ peer->rtt = _8THMSEC(&thisRtt) + 8;
peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
}
- /* the timeout is RTT + 4*MDEV but no less than rx_minPeerTimeout msec.
+ /* the timeout is RTT + 4*MDEV + rx_minPeerTimeout msec.
* This is because one end or the other of these connections is usually
* in a user process, and can be switched and/or swapped out. So on fast,
* reliable networks, the timeout would otherwise be too short. */
- rtt_timeout = MAX(((peer->rtt >> 3) + peer->rtt_dev), rx_minPeerTimeout);
+ rtt_timeout = ((peer->rtt >> 3) + peer->rtt_dev) + rx_minPeerTimeout;
clock_Zero(&(peer->timeout));
clock_Addmsec(&(peer->timeout), rtt_timeout);
peer->backedOff = 0;
dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%06d sec)\n",
- p->header.callNumber, p, MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2, (peer->timeout.sec), (peer->timeout.usec)));
+ p->header.callNumber, p, MSEC(&thisRtt), peer->rtt >> 3, peer->rtt_dev >> 2, (peer->timeout.sec), (peer->timeout.usec)));
}
return;
}
- dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %d.%06d, rtt %u, ps %u)",
+ dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %d.%06d, rtt %u, ps %u)\n",
ntohl(peer->host), ntohs(peer->port), (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt, peer->ifMTU));
* one packet exchange */
if (clock_Gt(&newTO, &peer->timeout)) {
- dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)",
+ dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)\n",
ntohl(peer->host), ntohs(peer->port), peer->timeout.sec, peer->timeout.usec,
newTO.sec, newTO.usec, peer->smRtt));
else if (minTime > rx_maxSendWindow)
minTime = rx_maxSendWindow;
/* if (minTime != peer->maxWindow) {
- dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)",
+ dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)\n",
ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
peer->timeout.sec, peer->timeout.usec, peer->smRtt));
peer->maxWindow = minTime;
/* calculate estimate for transmission interval in milliseconds */
minTime = rx_maxSendWindow * peer->smRtt;
if (minTime < 1000) {
- dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)",
+ dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)\n",
ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
peer->timeout.usec, peer->smRtt));
void
rx_StatsOnOff(int on)
{
-#ifdef RXDEBUG
rx_stats_active = on;
-#endif
}
if (len > 0) {
len = _vsnprintf(msg, sizeof(msg)-2, tformat, ap);
- if (len > 0) {
- if (msg[len-1] != '\n') {
- msg[len] = '\n';
- msg[len+1] = '\0';
- }
+ if (len > 0)
OutputDebugString(msg);
- }
}
va_end(ap);
#else
fprintf(rx_Log, " %d.%06d:", (unsigned int)now.sec,
(unsigned int)now.usec);
vfprintf(rx_Log, format, ap);
- putc('\n', rx_Log);
va_end(ap);
#endif
#endif