rx-clock-backward-detection-20080317
authorDerrick Brashear <shadow@dementia.org>
Mon, 17 Mar 2008 15:38:15 +0000 (15:38 +0000)
committerDerrick Brashear <shadow@dementia.org>
Mon, 17 Mar 2008 15:38:15 +0000 (15:38 +0000)
LICENSE IPL10

if the clock goes backwards, detect it and reset any rx events to run in a timely manner

src/rx/rx.c
src/rx/rx_event.c
src/rx/rx_prototypes.h
src/rx/rx_rdwr.c

index c2c1a14..cc783a9 100644 (file)
@@ -2958,7 +2958,7 @@ rxi_CheckReachEvent(struct rxevent *event, struct rx_connection *conn,
                    struct rx_call *acall)
 {
     struct rx_call *call = acall;
-    struct clock when;
+    struct clock when, now;
     int i, waiting;
 
     MUTEX_ENTER(&conn->conn_data_lock);
@@ -2997,13 +2997,15 @@ rxi_CheckReachEvent(struct rxevent *event, struct rx_connection *conn,
            if (call != acall)
                MUTEX_EXIT(&call->lock);
 
-           clock_GetTime(&when);
+           clock_GetTime(&now);
+           when = now;
            when.sec += RX_CHECKREACH_TIMEOUT;
            MUTEX_ENTER(&conn->conn_data_lock);
            if (!conn->checkReachEvent) {
                conn->refCount++;
                conn->checkReachEvent =
-                   rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
+                   rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn, 
+                                   NULL);
            }
            MUTEX_EXIT(&conn->conn_data_lock);
        }
@@ -3080,7 +3082,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
     afs_uint32 seq, serial, flags;
     int isFirst;
     struct rx_packet *tnp;
-    struct clock when;
+    struct clock when, now;
     rx_MutexIncrement(rx_stats.dataPacketsRead, rx_stats_mutex);
 
 #ifdef KERNEL
@@ -3097,7 +3099,8 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
        dpf(("packet %x dropped on receipt - quota problems", np));
        if (rxi_doreclaim)
            rxi_ClearReceiveQueue(call);
-       clock_GetTime(&when);
+       clock_GetTime(&now);
+       when = now;
        clock_Add(&when, &rx_softAckDelay);
        if (!call->delayedAckEvent
            || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
@@ -3105,7 +3108,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
                           RX_CALL_REFCOUNT_DELAY);
            CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
            call->delayedAckEvent =
-               rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+               rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
        }
        /* we've damaged this call already, might as well do it in. */
        return np;
@@ -3377,7 +3380,8 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
        rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
        np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
     } else if (call->nSoftAcks) {
-       clock_GetTime(&when);
+       clock_GetTime(&now);
+       when = now;
        if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
            clock_Add(&when, &rx_lastAckDelay);
        } else {
@@ -3389,7 +3393,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
                           RX_CALL_REFCOUNT_DELAY);
            CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
            call->delayedAckEvent =
-               rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+               rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
        }
     } else if (call->flags & RX_CALL_RECEIVE_DONE) {
        rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
@@ -4265,7 +4269,7 @@ rxi_SendCallAbort(register struct rx_call *call, struct rx_packet *packet,
                  int istack, int force)
 {
     afs_int32 error;
-    struct clock when;
+    struct clock when, now;
 
     if (!call->error)
        return packet;
@@ -4291,11 +4295,12 @@ rxi_SendCallAbort(register struct rx_call *call, struct rx_packet *packet,
            rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
                            (char *)&error, sizeof(error), istack);
     } else if (!call->delayedAbortEvent) {
-       clock_GetTime(&when);
+       clock_GetTime(&now);
+       when = now;
        clock_Addmsec(&when, rxi_callAbortDelay);
        CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
        call->delayedAbortEvent =
-           rxevent_Post(&when, rxi_SendDelayedCallAbort, call, 0);
+           rxevent_PostNow(&when, &now, rxi_SendDelayedCallAbort, call, 0);
     }
     return packet;
 }
@@ -4314,7 +4319,7 @@ rxi_SendConnectionAbort(register struct rx_connection *conn,
                        struct rx_packet *packet, int istack, int force)
 {
     afs_int32 error;
-    struct clock when;
+    struct clock when, now;
 
     if (!conn->error)
        return packet;
@@ -4337,10 +4342,11 @@ rxi_SendConnectionAbort(register struct rx_connection *conn,
                            sizeof(error), istack);
        MUTEX_ENTER(&conn->conn_data_lock);
     } else if (!conn->delayedAbortEvent) {
-       clock_GetTime(&when);
+       clock_GetTime(&now);
+       when = now;
        clock_Addmsec(&when, rxi_connAbortDelay);
        conn->delayedAbortEvent =
-           rxevent_Post(&when, rxi_SendDelayedConnAbort, conn, 0);
+           rxevent_PostNow(&when, &now, rxi_SendDelayedConnAbort, conn, 0);
     }
     return packet;
 }
