ubik: log important messages at default log level
[openafs.git] / src / ubik / ubik.c
index 16584b6..a8bd2e9 100644 (file)
 
 #include <roken.h>
 
-#include <sys/types.h>
-#include <string.h>
-#include <stdarg.h>
-#include <time.h>
 
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
+#include <afs/opr.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
 #else
-#include <sys/file.h>
-#include <netinet/in.h>
-#include <sys/param.h>
+# include <opr/lockstub.h>
 #endif
 
 #include <lock.h>
-#include <rx/xdr.h>
 #include <rx/rx.h>
 #include <afs/cellconfig.h>
+#include <afs/afsutil.h>
+
 
 #define UBIK_INTERNALS
 #include "ubik.h"
@@ -83,31 +79,52 @@ afs_int32 ubik_quorum = 0;
 struct ubik_dbase *ubik_dbase = 0;
 struct ubik_stats ubik_stats;
 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
-afs_int32 ubik_epochTime = 0;
 afs_int32 urecovery_state = 0;
-int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
-void *ubik_SRXSecurityRock;
 int (*ubik_SyncWriterCacheProc) (void);
 struct ubik_server *ubik_servers;
 short ubik_callPortal;
 
+/* These global variables were used to control the server security layers.
+ * They are retained for backwards compatibility with legacy callers.
+ *
+ * The ubik_SetServerSecurityProcs() interface should be used instead.
+ */
+
+int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
+void *ubik_SRXSecurityRock;
+int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
+void *ubik_CheckRXSecurityRock;
+
+
+
 static int BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
                      struct ubik_trans **transPtr, int readAny);
 
-struct rx_securityClass *ubik_sc[3];
+static struct rx_securityClass **ubik_sc = NULL;
+static void (*buildSecClassesProc)(void *, struct rx_securityClass ***,
+                                  afs_int32 *) = NULL;
+static int (*checkSecurityProc)(void *, struct rx_call *) = NULL;
+static void *securityRock = NULL;
+
+struct version_data version_globals;
 
 #define        CStampVersion       1   /* meaning set ts->version */
+#define        CCheckSyncAdvertised        2   /* check if the remote knows we are the sync-site */
 
 static_inline struct rx_connection *
 Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
 {
     struct rx_connection *conn;
 
+    UBIK_ADDR_LOCK;
     conn = as->disk_rxcid;
 
 #ifdef AFS_PTHREAD_ENV
     rx_GetConnection(conn);
+    UBIK_ADDR_UNLOCK;
     DBRELE(atrans->dbase);
+#else
+    UBIK_ADDR_UNLOCK;
 #endif /* AFS_PTHREAD_ENV */
 
     return conn;
@@ -122,6 +139,80 @@ Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
 #endif /* AFS_PTHREAD_ENV */
 }
 
