ubik: Rename flags to dbFlags
[openafs.git] / src / ubik / disk.c
index 69a1115..5b74369 100644 (file)
 #include <afs/param.h>
 
 #include <roken.h>
+#include <afs/opr.h>
 
-#include <sys/types.h>
-#include <string.h>
-#include <stdarg.h>
-#include <errno.h>
-
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
 #else
-#include <sys/file.h>
-#include <netinet/in.h>
+# include <opr/lockstub.h>
 #endif
-
-#include <lock.h>
-#include <rx/xdr.h>
-
-
+#include <afs/afsutil.h>
 
 #define UBIK_INTERNALS
 #include "ubik.h"
@@ -119,22 +110,14 @@ udisk_Debug(struct ubik_debug *aparm)
  * Abort transaction: opcode \n
  * Write data: opcode, file, position, length, <length> data bytes \n
  */
-int
+static int
 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
 {
-    struct ubik_stat ustat;
     afs_int32 code;
 
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code < 0)
-       return code;
-
     /* setup data and do write */
     aopcode = htonl(aopcode);
-    code =
-       (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
-                         sizeof(afs_int32));
+    code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32));
     if (code != sizeof(afs_int32))
        return UIOERROR;
 
@@ -149,17 +132,11 @@ udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
 /*!
  * \brief Log a commit, never syncing.
  */
-int
+static int
 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 {
     afs_int32 code;
     afs_int32 data[3];
-    struct ubik_stat ustat;
-
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code)
-       return code;
 
     /* setup data */
     data[0] = htonl(LOGEND);
@@ -168,8 +145,7 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 
     /* do write */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
-                         3 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
     if (code != 3 * sizeof(afs_int32))
        return UIOERROR;
 
@@ -181,18 +157,12 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 /*!
  * \brief Log a truncate operation, never syncing.
  */
-int
+static int
 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
                  afs_int32 alength)
 {
     afs_int32 code;
     afs_int32 data[3];
-    struct ubik_stat ustat;
-
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code < 0)
-       return code;
 
     /* setup data */
     data[0] = htonl(LOGTRUNCATE);
@@ -201,8 +171,7 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
 
     /* do write */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
-                         3 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
     if (code != 3 * sizeof(afs_int32))
        return UIOERROR;
     return 0;
@@ -211,20 +180,12 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
 /*!
  * \brief Write some data to the log, never syncing.
  */
-int
+static int
 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
                   afs_int32 apos, afs_int32 alen)
 {
-    struct ubik_stat ustat;
     afs_int32 code;
     afs_int32 data[4];
-    afs_int32 lpos;
-
-    /* find end of log */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    lpos = ustat.size;
-    if (code < 0)
-       return code;
 
     /* setup header */
     data[0] = htonl(LOGDATA);
@@ -234,13 +195,12 @@ udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
 
     /* write header */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32));
     if (code != 4 * sizeof(afs_int32))
        return UIOERROR;
-    lpos += 4 * sizeof(afs_int32);
 
     /* write data */
-    code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
+    code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen);
     if (code != alen)
        return UIOERROR;
     return 0;
@@ -252,9 +212,8 @@ udisk_Init(int abuffers)
     /* Initialize the venus buffer system. */
     int i;
     struct buffer *tb;
-    Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
-    memset(Buffers, 0, abuffers * sizeof(struct buffer));
-    BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
+    Buffers = calloc(abuffers, sizeof(struct buffer));
+    BufferData = malloc(abuffers * UBIK_PAGESIZE);
     nbuffers = abuffers;
     for (i = 0; i < PHSIZE; i++)
        phTable[i] = 0;
@@ -391,7 +350,7 @@ DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
        tb->file = BADFID;
        Dlru(tb);
        tb->lockers--;
-       ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
+       ViceLog(0, ("Ubik: Error reading database file: errno=%d\n", errno));
        return 0;
     }
     ios++;