@@ -5026,7 +5032,7 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
     struct rx_packet *p;
     register struct rx_packet *nxp;    /* Next pointer for queue_Scan */
     struct rx_peer *peer = call->conn->peer;
-    struct clock now, retryTime;
+    struct clock now, usenow, retryTime;
     int haveEvent;
     int nXmitPackets;
     int maxXmitPackets;
@@ -5096,13 +5102,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
         * in this burst.  Note, if we back off, it's reasonable to
         * back off all of the packets in the same manner, even if
         * some of them have been retransmitted more times than more
-        * recent additions */
-       clock_GetTime(&now);
-       retryTime = now;        /* initialize before use */
+        * recent additions.
+        * Do a dance to avoid blocking after setting now. */
+       clock_Zero(&retryTime);
        MUTEX_ENTER(&peer->peer_lock);
        clock_Add(&retryTime, &peer->timeout);
        MUTEX_EXIT(&peer->peer_lock);
-
+       clock_GetTime(&now);
+       clock_Add(&retryTime, &now);
+       usenow = now;
        /* Send (or resend) any packets that need it, subject to
         * window restrictions and congestion burst control
         * restrictions.  Ask for an ack on the last packet sent in
@@ -5152,6 +5160,8 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
                        osi_Panic("rxi_Start: xmit queue clobbered");
                    }
                    if (p->flags & RX_PKTFLAG_ACKED) {
+                       /* Since we may block, don't trust this */
+                       usenow.sec = usenow.usec = 0;
                         rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex);
                        continue;       /* Ignore this packet if it has been acknowledged */
                    }
@@ -5293,12 +5303,13 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
 #ifdef RX_ENABLE_LOCKS
                        CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
                        call->resendEvent =
-                           rxevent_Post2(&retryTime, rxi_StartUnlocked,
-                                        (void *)call, 0, istack);
+                           rxevent_PostNow2(&retryTime, &usenow, 
+                                            rxi_StartUnlocked,
+                                            (void *)call, 0, istack);
 #else /* RX_ENABLE_LOCKS */
                        call->resendEvent =
-                           rxevent_Post2(&retryTime, rxi_Start, (void *)call,
-                                        0, istack);
+                           rxevent_PostNow2(&retryTime, &usenow, rxi_Start, 
+                                            (void *)call, 0, istack);
 #endif /* RX_ENABLE_LOCKS */
                    }
                }
@@ -5495,12 +5506,13 @@ void
 rxi_ScheduleKeepAliveEvent(register struct rx_call *call)
 {
     if (!call->keepAliveEvent) {
-       struct clock when;
-       clock_GetTime(&when);
+       struct clock when, now;
+       clock_GetTime(&now);
+       when = now;
        when.sec += call->conn->secondsUntilPing;
        CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
        call->keepAliveEvent =
-           rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
+           rxevent_PostNow(&when, &now, rxi_KeepAliveEvent, call, 0);
     }
 }
 
@@ -5576,7 +5588,7 @@ rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn,
     conn->challengeEvent = NULL;
     if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
        register struct rx_packet *packet;
-       struct clock when;
+       struct clock when, now;
 
        if (tries <= 0) {
            /* We've failed to authenticate for too long.
@@ -5609,10 +5621,11 @@ rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn,
                            RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
            rxi_FreePacket(packet);
        }
-       clock_GetTime(&when);
+       clock_GetTime(&now);
+       when = now;
        when.sec += RX_CHALLENGE_TIMEOUT;
        conn->challengeEvent =
-           rxevent_Post2(&when, rxi_ChallengeEvent, conn, 0,
+           rxevent_PostNow2(&when, &now, rxi_ChallengeEvent, conn, 0,
                         (tries - 1));
     }
 }
@@ -5734,7 +5747,7 @@ rxi_ComputeRoundTripTime(register struct rx_packet *p,
 void
 rxi_ReapConnections(void)
 {
-    struct clock now;
+    struct clock now, when;
     clock_GetTime(&now);
 
     /* Find server connection structures that haven't been used for
@@ -5883,8 +5896,9 @@ rxi_ReapConnections(void)
     }
     MUTEX_EXIT(&rx_freePktQ_lock);
 
-    now.sec += RX_REAP_TIME;   /* Check every RX_REAP_TIME seconds */
-    rxevent_Post(&now, rxi_ReapConnections, 0, 0);
+    when = now;
+    when.sec += RX_REAP_TIME;  /* Check every RX_REAP_TIME seconds */
+    rxevent_Post(&when, rxi_ReapConnections, 0, 0);
 }
 
 