+
+/*
+ * Iterate over all servers.  Callers pass in *ts which is used to track
+ * the current server.
+ * - Returns 1 if there are no more servers
+ * - Returns 0 with conn set to the connection for the current server if
+ *   it's up and current
+ */
+static int
+ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
+                        struct rx_connection **conn, afs_int32 *rcode,
+                        afs_int32 *okcalls, afs_int32 code, const char *procname)
+{
+    if (!*ts) {
+       /* Initial call - start iterating over servers */
+       *ts = ubik_servers;
+       *conn = NULL;
+       *rcode = 0;
+       *okcalls = 0;
+    } else {
+       if (*conn) {
+           Quorum_EndIO(atrans, *conn);
+           *conn = NULL;
+           if (code) {         /* failure */
+               char hoststr[16];
+
+               *rcode = code;
+               UBIK_BEACON_LOCK;
+               (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
+               (*ts)->beaconSinceDown = 0;
+               UBIK_BEACON_UNLOCK;
+               (*ts)->currentDB = 0;
+               urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
+               ViceLog(0, ("Server %s is marked down due to %s code %d\n",
+                           afs_inet_ntoa_r((*ts)->addr[0], hoststr), procname, *rcode));
+           } else {            /* success */
+               if (!(*ts)->isClone)
+                   (*okcalls)++;       /* count up how many worked */
+               if (aflags & CStampVersion) {
+                   (*ts)->version = atrans->dbase->version;
+               }
+           }
+       }
+       *ts = (*ts)->next;
+    }
+    if (!(*ts))
+       return 1;
+    UBIK_BEACON_LOCK;
+    if (!(*ts)->up || !(*ts)->currentDB ||
+       /* do not call DISK_Begin until we know that lastYesState is set on the
+        * remote in question; otherwise, DISK_Begin will fail. */
+       ((aflags & CCheckSyncAdvertised) && !((*ts)->beaconSinceDown && (*ts)->lastVote))) {
+       UBIK_BEACON_UNLOCK;
+       (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
+       return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
+    }
+    UBIK_BEACON_UNLOCK;
+    *conn = Quorum_StartIO(atrans, *ts);
+    return 0;
+}
+
+static int
+ContactQuorum_rcode(int okcalls, afs_int32 rcode)
+{
+    /*
+     * return 0 if we successfully contacted a quorum, otherwise return error code.
+     * We don't have to contact ourselves (that was done locally)
+     */
+    if (okcalls + 1 >= ubik_quorum)
+       return 0;
+    else
+       return (rcode != 0) ? rcode : UNOQUORUM;
+}
+
 /*!
  * \brief Perform an operation at a quorum, handling error conditions.
  * \return 0 if all worked and a quorum was contacted successfully
@@ -136,318 +227,144 @@ Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
  * because it is sent the sync count along with the beacon message that
  * marks it as \b really up (\p beaconSinceDown).
  */
-afs_int32
+static afs_int32
 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
-                         struct ubik_trans *atrans, int aflags)
+                         struct ubik_trans *atrans, int aflags, const char *procname)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
+    int done;
 
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = (*proc)(conn, &atrans->tid);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+    while (!done) {
+       if (conn)
+           code = (*proc)(conn, &atrans->tid);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
-afs_int32
+
+static afs_int32
 ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
                        afs_int32 position, afs_int32 length, afs_int32 type)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
-    struct rx_connection *conn;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
-    }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
-}
-
-afs_int32
-ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
-                        afs_int32 file, afs_int32 position, bulkdata *data)
-{
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_Write(conn, &atrans->tid, file, position, data);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    int done;
+    char *procname = "DISK_Lock";
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+    while (!done) {
+       if (conn)
+           code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
-afs_int32
+static afs_int32
 ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
                            afs_int32 file, afs_int32 length)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_Truncate(conn, &atrans->tid, file, length);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    int done;
+    char *procname = "DISK_Truncate";
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+    while (!done) {
+       if (conn)
+           code = DISK_Truncate(conn, &atrans->tid, file, length);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
-afs_int32
+
+static afs_int32
 ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                          iovec_wrt * io_vector, iovec_buf *io_buffer)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if ((code <= -450) && (code > -500)) {
-           /* An RPC interface mismatch (as defined in comerr/error_msg.c).
-            * Un-bulk the entries and do individual DISK_Write calls
-            * instead of DISK_WriteV.
-            */
-           struct ubik_iovec *iovec =
-               (struct ubik_iovec *)io_vector->iovec_wrt_val;
-           char *iobuf = (char *)io_buffer->iovec_buf_val;
-           bulkdata tcbs;
-           afs_int32 i, offset;
-
-           conn = Quorum_StartIO(atrans, ts);
-
-           for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
-               /* Sanity check for going off end of buffer */
-               if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
-                   code = UINTERNAL;
-                   break;
+    int done;
+    char *procname = "DISK_WriteV";
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+    while (!done) {
+       if (conn) {
+           procname = "DISK_WriteV";   /* in case previous fallback to DISK_Write */
+           code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+           if ((code <= -450) && (code > -500)) {
+               /* An RPC interface mismatch (as defined in comerr/error_msg.c).
+                * Un-bulk the entries and do individual DISK_Write calls
+                * instead of DISK_WriteV.
+                */
+               struct ubik_iovec *iovec =
+                       (struct ubik_iovec *)io_vector->iovec_wrt_val;
+               char *iobuf = (char *)io_buffer->iovec_buf_val;
+               bulkdata tcbs;
+               afs_int32 i, offset;
+
+               procname = "DISK_Write";        /* for accurate error msg, if any */
+               for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
+                   /* Sanity check for going off end of buffer */
+                   if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
+                       code = UINTERNAL;
+                       break;
+                   }
+                   tcbs.bulkdata_len = iovec[i].length;
+                   tcbs.bulkdata_val = &iobuf[offset];
+                   code = DISK_Write(conn, &atrans->tid, iovec[i].file,
+                          iovec[i].position, &tcbs);
+                   if (code)
+                       break;
+                   offset += iovec[i].length;
                }
-               tcbs.bulkdata_len = iovec[i].length;
-               tcbs.bulkdata_val = &iobuf[offset];
-
-               code =
-                   DISK_Write(conn, &atrans->tid, iovec[i].file,
-                              iovec[i].position, &tcbs);
-               if (code)
-                   break;
-
-               offset += iovec[i].length;
-           }
-
-           Quorum_EndIO(atrans, conn);
-           conn = NULL;
-       }
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
            }
        }
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
 ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
                              ubik_version *OldVersion,
                              ubik_version *NewVersion)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    int done;
