ubik: Rename flags to dbFlags
[openafs.git] / src / ubik / ubik.c
index 6f81eee..6adf9dd 100644 (file)
 
 #include <roken.h>
 
+
+#include <afs/opr.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
+#else
+# include <opr/lockstub.h>
+#endif
+
 #include <lock.h>
-#include <rx/xdr.h>
 #include <rx/rx.h>
 #include <afs/cellconfig.h>
+#include <afs/afsutil.h>
+
 
 #define UBIK_INTERNALS
 #include "ubik.h"
@@ -70,7 +79,6 @@ afs_int32 ubik_quorum = 0;
 struct ubik_dbase *ubik_dbase = 0;
 struct ubik_stats ubik_stats;
 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
-afs_int32 ubik_epochTime = 0;
 afs_int32 urecovery_state = 0;
 int (*ubik_SyncWriterCacheProc) (void);
 struct ubik_server *ubik_servers;
@@ -101,6 +109,7 @@ static void *securityRock = NULL;
 struct version_data version_globals;
 
 #define        CStampVersion       1   /* meaning set ts->version */
+#define        CCheckSyncAdvertised        2   /* check if the remote knows we are the sync-site */
 
 static_inline struct rx_connection *
 Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
@@ -141,7 +150,7 @@ Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
 static int
 ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
                         struct rx_connection **conn, afs_int32 *rcode,