index d209a2d..56b90d7 100644 (file)
@@ -90,6 +90,7 @@ struct xfreelist {
 static struct xfreelist *xfreemallocs = 0, *xsp = 0;
 
 struct clock rxevent_nextRaiseEvents;  /* Time of next call to raise events */
+struct clock rxevent_lastEvent;        /* backwards time detection */
 int rxevent_raiseScheduled;    /* true if raise events is scheduled */
 
 #ifdef RX_ENABLE_LOCKS
@@ -117,6 +118,28 @@ pthread_mutex_t rx_event_mutex;
 #endif /* AFS_PTHREAD_ENV */
 
 
+int
+rxevent_adjTimes(struct clock *adjTime)
+{
+    /* backwards clock correction */
+    int nAdjusted = 0;
+    struct rxepoch *qep, *nqep;
+    struct rxevent *qev, *nqev;
+    
+    for (queue_Scan(&rxepoch_queue, qep, nqep, rxepoch)) {
+       for (queue_Scan(&qep->events, qev, nqev, rxevent)) {
+           if (clock_Gt(&qev->eventTime, adjTime)) {
+               clock_Sub(&qev->eventTime, adjTime); 
+               nAdjusted++;
+           }
+       }
+       if (qep->epochSec > adjTime->sec) {
+           qep->epochSec -= adjTime->sec;
+       }
+    }
+    return nAdjusted;
+}
+
 /* Pass in the number of events to allocate at a time */
 int rxevent_initialized = 0;
 void
@@ -139,6 +162,7 @@ rxevent_Init(int nEvents, void (*scheduler) (void))
     rxevent_ScheduledEarlierEvent = scheduler;
     rxevent_initialized = 1;
     clock_Zero(&rxevent_nextRaiseEvents);
+    clock_Zero(&rxevent_lastEvent);
     rxevent_raiseScheduled = 0;
     UNLOCK_EV_INIT;
 }
@@ -181,17 +205,9 @@ rxepoch_Allocate(struct clock *when)
  * "when" argument specifies when "func" should be called, in clock (clock.h)
  * units. */
 
-#if 0
-struct rxevent *
-rxevent_Post(struct clock *when,
-            void (*func) (struct rxevent * event,
-                          struct rx_connection * conn,
-                          struct rx_call * acall), void *arg, void *arg1)
-#else
 static struct rxevent *