+    char *procname = "DISK_SetVersion";
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+    while (!done) {
+       if (conn)
+           code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+#if defined(AFS_PTHREAD_ENV)
+static int
+ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
+    opr_Verify(pthread_attr_init(tattr) == 0);
+    opr_Verify(pthread_attr_setdetachstate(tattr,
+                                          PTHREAD_CREATE_DETACHED) == 0);
+    opr_Verify(pthread_create(thread, tattr, proc, NULL) == 0);
+    return 0;
+}
+#endif
+
 /*!
  * \brief This routine initializes the ubik system for a set of servers.
  * \return 0 for success, or an error code on failure.
@@ -461,7 +378,7 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
  *
  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
  */
-int
+static int
 ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
                      struct afsconf_cell *info, char clones[],
                      afs_uint32 serverList[], const char *pathName,
@@ -483,19 +400,23 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
 
     afs_int32 secIndex;
     struct rx_securityClass *secClass;
+    int numClasses;
 
     struct rx_service *tservice;
 
     initialize_U_error_table();
 
-    tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
-    tdb->pathName = (char *)malloc(strlen(pathName) + 1);
-    strcpy(tdb->pathName, pathName);
+    tdb = malloc(sizeof(struct ubik_dbase));
+    tdb->pathName = strdup(pathName);
     tdb->activeTrans = (struct ubik_trans *)0;
     memset(&tdb->version, 0, sizeof(struct ubik_version));
     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
 #ifdef AFS_PTHREAD_ENV
-    MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
+    opr_mutex_init(&tdb->versionLock);
+    opr_mutex_init(&beacon_globals.beacon_lock);
+    opr_mutex_init(&vote_globals.vote_lock);
+    opr_mutex_init(&addr_globals.addr_lock);
+    opr_mutex_init(&version_globals.version_lock);
 #else
     Lock_Init(&tdb->versionLock);
 #endif
@@ -516,8 +437,8 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     ubik_dbase = tdb;          /* for now, only one db per server; can fix later when we have names for the other dbases */
 
 #ifdef AFS_PTHREAD_ENV
-    CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
-    CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
+    opr_cv_init(&tdb->version_cond);
+    opr_cv_init(&tdb->flags_cond);
 #endif /* AFS_PTHREAD_ENV */
 
     /* initialize RX */
@@ -529,47 +450,57 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
        return code;
 
     ubik_callPortal = myPort;
