ubik: Return an error from ContactQuorum when inquorate
[openafs.git] / src / ubik / ubik.c
index a0c7afd..229c596 100644 (file)
 
 #include <roken.h>
 
+
+#include <afs/opr.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
+#else
+# include <opr/lockstub.h>
+#endif
+
 #include <lock.h>
-#include <rx/xdr.h>
 #include <rx/rx.h>
 #include <afs/cellconfig.h>
 
+
 #define UBIK_INTERNALS
 #include "ubik.h"
 #include "ubik_int.h"
@@ -70,7 +78,6 @@ afs_int32 ubik_quorum = 0;
 struct ubik_dbase *ubik_dbase = 0;
 struct ubik_stats ubik_stats;
 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
-afs_int32 ubik_epochTime = 0;
 afs_int32 urecovery_state = 0;
 int (*ubik_SyncWriterCacheProc) (void);
 struct ubik_server *ubik_servers;
@@ -194,7 +201,7 @@ ContactQuorum_rcode(int okcalls, afs_int32 rcode)
     if (okcalls + 1 >= ubik_quorum)
        return 0;
     else
-       return rcode;
+       return (rcode != 0) ? rcode : UNOQUORUM;
 }
 
 /*!
@@ -355,9 +362,10 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
 #if defined(AFS_PTHREAD_ENV)
 static int
 ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
-    osi_Assert(pthread_attr_init(tattr) == 0);
-    osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
-    osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
+    opr_Verify(pthread_attr_init(tattr) == 0);
+    opr_Verify(pthread_attr_setdetachstate(tattr,
+                                          PTHREAD_CREATE_DETACHED) == 0);
+    opr_Verify(pthread_create(thread, tattr, proc, NULL) == 0);
     return 0;
 }
 #endif
@@ -403,18 +411,17 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
 
     initialize_U_error_table();
 
-    tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
-    tdb->pathName = (char *)malloc(strlen(pathName) + 1);
-    strcpy(tdb->pathName, pathName);
+    tdb = malloc(sizeof(struct ubik_dbase));
+    tdb->pathName = strdup(pathName);
     tdb->activeTrans = (struct ubik_trans *)0;
     memset(&tdb->version, 0, sizeof(struct ubik_version));
     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
 #ifdef AFS_PTHREAD_ENV
-    MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&beacon_globals.beacon_lock, "beacon lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&vote_globals.vote_lock, "vote lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&addr_globals.addr_lock, "address lock", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&version_globals.version_lock, "version lock", MUTEX_DEFAULT, 0);
+    opr_mutex_init(&tdb->versionLock);
+    opr_mutex_init(&beacon_globals.beacon_lock);
+    opr_mutex_init(&vote_globals.vote_lock);
+    opr_mutex_init(&addr_globals.addr_lock);
+    opr_mutex_init(&version_globals.version_lock);
 #else
     Lock_Init(&tdb->versionLock);
 #endif
@@ -435,8 +442,8 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     ubik_dbase = tdb;          /* for now, only one db per server; can fix later when we have names for the other dbases */
 
 #ifdef AFS_PTHREAD_ENV
-    CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
-    CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
+    opr_cv_init(&tdb->version_cond);
+    opr_cv_init(&tdb->flags_cond);
 #endif /* AFS_PTHREAD_ENV */
 
     /* initialize RX */
@@ -447,6 +454,8 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     if (code < 0)
        return code;
 
+    ubik_callPortal = myPort;
+
     udisk_Init(ubik_nBuffers);
     ulock_Init();
 
@@ -463,7 +472,6 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     if (code)
        return code;
 
-    ubik_callPortal = myPort;
     /* try to get an additional security object */
     if (buildSecClassesProc == NULL) {
        numClasses = 3;
@@ -623,7 +631,7 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
        /* if we're writing already, wait */
        while (dbase->flags & DBWRITING) {
 #ifdef AFS_PTHREAD_ENV
-           CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
+           opr_cv_wait(&dbase->flags_cond, &dbase->versionLock);
 #else
            DBRELE(dbase);
            LWP_WaitProcess(&dbase->flags);
@@ -640,7 +648,7 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
     /* create the transaction */
     code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
     tt = jt;                   /* move to a register */
-    if (code || tt == (struct ubik_trans *)NULL) {
+    if (code || tt == NULL) {
        DBRELE(dbase);
        return code;
     }
@@ -652,14 +660,18 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
        }
     }
     /* label trans and dbase with new tid */
-    tt->tid.epoch = ubik_epochTime;
+    tt->tid.epoch = version_globals.ubik_epochTime;
     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
     tt->tid.counter = (dbase->tidCounter += 2);
 
     if (transMode == UBIK_WRITETRANS) {
        /* for a write trans, we have to keep track of the write tid counter too */
        dbase->writeTidCounter = tt->tid.counter;
+    }
 
+    UBIK_VERSION_UNLOCK;
+
+    if (transMode == UBIK_WRITETRANS) {
        /* next try to start transaction on appropriate number of machines */
        code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
        if (code) {
@@ -667,14 +679,12 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
            udisk_abort(tt);
            ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
            udisk_end(tt);
-           UBIK_VERSION_UNLOCK;
            DBRELE(dbase);
            return code;
        }
     }
 
     *transPtr = tt;
-    UBIK_VERSION_UNLOCK;
     DBRELE(dbase);
     return 0;
 }
@@ -1049,10 +1059,9 @@ ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
     if (!transPtr->iovec_info.iovec_wrt_val) {
        transPtr->iovec_info.iovec_wrt_len = 0;
        transPtr->iovec_info.iovec_wrt_val =
-           (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
-                                       sizeof(struct ubik_iovec));
+           malloc(IOVEC_MAXWRT * sizeof(struct ubik_iovec));
        transPtr->iovec_data.iovec_buf_len = 0;
-       transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
+       transPtr->iovec_data.iovec_buf_val = malloc(IOVEC_MAXBUF);
        if (!transPtr->iovec_info.iovec_wrt_val
            || !transPtr->iovec_data.iovec_buf_val) {
            if (transPtr->iovec_info.iovec_wrt_val)
@@ -1255,7 +1264,7 @@ ubik_WaitVersion(struct ubik_dbase *adatabase,
            return 0;
        }
 #ifdef AFS_PTHREAD_ENV
-       CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
+       opr_cv_wait(&adatabase->version_cond, &adatabase->versionLock);
 #else
        DBRELE(adatabase);
        LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
@@ -1374,7 +1383,7 @@ panic(char *format, ...)
     va_list ap;
 
     va_start(ap, format);
-    ubik_print("Ubik PANIC: ");
+    ubik_print("Ubik PANIC:\n");
     ubik_vprint(format, ap);
     va_end(ap);