-                        afs_int32 *okcalls, afs_int32 code)
+                        afs_int32 *okcalls, afs_int32 code, const char *procname)
 {
     if (!*ts) {
        /* Initial call - start iterating over servers */
@@ -154,6 +163,8 @@ ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server
            Quorum_EndIO(atrans, *conn);
            *conn = NULL;
            if (code) {         /* failure */
+               char hoststr[16];
+
                *rcode = code;
                UBIK_BEACON_LOCK;
                (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
@@ -161,6 +172,8 @@ ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server
                UBIK_BEACON_UNLOCK;
                (*ts)->currentDB = 0;
                urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
+               ViceLog(0, ("Server %s is marked down due to %s code %d\n",
+                           afs_inet_ntoa_r((*ts)->addr[0], hoststr), procname, *rcode));
            } else {            /* success */
                if (!(*ts)->isClone)
                    (*okcalls)++;       /* count up how many worked */
@@ -174,7 +187,10 @@ ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server
     if (!(*ts))
        return 1;
     UBIK_BEACON_LOCK;
-    if (!(*ts)->up || !(*ts)->currentDB) {
+    if (!(*ts)->up || !(*ts)->currentDB ||
+       /* do not call DISK_Begin until we know that lastYesState is set on the
+        * remote in question; otherwise, DISK_Begin will fail. */
+       ((aflags & CCheckSyncAdvertised) && !((*ts)->beaconSinceDown && (*ts)->lastVote))) {
        UBIK_BEACON_UNLOCK;
        (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
        return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
@@ -194,7 +210,7 @@ ContactQuorum_rcode(int okcalls, afs_int32 rcode)
     if (okcalls + 1 >= ubik_quorum)
        return 0;
     else
-       return rcode;
+       return (rcode != 0) ? rcode : UNOQUORUM;
 }
 
 /*!
@@ -211,26 +227,26 @@ ContactQuorum_rcode(int okcalls, afs_int32 rcode)
  * because it is sent the sync count along with the beacon message that
  * marks it as \b really up (\p beaconSinceDown).
  */
-afs_int32
+static afs_int32
 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
-                         struct ubik_trans *atrans, int aflags)
+                         struct ubik_trans *atrans, int aflags, const char *procname)
 {
     struct ubik_server *ts = NULL;
     afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
     int done;
 
-    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     while (!done) {
        if (conn)
            code = (*proc)(conn, &atrans->tid);
-       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
     return ContactQuorum_rcode(okcalls, rcode);
 }
 
 
-afs_int32
+static afs_int32
 ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
                        afs_int32 position, afs_int32 length, afs_int32 type)
 {
@@ -238,37 +254,18 @@ ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
     afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
     int done;
+    char *procname = "DISK_Lock";
 
-    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     while (!done) {
        if (conn)
            code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
-       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
-    }
-    return ContactQuorum_rcode(okcalls, rcode);
-}
-
-
-afs_int32
-ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
-                        afs_int32 file, afs_int32 position, bulkdata *data)
-{
-    struct ubik_server *ts = NULL;
-    afs_int32 code = 0, rcode, okcalls;
-    struct rx_connection *conn;
-    int done;
-
-    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
-    while (!done) {
-       if (conn)
-           code = DISK_Write(conn, &atrans->tid, file, position, data);
-       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
     return ContactQuorum_rcode(okcalls, rcode);
 }
 
-
-afs_int32
+static afs_int32
 ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
                            afs_int32 file, afs_int32 length)
 {
@@ -276,18 +273,19 @@ ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
     afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
     int done;
+    char *procname = "DISK_Truncate";
 
-    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     while (!done) {
        if (conn)
            code = DISK_Truncate(conn, &atrans->tid, file, length);
-       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
     return ContactQuorum_rcode(okcalls, rcode);
 }
 
 
-afs_int32
+static afs_int32
 ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                          iovec_wrt * io_vector, iovec_buf *io_buffer)
 {
@@ -295,10 +293,12 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
     afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
     int done;
+    char *procname = "DISK_WriteV";
 
-    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     while (!done) {
        if (conn) {
+           procname = "DISK_WriteV";   /* in case previous fallback to DISK_Write */
            code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
            if ((code <= -450) && (code > -500)) {
                /* An RPC interface mismatch (as defined in comerr/error_msg.c).
@@ -311,6 +311,7 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                bulkdata tcbs;
                afs_int32 i, offset;
 
+               procname = "DISK_Write";        /* for accurate error msg, if any */
                for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
                    /* Sanity check for going off end of buffer */
                    if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
@@ -327,7 +328,7 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                }
            }
        }
-       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
     return ContactQuorum_rcode(okcalls, rcode);
 }
@@ -342,12 +343,13 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
     afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
     int done;
+    char *procname = "DISK_SetVersion";
 
-    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     while (!done) {
        if (conn)
            code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
-       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
     }
     return ContactQuorum_rcode(okcalls, rcode);
 }
@@ -355,9 +357,10 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
 #if defined(AFS_PTHREAD_ENV)
 static int
 ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
-    osi_Assert(pthread_attr_init(tattr) == 0);
-    osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
-    osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
+    opr_Verify(pthread_attr_init(tattr) == 0);
+    opr_Verify(pthread_attr_setdetachstate(tattr,
+                                          PTHREAD_CREATE_DETACHED) == 0);
+    opr_Verify(pthread_create(thread, tattr, proc, NULL) == 0);
     return 0;
 }
 #endif
@@ -375,7 +378,7 @@ ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
  *
  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
  */
-int
+static int
 ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
                      struct afsconf_cell *info, char clones[],
                      afs_uint32 serverList[], const char *pathName,
@@ -403,22 +406,18 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
 
     initialize_U_error_table();
 
-    tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
-    tdb->pathName = (char *)malloc(strlen(pathName) + 1);
-    strcpy(tdb->pathName, pathName);
-    tdb->activeTrans = (struct ubik_trans *)0;
-    memset(&tdb->version, 0, sizeof(struct ubik_version));
-    memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
+    tdb = calloc(1, sizeof(*tdb));
+    tdb->pathName = strdup(pathName);
 #ifdef AFS_PTHREAD_ENV
-    MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&beacon_globals.beacon_lock, "beacon lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&vote_globals.vote_lock, "vote lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&addr_globals.addr_lock, "address lock", MUTEX_DEFAULT, 0);
+    opr_mutex_init(&tdb->versionLock);
+    opr_mutex_init(&beacon_globals.beacon_lock);
+    opr_mutex_init(&vote_globals.vote_lock);
+    opr_mutex_init(&addr_globals.addr_lock);
+    opr_mutex_init(&version_globals.version_lock);
 #else
     Lock_Init(&tdb->versionLock);
 #endif
     Lock_Init(&tdb->cache_lock);
-    tdb->flags = 0;
     tdb->read = uphys_read;
     tdb->write = uphys_write;
     tdb->truncate = uphys_truncate;
@@ -428,14 +427,12 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     tdb->getlabel = uphys_getlabel;
     tdb->setlabel = uphys_setlabel;
     tdb->getnfiles = uphys_getnfiles;
-    tdb->readers = 0;
-    tdb->tidCounter = tdb->writeTidCounter = 0;
+    tdb->buffered_append = uphys_buf_append;
     *dbase = tdb;
     ubik_dbase = tdb;          /* for now, only one db per server; can fix later when we have names for the other dbases */
 
 #ifdef AFS_PTHREAD_ENV
-    CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
-    CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
+    opr_cv_init(&tdb->flags_cond);
 #endif /* AFS_PTHREAD_ENV */
 
     /* initialize RX */
@@ -446,6 +443,8 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     if (code < 0)
        return code;
 
+    ubik_callPortal = myPort;
+
     udisk_Init(ubik_nBuffers);
     ulock_Init();
 
