struct rx_call *acall)
{
struct rx_call *call = acall;
- struct clock when;
+ struct clock when, now;
int i, waiting;
MUTEX_ENTER(&conn->conn_data_lock);
if (call != acall)
MUTEX_EXIT(&call->lock);
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
when.sec += RX_CHECKREACH_TIMEOUT;
MUTEX_ENTER(&conn->conn_data_lock);
if (!conn->checkReachEvent) {
conn->refCount++;
conn->checkReachEvent =
- rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
+ rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn,
+ NULL);
}
MUTEX_EXIT(&conn->conn_data_lock);
}
afs_uint32 seq, serial, flags;
int isFirst;
struct rx_packet *tnp;
- struct clock when;
+ struct clock when, now;
rx_MutexIncrement(rx_stats.dataPacketsRead, rx_stats_mutex);
#ifdef KERNEL
dpf(("packet %x dropped on receipt - quota problems", np));
if (rxi_doreclaim)
rxi_ClearReceiveQueue(call);
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
clock_Add(&when, &rx_softAckDelay);
if (!call->delayedAckEvent
|| clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
/* we've damaged this call already, might as well do it in. */
return np;
rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
} else if (call->nSoftAcks) {
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
clock_Add(&when, &rx_lastAckDelay);
} else {
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
} else if (call->flags & RX_CALL_RECEIVE_DONE) {
rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
int istack, int force)
{
afs_int32 error;
- struct clock when;
+ struct clock when, now;
if (!call->error)
return packet;
rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
(char *)&error, sizeof(error), istack);
} else if (!call->delayedAbortEvent) {
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
clock_Addmsec(&when, rxi_callAbortDelay);
CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
call->delayedAbortEvent =
- rxevent_Post(&when, rxi_SendDelayedCallAbort, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedCallAbort, call, 0);
}
return packet;
}
struct rx_packet *packet, int istack, int force)
{
afs_int32 error;
- struct clock when;
+ struct clock when, now;
if (!conn->error)
return packet;
sizeof(error), istack);
MUTEX_ENTER(&conn->conn_data_lock);
} else if (!conn->delayedAbortEvent) {
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
clock_Addmsec(&when, rxi_connAbortDelay);
conn->delayedAbortEvent =
- rxevent_Post(&when, rxi_SendDelayedConnAbort, conn, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedConnAbort, conn, 0);
}
return packet;
}
struct rx_packet *p;
register struct rx_packet *nxp; /* Next pointer for queue_Scan */
struct rx_peer *peer = call->conn->peer;
- struct clock now, retryTime;
+ struct clock now, usenow, retryTime;
int haveEvent;
int nXmitPackets;
int maxXmitPackets;
* in this burst. Note, if we back off, it's reasonable to
* back off all of the packets in the same manner, even if
* some of them have been retransmitted more times than more
- * recent additions */
- clock_GetTime(&now);
- retryTime = now; /* initialize before use */
+ * recent additions.
+ * Do a dance to avoid blocking after setting now. */
+ clock_Zero(&retryTime);
MUTEX_ENTER(&peer->peer_lock);
clock_Add(&retryTime, &peer->timeout);
MUTEX_EXIT(&peer->peer_lock);
-
+ clock_GetTime(&now);
+ clock_Add(&retryTime, &now);
+ usenow = now;
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
osi_Panic("rxi_Start: xmit queue clobbered");
}
if (p->flags & RX_PKTFLAG_ACKED) {
+ /* Since we may block, don't trust this */
+ usenow.sec = usenow.usec = 0;
rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex);
continue; /* Ignore this packet if it has been acknowledged */
}
#ifdef RX_ENABLE_LOCKS
CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
call->resendEvent =
- rxevent_Post2(&retryTime, rxi_StartUnlocked,
- (void *)call, 0, istack);
+ rxevent_PostNow2(&retryTime, &usenow,
+ rxi_StartUnlocked,
+ (void *)call, 0, istack);
#else /* RX_ENABLE_LOCKS */
call->resendEvent =
- rxevent_Post2(&retryTime, rxi_Start, (void *)call,
- 0, istack);
+ rxevent_PostNow2(&retryTime, &usenow, rxi_Start,
+ (void *)call, 0, istack);
#endif /* RX_ENABLE_LOCKS */
}
}
rxi_ScheduleKeepAliveEvent(register struct rx_call *call)
{
if (!call->keepAliveEvent) {
- struct clock when;
- clock_GetTime(&when);
+ struct clock when, now;
+ clock_GetTime(&now);
+ when = now;
when.sec += call->conn->secondsUntilPing;
CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
call->keepAliveEvent =
- rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
+ rxevent_PostNow(&when, &now, rxi_KeepAliveEvent, call, 0);
}
}
conn->challengeEvent = NULL;
if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
register struct rx_packet *packet;
- struct clock when;
+ struct clock when, now;
if (tries <= 0) {
/* We've failed to authenticate for too long.
RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
rxi_FreePacket(packet);
}
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
when.sec += RX_CHALLENGE_TIMEOUT;
conn->challengeEvent =
- rxevent_Post2(&when, rxi_ChallengeEvent, conn, 0,
+ rxevent_PostNow2(&when, &now, rxi_ChallengeEvent, conn, 0,
(tries - 1));
}
}
void
rxi_ReapConnections(void)
{
- struct clock now;
+ struct clock now, when;
clock_GetTime(&now);
/* Find server connection structures that haven't been used for
}
MUTEX_EXIT(&rx_freePktQ_lock);
- now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
- rxevent_Post(&now, rxi_ReapConnections, 0, 0);
+ when = now;
+ when.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
+ rxevent_Post(&when, rxi_ReapConnections, 0, 0);
}
static struct xfreelist *xfreemallocs = 0, *xsp = 0;
struct clock rxevent_nextRaiseEvents; /* Time of next call to raise events */
+struct clock rxevent_lastEvent; /* backwards time detection */
int rxevent_raiseScheduled; /* true if raise events is scheduled */
#ifdef RX_ENABLE_LOCKS
#endif /* AFS_PTHREAD_ENV */
+int
+rxevent_adjTimes(struct clock *adjTime)
+{
+ /* backwards clock correction */
+ int nAdjusted = 0;
+ struct rxepoch *qep, *nqep;
+ struct rxevent *qev, *nqev;
+
+ for (queue_Scan(&rxepoch_queue, qep, nqep, rxepoch)) {
+ for (queue_Scan(&qep->events, qev, nqev, rxevent)) {
+ if (clock_Gt(&qev->eventTime, adjTime)) {
+ clock_Sub(&qev->eventTime, adjTime);
+ nAdjusted++;
+ }
+ }
+ if (qep->epochSec > adjTime->sec) {
+ qep->epochSec -= adjTime->sec;
+ }
+ }
+ return nAdjusted;
+}
+
/* Pass in the number of events to allocate at a time */
int rxevent_initialized = 0;
void
rxevent_ScheduledEarlierEvent = scheduler;
rxevent_initialized = 1;
clock_Zero(&rxevent_nextRaiseEvents);
+ clock_Zero(&rxevent_lastEvent);
rxevent_raiseScheduled = 0;
UNLOCK_EV_INIT;
}
* "when" argument specifies when "func" should be called, in clock (clock.h)
* units. */
-#if 0
-struct rxevent *
-rxevent_Post(struct clock *when,
- void (*func) (struct rxevent * event,
- struct rx_connection * conn,
- struct rx_call * acall), void *arg, void *arg1)
-#else
static struct rxevent *
-_rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1,
- int arg2, int newargs)
-#endif
+_rxevent_Post(struct clock *when, struct clock *now, void (*func) (),
+ void *arg, void *arg1, int arg2, int newargs)
{
register struct rxevent *ev, *evqe, *evqpr;
register struct rxepoch *ep, *epqe, *epqpr;
MUTEX_ENTER(&rxevent_lock);
#ifdef RXDEBUG
if (rx_Log_event) {
- struct clock now;
- clock_GetTime(&now);
+ struct clock now1;
+ clock_GetTime(&now1);
fprintf(rx_Log_event, "%d.%d: rxevent_Post(%d.%d, %lp, %lp, %lp, %d)\n",
- (int)now.sec, (int)now.usec, (int)when->sec, (int)when->usec,
+ (int)now1.sec, (int)now1.usec, (int)when->sec, (int)when->usec,
func, arg,
arg1, arg2);
}
#endif
-
+ /* If a time was provided, check for consistency */
+ if (now->sec) {
+ if (clock_Gt(&rxevent_lastEvent, now)) {
+ struct clock adjTime = rxevent_lastEvent;
+ clock_Sub(&adjTime, now);
+ rxevent_adjTimes(&adjTime);
+ }
+ rxevent_lastEvent = *now;
+ }
/* Get a pointer to the epoch for this event, if none is found then
* create a new epoch and insert it into the sorted list */
for (ep = NULL, queue_ScanBackwards(&rxepoch_queue, epqe, epqpr, rxepoch)) {
struct rxevent *
rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1)
{
- return _rxevent_Post(when, func, arg, arg1, 0, 0);
+ struct clock now;
+ clock_Zero(&now);
+ return _rxevent_Post(when, &now, func, arg, arg1, 0, 0);
}
struct rxevent *
rxevent_Post2(struct clock *when, void (*func) (), void *arg, void *arg1,
int arg2)
{
- return _rxevent_Post(when, func, arg, arg1, arg2, 1);
+ struct clock now;
+ clock_Zero(&now);
+ return _rxevent_Post(when, &now, func, arg, arg1, arg2, 1);
+}
+
+struct rxevent *
+rxevent_PostNow(struct clock *when, struct clock *now, void (*func) (),
+ void *arg, void *arg1)
+{
+ return _rxevent_Post(when, now, func, arg, arg1, 0, 0);
+}
+
+struct rxevent *
+rxevent_PostNow2(struct clock *when, struct clock *now, void (*func) (),
+ void *arg, void *arg1, int arg2)
+{
+ return _rxevent_Post(when, now, func, arg, arg1, arg2, 1);
}
/* Cancel an event by moving it from the event queue to the free list.
register struct rxepoch *ep;
register struct rxevent *ev;
volatile struct clock now;
-
MUTEX_ENTER(&rxevent_lock);
/* Events are sorted by time, so only scan until an event is found that has
continue;
}
do {
+ reraise:
ev = queue_First(&ep->events, rxevent);
if (clock_Lt(&now, &ev->eventTime)) {
clock_GetTime(&now);
- if (clock_Lt(&now, &ev->eventTime)) {
- *next = rxevent_nextRaiseEvents = ev->eventTime;
- rxevent_raiseScheduled = 1;
- clock_Sub(next, &now);
- MUTEX_EXIT(&rxevent_lock);
- return 1;
+ if (clock_Gt(&rxevent_lastEvent, &now)) {
+ struct clock adjTime = rxevent_lastEvent;
+ int adjusted;
+ clock_Sub(&adjTime, &now);
+ adjusted = rxevent_adjTimes(&adjTime);
+ rxevent_lastEvent = now;
+ if (adjusted > 0)
+ goto reraise;
}
- }
+ if (clock_Lt(&now, &ev->eventTime)) {
+ *next = rxevent_nextRaiseEvents = ev->eventTime;
+ rxevent_raiseScheduled = 1;
+ clock_Sub(next, &now);
+ MUTEX_EXIT(&rxevent_lock);
+ return 1;
+ }
+ }
queue_Remove(ev);
rxevent_nPosted--;
MUTEX_EXIT(&rxevent_lock);
void *arg, void *arg1);
extern struct rxevent *rxevent_Post2(struct clock *when, void (*func) (),
void *arg, void *arg1, int arg2);
+extern struct rxevent *rxevent_PostNow(struct clock *when, struct clock *now,
+ void (*func) (), void *arg, void *arg1);
+extern struct rxevent *rxevent_PostNow2(struct clock *when, struct clock *now,
+ void (*func) (), void *arg,
+ void *arg1, int arg2);
#endif
extern void shutdown_rxevent(void);
extern struct rxepoch *rxepoch_Allocate(struct clock *when);
RX_CALL_REFCOUNT_DELAY);
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
} else {
- struct clock when;
- clock_GetTime(&when);
+ struct clock when, now;
+ clock_GetTime(&now);
+ when = now;
/* Delay to consolidate ack packets */
clock_Add(&when, &rx_hardAckDelay);
if (!call->delayedAckEvent
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when,
+ rxevent_PostNow(&when, &now,
rxi_SendDelayedAck, call,
0);
}
rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
didHardAck = 1;
} else {
- struct clock when;
- clock_GetTime(&when);
+ struct clock when, now;
+ clock_GetTime(&now);
+ when = now;
/* Delay to consolidate ack packets */
clock_Add(&when, &rx_hardAckDelay);
if (!call->delayedAckEvent
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
}
}