+
+    udisk_Init(ubik_nBuffers);
+    ulock_Init();
+
+    code = uvote_Init();
+    if (code)
+       return code;
+    code = urecovery_Initialize(tdb);
+    if (code)
+       return code;
+    if (info)
+       code = ubeacon_InitServerListByInfo(myHost, info, clones);
+    else
+       code = ubeacon_InitServerList(myHost, serverList);
+    if (code)
+       return code;
+
     /* try to get an additional security object */
-    ubik_sc[0] = rxnull_NewServerSecurityObject();
-    ubik_sc[1] = 0;
-    ubik_sc[2] = 0;
-    if (ubik_SRXSecurityProc) {
-       code =
-           (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
-                                    &secIndex);
-       if (code == 0) {
-           ubik_sc[secIndex] = secClass;
+    if (buildSecClassesProc == NULL) {
+       numClasses = 3;
+       ubik_sc = calloc(numClasses, sizeof(struct rx_securityClass *));
+       ubik_sc[0] = rxnull_NewServerSecurityObject();
+       if (ubik_SRXSecurityProc) {
+           code = (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock,
+                                           &secClass,
+                                           &secIndex);
+           if (code == 0) {
+                ubik_sc[secIndex] = secClass;
+           }
        }
+    } else {
+        (*buildSecClassesProc) (securityRock, &ubik_sc, &numClasses);
     }
     /* for backwards compat this should keep working as it does now
        and not host bind */
-#if 0
-    /* This really needs to be up above, where I have put it.  It works
-     * here when we're non-pthreaded, but the code above, when using
-     * pthreads may (and almost certainly does) end up calling on a
-     * pthread resource which gets initialized by rx_Init.  The end
-     * result is that an assert fails and the program dies. -- klm
-     */
-    code = rx_Init(myPort);
-    if (code < 0)
-       return code;
-#endif
 
     tservice =
-       rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
+       rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, numClasses,
                      VOTE_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
-       ubik_dprint("Could not create VOTE rx service!\n");
+       ViceLog(0, ("Could not create VOTE rx service!\n"));
        return -1;
     }
     rx_SetMinProcs(tservice, 2);
     rx_SetMaxProcs(tservice, 3);
 
     tservice =
-       rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
+       rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, numClasses,
                      DISK_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
-       ubik_dprint("Could not create DISK rx service!\n");
+       ViceLog(0, ("Could not create DISK rx service!\n"));
        return -1;
     }
     rx_SetMinProcs(tservice, 2);
@@ -580,41 +511,21 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
      * the "steplock" problem in ubik initialization. Defect 11037.
      */
 #ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
-    osi_Assert(pthread_attr_init(&rxServer_tattr) == 0);
-    osi_Assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    osi_Assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
-
-    osi_Assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
+    ubik_thread_create(&rxServer_tattr, &rxServerThread, (void *)rx_ServerProc);
 #else
     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
               NULL, "rx_ServerProc", &junk);
 #endif
 
-    /* do basic initialization */
-    code = uvote_Init();
-    if (code)
-       return code;
-    code = urecovery_Initialize(tdb);
-    if (code)
-       return code;
-    if (info)
-       code = ubeacon_InitServerListByInfo(myHost, info, clones);
-    else
-       code = ubeacon_InitServerList(myHost, serverList);
+    /* send addrs to all other servers */
+    code = ubeacon_updateUbikNetworkAddress(ubik_host);
     if (code)
        return code;
 
     /* now start up async processes */
 #ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
-    osi_Assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
-    osi_Assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    osi_Assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
-    /*  need another attr set here for priority???  - klm */
-
-    osi_Assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
-           (void *)ubeacon_Interact, NULL) == 0);
+    ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
+               (void *)ubeacon_Interact);
 #else
     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
                             LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
@@ -624,15 +535,8 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
 #endif
 
 #ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
-    osi_Assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
-    osi_Assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/*    osi_Assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
-    /*  need another attr set here for priority???  - klm */
-
-    osi_Assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
-           (void *)urecovery_Interact, NULL) == 0);
-
+    ubik_thread_create(&urecovery_Interact_tattr, &urecovery_InteractThread,
+               (void *)urecovery_Interact);
     return 0;  /* is this correct?  - klm */
 #else
     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