@@ -462,7 +461,6 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     if (code)
        return code;
 
-    ubik_callPortal = myPort;
     /* try to get an additional security object */
     if (buildSecClassesProc == NULL) {
        numClasses = 3;
@@ -486,7 +484,7 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
        rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, numClasses,
                      VOTE_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
-       ubik_dprint("Could not create VOTE rx service!\n");
+       ViceLog(0, ("Could not create VOTE rx service!\n"));
        return -1;
     }
     rx_SetMinProcs(tservice, 2);
@@ -496,7 +494,7 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
        rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, numClasses,
                      DISK_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
-       ubik_dprint("Could not create DISK rx service!\n");
+       ViceLog(0, ("Could not create DISK rx service!\n"));
        return -1;
     }
     rx_SetMinProcs(tservice, 2);
@@ -513,6 +511,11 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
               NULL, "rx_ServerProc", &junk);
 #endif
 
+    /* send addrs to all other servers */
+    code = ubeacon_updateUbikNetworkAddress(ubik_host);
+    if (code)
+       return code;
+
     /* now start up async processes */
 #ifdef AFS_PTHREAD_ENV
     ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
@@ -594,9 +597,9 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
        /* it's not safe to use ubik_BeginTransReadAnyWrite without a
         * cache-syncing function; fall back to ubik_BeginTransReadAny,
         * which is safe but slower */
-       ubik_print("ubik_BeginTransReadAnyWrite called, but "
+       ViceLog(0, ("ubik_BeginTransReadAnyWrite called, but "
                   "ubik_SyncWriterCacheProc not set; pretending "
-                  "ubik_BeginTransReadAny was called instead\n");
+                  "ubik_BeginTransReadAny was called instead\n"));
        readAny = 1;
     }
 
@@ -615,12 +618,12 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
      * don't know how to restore one without possibly picking up some data from the other. */
     if (transMode == UBIK_WRITETRANS) {
        /* if we're writing already, wait */
-       while (dbase->flags & DBWRITING) {
+       while (dbase->dbFlags & DBWRITING) {
 #ifdef AFS_PTHREAD_ENV
-           CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
+           opr_cv_wait(&dbase->flags_cond, &dbase->versionLock);
 #else
            DBRELE(dbase);
-           LWP_WaitProcess(&dbase->flags);
+           LWP_WaitProcess(&dbase->dbFlags);
            DBHOLD(dbase);
 #endif
        }
@@ -629,12 +632,17 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
            DBRELE(dbase);
            return UNOTSYNC;
        }
+       if (!ubeacon_SyncSiteAdvertised()) {
+           /* i am the sync-site but the remotes are not aware yet */
+           DBRELE(dbase);
+           return UNOQUORUM;
+       }
     }
 
     /* create the transaction */
     code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
     tt = jt;                   /* move to a register */
-    if (code || tt == (struct ubik_trans *)NULL) {
+    if (code || tt == NULL) {
        DBRELE(dbase);
        return code;
     }
@@ -646,29 +654,32 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
        }
     }
     /* label trans and dbase with new tid */
-    tt->tid.epoch = ubik_epochTime;
+    tt->tid.epoch = version_globals.ubik_epochTime;
     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
     tt->tid.counter = (dbase->tidCounter += 2);
 
     if (transMode == UBIK_WRITETRANS) {
        /* for a write trans, we have to keep track of the write tid counter too */
        dbase->writeTidCounter = tt->tid.counter;
+    }
 
+    UBIK_VERSION_UNLOCK;
+
+    if (transMode == UBIK_WRITETRANS) {
        /* next try to start transaction on appropriate number of machines */
-       code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
+       code = ContactQuorum_NoArguments(DISK_Begin, tt, CCheckSyncAdvertised, "DISK_Begin");
        if (code) {
            /* we must abort the operation */
            udisk_abort(tt);
-           ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
+           /* force aborts to the others */
+           ContactQuorum_NoArguments(DISK_Abort, tt, 0, "DISK_Abort");
            udisk_end(tt);
-           UBIK_VERSION_UNLOCK;
            DBRELE(dbase);
            return code;
        }
     }
 
     *transPtr = tt;
-    UBIK_VERSION_UNLOCK;
     DBRELE(dbase);
     return 0;
 }
@@ -751,7 +762,7 @@ ubik_AbortTrans(struct ubik_trans *transPtr)
     }
 
     /* now it is safe to try remote abort */
-    code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
+    code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
     code2 = udisk_abort(transPtr);
     udisk_end(transPtr);
     DBRELE(dbase);
