#endif
#ifdef AFS_PTHREAD_ENV
-#include <assert.h>
/*
* Use procedural initialization of mutexes/condition variables
MUTEX_INIT(&rxkad_random_mutex, "rxkad random", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_debug_mutex, "debug", MUTEX_DEFAULT, 0);
- assert(pthread_cond_init
+ osi_Assert(pthread_cond_init
(&rx_event_handler_cond, (const pthread_condattr_t *)0) == 0);
- assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
+ osi_Assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
== 0);
- assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
- assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
+ osi_Assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
+ osi_Assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
rxkad_global_stats_init();
}
pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
-#define INIT_PTHREAD_LOCKS \
-assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
+#define INIT_PTHREAD_LOCKS osi_Assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
/*
* The rx_stats_mutex mutex protects the following global variables:
* rxi_lowConnRefCount
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Wait for the transmit queue to no longer be busy.
* requires the call->lock to be held */
-static void rxi_WaitforTQBusy(struct rx_call *call) {
- while (call->flags & RX_CALL_TQ_BUSY) {
+void
+rxi_WaitforTQBusy(struct rx_call *call) {
+ while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
* this is the first time the packet has been seen */
packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
- dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %"AFS_PTR_FMT,
+ dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %"AFS_PTR_FMT"\n",
np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
np->header.epoch, np->header.cid, np->header.callNumber,
np->header.seq, np->header.flags, np));
case RX_PACKET_TYPE_ABORT: {
/* What if the supplied error is zero? */
afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
- dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode));
+ dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d\n", errcode));
rxi_ConnectionError(conn, errcode);
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount--;
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d\n",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
np->header.flags, np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
* flag is cleared.
*/
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- while ((call->state == RX_STATE_ACTIVE)
- && (call->flags & RX_CALL_TQ_BUSY)) {
- call->flags |= RX_CALL_TQ_WAIT;
- call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start lock3");
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- call->tqWaiters--;
- if (call->tqWaiters == 0)
- call->flags &= ~RX_CALL_TQ_WAIT;
- }
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_WaitforTQBusy(call);
+ /*
+ * If we entered error state while waiting,
+ * must call rxi_CallError to permit rxi_ResetCall
+ * to processed when the tqWaiter count hits zero.
+ */
+ if (call->error) {
+ rxi_CallError(call, call->error);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ conn->refCount--;
+ MUTEX_EXIT(&rx_refcnt_mutex);
+ return np;
+ }
+ }
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
/* If the new call cannot be taken right now send a busy and set
* the error condition in this call, so that it terminates as
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d\n",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
np->header.flags, np, np->retryTime.sec, np->retryTime.usec, np->length));
/* What if error is zero? */
/* What if the error is -1? the application will treat it as a timeout. */
afs_int32 errdata = ntohl(*(afs_int32 *) rx_DataOf(np));
- dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata));
+ dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d\n", errdata));
rxi_CallError(call, errdata);
MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&rx_refcnt_mutex);
rx_atomic_inc(&rx_stats.noPacketBuffersOnRead);
call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
- dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems", np));
+ dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems\n", np));
if (rxi_doreclaim)
rxi_ClearReceiveQueue(call);
clock_GetTime(&now);
&& queue_First(&call->rq, rx_packet)->header.seq == seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
- dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate", np));
+ dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np));
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
#endif
-/* rxi_ComputePeerNetStats
- *
- * Called exclusively by rxi_ReceiveAckPacket to compute network link
- * estimates (like RTT and throughput) based on ack packets. Caller
- * must ensure that the packet in question is the right one (i.e.
- * serial number matches).
- */
-static void
-rxi_ComputePeerNetStats(struct rx_call *call, struct rx_packet *p,
- struct rx_ackPacket *ap, struct rx_packet *np,
- struct clock *now)
-{
- struct rx_peer *peer = call->conn->peer;
-
- /* Use RTT if not delayed by client and
- * ignore packets that were retransmitted. */
- if (!(p->flags & RX_PKTFLAG_ACKED) &&
- ap->reason != RX_ACK_DELAY &&
- clock_Eq(&p->timeSent, &p->firstSent))
- rxi_ComputeRoundTripTime(p, &p->timeSent, peer, now);
-#ifdef ADAPT_WINDOW
- rxi_ComputeRate(peer, call, p, np, ap->reason);
-#endif
-}
-
/* The real smarts of the whole thing. */
struct rx_packet *
rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
struct rx_peer *peer = conn->peer;
struct clock now; /* Current time, for RTT calculations */
afs_uint32 first;
+ afs_uint32 prev;
afs_uint32 serial;
/* because there are CM's that are bogus, sending weird values for this. */
afs_uint32 skew = 0;
/* depends on ack packet struct */
nAcks = MIN((unsigned)nbytes, (unsigned)ap->nAcks);
first = ntohl(ap->firstPacket);
+ prev = ntohl(ap->previousPacket);
serial = ntohl(ap->serial);
/* temporarily disabled -- needs to degrade over time
* skew = ntohs(ap->maxSkew); */
/* Ignore ack packets received out of order */
- if (first < call->tfirst) {
+ if (first < call->tfirst ||
+ (first == call->tfirst && prev < call->tprev)) {
return np;
}
+ call->tprev = prev;
+
if (np->header.flags & RX_SLOW_START_OK) {
call->flags |= RX_CALL_SLOW_START_OK;
}
if (tp->header.seq >= first)
break;
call->tfirst = tp->header.seq + 1;
- rxi_ComputePeerNetStats(call, tp, ap, np, &now);
+
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
+ if (ap->reason != RX_ACK_DELAY &&
+ clock_Eq(&tp->timeSent, &tp->firstSent)) {
+ rxi_ComputeRoundTripTime(tp, &tp->timeSent, call->conn->peer,
+ &now);
+ }
}
+
+#ifdef ADAPT_WINDOW
+ rxi_ComputeRate(call->conn->peer, call, p, np, ap->reason);
+#endif
+
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* XXX Hack. Because we have to release the global rx lock when sending
* packets (osi_NetSend) we drop all acks while we're traversing the tq
call->nSoftAcked = 0;
for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
- /* Update round trip time if the ack was stimulated on receipt
- * of this packet */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-#ifdef RX_ENABLE_LOCKS
- if (tp->header.seq >= first)
-#endif /* RX_ENABLE_LOCKS */
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxi_ComputePeerNetStats(call, tp, ap, np, &now);
- /* Set the acknowledge flag per packet based on the
+ /* Set the acknowledge flag per packet based on the
* information in the ack packet. An acknowlegded packet can
* be downgraded when the server has discarded a packet it
* soacked previously, or when an ack packet is received
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
tp->flags |= RX_PKTFLAG_ACKED;
+
+ if (ap->reason != RX_ACK_DELAY &&
+ clock_Eq(&tp->timeSent, &tp->firstSent)) {
+ rxi_ComputeRoundTripTime(tp, &tp->timeSent,
+ call->conn->peer, &now);
+ }
+#ifdef ADAPT_WINDOW
+ rxi_ComputeRate(call->conn->peer, call, tp, np,
+ ap->reason);
+#endif
}
if (missing) {
nNacked++;
missing = 1;
}
} else {
- tp->flags &= ~RX_PKTFLAG_ACKED;
- missing = 1;
+ if (tp->flags & RX_PKTFLAG_ACKED) {
+ tp->flags &= ~RX_PKTFLAG_ACKED;
+ missing = 1;
+ }
}
/*
#ifdef RXDEBUG_PACKET
call->rqc -= count;
if ( call->rqc != 0 )
- dpf(("rxi_ClearReceiveQueue call %"AFS_PTR_FMT" rqc %u != 0", call, call->rqc));
+ dpf(("rxi_ClearReceiveQueue call %"AFS_PTR_FMT" rqc %u != 0\n", call, call->rqc));
#endif
call->flags &= ~(RX_CALL_RECEIVE_DONE | RX_CALL_HAVE_LAST);
}
if (error) {
int i;
- dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d", conn, error));
+ dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d\n", conn, error));
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->challengeEvent)
}
}
+/**
+ * Interrupt an in-progress call with the specified error and wakeup waiters.
+ *
+ * @param[in] call The call to interrupt
+ * @param[in] error The error code to send to the peer
+ */
+void
+rx_InterruptCall(struct rx_call *call, afs_int32 error)
+{
+ MUTEX_ENTER(&call->lock);
+ rxi_CallError(call, error);
+ rxi_SendCallAbort(call, NULL, 0, 1);
+ MUTEX_EXIT(&call->lock);
+}
+
void
rxi_CallError(struct rx_call *call, afs_int32 error)
{
#ifdef DEBUG
osirx_AssertMine(&call->lock, "rxi_CallError");
#endif
- dpf(("rxi_CallError call %"AFS_PTR_FMT" error %d call->error %d", call, error, call->error));
+ dpf(("rxi_CallError call %"AFS_PTR_FMT" error %d call->error %d\n", call, error, call->error));
if (call->error)
error = call->error;
call->nHardAcks = 0;
call->tfirst = call->rnext = call->tnext = 1;
+ call->tprev = 0;
call->rprev = 0;
call->lastAcked = 0;
call->localStatus = call->remoteStatus = 0;
rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
- call->flags |= RX_CALL_FAST_RECOVER;
- if (peer->maxDgramPackets > 1) {
- call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
- } else {
- call->MTU = MIN(peer->natMTU, peer->maxMTU);
- }
- call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
- call->nDgramPackets = 1;
- call->cwind = 1;
- call->nextCwind = 1;
- call->nAcks = 0;
- call->nNacks = 0;
- MUTEX_ENTER(&peer->peer_lock);
- peer->MTU = call->MTU;
- peer->cwind = call->cwind;
- peer->nDgramPackets = 1;
- peer->congestSeq++;
- call->congestSeq = peer->congestSeq;
- MUTEX_EXIT(&peer->peer_lock);
- /* Clear retry times on packets. Otherwise, it's possible for
- * some packets in the queue to force resends at rates faster
- * than recovery rates.
- */
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
- if (!(p->flags & RX_PKTFLAG_ACKED)) {
- clock_Zero(&p->retryTime);
- }
- }
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+ if (call->error) {
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_tq_debug.rxi_start_in_error);
+ return;
+ }
+#endif
+ call->flags |= RX_CALL_FAST_RECOVER;
+
+ if (peer->maxDgramPackets > 1) {
+ call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
+ } else {
+ call->MTU = MIN(peer->natMTU, peer->maxMTU);
+ }
+ call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
+ call->nDgramPackets = 1;
+ call->cwind = 1;
+ call->nextCwind = 1;
+ call->nAcks = 0;
+ call->nNacks = 0;
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->MTU = call->MTU;
+ peer->cwind = call->cwind;
+ peer->nDgramPackets = 1;
+ peer->congestSeq++;
+ call->congestSeq = peer->congestSeq;
+ MUTEX_EXIT(&peer->peer_lock);
+ /* Clear retry times on packets. Otherwise, it's possible for
+ * some packets in the queue to force resends at rates faster
+ * than recovery rates.
+ */
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (!(p->flags & RX_PKTFLAG_ACKED)) {
+ clock_Zero(&p->retryTime);
+ }
+ }
}
if (call->error) {
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
return;
}
- dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %d.%06d, rtt %u, ps %u)",
+ dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %d.%06d, rtt %u, ps %u)\n",
ntohl(peer->host), ntohs(peer->port), (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt, peer->ifMTU));
* one packet exchange */
if (clock_Gt(&newTO, &peer->timeout)) {
- dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)",
+ dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)\n",
ntohl(peer->host), ntohs(peer->port), peer->timeout.sec, peer->timeout.usec,
newTO.sec, newTO.usec, peer->smRtt));
else if (minTime > rx_maxSendWindow)
minTime = rx_maxSendWindow;
/* if (minTime != peer->maxWindow) {
- dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)",
+ dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)\n",
ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
peer->timeout.sec, peer->timeout.usec, peer->smRtt));
peer->maxWindow = minTime;
/* calculate estimate for transmission interval in milliseconds */
minTime = rx_maxSendWindow * peer->smRtt;
if (minTime < 1000) {
- dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)",
+ dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)\n",
ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
peer->timeout.usec, peer->smRtt));
if (len > 0) {
len = _vsnprintf(msg, sizeof(msg)-2, tformat, ap);
- if (len > 0) {
- if (msg[len-1] != '\n') {
- msg[len] = '\n';
- msg[len+1] = '\0';
- }
+ if (len > 0)
OutputDebugString(msg);
- }
}
va_end(ap);
#else
fprintf(rx_Log, " %d.%06d:", (unsigned int)now.sec,
(unsigned int)now.usec);
vfprintf(rx_Log, format, ap);
- putc('\n', rx_Log);
va_end(ap);
#endif
#endif