@@ -694,54 +598,20 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
     struct ubik_trans *jt;
     struct ubik_trans *tt;
     afs_int32 code;
-#if defined(UBIK_PAUSE)
-    int count;
-#endif /* UBIK_PAUSE */
 
     if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
        /* it's not safe to use ubik_BeginTransReadAnyWrite without a
         * cache-syncing function; fall back to ubik_BeginTransReadAny,
         * which is safe but slower */
-       ubik_print("ubik_BeginTransReadAnyWrite called, but "
+       ViceLog(0, ("ubik_BeginTransReadAnyWrite called, but "
                   "ubik_SyncWriterCacheProc not set; pretending "
-                  "ubik_BeginTransReadAny was called instead\n");
+                  "ubik_BeginTransReadAny was called instead\n"));
        readAny = 1;
     }
 
     if ((transMode != UBIK_READTRANS) && readAny)
        return UBADTYPE;
     DBHOLD(dbase);
-#if defined(UBIK_PAUSE)
-    /* if we're polling the slave sites, wait until the returns
-     *  are all in.  Otherwise, the urecovery_CheckTid call may
-     *  glitch us.
-     */
-    if (transMode == UBIK_WRITETRANS)
-       for (count = 75; dbase->flags & DBVOTING; --count) {
-           DBRELE(dbase);
-#ifdef GRAND_PAUSE_DEBUGGING
-           if (count == 75)
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-           else
-#endif
-           if (count <= 0) {
-#if 1
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-#endif
-               return UNOQUORUM;       /* a white lie */
-           }
-#ifdef AFS_PTHREAD_ENV
-           sleep(2);
-#else
-           IOMGR_Sleep(2);
-#endif
-           DBHOLD(dbase);
-       }
-#endif /* UBIK_PAUSE */
     if (urecovery_AllBetter(dbase, readAny) == 0) {
        DBRELE(dbase);
        return UNOQUORUM;
@@ -756,7 +626,7 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
        /* if we're writing already, wait */
        while (dbase->flags & DBWRITING) {
 #ifdef AFS_PTHREAD_ENV
-           CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
+           opr_cv_wait(&dbase->flags_cond, &dbase->versionLock);
 #else
            DBRELE(dbase);
            LWP_WaitProcess(&dbase->flags);
@@ -768,15 +638,21 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
            DBRELE(dbase);
            return UNOTSYNC;
        }
+       if (!ubeacon_SyncSiteAdvertised()) {
+           /* i am the sync-site but the remotes are not aware yet */
+           DBRELE(dbase);
+           return UNOQUORUM;
+       }
     }
 
     /* create the transaction */
     code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
     tt = jt;                   /* move to a register */