-_rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1,
-             int arg2, int newargs)
-#endif
+_rxevent_Post(struct clock *when, struct clock *now, void (*func) (), 
+             void *arg, void *arg1, int arg2, int newargs)
 {
     register struct rxevent *ev, *evqe, *evqpr;
     register struct rxepoch *ep, *epqe, *epqpr;
@@ -200,15 +216,23 @@ _rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1,
     MUTEX_ENTER(&rxevent_lock);
 #ifdef RXDEBUG
     if (rx_Log_event) {
-       struct clock now;
-       clock_GetTime(&now);
+       struct clock now1;
+       clock_GetTime(&now1);
        fprintf(rx_Log_event, "%d.%d: rxevent_Post(%d.%d, %lp, %lp, %lp, %d)\n",
-               (int)now.sec, (int)now.usec, (int)when->sec, (int)when->usec,
+               (int)now1.sec, (int)now1.usec, (int)when->sec, (int)when->usec,
                func, arg,
                arg1, arg2);
     }
 #endif
-
+    /* If a time was provided, check for consistency */
+    if (now->sec) {
+       if (clock_Gt(&rxevent_lastEvent, now)) {
+           struct clock adjTime = rxevent_lastEvent;
+           clock_Sub(&adjTime, now);
+           rxevent_adjTimes(&adjTime);
+       }
+       rxevent_lastEvent = *now;
+    }
     /* Get a pointer to the epoch for this event, if none is found then
      * create a new epoch and insert it into the sorted list */
     for (ep = NULL, queue_ScanBackwards(&rxepoch_queue, epqe, epqpr, rxepoch)) {
@@ -297,14 +321,32 @@ _rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1,
 struct rxevent *
 rxevent_Post(struct clock *when, void (*func) (), void *arg, void *arg1)
 {
-    return _rxevent_Post(when, func, arg, arg1, 0, 0);
+    struct clock now;
+    clock_Zero(&now);
+    return _rxevent_Post(when, &now, func, arg, arg1, 0, 0);
 }
 
 struct rxevent *
 rxevent_Post2(struct clock *when, void (*func) (), void *arg, void *arg1,
              int arg2)
 {
-    return _rxevent_Post(when, func, arg, arg1, arg2, 1);
+    struct clock now;
+    clock_Zero(&now);
+    return _rxevent_Post(when, &now, func, arg, arg1, arg2, 1);
+}
+
+struct rxevent *
+rxevent_PostNow(struct clock *when, struct clock *now, void (*func) (), 
+               void *arg, void *arg1)
+{
+    return _rxevent_Post(when, now, func, arg, arg1, 0, 0);
+}
+
+struct rxevent *
+rxevent_PostNow2(struct clock *when, struct clock *now, void (*func) (), 
+                void *arg, void *arg1, int arg2)
+{
+    return _rxevent_Post(when, now, func, arg, arg1, arg2, 1);
 }
 
 /* Cancel an event by moving it from the event queue to the free list.
@@ -378,7 +420,6 @@ rxevent_RaiseEvents(struct clock *next)
     register struct rxepoch *ep;
     register struct rxevent *ev;
     volatile struct clock now;
-
     MUTEX_ENTER(&rxevent_lock);
 
     /* Events are sorted by time, so only scan until an event is found that has
@@ -394,17 +435,27 @@ rxevent_RaiseEvents(struct clock *next)
            continue;
        }
        do {
+       reraise:
            ev = queue_First(&ep->events, rxevent);
            if (clock_Lt(&now, &ev->eventTime)) {
                clock_GetTime(&now);
-               if (clock_Lt(&now, &ev->eventTime)) {
-                   *next = rxevent_nextRaiseEvents = ev->eventTime;
-                   rxevent_raiseScheduled = 1;
-                   clock_Sub(next, &now);
-                   MUTEX_EXIT(&rxevent_lock);
-                   return 1;
+               if (clock_Gt(&rxevent_lastEvent, &now)) {
+                   struct clock adjTime = rxevent_lastEvent;
+                   int adjusted;
+                   clock_Sub(&adjTime, &now);
+                   adjusted = rxevent_adjTimes(&adjTime);
+                   rxevent_lastEvent = now;
+                   if (adjusted > 0)
+                       goto reraise;
                }
-           }
+               if (clock_Lt(&now, &ev->eventTime)) {
+                    *next = rxevent_nextRaiseEvents = ev->eventTime;
+                    rxevent_raiseScheduled = 1;
+                    clock_Sub(next, &now);
+                    MUTEX_EXIT(&rxevent_lock);
+                    return 1;
+                }
+            }
            queue_Remove(ev);
            rxevent_nPosted--;
            MUTEX_EXIT(&rxevent_lock);
index 22ba2f7..842419a 100644 (file)
@@ -304,6 +304,11 @@ extern struct rxevent *rxevent_Post(struct clock *when, void (*func) (),
                                    void *arg, void *arg1);
 extern struct rxevent *rxevent_Post2(struct clock *when, void (*func) (),
                                    void *arg, void *arg1, int arg2);
+extern struct rxevent *rxevent_PostNow(struct clock *when, struct clock *now,
+                                      void (*func) (), void *arg, void *arg1);
+extern struct rxevent *rxevent_PostNow2(struct clock *when, struct clock *now,
+                                       void (*func) (), void *arg, 
+                                       void *arg1, int arg2);
 #endif
 extern void shutdown_rxevent(void);
 extern struct rxepoch *rxepoch_Allocate(struct clock *when);
index 2e3bf0f..0c019c2 100644 (file)
@@ -192,8 +192,9 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                                               RX_CALL_REFCOUNT_DELAY);
                                rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
                            } else {
-                               struct clock when;
-                               clock_GetTime(&when);
+                               struct clock when, now;
+                               clock_GetTime(&now);
+                               when = now;
                                /* Delay to consolidate ack packets */
                                clock_Add(&when, &rx_hardAckDelay);
                                if (!call->delayedAckEvent
@@ -204,7 +205,7 @@ rxi_ReadProc(register struct rx_call *call, register char *buf,
                                                   RX_CALL_REFCOUNT_DELAY);
                                    CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
                                    call->delayedAckEvent =
-                                       rxevent_Post(&when,
+                                     rxevent_PostNow(&when, &now,
                                                     rxi_SendDelayedAck, call,
                                                     0);
                                }
@@ -521,8 +522,9 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
            rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
            didHardAck = 1;
        } else {
-           struct clock when;
-           clock_GetTime(&when);
+           struct clock when, now;
+           clock_GetTime(&now);
+           when = now;
            /* Delay to consolidate ack packets */
            clock_Add(&when, &rx_hardAckDelay);
            if (!call->delayedAckEvent
@@ -531,7 +533,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
                               RX_CALL_REFCOUNT_DELAY);
                CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
                call->delayedAckEvent =
-                   rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+                   rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
            }
        }
     }