@@ -435,8 +394,7 @@ GetTrunc(void)
 {
     struct ubik_trunc *tt;
     if (!freeTruncList) {
-       freeTruncList =
-           (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
+       freeTruncList = malloc(sizeof(struct ubik_trunc));
        freeTruncList->next = (struct ubik_trunc *)0;
     }
     tt = freeTruncList;
@@ -557,8 +515,7 @@ newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
 
     if (pp == 0) {
        /* There are no unlocked buffers that don't need to be written to the disk. */
-       ubik_print
-           ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
+       ViceLog(0, ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n"));
        return NULL;
     }
 
@@ -819,7 +776,6 @@ udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
            bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
            if (!bp)
                return UIOERROR;
-           memset(bp, 0, UBIK_PAGESIZE);
        }
        /* otherwise, min of remaining bytes and end of buffer to user mode */
        offset = apos & (UBIK_PAGESIZE - 1);
@@ -845,24 +801,26 @@ udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
     afs_int32 code;
     struct ubik_trans *tt;
 
-    *atrans = (struct ubik_trans *)NULL;
+    *atrans = NULL;
     if (atype == UBIK_WRITETRANS) {
-       if (adbase->flags & DBWRITING)
+       if (adbase->dbFlags & DBWRITING)
            return USYNC;
        code = udisk_LogOpcode(adbase, LOGNEW, 0);
        if (code)
            return code;
     }
-    tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
-    memset(tt, 0, sizeof(struct ubik_trans));
+    tt = calloc(1, sizeof(struct ubik_trans));
     tt->dbase = adbase;
     tt->next = adbase->activeTrans;
     adbase->activeTrans = tt;
     tt->type = atype;
     if (atype == UBIK_READTRANS)
        adbase->readers++;
-    else if (atype == UBIK_WRITETRANS)
-       adbase->flags |= DBWRITING;
+    else if (atype == UBIK_WRITETRANS) {
+       UBIK_VERSION_LOCK;
+       adbase->dbFlags |= DBWRITING;
+       UBIK_VERSION_UNLOCK;
+    }
     *atrans = tt;
     return 0;
 }
@@ -876,6 +834,7 @@ udisk_commit(struct ubik_trans *atrans)
     struct ubik_dbase *dbase;
     afs_int32 code = 0;
     struct ubik_version oldversion, newversion;
+    afs_int32 now = FT_ApproxTime();
 
     if (atrans->flags & TRDONE)
        return (UTWOENDS);
@@ -885,15 +844,28 @@ udisk_commit(struct ubik_trans *atrans)
 
        /* On the first write to the database. We update the versions */
        if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
+           UBIK_VERSION_LOCK;
+           if (version_globals.ubik_epochTime < UBIK_MILESTONE
+               || version_globals.ubik_epochTime > now) {
+               ViceLog(0,
+                   ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
+                    version_globals.ubik_epochTime, UBIK_MILESTONE, now));
+               panic("Writing Ubik DB label\n");
+           }
            oldversion = dbase->version;
-           newversion.epoch = FT_ApproxTime();;
+           newversion.epoch = version_globals.ubik_epochTime;
            newversion.counter = 1;
 
            code = (*dbase->setlabel) (dbase, 0, &newversion);
-           if (code)
-               return (code);
-           ubik_epochTime = newversion.epoch;
+           if (code) {
+               UBIK_VERSION_UNLOCK;
+               return code;
+           }
+
            dbase->version = newversion;
+           UBIK_VERSION_UNLOCK;
+
+           urecovery_state |= UBIK_RECLABELDB;
 
            /* Ignore the error here. If the call fails, the site is
             * marked down and when we detect it is up again, we will
@@ -901,20 +873,17 @@ udisk_commit(struct ubik_trans *atrans)
             */
            ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
                                           &oldversion, &newversion);
-           urecovery_state |= UBIK_RECLABELDB;
        }
 
+       UBIK_VERSION_LOCK;
        dbase->version.counter++;       /* bump commit count */
-#ifdef AFS_PTHREAD_ENV
-       CV_BROADCAST(&dbase->version_cond);
-#else
-       LWP_NoYieldSignal(&dbase->version);
-#endif
        code = udisk_LogEnd(dbase, &dbase->version);
        if (code) {
            dbase->version.counter--;
-           return (code);
+           UBIK_VERSION_UNLOCK;
+           return code;
        }
+       UBIK_VERSION_UNLOCK;
 
        /* If we fail anytime after this, then panic and let the
         * recovery replay the log.
@@ -968,7 +937,7 @@ udisk_abort(struct ubik_trans *atrans)
      * will do nothing because the abort is there or no LogEnd opcode.
      */
     dbase = atrans->dbase;
-    if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
+    if (atrans->type == UBIK_WRITETRANS && dbase->dbFlags & DBWRITING) {
        udisk_LogOpcode(dbase, LOGABORT, 1);
        code = (*dbase->truncate) (dbase, LOGFILE, 0);
        if (code)
@@ -1004,8 +973,10 @@ udisk_end(struct ubik_trans *atrans)
     /* check if we are the write trans before unsetting the DBWRITING bit, else
      * we could be unsetting someone else's bit.
      */
-    if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
-       dbase->flags &= ~DBWRITING;
+    if (atrans->type == UBIK_WRITETRANS && dbase->dbFlags & DBWRITING) {
+       UBIK_VERSION_LOCK;
+       dbase->dbFlags &= ~DBWRITING;
+       UBIK_VERSION_UNLOCK;
     } else {
        dbase->readers--;
     }
@@ -1017,9 +988,9 @@ udisk_end(struct ubik_trans *atrans)
 
     /* Wakeup any writers waiting in BeginTrans() */
 #ifdef AFS_PTHREAD_ENV
-    CV_BROADCAST(&dbase->flags_cond);
+    opr_cv_broadcast(&dbase->flags_cond);
 #else
-    LWP_NoYieldSignal(&dbase->flags);
+    LWP_NoYieldSignal(&dbase->dbFlags);
 #endif
     return 0;
 }