-    if (code || tt == (struct ubik_trans *)NULL) {
+    if (code || tt == NULL) {
        DBRELE(dbase);
        return code;
     }
+    UBIK_VERSION_LOCK;
     if (readAny) {
        tt->flags |= TRREADANY;
        if (readAny > 1) {
@@ -784,20 +660,25 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
        }
     }
     /* label trans and dbase with new tid */
-    tt->tid.epoch = ubik_epochTime;
+    tt->tid.epoch = version_globals.ubik_epochTime;
     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
     tt->tid.counter = (dbase->tidCounter += 2);
 
     if (transMode == UBIK_WRITETRANS) {
        /* for a write trans, we have to keep track of the write tid counter too */
        dbase->writeTidCounter = tt->tid.counter;
+    }
 
+    UBIK_VERSION_UNLOCK;
+
+    if (transMode == UBIK_WRITETRANS) {
        /* next try to start transaction on appropriate number of machines */
-       code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
+       code = ContactQuorum_NoArguments(DISK_Begin, tt, CCheckSyncAdvertised, "DISK_Begin");
        if (code) {
            /* we must abort the operation */
            udisk_abort(tt);
-           ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
+           /* force aborts to the others */
+           ContactQuorum_NoArguments(DISK_Abort, tt, 0, "DISK_Abort");
            udisk_end(tt);
            DBRELE(dbase);
            return code;
@@ -887,7 +768,7 @@ ubik_AbortTrans(struct ubik_trans *transPtr)
     }
 
     /* now it is safe to try remote abort */
-    code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
+    code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
     code2 = udisk_abort(transPtr);
     udisk_end(transPtr);
     DBRELE(dbase);
@@ -984,7 +865,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
 
        ReleaseWriteLock(&dbase->cache_lock);
 
-       code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
+       code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion, "DISK_Commit");
 
     } else {
        memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
@@ -998,7 +879,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
         * we lose.  If we contact a majority of sites, then we won't be here: contacting
         * a majority guarantees commit, since it guarantees that one dude will be a
         * member of the next quorum. */
-       ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+       ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
        udisk_end(transPtr);
        DBRELE(dbase);
        goto error;
@@ -1016,11 +897,13 @@ ubik_EndTrans(struct ubik_trans *transPtr)
         * to us, or timeout.  Put safety check in anyway */
        if (now - realStart > 10 * BIGTIME) {
            ubik_stats.escapes++;
-           ubik_print("ubik escaping from commit wait\n");
+           ViceLog(0, ("ubik escaping from commit wait\n"));
            break;
        }
        for (ts = ubik_servers; ts; ts = ts->next) {
+           UBIK_BEACON_LOCK;
            if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
+               UBIK_BEACON_UNLOCK;
 
                /* this guy could have some damaged data, wait for him */
                code = 1;
@@ -1040,6 +923,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
 
                break;
            }
+           UBIK_BEACON_UNLOCK;
        }
        if (code == 0)
            break;              /* no down ones still pseudo-active */
@@ -1050,7 +934,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
      * The transaction is committed anyway, since we succeeded in contacting a quorum
      * at the start (when invoking the DiskCommit function).
      */
-    ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+    ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
 
   success:
     udisk_end(transPtr);
@@ -1114,11 +998,14 @@ ubik_Flush(struct ubik_trans *transPtr)
 
     if (transPtr->type != UBIK_WRITETRANS)
        return UBADTYPE;
+
+    DBHOLD(transPtr->dbase);
     if (!transPtr->iovec_info.iovec_wrt_len
-       || !transPtr->iovec_info.iovec_wrt_val)
+       || !transPtr->iovec_info.iovec_wrt_val) {
+       DBRELE(transPtr->dbase);
        return 0;
+    }
 
-    DBHOLD(transPtr->dbase);
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
        ERROR_EXIT(UNOQUORUM);
     if (!ubeacon_AmSyncSite()) /* only sync site can write */
@@ -1130,7 +1017,8 @@ ubik_Flush(struct ubik_trans *transPtr)
                                  &transPtr->iovec_data);
     if (code) {
        udisk_abort(transPtr);
-       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+       /* force aborts to the others */
+       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
        transPtr->iovec_info.iovec_wrt_len = 0;
        transPtr->iovec_data.iovec_buf_len = 0;
        ERROR_EXIT(code);
@@ -1169,13 +1057,13 @@ ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
        return 0;
     }
 
+    DBHOLD(transPtr->dbase);
     if (!transPtr->iovec_info.iovec_wrt_val) {
        transPtr->iovec_info.iovec_wrt_len = 0;
        transPtr->iovec_info.iovec_wrt_val =
-           (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
-                                       sizeof(struct ubik_iovec));
+           malloc(IOVEC_MAXWRT * sizeof(struct ubik_iovec));
        transPtr->iovec_data.iovec_buf_len = 0;
-       transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
+       transPtr->iovec_data.iovec_buf_val = malloc(IOVEC_MAXBUF);
        if (!transPtr->iovec_info.iovec_wrt_val
            || !transPtr->iovec_data.iovec_buf_val) {
            if (transPtr->iovec_info.iovec_wrt_val)
@@ -1184,6 +1072,7 @@ ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
            if (transPtr->iovec_data.iovec_buf_val)
                free(transPtr->iovec_data.iovec_buf_val);
            transPtr->iovec_data.iovec_buf_val = 0;
+           DBRELE(transPtr->dbase);
            return UNOMEM;
        }
     }
