rx: Use a red black tree for the event stack
authorSimon Wilkinson <sxw@your-file-system.com>
Sun, 23 Oct 2011 20:21:39 +0000 (21:21 +0100)
committerJeffrey Altman <jaltman@secure-endpoints.com>
Tue, 29 Nov 2011 20:29:41 +0000 (12:29 -0800)
Instead of the current event stack, which uses a sorted linked
list, use a red/black tree to maintain the timer stack. This
dramatically improves event insertion times, at the expense of
some additional implementation complexity.

This change also adds reference counting to the rxevent
structure. We've always had a race between an event being
fired, and that event being simultaneously cancelled by
the user thread. Reference counting avoids that race resulting
in the structure appearing twice in the free list.

Change-Id: Icbef6e04e01f3eef5b888bc3cb77b7a3d1be26ae
Reviewed-on: http://gerrit.openafs.org/5841
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Tested-by: Jeffrey Altman <jaltman@secure-endpoints.com>
Reviewed-by: Jeffrey Altman <jaltman@secure-endpoints.com>

23 files changed:
Makefile.in
configure.ac
src/afsmonitor/Makefile.in
src/aklog/Makefile.in
src/libadmin/samples/Makefile.in
src/libadmin/test/Makefile.in
src/libuafs/Makefile.common.in
src/rx/rx.c
src/rx/rx.h
src/rx/rx_clock.h
src/rx/rx_event.c
src/rx/rx_event.h
src/rx/rx_packet.h
src/rxdebug/Makefile.in
src/shlibafsrpc/Makefile.in
src/tbutc/Makefile.in
src/xstat/Makefile.in
tests/Makefile.in
tests/TESTS
tests/auth/Makefile.in
tests/rx/Makefile.in [new file with mode: 0644]
tests/rx/event-t.c [new file with mode: 0644]
tests/volser/Makefile.in

index 626666b..535baf2 100644 (file)
@@ -941,6 +941,7 @@ distclean: clean
        tests/auth/Makefile \
        tests/cmd/Makefile \
        tests/common/Makefile \
+       tests/rx/Makefile \
        tests/opr/Makefile \
        tests/util/Makefile \
        tests/volser/Makefile \
index 5c53906..588b50a 100644 (file)
@@ -257,6 +257,7 @@ tests/cmd/Makefile \
 tests/common/Makefile \
 tests/opr/Makefile \
 tests/rpctestlib/Makefile \
+tests/rx/Makefile \
 tests/tap/Makefile \
 tests/util/Makefile \
 tests/volser/Makefile,
index 129b5de..b8cd117 100644 (file)
@@ -34,6 +34,7 @@ LIBS=${TOP_LIBDIR}/libxstat_fs.a \
        ${TOP_LIBDIR}/librx.a \
        ${TOP_LIBDIR}/liblwp.a \
        ${TOP_LIBDIR}/libsys.a \
+       $(TOP_LIBDIR)/libopr.a \
        ${TOP_LIBDIR}/util.a
 
 EXTRA_LIBS=${LIB_curses} ${XLIBS}
index a856670..01eb1df 100644 (file)
@@ -14,6 +14,7 @@ AFSLIBS= ${TOP_LIBDIR}/libafsauthent.a \
          ${TOP_LIBDIR}/libafsrpc.a \
         ${TOP_LIBDIR}/libafshcrypto.a \
         ${TOP_LIBDIR}/libcmd.a \
+        ${TOP_LIBDIR}/libopr.a \
          ${TOP_LIBDIR}/util.a
 
 SRCS=  aklog.c krb_util.c linked_list.c
index b44b3e0..a31ece0 100644 (file)
@@ -41,6 +41,7 @@ SAMPLELIBS =\
        ${TOP_LIBDIR}/libafsauthent.a \
        ${TOP_LIBDIR}/libafsrpc.a \
        $(TOP_LIBDIR)/libafsutil.a \
+       $(TOP_LIBDIR)/libopr.a \
        ${TOP_LIBDIR}/libafshcrypto_lwp.a
 
 all test tests: $(SAMPLEPROGS)
index 234a2e5..241d952 100644 (file)
@@ -23,6 +23,7 @@ AFSCPLIBS =\
        $(TOP_LIBDIR)/libafsauthent.a \
        $(TOP_LIBDIR)/libafsrpc.a \
        $(TOP_LIBDIR)/libcmd.a \
+       $(TOP_LIBDIR)/libopr.a \
        $(TOP_LIBDIR)/libafsutil.a \
        ${TOP_LIBDIR}/libafshcrypto_lwp.a
 
index d2e1118..4de08f4 100644 (file)
@@ -101,7 +101,7 @@ linktest: UAFS/$(LIBUAFS)
                $(LDFLAGS_roken) $(LDFLAGS_hcrypto) -o linktest \
                ${srcdir}/linktest.c $(COMMON_INCLUDE) -DUKERNEL \
                UAFS/$(LIBUAFS) ${TOP_LIBDIR}/libcmd.a \
-               ${TOP_LIBDIR}/libafsutil.a \
+               ${TOP_LIBDIR}/libafsutil.a $(TOP_LIBDIR)/libopr.a \
                $(LIB_hcrypto) $(LIB_roken) $(LIB_crypt) $(TEST_LIBS) $(XLIBS)
 
 CRULE1=        $(CC) $(COMMON_INCLUDE) $(CPPFLAGS_roken) $(OPTF) -DKERNEL $(LIBJUAFS_FLAGS) $(CFLAGS) -c $?
@@ -2033,8 +2033,10 @@ $(PERLUAFS)/ukernel.so: $(PERLUAFS)/ukernel_swig_perl.o UAFS.pic/libuafs_pic.a
                $(SWIG_PERL_LDFLAGS) $(LDFLAGS) \
                $(PERLUAFS)/ukernel_swig_perl.o \
                UAFS.pic/libuafs_pic.a ${TOP_LIBDIR}/libcmd_pic.a \
-               ${TOP_LIBDIR}/libafsutil_pic.a $(LDFLAGS_roken) \
-               $(LDFLAGS_hcrypto) $(LIB_hcrypto) $(LIB_roken) $(LIB_crypt) \
+               ${TOP_LIBDIR}/libafsutil_pic.a \
+               $(TOP_LIBDIR)/libopr.a \
+               $(LDFLAGS_roken) $(LDFLAGS_hcrypto) $(LIB_hcrypto) \
+               $(LIB_roken) $(LIB_crypt) \
                $(XLIBS)
 
 clean:
index cd1aa75..1c5622e 100644 (file)
@@ -78,6 +78,7 @@ extern afs_int32 afs_termState;
 #include "rx_trace.h"
 #include "rx_internal.h"
 #include "rx_stats.h"
+#include "rx_event.h"
 
 #include <afs/rxgen_consts.h>
 
@@ -196,7 +197,6 @@ extern afs_kmutex_t des_init_mutex;
 extern afs_kmutex_t des_random_mutex;
 extern afs_kmutex_t rx_clock_mutex;
 extern afs_kmutex_t rxi_connCacheMutex;
-extern afs_kmutex_t rx_event_mutex;
 extern afs_kmutex_t event_handler_mutex;
 extern afs_kmutex_t listener_mutex;
 extern afs_kmutex_t rx_if_init_mutex;
@@ -222,7 +222,6 @@ rxi_InitPthread(void)
     MUTEX_INIT(&rx_refcnt_mutex, "refcnts", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&epoch_mutex, "epoch", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_init_mutex, "init", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&rx_event_mutex, "event", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&event_handler_mutex, "event handler", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rxi_connCacheMutex, "conn cache", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&listener_mutex, "listener", MUTEX_DEFAULT, 0);
@@ -776,7 +775,7 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset)
     clock_Add(&when, offset);
 
     if (!call->delayedAckEvent
-       || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
+       || clock_Gt(&call->delayedAckTime, &when)) {
 
         rxevent_Cancel(&call->delayedAckEvent, call,
                       RX_CALL_REFCOUNT_DELAY);
@@ -787,11 +786,10 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset)
        call->delayedAckEvent = rxevent_Post(&when, &now,
                                             rxi_SendDelayedAck,
                                             call, NULL, 0);
