ubik: Buffer log writes with stdio
[openafs.git] / src / ubik / disk.c
index 9a9da8f..ed2b55b 100644 (file)
 #include <afsconfig.h>
 #include <afs/param.h>
 
+#include <roken.h>
+#include <afs/opr.h>
 
-#include <sys/types.h>
-#include <string.h>
-#include <stdarg.h>
-#include <errno.h>
-
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
 #else
-#include <sys/file.h>
-#include <netinet/in.h>
+# include <opr/lockstub.h>
 #endif
-
-#include <lock.h>
-#include <rx/xdr.h>
-
-
+#include <afs/afsutil.h>
 
 #define UBIK_INTERNALS
 #include "ubik.h"
@@ -56,10 +48,9 @@ static int calls = 0, ios = 0, lastb = 0;
 static char *BufferData;
 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
                              afs_int32 apage);
-static int initd = 0;
 #define        BADFID      0xffffffff
 
-static int DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
+static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
 
 static struct ubik_trunc *freeTruncList = 0;
 
@@ -119,22 +110,14 @@ udisk_Debug(struct ubik_debug *aparm)
  * Abort transaction: opcode \n
  * Write data: opcode, file, position, length, <length> data bytes \n
  */
-int
+static int
 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
 {
-    struct ubik_stat ustat;
     afs_int32 code;
 
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code < 0)
-       return code;
-
     /* setup data and do write */
     aopcode = htonl(aopcode);
-    code =
-       (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
-                         sizeof(afs_int32));
+    code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32));
     if (code != sizeof(afs_int32))
        return UIOERROR;
 
@@ -149,17 +132,11 @@ udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
 /*!
  * \brief Log a commit, never syncing.
  */
-int
+static int
 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 {
     afs_int32 code;
     afs_int32 data[3];
-    struct ubik_stat ustat;
-
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code)
-       return code;
 
     /* setup data */
     data[0] = htonl(LOGEND);
@@ -168,8 +145,7 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 
     /* do write */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
-                         3 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
     if (code != 3 * sizeof(afs_int32))
        return UIOERROR;
 
@@ -181,18 +157,12 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 /*!
  * \brief Log a truncate operation, never syncing.
  */
-int
+static int
 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
                  afs_int32 alength)
 {
     afs_int32 code;
     afs_int32 data[3];
-    struct ubik_stat ustat;
-
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code < 0)
-       return code;
 
     /* setup data */
     data[0] = htonl(LOGTRUNCATE);
@@ -201,8 +171,7 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
 
     /* do write */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
-                         3 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
     if (code != 3 * sizeof(afs_int32))
        return UIOERROR;
     return 0;
@@ -211,20 +180,12 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
 /*!
  * \brief Write some data to the log, never syncing.
  */
-int
+static int
 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
                   afs_int32 apos, afs_int32 alen)
 {
-    struct ubik_stat ustat;
     afs_int32 code;
     afs_int32 data[4];
-    afs_int32 lpos;
-
-    /* find end of log */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    lpos = ustat.size;
-    if (code < 0)
-       return code;
 
     /* setup header */
     data[0] = htonl(LOGDATA);
@@ -234,27 +195,25 @@ udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
 
     /* write header */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32));
     if (code != 4 * sizeof(afs_int32))
        return UIOERROR;
-    lpos += 4 * sizeof(afs_int32);
 
     /* write data */
-    code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
+    code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen);
     if (code != alen)
        return UIOERROR;
     return 0;
 }
 
-static int
-DInit(int abuffers)
+int
+udisk_Init(int abuffers)
 {
     /* Initialize the venus buffer system. */
     int i;
     struct buffer *tb;
-    Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
-    memset(Buffers, 0, abuffers * sizeof(struct buffer));
-    BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
+    Buffers = calloc(abuffers, sizeof(struct buffer));
+    BufferData = malloc(abuffers * UBIK_PAGESIZE);
     nbuffers = abuffers;
     for (i = 0; i < PHSIZE; i++)
        phTable[i] = 0;
@@ -316,33 +275,67 @@ Dmru(struct buffer *abuf)
     LruBuffer->lru_prev = abuf;
 }
 