@@ -848,7 +859,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
 
        ReleaseWriteLock(&dbase->cache_lock);
 
-       code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
+       code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion, "DISK_Commit");
 
     } else {
        memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
@@ -862,7 +873,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
         * we lose.  If we contact a majority of sites, then we won't be here: contacting
         * a majority guarantees commit, since it guarantees that one dude will be a
         * member of the next quorum. */
-       ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+       ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
        udisk_end(transPtr);
        DBRELE(dbase);
        goto error;
@@ -880,7 +891,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
         * to us, or timeout.  Put safety check in anyway */
        if (now - realStart > 10 * BIGTIME) {
            ubik_stats.escapes++;
-           ubik_print("ubik escaping from commit wait\n");
+           ViceLog(0, ("ubik escaping from commit wait\n"));
            break;
        }
        for (ts = ubik_servers; ts; ts = ts->next) {
@@ -917,7 +928,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
      * The transaction is committed anyway, since we succeeded in contacting a quorum
      * at the start (when invoking the DiskCommit function).
      */
-    ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+    ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
 
   success:
     udisk_end(transPtr);
@@ -1000,7 +1011,8 @@ ubik_Flush(struct ubik_trans *transPtr)
                                  &transPtr->iovec_data);
     if (code) {
        udisk_abort(transPtr);
-       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+       /* force aborts to the others */
+       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
        transPtr->iovec_info.iovec_wrt_len = 0;
        transPtr->iovec_data.iovec_buf_len = 0;
        ERROR_EXIT(code);
@@ -1043,10 +1055,9 @@ ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
     if (!transPtr->iovec_info.iovec_wrt_val) {
        transPtr->iovec_info.iovec_wrt_len = 0;
        transPtr->iovec_info.iovec_wrt_val =
-           (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
-                                       sizeof(struct ubik_iovec));
+           malloc(IOVEC_MAXWRT * sizeof(struct ubik_iovec));
        transPtr->iovec_data.iovec_buf_len = 0;
-       transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
+       transPtr->iovec_data.iovec_buf_val = malloc(IOVEC_MAXBUF);
        if (!transPtr->iovec_info.iovec_wrt_val
            || !transPtr->iovec_data.iovec_buf_val) {
            if (transPtr->iovec_info.iovec_wrt_val)
@@ -1177,7 +1188,8 @@ ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
     if (code) {
        /* we must abort the operation */
        udisk_abort(transPtr);
-       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+       /* force aborts to the others */
+       ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
        ERROR_EXIT(code);
     }
 
@@ -1224,7 +1236,8 @@ ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
        if (code) {
            /* we must abort the operation */
            udisk_abort(atrans);
-           ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
+           /* force aborts to the others */
+           ContactQuorum_NoArguments(DISK_Abort, atrans, 0, "DISK_Abort");
            ERROR_EXIT(code);
        }
     }
@@ -1235,41 +1248,6 @@ ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
 }
 
 /*!
- * \brief utility to wait for a version # to change
- */
-int
-ubik_WaitVersion(struct ubik_dbase *adatabase,
-                struct ubik_version *aversion)
-{
-    DBHOLD(adatabase);
-    while (1) {
-       /* wait until version # changes, and then return */
-       if (vcmp(*aversion, adatabase->version) != 0) {
-           DBRELE(adatabase);
-           return 0;
-       }
-#ifdef AFS_PTHREAD_ENV
-       CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
-#else
-       DBRELE(adatabase);
-       LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
-       DBHOLD(adatabase);
-#endif
-    }
-}
-
-/*!
- * \brief utility to get the version of the dbase a transaction is dealing with
- */
-int
-ubik_GetVersion(struct ubik_trans *atrans,
-               struct ubik_version *avers)
-{
-    *avers = atrans->dbase->version;
-    return 0;
-}
-
-/*!
  * \brief Facility to simplify database caching.
  * \return zero if last trans was done on the local server and was successful.
  * \return -1 means bad (NULL) argument.
@@ -1366,13 +1344,13 @@ panic(char *format, ...)
     va_list ap;
 
     va_start(ap, format);
-    ubik_print("Ubik PANIC: ");
-    ubik_vprint(format, ap);
+    ViceLog(0, ("Ubik PANIC:\n"));
+    vViceLog(0, (format, ap));
     va_end(ap);
 
     abort();
-    ubik_print("BACK FROM ABORT\n");   /* shouldn't come back */
-    exit(1);                   /* never know, though  */
+    AFS_UNREACHED(ViceLog(0, ("BACK FROM ABORT\n")));
+    AFS_UNREACHED(exit(1));
 }
 
 /*!