@@ -1191,12 +1080,14 @@ ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
     /* If this write won't fit in the structure, then flush it out and start anew */
     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
        || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
+       /* Can't hold the DB lock over ubik_Flush */
+       DBRELE(transPtr->dbase);
        code = ubik_Flush(transPtr);
        if (code)
            return (code);
+       DBHOLD(transPtr->dbase);
     }
 
-    DBHOLD(transPtr->dbase);
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
        ERROR_EXIT(UNOQUORUM);
     if (!ubeacon_AmSyncSite()) /* only sync site can write */
@@ -1303,7 +1194,8 @@ ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
     if (code) {
        /* we must abort the operation */
        udisk_abort(transPtr);
-       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+       /* force aborts to the others */
+       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
        ERROR_EXIT(code);
     }
 
@@ -1350,7 +1242,8 @@ ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
        if (code) {
            /* we must abort the operation */
            udisk_abort(atrans);
-           ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
+           /* force aborts to the others */
+           ContactQuorum_NoArguments(DISK_Abort, atrans, 0, "DISK_Abort");
            ERROR_EXIT(code);
        }
     }
@@ -1375,7 +1268,7 @@ ubik_WaitVersion(struct ubik_dbase *adatabase,
            return 0;
        }
 #ifdef AFS_PTHREAD_ENV
-       CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
+       opr_cv_wait(&adatabase->version_cond, &adatabase->versionLock);
 #else
        DBRELE(adatabase);
        LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
@@ -1391,7 +1284,9 @@ int
 ubik_GetVersion(struct ubik_trans *atrans,
                struct ubik_version *avers)
 {
+    DBHOLD(atrans->dbase);
     *avers = atrans->dbase->version;
+    DBRELE(atrans->dbase);
     return 0;
 }
 
@@ -1492,12 +1387,12 @@ panic(char *format, ...)
     va_list ap;
 
     va_start(ap, format);
-    ubik_print("Ubik PANIC: ");
-    ubik_vprint(format, ap);
+    ViceLog(0, ("Ubik PANIC:\n"));
+    vViceLog(0, (format, ap));
     va_end(ap);
 
     abort();
-    ubik_print("BACK FROM ABORT\n");   /* shouldn't come back */
+    ViceLog(0, ("BACK FROM ABORT\n")); /* shouldn't come back */
     exit(1);                   /* never know, though  */
 }
 
@@ -1511,9 +1406,36 @@ ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
     struct ubik_server *ts;
     int j;
 
+    UBIK_ADDR_LOCK;
     for (ts = ubik_servers; ts; ts = ts->next)
        for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
-           if (ts->addr[j] == addr)
+           if (ts->addr[j] == addr) {
+               UBIK_ADDR_UNLOCK;
                return ts->addr[0];     /* net byte order */
+           }
+    UBIK_ADDR_UNLOCK;
     return 0;                  /* if not in server database, return error */
 }
+
+int
+ubik_CheckAuth(struct rx_call *acall)
+{
+    if (checkSecurityProc)
+       return (*checkSecurityProc) (securityRock, acall);
+    else if (ubik_CheckRXSecurityProc) {
+       return (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
+    } else
+       return 0;
+}
+
+void
+ubik_SetServerSecurityProcs(void (*buildproc) (void *,
+                                              struct rx_securityClass ***,
+                                              afs_int32 *),
+                           int (*checkproc) (void *, struct rx_call *),
+                           void *rock)
+{
+    buildSecClassesProc = buildproc;
+    checkSecurityProc = checkproc;
+    securityRock = rock;
+}