From 3a84b02172800e7c172e04718fb109d1f5f1c014 Mon Sep 17 00:00:00 2001 From: Derrick Brashear Date: Mon, 17 Mar 2008 15:38:15 +0000 Subject: [PATCH] rx-clock-backward-detection-20080317 LICENSE IPL10 if the clock goes backwards, detect it and reset any rx events to run in a timely manner --- src/rx/rx.c | 78 +++++++++++++++++++++++---------------- src/rx/rx_event.c | 99 ++++++++++++++++++++++++++++++++++++++------------ src/rx/rx_prototypes.h | 5 +++ src/rx/rx_rdwr.c | 14 ++++--- 4 files changed, 134 insertions(+), 62 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index c2c1a14..cc783a9 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -2958,7 +2958,7 @@ rxi_CheckReachEvent(struct rxevent *event, struct rx_connection *conn, struct rx_call *acall) { struct rx_call *call = acall; - struct clock when; + struct clock when, now; int i, waiting; MUTEX_ENTER(&conn->conn_data_lock); @@ -2997,13 +2997,15 @@ rxi_CheckReachEvent(struct rxevent *event, struct rx_connection *conn, if (call != acall) MUTEX_EXIT(&call->lock); - clock_GetTime(&when); + clock_GetTime(&now); + when = now; when.sec += RX_CHECKREACH_TIMEOUT; MUTEX_ENTER(&conn->conn_data_lock); if (!conn->checkReachEvent) { conn->refCount++; conn->checkReachEvent = - rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL); + rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn, + NULL); } MUTEX_EXIT(&conn->conn_data_lock); } @@ -3080,7 +3082,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, afs_uint32 seq, serial, flags; int isFirst; struct rx_packet *tnp; - struct clock when; + struct clock when, now; rx_MutexIncrement(rx_stats.dataPacketsRead, rx_stats_mutex); #ifdef KERNEL @@ -3097,7 +3099,8 @@ rxi_ReceiveDataPacket(register struct rx_call *call, dpf(("packet %x dropped on receipt - quota problems", np)); if (rxi_doreclaim) rxi_ClearReceiveQueue(call); - clock_GetTime(&when); + clock_GetTime(&now); + when = now; clock_Add(&when, &rx_softAckDelay); if (!call->delayedAckEvent || clock_Gt(&call->delayedAckEvent->eventTime, &when)) { @@ -3105,7 +3108,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, RX_CALL_REFCOUNT_DELAY); CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY); call->delayedAckEvent = - rxevent_Post(&when, rxi_SendDelayedAck, call, 0); + rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0); } /* we've damaged this call already, might as well do it in. */ return np; @@ -3377,7 +3380,8 @@ rxi_ReceiveDataPacket(register struct rx_call *call, rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack); } else if (call->nSoftAcks) { - clock_GetTime(&when); + clock_GetTime(&now); + when = now; if (haveLast && !(flags & RX_CLIENT_INITIATED)) { clock_Add(&when, &rx_lastAckDelay); } else { @@ -3389,7 +3393,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, RX_CALL_REFCOUNT_DELAY); CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY); call->delayedAckEvent = - rxevent_Post(&when, rxi_SendDelayedAck, call, 0); + rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0); } } else if (call->flags & RX_CALL_RECEIVE_DONE) { rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); @@ -4265,7 +4269,7 @@ rxi_SendCallAbort(register struct rx_call *call, struct rx_packet *packet, int istack, int force) { afs_int32 error; - struct clock when; + struct clock when, now; if (!call->error) return packet; @@ -4291,11 +4295,12 @@ rxi_SendCallAbort(register struct rx_call *call, struct rx_packet *packet, rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT, (char *)&error, sizeof(error), istack); } else if (!call->delayedAbortEvent) { - clock_GetTime(&when); + clock_GetTime(&now); + when = now; clock_Addmsec(&when, rxi_callAbortDelay); CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT); call->delayedAbortEvent = - rxevent_Post(&when, rxi_SendDelayedCallAbort, call, 0); + rxevent_PostNow(&when, &now, rxi_SendDelayedCallAbort, call, 0); } return packet; } @@ -4314,7 +4319,7 @@ rxi_SendConnectionAbort(register struct rx_connection *conn, struct rx_packet *packet, int istack, int force) { afs_int32 error; - struct clock when; + struct clock when, now; if (!conn->error) return packet; @@ -4337,10 +4342,11 @@ rxi_SendConnectionAbort(register struct rx_connection *conn, sizeof(error), istack); MUTEX_ENTER(&conn->conn_data_lock); } else if (!conn->delayedAbortEvent) { - clock_GetTime(&when); + clock_GetTime(&now); + when = now; clock_Addmsec(&when, rxi_connAbortDelay); conn->delayedAbortEvent = - rxevent_Post(&when, rxi_SendDelayedConnAbort, conn, 0); + rxevent_PostNow(&when, &now, rxi_SendDelayedConnAbort, conn, 0); } return packet; } @@ -5026,7 +5032,7 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, struct rx_packet *p; register struct rx_packet *nxp; /* Next pointer for queue_Scan */ struct rx_peer *peer = call->conn->peer; - struct clock now, retryTime; + struct clock now, usenow, retryTime; int haveEvent; int nXmitPackets; int maxXmitPackets; @@ -5096,13 +5102,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, * in this burst. Note, if we back off, it's reasonable to * back off all of the packets in the same manner, even if * some of them have been retransmitted more times than more - * recent additions */ - clock_GetTime(&now); - retryTime = now; /* initialize before use */ + * recent additions. + * Do a dance to avoid blocking after setting now. */ + clock_Zero(&retryTime); MUTEX_ENTER(&peer->peer_lock); clock_Add(&retryTime, &peer->timeout); MUTEX_EXIT(&peer->peer_lock); - + clock_GetTime(&now); + clock_Add(&retryTime, &now); + usenow = now; /* Send (or resend) any packets that need it, subject to * window restrictions and congestion burst control * restrictions. Ask for an ack on the last packet sent in @@ -5152,6 +5160,8 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, osi_Panic("rxi_Start: xmit queue clobbered"); } if (p->flags & RX_PKTFLAG_ACKED) { + /* Since we may block, don't trust this */ + usenow.sec = usenow.usec = 0; rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex); continue; /* Ignore this packet if it has been acknowledged */ } @@ -5293,12 +5303,13 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, #ifdef RX_ENABLE_LOCKS CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND); call->resendEvent = - rxevent_Post2(&retryTime, rxi_StartUnlocked, - (void *)call, 0, istack); + rxevent_PostNow2(&retryTime, &usenow, + rxi_StartUnlocked, + (void *)call, 0, istack); #else /* RX_ENABLE_LOCKS */ call->resendEvent = - rxevent_Post2(&retryTime, rxi_Start, (void *)call, - 0, istack); + rxevent_PostNow2(&retryTime, &usenow, rxi_Start, + (void *)call, 0, istack); #endif /* RX_ENABLE_LOCKS */ } } @@ -5495,12 +5506,13 @@ void rxi_ScheduleKeepAliveEvent(register struct rx_call *call) { if (!call->keepAliveEvent) { - struct clock when; - clock_GetTime(&when); + struct clock when, now; + clock_GetTime(&now); + when = now; when.sec += call->conn->secondsUntilPing; CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE); call->keepAliveEvent = - rxevent_Post(&when, rxi_KeepAliveEvent, call, 0); + rxevent_PostNow(&when, &now, rxi_KeepAliveEvent, call, 0); } } @@ -5576,7 +5588,7 @@ rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn, conn->challengeEvent = NULL; if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) { register struct rx_packet *packet; - struct clock when; + struct clock when, now; if (tries <= 0) { /* We've failed to authenticate for too long. @@ -5609,10 +5621,11 @@ rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn, RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0); rxi_FreePacket(packet); } - clock_GetTime(&when); + clock_GetTime(&now); + when = now; when.sec += RX_CHALLENGE_TIMEOUT; conn->challengeEvent = - rxevent_Post2(&when, rxi_ChallengeEvent, conn, 0, + rxevent_PostNow2(&when, &now, rxi_ChallengeEvent, conn, 0, (tries - 1)); } } @@ -5734,7 +5747,7 @@ rxi_ComputeRoundTripTime(register struct rx_packet *p, void rxi_ReapConnections(void) { - struct clock now; + struct clock now, when; clock_GetTime(&now); /* Find server connection structures that haven't been used for @@ -5883,8 +5896,9 @@ rxi_ReapConnections(void) } MUTEX_EXIT(&rx_freePktQ_lock); - now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */ - rxevent_Post(&now, rxi_ReapConnections, 0, 0); + when = now; + when.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */ + rxevent_Post(&when, rxi_ReapConnections, 0, 0); } diff --git a/src/rx/rx_event.c b/src/rx/rx_event.c index d209a2d..56b90d7 100644 --- a/src/rx/rx_event.c +++ b/src/rx/rx_event.c @@ -90,6 +90,7 @@ struct xfreelist { static struct xfreelist *xfreemallocs = 0, *xsp = 0; struct clock rxevent_nextRaiseEvents; /* Time of next call to raise events */ +struct clock rxevent_lastEvent; /* backwards time detection */ int rxevent_raiseScheduled; /* true if raise events is scheduled */ #ifdef RX_ENABLE_LOCKS @@ -117,6 +118,28 @@ pthread_mutex_t rx_event_mutex; #endif /* AFS_PTHREAD_ENV */ +int +rxevent_adjTimes(struct clock *adjTime) +{ + /* backwards clock correction */ + int nAdjusted = 0; + struct rxepoch *qep, *nqep; + struct rxevent *qev, *nqev; + + for (queue_Scan(&rxepoch_queue, qep, nqep, rxepoch)) { + for (queue_Scan(&qep->events, qev, nqev, rxevent)) { + if (clock_Gt(&qev->eventTime, adjTime)) { + clock_Sub(&qev->eventTime, adjTime); + nAdjusted++; + } + } + if (qep->epochSec > adjTime->sec) { + qep->epochSec -= adjTime->sec; + } + } + return nAdjusted; +} + /* Pass in the number of events to allocate at a time */ int rxevent_initialized = 0; void @@ -139,6 +162,7 @@ rxevent_Init(int nEvents, void (*scheduler) (void)) rxevent_ScheduledEarlierEvent = scheduler; rxevent_initialized = 1; clock_Zero(&rxevent_nextRaiseEvents); + clock_Zero(&rxevent_lastEvent); rxevent_raiseScheduled = 0; UNLOCK_EV_INIT; } @@ -181,17 +205,9 @@ rxepoch_Allocate(struct clock *when) * "when" argument specifies when "func" should be called, in clock (clock.h) * units. */ -#if 0 -struct rxevent * -rxevent_Post(struct clock *when, - void (*func) (struct rxevent * event, - struct rx_connection * conn, - struct rx_call * acall), void *arg, void *arg1) -#else static struct rxevent * -_rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1, - int arg2, int newargs) -#endif +_rxevent_Post(struct clock *when, struct clock *now, void (*func) (), + void *arg, void *arg1, int arg2, int newargs) { register struct rxevent *ev, *evqe, *evqpr; register struct rxepoch *ep, *epqe, *epqpr; @@ -200,15 +216,23 @@ _rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1, MUTEX_ENTER(&rxevent_lock); #ifdef RXDEBUG if (rx_Log_event) { - struct clock now; - clock_GetTime(&now); + struct clock now1; + clock_GetTime(&now1); fprintf(rx_Log_event, "%d.%d: rxevent_Post(%d.%d, %lp, %lp, %lp, %d)\n", - (int)now.sec, (int)now.usec, (int)when->sec, (int)when->usec, + (int)now1.sec, (int)now1.usec, (int)when->sec, (int)when->usec, func, arg, arg1, arg2); } #endif - + /* If a time was provided, check for consistency */ + if (now->sec) { + if (clock_Gt(&rxevent_lastEvent, now)) { + struct clock adjTime = rxevent_lastEvent; + clock_Sub(&adjTime, now); + rxevent_adjTimes(&adjTime); + } + rxevent_lastEvent = *now; + } /* Get a pointer to the epoch for this event, if none is found then * create a new epoch and insert it into the sorted list */ for (ep = NULL, queue_ScanBackwards(&rxepoch_queue, epqe, epqpr, rxepoch)) { @@ -297,14 +321,32 @@ _rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1, struct rxevent * rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1) { - return _rxevent_Post(when, func, arg, arg1, 0, 0); + struct clock now; + clock_Zero(&now); + return _rxevent_Post(when, &now, func, arg, arg1, 0, 0); } struct rxevent * rxevent_Post2(struct clock *when, void (*func) (), void *arg, void *arg1, int arg2) { - return _rxevent_Post(when, func, arg, arg1, arg2, 1); + struct clock now; + clock_Zero(&now); + return _rxevent_Post(when, &now, func, arg, arg1, arg2, 1); +} + +struct rxevent * +rxevent_PostNow(struct clock *when, struct clock *now, void (*func) (), + void *arg, void *arg1) +{ + return _rxevent_Post(when, now, func, arg, arg1, 0, 0); +} + +struct rxevent * +rxevent_PostNow2(struct clock *when, struct clock *now, void (*func) (), + void *arg, void *arg1, int arg2) +{ + return _rxevent_Post(when, now, func, arg, arg1, arg2, 1); } /* Cancel an event by moving it from the event queue to the free list. @@ -378,7 +420,6 @@ rxevent_RaiseEvents(struct clock *next) register struct rxepoch *ep; register struct rxevent *ev; volatile struct clock now; - MUTEX_ENTER(&rxevent_lock); /* Events are sorted by time, so only scan until an event is found that has @@ -394,17 +435,27 @@ rxevent_RaiseEvents(struct clock *next) continue; } do { + reraise: ev = queue_First(&ep->events, rxevent); if (clock_Lt(&now, &ev->eventTime)) { clock_GetTime(&now); - if (clock_Lt(&now, &ev->eventTime)) { - *next = rxevent_nextRaiseEvents = ev->eventTime; - rxevent_raiseScheduled = 1; - clock_Sub(next, &now); - MUTEX_EXIT(&rxevent_lock); - return 1; + if (clock_Gt(&rxevent_lastEvent, &now)) { + struct clock adjTime = rxevent_lastEvent; + int adjusted; + clock_Sub(&adjTime, &now); + adjusted = rxevent_adjTimes(&adjTime); + rxevent_lastEvent = now; + if (adjusted > 0) + goto reraise; } - } + if (clock_Lt(&now, &ev->eventTime)) { + *next = rxevent_nextRaiseEvents = ev->eventTime; + rxevent_raiseScheduled = 1; + clock_Sub(next, &now); + MUTEX_EXIT(&rxevent_lock); + return 1; + } + } queue_Remove(ev); rxevent_nPosted--; MUTEX_EXIT(&rxevent_lock); diff --git a/src/rx/rx_prototypes.h b/src/rx/rx_prototypes.h index 22ba2f7..842419a 100644 --- a/src/rx/rx_prototypes.h +++ b/src/rx/rx_prototypes.h @@ -304,6 +304,11 @@ extern struct rxevent *rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1); extern struct rxevent *rxevent_Post2(struct clock *when, void (*func) (), void *arg, void *arg1, int arg2); +extern struct rxevent *rxevent_PostNow(struct clock *when, struct clock *now, + void (*func) (), void *arg, void *arg1); +extern struct rxevent *rxevent_PostNow2(struct clock *when, struct clock *now, + void (*func) (), void *arg, + void *arg1, int arg2); #endif extern void shutdown_rxevent(void); extern struct rxepoch *rxepoch_Allocate(struct clock *when); diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index 2e3bf0f..0c019c2 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -192,8 +192,9 @@ rxi_ReadProc(register struct rx_call *call, register char *buf, RX_CALL_REFCOUNT_DELAY); rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0); } else { - struct clock when; - clock_GetTime(&when); + struct clock when, now; + clock_GetTime(&now); + when = now; /* Delay to consolidate ack packets */ clock_Add(&when, &rx_hardAckDelay); if (!call->delayedAckEvent @@ -204,7 +205,7 @@ rxi_ReadProc(register struct rx_call *call, register char *buf, RX_CALL_REFCOUNT_DELAY); CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY); call->delayedAckEvent = - rxevent_Post(&when, + rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0); } @@ -521,8 +522,9 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0); didHardAck = 1; } else { - struct clock when; - clock_GetTime(&when); + struct clock when, now; + clock_GetTime(&now); + when = now; /* Delay to consolidate ack packets */ clock_Add(&when, &rx_hardAckDelay); if (!call->delayedAckEvent @@ -531,7 +533,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) RX_CALL_REFCOUNT_DELAY); CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY); call->delayedAckEvent = - rxevent_Post(&when, rxi_SendDelayedAck, call, 0); + rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0); } } } -- 1.9.4