+       call->delayedAckTime = when;
     }
 }
 
-
-
 /* called with unincremented nRequestsRunning to see if it is OK to start
  * a new thread in this service.  Could be "no" for two reasons: over the
  * max quota, or would prevent others from reaching their min quota.
@@ -3698,7 +3696,12 @@ rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2, int dummy)
     int i, waiting;
 
     MUTEX_ENTER(&conn->conn_data_lock);
-    conn->checkReachEvent = NULL;
+
+    if (event) {
+       rxevent_Put(conn->checkReachEvent);
+       conn->checkReachEvent = NULL;
+    }
+
     waiting = conn->flags & RX_CONN_ATTACHWAIT;
     if (event) {
         MUTEX_ENTER(&rx_refcnt_mutex);
@@ -4904,6 +4907,7 @@ rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy)
 #ifdef RX_ENABLE_LOCKS
     if (event) {
        MUTEX_ENTER(&call->lock);
+       rxevent_Put(call->delayedAckEvent);
        call->delayedAckEvent = NULL;
         MUTEX_ENTER(&rx_refcnt_mutex);
        CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
@@ -4915,8 +4919,10 @@ rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy)
     if (event)
        MUTEX_EXIT(&call->lock);
 #else /* RX_ENABLE_LOCKS */
-    if (event)
+    if (event) {
+       rxevent_Put(call->delayedAckEvent);
        call->delayedAckEvent = NULL;
+    }
     rxi_SendSpecial(call, call->conn, (struct rx_packet *)0,
                    RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
     call->flags |= RX_CALL_ACKALL_SENT;
@@ -4931,8 +4937,10 @@ rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1,
 #ifdef RX_ENABLE_LOCKS
     if (event) {
        MUTEX_ENTER(&call->lock);
-       if (event == call->delayedAckEvent)
+       if (event == call->delayedAckEvent) {
+           rxevent_Put(call->delayedAckEvent);
            call->delayedAckEvent = NULL;
+       }
         MUTEX_ENTER(&rx_refcnt_mutex);
        CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
         MUTEX_EXIT(&rx_refcnt_mutex);
@@ -4941,8 +4949,10 @@ rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1,
     if (event)
        MUTEX_EXIT(&call->lock);
 #else /* RX_ENABLE_LOCKS */
-    if (event)
+    if (event) {
+       rxevent_Put(call->delayedAckEvent);
        call->delayedAckEvent = NULL;
+    }
     (void)rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
 #endif /* RX_ENABLE_LOCKS */
 }
@@ -5895,6 +5905,7 @@ rxi_Resend(struct rxevent *event, void *arg0, void *arg1, int istack)
         MUTEX_ENTER(&rx_refcnt_mutex);
        CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
         MUTEX_EXIT(&rx_refcnt_mutex);
+       rxevent_Put(call->resendEvent);
        call->resendEvent = NULL;
     }
 