+static_inline int
+MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
+            struct ubik_trans *atrans)
+{
+    if (buf->page != page) {
+       return 0;
+    }
+    if (buf->file != fid) {
+       return 0;
+    }
+    if (atrans->type == UBIK_READTRANS && buf->dirty) {
+       /* if 'buf' is dirty, it has uncommitted changes; we do not want to
+        * see uncommitted changes if we are a read transaction, so skip over
+        * it. */
+       return 0;
+    }
+    if (buf->dbase != atrans->dbase) {
+       return 0;
+    }
+    return 1;
+}
+
 /*!
  * \brief Get a pointer to a particular buffer.
  */
 static char *
-DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
+DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
 {
     /* Read a page from the disk. */
-    struct buffer *tb, *lastbuffer;
+    struct buffer *tb, *lastbuffer, *found_tb = NULL;
     afs_int32 code;
+    struct ubik_dbase *dbase = atrans->dbase;
 
     calls++;
     lastbuffer = LruBuffer->lru_prev;
 
-    if ((lastbuffer->page == page) && (lastbuffer->file == fid)
-       && (lastbuffer->dbase == dbase)) {
+    /* Skip for write transactions for a clean page - this may not be the right page to use */
+    if (MatchBuffer(lastbuffer, page, fid, atrans)
+               && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
        tb = lastbuffer;
        tb->lockers++;
        lastb++;
        return tb->data;
     }
     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
-       if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
-           Dmru(tb);
-           tb->lockers++;
-           return tb->data;
+       if (MatchBuffer(tb, page, fid, atrans)) {
+           if (tb->dirty || atrans->type == UBIK_READTRANS) {
+               found_tb = tb;
+               break;
+           }
+           /* Remember this clean page - we might use it */
+           found_tb = tb;
        }
     }
+    /* For a write transaction, use a matching clean page if no dirty one was found */
+    if (found_tb) {
+       Dmru(found_tb);
+       found_tb->lockers++;
+       return found_tb->data;
+    }
+
     /* can't find it */
     tb = newslot(dbase, fid, page);
     if (!tb)
@@ -357,7 +350,7 @@ DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
        tb->file = BADFID;
        Dlru(tb);
        tb->lockers--;
-       ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
+       ViceLog(0, ("Ubik: Error reading database file: errno=%d\n", errno));
        return 0;
     }
     ios++;
@@ -372,11 +365,12 @@ DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
  * \brief Zap truncated pages.
  */
 static int
-DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
+DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
 {
     afs_int32 maxPage;
     struct buffer *tb;
     int i;
+    struct ubik_dbase *dbase = atrans->dbase;
 
     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE;        /* first invalid page now in file */
     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
@@ -400,8 +394,7 @@ GetTrunc(void)
 {
     struct ubik_trunc *tt;
     if (!freeTruncList) {
-       freeTruncList =
-           (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
+       freeTruncList = malloc(sizeof(struct ubik_trunc));
        freeTruncList->next = (struct ubik_trunc *)0;
     }
     tt = freeTruncList;
@@ -447,7 +440,7 @@ DoTruncs(struct ubik_trans *atrans)
     tproc = atrans->dbase->truncate;
     for (tt = atrans->activeTruncs; tt; tt = nt) {
        nt = tt->next;
-       DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
+       DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
        code = (*tproc) (atrans->dbase, tt->file, tt->length);
        if (code)
            rcode = code;
@@ -522,8 +515,7 @@ newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
 
     if (pp == 0) {
        /* There are no unlocked buffers that don't need to be written to the disk. */
-       ubik_print
-           ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
+       ViceLog(0, ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n"));
        return NULL;
     }
 
@@ -566,11 +558,12 @@ DRelease(char *ap, int flag)
  * always call DFlush/DSync as a pair.
  */
 static int
-DFlush(struct ubik_dbase *adbase)
+DFlush(struct ubik_trans *atrans)
 {
     int i;
     afs_int32 code;
     struct buffer *tb;
+    struct ubik_dbase *adbase = atrans->dbase;
 
     tb = Buffers;
     for (i = 0; i < nbuffers; i++, tb++) {
@@ -590,7 +583,7 @@ DFlush(struct ubik_dbase *adbase)
  * \brief Flush all modified buffers.
  */
 static int
-DAbort(struct ubik_dbase *adbase)
+DAbort(struct ubik_trans *atrans)
 {
     int i;
     struct buffer *tb;
@@ -606,17 +599,39 @@ DAbort(struct ubik_dbase *adbase)
     return 0;
 }
 
+/**
+ * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
+ * can appear if a read transaction reads a page that is dirty, then that
+ * dirty page is synced. The read transaction will skip over the dirty page,
+ * and create a new buffer, and when the dirty page is synced, it will be
+ * identical (except for contents) to the read-transaction buffer.
+ */
+static void
+DedupBuffer(struct buffer *abuf)
+{
+    struct buffer *tb;
+    for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
+       if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
+           && tb->dbase == abuf->dbase) {
+
+           tb->file = BADFID;
+           Dlru(tb);
+       }
+    }
+}
+
 /*!
  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
  */
 static int
-DSync(struct ubik_dbase *adbase)
+DSync(struct ubik_trans *atrans)
 {
     int i;
     afs_int32 code;
     struct buffer *tb;
     afs_int32 file;
     afs_int32 rCode;
+    struct ubik_dbase *adbase = atrans->dbase;
 
     rCode = 0;
     while (1) {
@@ -625,8 +640,10 @@ DSync(struct ubik_dbase *adbase)
            if (tb->dirty == 1) {
                if (file == BADFID)
                    file = tb->file;
-               if (file != BADFID && tb->file == file)
+               if (file != BADFID && tb->file == file) {
                    tb->dirty = 0;
+                   DedupBuffer(tb);
+               }
            }
        }
        if (file == BADFID)
@@ -643,9 +660,10 @@ DSync(struct ubik_dbase *adbase)
  * \brief Same as DRead(), only do not even try to read the page.
  */
 static char *
-DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
+DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
 {
     struct buffer *tb;
+    struct ubik_dbase *dbase = atrans->dbase;
 
     if ((tb = newslot(dbase, fid, page)) == 0)
        return NULL;
@@ -663,14 +681,12 @@ udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
 {
     char *bp;
     afs_int32 offset, len, totalLen;
-    struct ubik_dbase *dbase;
 
     if (atrans->flags & TRDONE)
        return UDONE;
     totalLen = 0;
-    dbase = atrans->dbase;
     while (alen > 0) {
-       bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
+       bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
        if (!bp)
            return UEOF;
        /* otherwise, min of remaining bytes and end of buffer to user mode */
@@ -731,7 +747,6 @@ udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
 {
     char *bp;
     afs_int32 offset, len, totalLen;
-    struct ubik_dbase *dbase;
     struct ubik_trunc *tt;
     afs_int32 code;
 
@@ -740,9 +755,8 @@ udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
     if (atrans->type != UBIK_WRITETRANS)
        return UBADTYPE;
 
-    dbase = atrans->dbase;
     /* first write the data to the log */
-    code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
+    code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
     if (code)
        return code;
 
@@ -757,12 +771,11 @@ udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
     /* now update vm */
     totalLen = 0;
     while (alen > 0) {
-       bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
+       bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
        if (!bp) {
-           bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
+           bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
            if (!bp)
                return UIOERROR;
-           memset(bp, 0, UBIK_PAGESIZE);
        }
        /* otherwise, min of remaining bytes and end of buffer to user mode */
        offset = apos & (UBIK_PAGESIZE - 1);
@@ -788,12 +801,7 @@ udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
     afs_int32 code;
     struct ubik_trans *tt;
 
-    *atrans = (struct ubik_trans *)NULL;
-    /* Make sure system is initialized before doing anything */
-    if (!initd) {
-       initd = 1;
-       DInit(ubik_nBuffers);
-    }
+    *atrans = NULL;
     if (atype == UBIK_WRITETRANS) {
        if (adbase->flags & DBWRITING)
            return USYNC;
@@ -801,16 +809,18 @@ udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
        if (code)
            return code;
     }
-    tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
-    memset(tt, 0, sizeof(struct ubik_trans));
+    tt = calloc(1, sizeof(struct ubik_trans));
     tt->dbase = adbase;
     tt->next = adbase->activeTrans;
     adbase->activeTrans = tt;
     tt->type = atype;
     if (atype == UBIK_READTRANS)
        adbase->readers++;
-    else if (atype == UBIK_WRITETRANS)
+    else if (atype == UBIK_WRITETRANS) {
+       UBIK_VERSION_LOCK;
        adbase->flags |= DBWRITING;
+       UBIK_VERSION_UNLOCK;
+    }
     *atrans = tt;
     return 0;
 }
@@ -824,6 +834,7 @@ udisk_commit(struct ubik_trans *atrans)
     struct ubik_dbase *dbase;
     afs_int32 code = 0;
     struct ubik_version oldversion, newversion;
+    afs_int32 now = FT_ApproxTime();
 
     if (atrans->flags & TRDONE)
        return (UTWOENDS);
@@ -833,15 +844,28 @@ udisk_commit(struct ubik_trans *atrans)
 
        /* On the first write to the database. We update the versions */
        if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
+           UBIK_VERSION_LOCK;
+           if (version_globals.ubik_epochTime < UBIK_MILESTONE
+               || version_globals.ubik_epochTime > now) {
+               ViceLog(0,
+                   ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
+                    version_globals.ubik_epochTime, UBIK_MILESTONE, now));
+               panic("Writing Ubik DB label\n");
+           }
            oldversion = dbase->version;
-           newversion.epoch = FT_ApproxTime();;
+           newversion.epoch = version_globals.ubik_epochTime;
            newversion.counter = 1;
 
            code = (*dbase->setlabel) (dbase, 0, &newversion);
-           if (code)
-               return (code);
-           ubik_epochTime = newversion.epoch;
+           if (code) {
+               UBIK_VERSION_UNLOCK;
+               return code;
+           }
+
            dbase->version = newversion;
+           UBIK_VERSION_UNLOCK;
+
+           urecovery_state |= UBIK_RECLABELDB;
 
            /* Ignore the error here. If the call fails, the site is
             * marked down and when we detect it is up again, we will
@@ -849,28 +873,30 @@ udisk_commit(struct ubik_trans *atrans)
             */
            ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
                                           &oldversion, &newversion);
-           urecovery_state |= UBIK_RECLABELDB;
        }
 
+       UBIK_VERSION_LOCK;
        dbase->version.counter++;       /* bump commit count */
 #ifdef AFS_PTHREAD_ENV
-       assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
+       opr_cv_broadcast(&dbase->version_cond);
 #else
        LWP_NoYieldSignal(&dbase->version);
 #endif
        code = udisk_LogEnd(dbase, &dbase->version);
        if (code) {
            dbase->version.counter--;
-           return (code);
+           UBIK_VERSION_UNLOCK;
+           return code;
        }
+       UBIK_VERSION_UNLOCK;
 
        /* If we fail anytime after this, then panic and let the
         * recovery replay the log.
         */
-       code = DFlush(dbase);   /* write dirty pages to respective files */
+       code = DFlush(atrans);  /* write dirty pages to respective files */
        if (code)
            panic("Writing Ubik DB modifications\n");
-       code = DSync(dbase);    /* sync the files and mark pages not dirty */
+       code = DSync(atrans);   /* sync the files and mark pages not dirty */
        if (code)
            panic("Synchronizing Ubik DB modifications\n");
 
@@ -921,7 +947,7 @@ udisk_abort(struct ubik_trans *atrans)
        code = (*dbase->truncate) (dbase, LOGFILE, 0);
        if (code)
            panic("Truncating Ubik logfile during an abort\n");
-       DAbort(dbase);          /* remove all dirty pages */
+       DAbort(atrans);         /* remove all dirty pages */
     }
 
     /* When the transaction is marked done, it also means the logfile
@@ -942,19 +968,6 @@ udisk_end(struct ubik_trans *atrans)
 {
     struct ubik_dbase *dbase;
 
-#if defined(UBIK_PAUSE)
-    /* Another thread is trying to lock this transaction.
-     * That can only be an RPC doing SDISK_Lock.
-     * Unlock the transaction, 'cause otherwise the other
-     * thread will never wake up.  Don't free it because
-     * the caller will do that already.
-     */
-    if (atrans->flags & TRSETLOCK) {
-       atrans->flags |= TRSTALE;
-       ulock_relLock(atrans);
-       return UINTERNAL;
-    }
-#endif /* UBIK_PAUSE */
     if (!(atrans->flags & TRDONE))
        udisk_abort(atrans);
     dbase = atrans->dbase;
@@ -966,7 +979,9 @@ udisk_end(struct ubik_trans *atrans)
      * we could be unsetting someone else's bit.
      */
     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
+       UBIK_VERSION_LOCK;
        dbase->flags &= ~DBWRITING;
+       UBIK_VERSION_UNLOCK;
     } else {
        dbase->readers--;
     }
@@ -978,7 +993,7 @@ udisk_end(struct ubik_trans *atrans)
 
     /* Wakeup any writers waiting in BeginTrans() */
 #ifdef AFS_PTHREAD_ENV
-       assert(pthread_cond_broadcast(&dbase->flags_cond) == 0);
+    opr_cv_broadcast(&dbase->flags_cond);
 #else
     LWP_NoYieldSignal(&dbase->flags);
 #endif