@@ -6369,6 +6380,7 @@ rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1,
     MUTEX_ENTER(&rx_refcnt_mutex);
     /* Only reschedule ourselves if the connection would not be destroyed */
     if (conn->refCount <= 1) {
+       rxevent_Put(conn->natKeepAliveEvent);
        conn->natKeepAliveEvent = NULL;
         MUTEX_EXIT(&rx_refcnt_mutex);
        MUTEX_EXIT(&conn->conn_data_lock);
@@ -6376,6 +6388,7 @@ rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1,
     } else {
        conn->refCount--; /* drop the reference for this */
         MUTEX_EXIT(&rx_refcnt_mutex);
+       rxevent_Put(conn->natKeepAliveEvent);
        conn->natKeepAliveEvent = NULL;
        rxi_ScheduleNatKeepAliveEvent(conn);
        MUTEX_EXIT(&conn->conn_data_lock);
@@ -6442,8 +6455,12 @@ rxi_KeepAliveEvent(struct rxevent *event, void *arg1, void *dummy,
     CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
     MUTEX_EXIT(&rx_refcnt_mutex);
     MUTEX_ENTER(&call->lock);
-    if (event == call->keepAliveEvent)
+
+    if (event == call->keepAliveEvent) {
+       rxevent_Put(call->keepAliveEvent);
        call->keepAliveEvent = NULL;
+    }
+
     now = clock_Sec();
 
 #ifdef RX_ENABLE_LOCKS
@@ -6485,8 +6502,10 @@ rxi_GrowMTUEvent(struct rxevent *event, void *arg1, void *dummy, int dummy2)
     MUTEX_EXIT(&rx_refcnt_mutex);
     MUTEX_ENTER(&call->lock);
 
-    if (event == call->growMTUEvent)
+    if (event == call->growMTUEvent) {
+       rxevent_Put(call->growMTUEvent);
        call->growMTUEvent = NULL;
+    }
 
 #ifdef RX_ENABLE_LOCKS
     if (rxi_CheckCall(call, 0)) {
@@ -6594,6 +6613,7 @@ rxi_SendDelayedConnAbort(struct rxevent *event, void *arg1, void *unused,
     struct rx_packet *packet;
 
     MUTEX_ENTER(&conn->conn_data_lock);
+    rxevent_Put(conn->delayedAbortEvent);
     conn->delayedAbortEvent = NULL;
     error = htonl(conn->error);
     conn->abortCount++;
@@ -6620,6 +6640,7 @@ rxi_SendDelayedCallAbort(struct rxevent *event, void *arg1, void *dummy,
     struct rx_packet *packet;
 
     MUTEX_ENTER(&call->lock);
+    rxevent_Put(call->delayedAbortEvent);
     call->delayedAbortEvent = NULL;
     error = htonl(call->error);
     call->abortCount++;
@@ -6646,7 +6667,11 @@ rxi_ChallengeEvent(struct rxevent *event,
 {
     struct rx_connection *conn = arg0;
 
-    conn->challengeEvent = NULL;
+    if (event) {
+       rxevent_Put(conn->challengeEvent);
+       conn->challengeEvent = NULL;
+    }
+
     if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
        struct rx_packet *packet;
        struct clock when, now;
@@ -7074,7 +7099,7 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2,
 
     when = now;
     when.sec += RX_REAP_TIME;  /* Check every RX_REAP_TIME seconds */
-    rxevent_Post(&when, &now, rxi_ReapConnections, 0, NULL, 0);
+    rxevent_Put(rxevent_Post(&when, &now, rxi_ReapConnections, 0, NULL, 0));
 }
 
 
index 42fb3bd..50d2bf2 100644 (file)
@@ -529,6 +529,7 @@ struct rx_call {
     struct rxevent *keepAliveEvent;    /* Scheduled periodically in active calls to keep call alive */
     struct rxevent *growMTUEvent;      /* Scheduled periodically in active calls to discover true maximum MTU */
     struct rxevent *delayedAckEvent;   /* Scheduled after all packets are received to send an ack if a reply or new call is not generated soon */
+    struct clock delayedAckTime;        /* Time that next delayed ack was scheduled  for */
     struct rxevent *delayedAbortEvent; /* Scheduled to throttle looping client */
     int abortCode;             /* error code from last RPC */
     int abortCount;            /* number of times last error was sent */
index 2bdc0b8..648d571 100644 (file)
@@ -86,6 +86,11 @@ extern int clock_nUpdates;
 
 /* Current clock time, truncated to seconds */
 #define        clock_Sec() ((!clock_haveCurrentTime)? clock_UpdateTime(), clock_now.sec:clock_now.sec)
+
+extern void clock_Init(void);
+extern int clock_UnInit(void);
+extern void clock_UpdateTime(void);
+
 #endif /* AFS_USE_GETTIMEOFDAY || AFS_PTHREAD_ENV */
 #else /* KERNEL */
 #define clock_Init()
index 39798cf..6baf8a8 100644 (file)
 /*
- * Copyright 2000, International Business Machines Corporation and others.
- * All Rights Reserved.
+ * Copyright (c) 2011 Your File System Inc. All rights reserved.
  *
- * This software has been released under the terms of the IBM Public
- * License.  For details, see the LICENSE file in the top-level source
- * directory or online at http://www.openafs.org/dl/license10.html
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* A reimplementation of the rx_event handler using red/black trees
+ *
+ * The first rx_event implementation used a simple sorted queue of all
+ * events, which lead to O(n^2) performance, where n is the number of
+ * outstanding events. This was found to scale poorly, so was replaced.
+ *
+ * The second implementation used a set of per-second buckets to store
+ * events. Each bucket (referred to as an epoch in the code) stored all
+ * of the events which expired in that second. However, on modern networks
+ * where RTT times are in the millisecond, most connections will have events
+ * expiring within the next second, so the problem reoccurs.
+ *
+ * This new implementation uses Red-Black trees to store a sorted list of
+ * events. Red Black trees are guaranteed to have no worse than O(log N)
+ * insertion, and are commonly used in timer applications
  */
 
 #include <afsconfig.h>
 #include <afs/param.h>
 
-#ifdef AFS_SUN59_ENV
-# include <sys/time_impl.h>
-#endif
-
 #ifdef KERNEL
-# ifndef UKERNEL
-#  include "afs/afs_osi.h"
-# else /* !UKERNEL */
-#  include "afs/sysincludes.h"
-#  include "afsincludes.h"
-# endif /* !UKERNEL */
-# include "rx_kernel.h"
-# include "rx_kmutex.h"
-# if defined(AFS_SGI_ENV)
-#  include "sys/debug.h"
-/* These are necessary to get curproc (used by GLOCK asserts) to work. */
-#  include "h/proc.h"
-#  if !defined(AFS_SGI64_ENV) && !defined(UKERNEL)
-#   include "h/user.h"
-#  endif
-extern void *osi_Alloc();
-# endif
-# if defined(AFS_OBSD_ENV)
-#  if defined(AFS_OBSD48_ENV)
-#   include "h/systm.h"
-#  else
-#   include "h/proc.h"
-#  endif
-# endif
-#else /* KERNEL */
+# include "afs/sysincludes.h"
+# include "afsincludes.h"
+#else
 # include <roken.h>
-# include "rx_user.h"
-# ifdef AFS_PTHREAD_ENV
-#  include "rx_pthread.h"
-# else
-#  include "rx_lwp.h"
-# endif
-#endif /* KERNEL */
+#endif
+
+#include <afs/opr.h>
+#include <opr/queue.h>
+#include <opr/rbtree.h>
 
 #include "rx.h"
-#include "rx_clock.h"
-#include "rx_queue.h"
-#include "rx_event.h"
-#include "rx_globals.h"
-
-/* All event processing is relative to the apparent current time given by clock_GetTime */
-
-/* This should be static, but event_test wants to look at the free list... */
-struct rx_queue rxevent_free;  /* It's somewhat bogus to use a doubly-linked queue for the free list */
-struct rx_queue rxepoch_free;  /* It's somewhat bogus to use a doubly-linked queue for the free list */
-static struct rx_queue rxepoch_queue;  /* list of waiting epochs */
-static int rxevent_allocUnit = 10;     /* Allocation unit (number of event records to allocate at one time) */
-static int rxepoch_allocUnit = 10;     /* Allocation unit (number of epoch records to allocate at one time) */
-int rxevent_nFree;             /* Number of free event records */
-int rxevent_nPosted;           /* Current number of posted events */
-int rxepoch_nFree;             /* Number of free epoch records */
-static void (*rxevent_ScheduledEarlierEvent) (void);   /* Proc to call when an event is scheduled that is earlier than all other events */
-struct xfreelist {
+#include "rx_atomic.h"
+
+struct rxevent {
+    struct opr_queue q;
+    struct opr_rbtree_node node;
+    struct clock eventTime;
+    struct rxevent *next;
+    rx_atomic_t refcnt;
+    int handled;
+    void (*func)(struct rxevent *, void *, void *, int);
+    void *arg;
+    void *arg1;
+    int arg2;
+};
+
+struct malloclist {
     void *mem;
     int size;
-    struct xfreelist *next;
+    struct malloclist *next;
 };
-static struct xfreelist *xfreemallocs = 0, *xsp = 0;
-
-struct clock rxevent_nextRaiseEvents;  /* Time of next call to raise events */
-struct clock rxevent_lastEvent;        /* backwards time detection */
-int rxevent_raiseScheduled;    /* true if raise events is scheduled */
-
-#ifdef RX_ENABLE_LOCKS
-#ifdef RX_LOCKS_DB
-/* rxdb_fileID is used to identify the lock location, along with line#. */
-static int rxdb_fileID = RXDB_FILE_RX_EVENT;
-#endif /* RX_LOCKS_DB */
-#define RX_ENABLE_LOCKS  1
-afs_kmutex_t rxevent_lock;
-#endif /* RX_ENABLE_LOCKS */
-
-#ifdef AFS_PTHREAD_ENV
-/*
- * This mutex protects the following global variables:
- * rxevent_initialized
- */
 
-afs_kmutex_t rx_event_mutex;
-#define LOCK_EV_INIT MUTEX_ENTER(&rx_event_mutex)
-#define UNLOCK_EV_INIT MUTEX_EXIT(&rx_event_mutex)
+static struct {
+    afs_kmutex_t lock;
+    struct opr_queue list;
+    struct malloclist *mallocs;
+} freeEvents;
+
+static struct {
+    afs_kmutex_t lock;
+    struct opr_rbtree head;
+    struct rxevent *first;
+} eventTree;
+
+static struct {
+    afs_kmutex_t lock;
+    struct clock last;
+    struct clock next;
+    void (*func)(void);
+    int raised;
+} eventSchedule;
+
+static int allocUnit = 10;
+
+static struct rxevent *
+rxevent_alloc(void) {
+     struct rxevent *evlist;
+     struct rxevent *ev;
+     struct malloclist *mrec;
+     int i;
+
+     MUTEX_ENTER(&freeEvents.lock);
+     if (opr_queue_IsEmpty(&freeEvents.list)) {
+       MUTEX_EXIT(&freeEvents.lock);
+
+#if    defined(AFS_AIX32_ENV) && defined(KERNEL)
+       ev = rxi_Alloc(sizeof(struct rxevent));
 #else
-#define LOCK_EV_INIT
-#define UNLOCK_EV_INIT
-#endif /* AFS_PTHREAD_ENV */
+       evlist = osi_Alloc(sizeof(struct rxevent) * allocUnit);
+       mrec = osi_Alloc(sizeof(struct malloclist));
 
+       mrec->mem = evlist;
+       mrec->size = sizeof(struct rxevent) * allocUnit;
 
-int
-rxevent_adjTimes(struct clock *adjTime)
-{
-    /* backwards clock correction */
-    int nAdjusted = 0;
-    struct rxepoch *qep, *nqep;
-    struct rxevent *qev, *nqev;
-
-    for (queue_Scan(&rxepoch_queue, qep, nqep, rxepoch)) {
-       for (queue_Scan(&qep->events, qev, nqev, rxevent)) {
-           if (clock_Gt(&qev->eventTime, adjTime)) {
-               clock_Sub(&qev->eventTime, adjTime);
-               nAdjusted++;
-           }
-       }
-       if (qep->epochSec > adjTime->sec) {
-           qep->epochSec -= adjTime->sec;
+       MUTEX_ENTER(&freeEvents.lock);
+       for (i = 1; i < allocUnit; i++) {
+           opr_queue_Append(&freeEvents.list, &evlist[i].q);
        }
+       mrec->next = freeEvents.mallocs;
+       freeEvents.mallocs = mrec;
+       MUTEX_EXIT(&freeEvents.lock);
+#endif
+       ev = &evlist[0];
+    } else {
+       ev = opr_queue_First(&freeEvents.list, struct rxevent, q);
+       opr_queue_Remove(&ev->q);
+       MUTEX_EXIT(&freeEvents.lock);
     }
-    return nAdjusted;
+
+    memset(ev, 0, sizeof(struct rxevent));
+    rx_atomic_set(&ev->refcnt, 1);
+
+    return ev;
 }
 
-/* Pass in the number of events to allocate at a time */
-int rxevent_initialized = 0;
-void
-rxevent_Init(int nEvents, void (*scheduler) (void))
-{
-    LOCK_EV_INIT;
-    if (rxevent_initialized) {
-       UNLOCK_EV_INIT;
-       return;
-    }
-    MUTEX_INIT(&rxevent_lock, "rxevent_lock", MUTEX_DEFAULT, 0);
-    clock_Init();
-    if (nEvents)
-       rxevent_allocUnit = nEvents;
-    queue_Init(&rxevent_free);
-    queue_Init(&rxepoch_free);
-    queue_Init(&rxepoch_queue);
-    rxevent_nFree = rxevent_nPosted = 0;
-    rxepoch_nFree = 0;
-    rxevent_ScheduledEarlierEvent = scheduler;
-    rxevent_initialized = 1;
-    clock_Zero(&rxevent_nextRaiseEvents);
-    clock_Zero(&rxevent_lastEvent);
-    rxevent_raiseScheduled = 0;
-    UNLOCK_EV_INIT;
+static void
+rxevent_free(struct rxevent *ev) {
+    MUTEX_ENTER(&freeEvents.lock);
+    opr_queue_Prepend(&freeEvents.list, &ev->q);
+    MUTEX_EXIT(&freeEvents.lock);
 }
 
-/* Create and initialize new epoch structure */
-struct rxepoch *
-rxepoch_Allocate(struct clock *when)
-{
-    struct rxepoch *ep;
-    int i;
-
-    /* If we are short on free epoch entries, create a block of new oned
-     * and add them to the free queue */
-    if (queue_IsEmpty(&rxepoch_free)) {
-#if    defined(AFS_AIX32_ENV) && defined(KERNEL)
-       ep = rxi_Alloc(sizeof(struct rxepoch));
-       queue_Append(&rxepoch_free, &ep[0]), rxepoch_nFree++;
-#else
-#if defined(KERNEL) && !defined(UKERNEL) && defined(AFS_FBSD80_ENV)
-       ep = (struct rxepoch *)
-           afs_osi_Alloc_NoSleep(sizeof(struct rxepoch) * rxepoch_allocUnit);
-       xsp = xfreemallocs;
-       xfreemallocs =
-           (struct xfreelist *)afs_osi_Alloc_NoSleep(sizeof(struct xfreelist));
-#else
-       ep = (struct rxepoch *)
-           osi_Alloc(sizeof(struct rxepoch) * rxepoch_allocUnit);
-       xsp = xfreemallocs;
-       xfreemallocs =
-           (struct xfreelist *)osi_Alloc(sizeof(struct xfreelist));
-#endif
-       xfreemallocs->mem = (void *)ep;
-       xfreemallocs->size = sizeof(struct rxepoch) * rxepoch_allocUnit;
-       xfreemallocs->next = xsp;
-       for (i = 0; i < rxepoch_allocUnit; i++)
-           queue_Append(&rxepoch_free, &ep[i]), rxepoch_nFree++;
-#endif
+static_inline void
+rxevent_put(struct rxevent *ev) {
+    if (rx_atomic_dec_and_read(&ev->refcnt) == 0) {
+        rxevent_free(ev);
     }
-    ep = queue_First(&rxepoch_free, rxepoch);
-    queue_Remove(ep);
-    rxepoch_nFree--;
-    ep->epochSec = when->sec;
-    queue_Init(&ep->events);
-    return ep;
 }
 
-/* Add the indicated event (function, arg) at the specified clock time.  The
- * "when" argument specifies when "func" should be called, in clock (clock.h)
- * units. */
+void
+rxevent_Put(struct rxevent *ev) {
+    rxevent_put(ev);
+}
+
+static_inline struct rxevent *
+rxevent_get(struct rxevent *ev) {
+    rx_atomic_inc(&ev->refcnt);
+    return ev;
+}
 
 struct rxevent *
-rxevent_Post(struct clock *when, struct clock *now,
-             void (*func) (struct rxevent *, void *, void *, int),
-             void *arg, void *arg1, int arg2)
+rxevent_Get(struct rxevent *ev) {
+    return rxevent_get(ev);
+}
+
+/* Called if the time now is older than the last time we recorded running an
+ * event. This test catches machines where the system time has been set
+ * backwards, and avoids RX completely stalling when timers fail to fire.
+ *
+ * Take the different between now and the last event time, and subtract that
+ * from the timing of every event on the system. This does a relatively slow
+ * walk of the completely eventTree, but time-travel will hopefully be a pretty
+ * rare occurrence.
+ *
+ * This can only safely be called from the event thread, as it plays with the
+ * schedule directly.
+ *
+ */
+static void
+adjustTimes(void)
 {
-    struct rxevent *ev, *evqe, *evqpr;
-    struct rxepoch *ep, *epqe, *epqpr;
-    int isEarliest = 0;
-
-    MUTEX_ENTER(&rxevent_lock);
-#ifdef RXDEBUG
-    if (rx_Log_event) {
-       struct clock now1;
-       clock_GetTime(&now1);
-       fprintf(rx_Log_event, "%ld.%ld: rxevent_Post(%ld.%ld, "
-                             "%"AFS_PTR_FMT", %"AFS_PTR_FMT", "
-                             "%"AFS_PTR_FMT", %d)\n",
-               afs_printable_int32_ld(now1.sec),
-               afs_printable_int32_ld(now1.usec),
-               afs_printable_int32_ld(when->sec),
-               afs_printable_int32_ld(when->usec),
-               func, arg,
-               arg1, arg2);
-    }
-#endif
-    /* If a time was provided, check for consistency */
-    if (now->sec) {
-       if (clock_Gt(&rxevent_lastEvent, now)) {
-           struct clock adjTime = rxevent_lastEvent;
-           clock_Sub(&adjTime, now);
-           rxevent_adjTimes(&adjTime);
-       }
-       rxevent_lastEvent = *now;
-    }
-    /* Get a pointer to the epoch for this event, if none is found then
-     * create a new epoch and insert it into the sorted list */
-    for (ep = NULL, queue_ScanBackwards(&rxepoch_queue, epqe, epqpr, rxepoch)) {
-       if (when->sec == epqe->epochSec) {
-           /* already have an structure for this epoch */
-           ep = epqe;
-           if (ep == queue_First(&rxepoch_queue, rxepoch))
-               isEarliest = 1;
-           break;
-       } else if (when->sec > epqe->epochSec) {
-           /* Create a new epoch and insert after qe */
-           ep = rxepoch_Allocate(when);
-           queue_InsertAfter(epqe, ep);
-           break;
-       }
-    }
-    if (ep == NULL) {
-       /* Create a new epoch and place it at the head of the list */
-       ep = rxepoch_Allocate(when);
-       queue_Prepend(&rxepoch_queue, ep);
-       isEarliest = 1;
-    }
+    struct opr_rbtree_node *node;
+    struct clock adjTime, now;
 
-    /* If we're short on free event entries, create a block of new ones and add
-     * them to the free queue */
-    if (queue_IsEmpty(&rxevent_free)) {
-       int i;
-#if    defined(AFS_AIX32_ENV) && defined(KERNEL)
-       ev = rxi_Alloc(sizeof(struct rxevent));
-       queue_Append(&rxevent_free, &ev[0]), rxevent_nFree++;
-#else
+    MUTEX_ENTER(&eventTree.lock);
+    /* Time adjustment is expensive, make absolutely certain that we have
+     * to do it, by getting an up to date time to base our decision on
+     * once we've acquired the relevant locks.
+     */
+    clock_GetTime(&now);
+    if (!clock_Lt(&now, &eventSchedule.last))
+       goto out;
 
-#if defined(KERNEL) && !defined(UKERNEL) && defined(AFS_FBSD80_ENV)
-       ev = (struct rxevent *)afs_osi_Alloc_NoSleep(sizeof(struct rxevent) *
-                                        rxevent_allocUnit);
-       xsp = xfreemallocs;
-       xfreemallocs =
-           (struct xfreelist *)afs_osi_Alloc_NoSleep(sizeof(struct xfreelist));
-#else
-       ev = (struct rxevent *)osi_Alloc(sizeof(struct rxevent) *
-                                        rxevent_allocUnit);
-       xsp = xfreemallocs;
-       xfreemallocs =
-           (struct xfreelist *)osi_Alloc(sizeof(struct xfreelist));
-#endif
-       xfreemallocs->mem = (void *)ev;
-       xfreemallocs->size = sizeof(struct rxevent) * rxevent_allocUnit;
-       xfreemallocs->next = xsp;
-       for (i = 0; i < rxevent_allocUnit; i++)
-           queue_Append(&rxevent_free, &ev[i]), rxevent_nFree++;
-#endif
+    adjTime = eventSchedule.last;
+    clock_Zero(&eventSchedule.last);
+
+    clock_Sub(&adjTime, &now);
+
+    node = opr_rbtree_first(&eventTree.head);
+    while(node) {
+       struct rxevent *event = opr_containerof(node, struct rxevent, node);
+
+       clock_Sub(&event->eventTime, &adjTime);
+       node = opr_rbtree_next(node);
     }
+    eventSchedule.next = eventTree.first->eventTime;
+
+out:
+    MUTEX_EXIT(&eventTree.lock);
+}
+
+static int initialised = 0;
+void
+rxevent_Init(int nEvents, void (*scheduler)(void))
+{
+    if (initialised)
+       return;
+
+    initialised = 1;
 
-    /* Grab and initialize a new rxevent structure */
-    ev = queue_First(&rxevent_free, rxevent);
-    queue_Remove(ev);
-    rxevent_nFree--;
+    clock_Init();
+    MUTEX_INIT(&eventTree.lock, "event tree lock", MUTEX_DEFAULT, 0);
+    opr_rbtree_init(&eventTree.head);
+
+    MUTEX_INIT(&freeEvents.lock, "free events lock", MUTEX_DEFAULT, 0);
+    opr_queue_Init(&freeEvents.list);
+    freeEvents.mallocs = NULL;
+
+    if (nEvents)
+       allocUnit = nEvents;
 
-    /* Record user defined event state */
+    clock_Zero(&eventSchedule.next);
+    clock_Zero(&eventSchedule.last);
+    eventSchedule.raised = 0;
+    eventSchedule.func = scheduler;
+}
+
+struct rxevent *
+rxevent_Post(struct clock *when, struct clock *now,
+            void (*func) (struct rxevent *, void *, void *, int),
+            void *arg, void *arg1, int arg2)
+{
+    struct rxevent *ev, *event;
+    struct opr_rbtree_node **childptr, *parent = NULL;
+
+    ev = rxevent_alloc();
     ev->eventTime = *when;
     ev->func = func;
     ev->arg = arg;
     ev->arg1 = arg1;
     ev->arg2 = arg2;
-    rxevent_nPosted += 1;      /* Rather than ++, to shut high-C up
-                                *  regarding never-set variables
-                                */
-
-    /* Insert the event into the sorted list of events for this epoch */
-    for (queue_ScanBackwards(&ep->events, evqe, evqpr, rxevent)) {
-       if (when->usec >= evqe->eventTime.usec) {
-           /* Insert event after evqe */
-           queue_InsertAfter(evqe, ev);
-           MUTEX_EXIT(&rxevent_lock);
-           return ev;
+
+    if (clock_Lt(now, &eventSchedule.last))
+       adjustTimes();
+
+    MUTEX_ENTER(&eventTree.lock);
+
+    /* Work out where in the tree we'll be storing this */
+    childptr = &eventTree.head.root;
+
+    while(*childptr) {
+       event = opr_containerof((*childptr), struct rxevent, node);
+
+       parent = *childptr;
+       if (clock_Lt(when, &event->eventTime))
+           childptr = &(*childptr)->left;
+       else if (clock_Gt(when, &event->eventTime))
+           childptr = &(*childptr)->right;
+       else {
+           opr_queue_Append(&event->q, &ev->q);
+           goto out;
        }
     }
-    /* Insert event at head of current epoch */
-    queue_Prepend(&ep->events, ev);
-    if (isEarliest && rxevent_ScheduledEarlierEvent
-       && (!rxevent_raiseScheduled
-           || clock_Lt(&ev->eventTime, &rxevent_nextRaiseEvents))) {
-       rxevent_raiseScheduled = 1;
-       clock_Zero(&rxevent_nextRaiseEvents);
-       MUTEX_EXIT(&rxevent_lock);
-       /* Notify our external scheduler */
-       (*rxevent_ScheduledEarlierEvent) ();
-       MUTEX_ENTER(&rxevent_lock);
+    opr_queue_Init(&ev->q);
+    opr_rbtree_insert(&eventTree.head, parent, childptr, &ev->node);
+
+    if (eventTree.first == NULL ||
+       clock_Lt(when, &(eventTree.first->eventTime))) {
+       eventTree.first = ev;
+       eventSchedule.raised = 1;
+       clock_Zero(&eventSchedule.next);
+       MUTEX_EXIT(&eventTree.lock);
+       (*eventSchedule.func)();
+       return rxevent_get(ev);
     }
-    MUTEX_EXIT(&rxevent_lock);
-    return ev;
+
+out:
+    MUTEX_EXIT(&eventTree.lock);
+    return rxevent_get(ev);
 }
 
-/* Cancel an event by moving it from the event queue to the free list.
- * Warning, the event must be on the event queue!  If not, this should core
- * dump (reference through 0).  This routine should be called using the macro
- * event_Cancel, which checks for a null event and also nulls the caller's
- * event pointer after cancelling the event.
- */
-#ifdef RX_ENABLE_LOCKS
-#ifdef RX_REFCOUNT_CHECK
-int rxevent_Cancel_type = 0;
-#endif
-#endif
+/* We're going to remove ev from the tree, so set the first pointer to the
+ * next event after it */
+static_inline void
+resetFirst(struct rxevent *ev)
+{
+    struct opr_rbtree_node *next = opr_rbtree_next(&ev->node);
+    if (next)
+       eventTree.first = opr_containerof(next, struct rxevent, node);
+    else
+       eventTree.first = NULL;
+}
 
 void
 rxevent_Cancel(struct rxevent **evp, struct rx_call *call, int type)
 {
-    struct rxevent *ev = *evp;
-
-#ifdef RXDEBUG
-    if (rx_Log_event) {
-       struct clock now;
-       clock_GetTime(&now);
-       fprintf(rx_Log_event, "%d.%d: rxevent_Cancel_1(%d.%d, %"
-               AFS_PTR_FMT ", %p" AFS_PTR_FMT ")\n",
-               (int)now.sec, (int)now.usec, (int)ev->eventTime.sec,
-               (int)ev->eventTime.usec, ev->func,
-               ev->arg);
-    }
-#endif
-    /* Append it to the free list (rather than prepending) to keep the free
-     * list hot so nothing pages out
-     */
-    MUTEX_ENTER(&rxevent_lock);
-    if (!ev) {
-       MUTEX_EXIT(&rxevent_lock);
+    struct rxevent *event;
+
+    if (!evp || !*evp)
        return;
-    }
 
-    *evp = NULL;
+    event = *evp;
+
+    MUTEX_ENTER(&eventTree.lock);
+
+    if (!event->handled) {
+       /* We're a node on the red/black tree. If our list is non-empty,
+        * then swap the first element in the list in in our place,
+        * promoting it to the list head */
+       if (event->node.parent == NULL
+           && eventTree.head.root != &event->node) {
+           /* Not in the rbtree, therefore must be a list element */
+           opr_queue_Remove(&event->q);
+       } else {
+           if (!opr_queue_IsEmpty(&event->q)) {
+               struct rxevent *next;
+
+               next = opr_queue_First(&event->q, struct rxevent, q);
+               opr_queue_Remove(&next->q); /* Remove ourselves from list */
+               if (event->q.prev == &event->q) {
+                   next->q.prev = next->q.next = &next->q;
+               } else {
+                   next->q = event->q;
+                   next->q.prev->next = &next->q;
+                   next->q.next->prev = &next->q;
+               }
+
+               opr_rbtree_replace(&eventTree.head, &event->node,
+                                  &next->node);
+
+               if (eventTree.first == event)
+                   eventTree.first = next;
 
-#ifdef RX_ENABLE_LOCKS
-    /* It's possible we're currently processing this event. */
-    if (queue_IsOnQueue(ev)) {
-       queue_MoveAppend(&rxevent_free, ev);
-       rxevent_nPosted--;
-       rxevent_nFree++;
-       if (call) {
-           call->refCount--;
-#ifdef RX_REFCOUNT_CHECK
-           call->refCDebug[type]--;
-           if (call->refCDebug[type] < 0) {
-               rxevent_Cancel_type = type;
-               osi_Panic("rxevent_Cancel: call refCount < 0");
+           } else {
+               if (eventTree.first == event)
+                   resetFirst(event);
+
+               opr_rbtree_remove(&eventTree.head, &event->node);
            }
-#endif /* RX_REFCOUNT_CHECK */
        }
+       event->handled = 1;
+       rxevent_put(event); /* Dispose of eventTree reference */
     }
-#else /* RX_ENABLE_LOCKS */
-    queue_MoveAppend(&rxevent_free, ev);
-    rxevent_nPosted--;
-    rxevent_nFree++;
-#endif /* RX_ENABLE_LOCKS */
-    MUTEX_EXIT(&rxevent_lock);
+
+    MUTEX_EXIT(&eventTree.lock);
+
+    *evp = NULL;
+    rxevent_put(event); /* Dispose of caller's reference */
+
+    if (call)
+       CALL_RELE(call, type);
 }
 
-/* Process all epochs that have expired relative to the current clock time
- * (which is not re-evaluated unless clock_NewTime has been called).  The
- * relative time to the next epoch is returned in the output parameter next
- * and the function returns 1.  If there are is no next epoch, the function
- * returns 0.
+/* Process all events which have expired. If events remain, then the relative
+ * time until the next event is returned in the parameter 'wait', and the
+ * function returns 1. If no events currently remain, the function returns 0
+ *
+ * If the current time is older than that of the last event processed, then we
+ * assume that time has gone backwards (for example, due to a system time reset)
+ * When this happens, all events in the current queue are rescheduled, using
+ * the difference between the current time and the last event time as a delta
  */
+
 int
-rxevent_RaiseEvents(struct clock *next)
+rxevent_RaiseEvents(struct clock *wait)
 {
-    struct rxepoch *ep;
-    struct rxevent *ev;
-    volatile struct clock now;
-    MUTEX_ENTER(&rxevent_lock);
-
-    /* Events are sorted by time, so only scan until an event is found that has
-     * not yet timed out */
-
-    clock_Zero(&now);
-    while (queue_IsNotEmpty(&rxepoch_queue)) {
-       ep = queue_First(&rxepoch_queue, rxepoch);
-       if (queue_IsEmpty(&ep->events)) {
-           queue_Remove(ep);
-           queue_Append(&rxepoch_free, ep);
-           rxepoch_nFree++;
-           continue;
+    struct clock now;
+    struct rxevent *event;
+    int ret;
+
+    clock_GetTime(&now);
+
+    /* Check for time going backwards */
+    if (clock_Lt(&now, &eventSchedule.last))
+         adjustTimes();
+    eventSchedule.last = now;
+
+    MUTEX_ENTER(&eventTree.lock);
+    /* Lock our event tree */
+    while (eventTree.first != NULL
+          && clock_Lt(&eventTree.first->eventTime, &now)) {
+
+       /* Grab the next node, either in the event's list, or in the tree node
+        * itself, and remove it from the event tree */
+       event = eventTree.first;
+       if (!opr_queue_IsEmpty(&event->q)) {
+           event = opr_queue_Last(&event->q, struct rxevent, q);
+           opr_queue_Remove(&event->q);
+       } else {
+           resetFirst(event);
+           opr_rbtree_remove(&eventTree.head, &event->node);
        }
-       do {
-       reraise:
-           ev = queue_First(&ep->events, rxevent);
-           if (clock_Lt(&now, &ev->eventTime)) {
-               clock_GetTime(&now);
-               if (clock_Gt(&rxevent_lastEvent, &now)) {
-                   struct clock adjTime = rxevent_lastEvent;
-                   int adjusted;
-                   clock_Sub(&adjTime, &now);
-                   adjusted = rxevent_adjTimes(&adjTime);
-                   rxevent_lastEvent = now;
-                   if (adjusted > 0)
-                       goto reraise;
-               }
-               if (clock_Lt(&now, &ev->eventTime)) {
-                    *next = rxevent_nextRaiseEvents = ev->eventTime;
-                    rxevent_raiseScheduled = 1;
-                    clock_Sub(next, &now);
-                    MUTEX_EXIT(&rxevent_lock);
-                    return 1;
-                }
-            }
-           queue_Remove(ev);
-           rxevent_nPosted--;
-           MUTEX_EXIT(&rxevent_lock);
-           ev->func(ev, ev->arg, ev->arg1, ev->arg2);
-           MUTEX_ENTER(&rxevent_lock);
-           queue_Append(&rxevent_free, ev);
-           rxevent_nFree++;
-       } while (queue_IsNotEmpty(&ep->events));
+       event->handled = 1;
+        MUTEX_EXIT(&eventTree.lock);
+
+        /* Fire the event, then free the structure */
+       event->func(event, event->arg, event->arg1, event->arg2);
+       rxevent_put(event);
+
+       MUTEX_ENTER(&eventTree.lock);
     }
-#ifdef RXDEBUG
-    if (rx_Log_event)
-       fprintf(rx_Log_event, "rxevent_RaiseEvents(%d.%d)\n", (int)now.sec,
-               (int)now.usec);
-#endif
-    rxevent_raiseScheduled = 0;
-    MUTEX_EXIT(&rxevent_lock);
-    return 0;
+
+    /* Figure out when we next need to be scheduled */
+    if (eventTree.first != NULL) {
+       *wait = eventSchedule.next = eventTree.first->eventTime;
+       ret = eventSchedule.raised = 1;
+       clock_Sub(wait, &now);
+    } else {
+       ret = eventSchedule.raised = 0;
+    }
+
+    MUTEX_EXIT(&eventTree.lock);
+
+    return ret;
 }
 
 void
 shutdown_rxevent(void)
 {
-    struct xfreelist *xp, *nxp;
+    struct malloclist *mrec, *nmrec;
 
-    LOCK_EV_INIT;
-    if (!rxevent_initialized) {
-       UNLOCK_EV_INIT;
+    if (!initialised) {
        return;
     }
-    rxevent_initialized = 0;
-    UNLOCK_EV_INIT;
-    MUTEX_DESTROY(&rxevent_lock);
-#if    defined(AFS_AIX32_ENV) && defined(KERNEL)
-    /* Everything is freed in afs_osinet.c */
-#else
-    xp = xfreemallocs;
-    while (xp) {
-       nxp = xp->next;
-       osi_Free((char *)xp->mem, xp->size);
-       osi_Free((char *)xp, sizeof(struct xfreelist));
-       xp = nxp;
+    MUTEX_DESTROY(&eventTree.lock);
+
+#if !defined(AFS_AIX32_ENV) || !defined(KERNEL)
+    MUTEX_DESTROY(&freeEvents.lock);
+    mrec = freeEvents.mallocs;
+    while (mrec) {
+       nmrec = mrec->next;
+       osi_Free(mrec->mem, mrec->size);
+       osi_Free(mrec, sizeof(struct malloclist));
+       mrec = nmrec;
     }
-    xfreemallocs = NULL;
+    mrec = NULL;
 #endif
-
 }
index e1921a2..df9b426 100644 (file)
@@ -9,91 +9,44 @@
 
 /* Event package */
 
-#ifndef _EVENT_
-#define _EVENT_
-
-#ifdef KERNEL
-#include "rx/rx_queue.h"
-#include "rx/rx_clock.h"
-#else /* KERNEL */
-#include "rx_queue.h"
-#include "rx_clock.h"
-#endif /* KERNEL */
-
-/* An event is something that will happen at (or after) a specified clock
- * time, unless cancelled prematurely.  The user routine (*func)() is called
- * with arguments (event, arg, arg1) when the event occurs.
- * Warnings:
- *   (1) The user supplied routine should NOT cause process preemption.
- *   (2) The event passed to the user is still on the event queue at that
- *       time.  The user must not remove (event_Cancel) it explicitly, but
- *       the user may remove or schedule any OTHER event at this time.
- */
-
-struct rxevent {
-    struct rx_queue junk;      /* Events are queued */
-    struct clock eventTime;    /* When this event times out (in clock.c units) */
-    void (*func) (struct rxevent *, void *, void *, int);
-    void *arg;                 /* Argument to the function */
-    void *arg1;                        /* Another argument */
-    int arg2;                  /* An integer argument */
-};
-
-/* We used to maintain a sorted list of events, but the amount of CPU
- * required to maintain the list grew with the square of the number of
- * connections. Now we keep a list of epochs, each epoch contains the
- * events scheduled for a particular second. Each epoch contains a sorted
- * list of the events scheduled for that epoch. */
-struct rxepoch {
-    struct rx_queue junk;      /* Epochs are queued */
-    int epochSec;              /* each epoch spans one second */
-    struct rx_queue events;    /* list of events for this epoch */
-};
-
-/* Some macros to make macros more reasonable (this allows a block to be
- * used within a macro which does not cause if statements to screw up).
- * That is, you can use "if (...) macro_name(); else ...;" without
- * having things blow up on the semi-colon. */
-
-#ifndef BEGIN
-#define BEGIN do {
-#define END } while(0)
-#endif
+#ifndef OPENAFS_RX_EVENT_H
+#define OPENAFS_RX_EVENT_H
 
 /* This routine must be called to initialize the event package.
  * nEvents is the number of events to allocate in a batch whenever
  * more are needed.  If this is 0, a default number (10) will be
  * allocated. */
-#if 0
-extern void rxevent_Init( /* nEvents, scheduler */ );
-#endif
-
-/* Get the expiration time for the next event */
-#if 0
-extern void exevent_NextEvent( /* when */ );
-#endif
+extern void rxevent_Init( int nEvents, void (*scheduler)(void) );
 
-/* Arrange for the indicated event at the appointed time.  When is a
+/* Arrange for the indicated event at the appointed time.  when is a
  * "struct clock", in the clock.c time base */
-#if 0
-extern struct rxevent *rxevent_Post( /* when, func, arg, arg1 */ );
-#endif
+struct clock;
+struct rxevent;
+extern struct rxevent *rxevent_Post(struct clock *when, struct clock *now,
+                                   void (*func) (struct rxevent *, void *,
+                                                 void *, int),
+                                   void *arg, void *arg1, int arg2);
 
 /* Remove the indicated event from the event queue.  The event must be
- * pending.  Also see the warning, above.  The event pointer supplied
- * is zeroed.
+ * pending.  Note that a currently executing event may not cancel itself.
  */
-#if 0
-extern struct rxevent *rxevent_Cancel(struct rxevent *, struct rx_call *, int)
-#endif
+struct rx_call;
+extern void rxevent_Cancel(struct rxevent **, struct rx_call *, int type);
 
 /* The actions specified for each event that has reached the current clock
  * time will be taken.  The current time returned by GetTime is used
  * (warning:  this may be an old time if the user has not called
  * clock_NewTime)
  */
-#if 0
-extern int rxevent_RaiseEvents();
-#endif
+extern int rxevent_RaiseEvents(struct clock *wait);
+
+/* Acquire a reference to an event */
+extern struct rxevent *rxevent_Get(struct rxevent *event);
+
+/* Release a reference to an event */
+extern void rxevent_Put(struct rxevent *event);
+
+/* Shutdown the event package */
+extern void shutdown_rxevent(void);
 
 #endif /* _EVENT_ */
index c60b27d..b7cbd0a 100644 (file)
@@ -9,6 +9,9 @@
 
 #ifndef _RX_PACKET_
 #define _RX_PACKET_
+
+#include "rx_queue.h"
+
 #if defined(AFS_NT40_ENV)
 #include "rx_xmit_nt.h"
 #endif
index a9e470c..df83359 100644 (file)
@@ -10,8 +10,12 @@ include @TOP_OBJDIR@/src/config/Makefile.config
 include @TOP_OBJDIR@/src/config/Makefile.lwp
 
 
-LIBS=${TOP_LIBDIR}/librx.a ${TOP_LIBDIR}/liblwp.a ${TOP_LIBDIR}/libcmd.a \
-              ${TOP_LIBDIR}/libsys.a ${TOP_LIBDIR}/libafsutil.a
+LIBS=${TOP_LIBDIR}/librx.a \
+     ${TOP_LIBDIR}/liblwp.a \
+     ${TOP_LIBDIR}/libcmd.a \
+     ${TOP_LIBDIR}/libsys.a \
+     ${TOP_LIBDIR}/libopr.a \
+     ${TOP_LIBDIR}/libafsutil.a
 
 all: rxdebug rxdumptrace
 
index e55b679..30ff565 100644 (file)
@@ -39,7 +39,8 @@ SYSOBJS =\
 UTILOBJS =\
        assert.o \
        casestrcpy.o \
-       base64.o
+       base64.o \
+       rbtree.o
 
 COMERROBJS =\
        error_msg.o \
@@ -277,6 +278,9 @@ casestrcpy.o: ${OPR}/casestrcpy.c
 assert.o: ${OPR}/assert.c
        $(AFS_CCRULE) $(OPR)/assert.c
 
+rbtree.o: ${OPR}/rbtree.c
+       $(AFS_CCRULE) $(OPR)/rbtree.c
+
 base64.o: ${UTIL}/base64.c
        $(AFS_CCRULE) $(UTIL)/base64.c
 
index 14e829b..de86dd1 100644 (file)
@@ -57,6 +57,7 @@ BUTCLIBS=${TOP_LIBDIR}/libbudb.a \
            ${TOP_LIBDIR}/libafsrpc.a \
             ${TOP_LIBDIR}/libcmd.a \
            ${TOP_LIBDIR}/util.a \
+           $(TOP_LIBDIR)/libopr.a \
            ${TOP_LIBDIR}/libusd.a \
            ${TOP_LIBDIR}/libprocmgmt.a
 
index a27ba89..f91b67e 100644 (file)
@@ -20,6 +20,7 @@ LIBS=${TOP_LIBDIR}/libafsint.a \
        ${TOP_LIBDIR}/librx.a \
        ${TOP_LIBDIR}/liblwp.a \
        ${TOP_LIBDIR}/libsys.a \
+       ${TOP_LIBDIR}/libopr.a \
        ${TOP_LIBDIR}/util.a
 
 all: \
index cfbef1b..f62f248 100644 (file)
@@ -9,7 +9,7 @@ include @TOP_OBJDIR@/src/config/Makefile.pthread
 MODULE_CFLAGS = -DSOURCE='"$(abs_top_srcdir)/tests"' \
        -DBUILD='"$(abs_top_builddir)/tests"'
 
-SUBDIRS = tap common auth util cmd volser opr
+SUBDIRS = tap common auth util cmd volser opr rx
 
 all: runtests
        @for A in $(SUBDIRS); do cd $$A && $(MAKE) $@ && cd .. || exit 1; done
index 217546b..f9cb17e 100644 (file)
@@ -9,6 +9,7 @@ opr/queues
 opr/rbtree
 ptserver/pt_util
 ptserver/pts-man
+rx/event
 volser/vos-man
 bucoord/backup-man
 kauth/kas-man
index b6c84be..d373687 100644 (file)
@@ -14,6 +14,7 @@ MODULE_LIBS =         ../tap/libtap.a \
                $(abs_top_builddir)/lib/libafsauthent.a \
                $(abs_top_builddir)/lib/libafsrpc.a \
                $(abs_top_builddir)/lib/libafshcrypto.a \
+               $(abs_top_builddir)/lib/libopr.a \
                $(LIB_rfc3961) $(LIB_roken) -lafsutil\
                $(XLIBS)
 
diff --git a/tests/rx/Makefile.in b/tests/rx/Makefile.in
new file mode 100644 (file)
index 0000000..2ff8a76
--- /dev/null
@@ -0,0 +1,24 @@
+# Build rules for the OpenAFS RX test suite.
+
+srcdir=@srcdir@
+abs_top_builddir=@abs_top_builddir@
+include @TOP_OBJDIR@/src/config/Makefile.config
+include @TOP_OBJDIR@/src/config/Makefile.pthread
+
+MODULE_CFLAGS = -I$(srcdir)/..
+
+LIBS = ../tap/libtap.a \
+       $(abs_top_builddir)/lib/libafsrpc.a \
+       $(abs_top_builddir)/lib/libopr.a
+
+tests = event-t
+
+all check test tests: $(tests)
+
+event-t: event-t.o $(LIBS)
+       $(AFS_LDRULE) event-t.o $(LIBS) $(XLIBS)
+
+install:
+
+clean distclean:
+       $(RM) -f $(tests) *.o core
diff --git a/tests/rx/event-t.c b/tests/rx/event-t.c
new file mode 100644 (file)
index 0000000..c66ef71
--- /dev/null
@@ -0,0 +1,178 @@
+/* A simple test of the rx event layer */
+
+#include <afsconfig.h>
+#include <afs/param.h>
+
+#include <roken.h>
+#include <pthread.h>
+
+#include <tap/basic.h>
+
+#include "rx/rx_event.h"
+#include "rx/rx_clock.h"
+
+#define NUMEVENTS 10000
+
+/* Mutexes and condvars for the scheduler */
+static int rescheduled = 0;
+static pthread_mutex_t eventMutex;
+static pthread_cond_t eventCond;
+
+/* Mutexes and condvars for the event list */
+
+static pthread_mutex_t eventListMutex;
+struct testEvent {
+    struct rxevent *event;
+    int fired;
+    int cancelled;
+};
+
+static struct testEvent events[NUMEVENTS];
+
+static void
+reschedule(void)
+{
+    pthread_mutex_lock(&eventMutex);
+    pthread_cond_signal(&eventCond);
+    rescheduled = 1;
+    pthread_mutex_unlock(&eventMutex);
+    return;
+}
+
+static void
+eventSub(struct rxevent *event, void *arg, void *arg1, int arg2)
+{
+    struct testEvent *evrecord = arg;
+
+    pthread_mutex_lock(&eventListMutex);
+    rxevent_Put(evrecord->event);
+    evrecord->event = NULL;
+    evrecord->fired = 1;
+    pthread_mutex_unlock(&eventListMutex);
+    return;
+}
+
+static void
+reportSub(struct rxevent *event, void *arg, void *arg1, int arg2)
+{
+    printf("Event fired\n");
+}
+
+static void *
+eventHandler(void *dummy) {
+    struct timespec nextEvent;
+    struct clock cv;
+    struct clock next;
+
+    pthread_mutex_lock(&eventMutex);
+    while (1) {
+       pthread_mutex_unlock(&eventMutex);
+
+       next.sec = 30;
+       next.usec = 0;
+       clock_GetTime(&cv);
+       rxevent_RaiseEvents(&next);
+
+       pthread_mutex_lock(&eventMutex);
+
+       /* If we were rescheduled whilst running the event queue,
+        * process the queue again */
+       if (rescheduled) {
+           rescheduled = 0;
+           continue;
+       }
+
+       clock_Add(&cv, &next);
+       nextEvent.tv_sec = cv.sec;
+       nextEvent.tv_nsec = cv.usec * 1000;
+       pthread_cond_timedwait(&eventCond, &eventMutex, &nextEvent);
+    }
+    pthread_mutex_unlock(&eventMutex);
+
+    return NULL;
+}
+
+int
+main(void)
+{
+    int when, counter, fail, fired, cancelled;
+    struct clock now, eventTime;
+    struct rxevent *event;
+    pthread_t handler;
+
+    plan(8);
+
+    pthread_mutex_init(&eventMutex, NULL);
+    pthread_cond_init(&eventCond, NULL);
+
+    memset(events, sizeof(events), 0);
+    pthread_mutex_init(&eventListMutex, NULL);
+
+    /* Start up the event system */
+    rxevent_Init(20, reschedule);
+    ok(1, "Started event subsystem");
+
+    clock_GetTime(&now);
+    /* Test for a problem when there is only a single event in the tree */
+    event = rxevent_Post(&now, &now, reportSub, NULL, NULL, 0);
+    ok(event != NULL, "Created a single event");
+    rxevent_Cancel(&event, NULL, 0);
+    ok(1, "Cancelled a single event");
+    rxevent_RaiseEvents(&now);
+    ok(1, "RaiseEvents happened without error");
+
+    ok(pthread_create(&handler, NULL, eventHandler, NULL) == 0,
+       "Created handler thread");
+
+    /* Add 1000 random events to fire over the next 3 seconds */
+
+    for (counter = 0; counter < NUMEVENTS; counter++) {
+        when = random() % 3000;
+       clock_GetTime(&now);
+       eventTime = now;
+       clock_Addmsec(&eventTime, when);
+       pthread_mutex_lock(&eventListMutex);
+       events[counter].event
+           = rxevent_Post(&eventTime, &now, eventSub, &events[counter], NULL, 0);
+
+       /* A 10% chance that we will schedule another event at the same time */
+       if (counter!=999 && random() % 10 == 0) {
+            counter++;
+            events[counter].event
+                = rxevent_Post(&eventTime, &now, eventSub, &events[counter],
+                               NULL, 0);
+       }
+
+       /* A 25% chance that we will cancel a random event */
+       if (random() % 4 == 0) {
+           int victim = random() % counter;
+
+           if (events[victim].event != NULL) {
+               rxevent_Cancel(&events[victim].event, NULL, 0);
+               events[victim].cancelled = 1;
+           }
+       }
+       pthread_mutex_unlock(&eventListMutex);
+    }
+
+    ok(1, "Added %d events", NUMEVENTS);
+
+    sleep(3);
+
+    fired = 0;
+    cancelled = 0;
+    fail = 0;
+    for (counter = 0; counter < NUMEVENTS; counter++) {
+       if (events[counter].fired)
+           fired++;
+       if (events[counter].cancelled)
+           cancelled++;
+       if (events[counter].cancelled && events[counter].fired)
+           fail = 1;
+    }
+    ok(!fail, "Didn't fire any cancelled events");
+    ok(fired+cancelled == NUMEVENTS,
+       "Number of fired and cancelled events sum to correct total");
+
+    return 0;
+}
index db64cbd..c859f39 100644 (file)
@@ -20,6 +20,7 @@ MODULE_LIBS =         ../tap/libtap.a \
                $(abs_top_builddir)/lib/libafsauthent.a \
                $(abs_top_builddir)/lib/libafsrpc.a \
                $(abs_top_builddir)/lib/libafshcrypto.a \
+               $(abs_top_builddir)/lib/libopr.a \
                $(LIB_rfc3961) $(LIB_roken) -lafsutil\
                $(XLIBS)