ubik: Make udisk_Log* functions static
[openafs.git] / src / ubik / disk.c
index 379616b..a091e7f 100644 (file)
 /*
  * Copyright 2000, International Business Machines Corporation and others.
  * All Rights Reserved.
- * 
+ *
  * This software has been released under the terms of the IBM Public
  * License.  For details, see the LICENSE file in the top-level source
  * directory or online at http://www.openafs.org/dl/license10.html
  */
 
+#include <afsconfig.h>
 #include <afs/param.h>
-#include <sys/types.h>
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
-#else
-#include <sys/file.h>
-#include <netinet/in.h>
-#endif
-#include <errno.h>
-#include <lock.h>
-#include <rx/xdr.h>
 
+#include <roken.h>
+#include <afs/opr.h>
 
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
+#else
+# include <opr/lockstub.h>
+#endif
 
 #define UBIK_INTERNALS
 #include "ubik.h"
 #include "ubik_int.h"
 
 #define        PHSIZE  128
-struct buffer  {
-    struct ubik_dbase *dbase;      /* dbase within which the buffer resides */
-    afs_int32 file;                        /* Unique cache key */
-    afs_int32 page;                        /* page number */
+static struct buffer {
+    struct ubik_dbase *dbase;  /*!< dbase within which the buffer resides */
+    afs_int32 file;            /*!< Unique cache key */
+    afs_int32 page;            /*!< page number */
     struct buffer *lru_next;
     struct buffer *lru_prev;
-    struct buffer *hashNext;       /* next dude in hash table */
-    char *data;                            /* ptr to the data */
-    char lockers;                  /* usage ref count */
-    char dirty;                            /* is buffer modified */
-    char hashIndex;                /* back ptr to hash table */
+    struct buffer *hashNext;   /*!< next dude in hash table */
+    char *data;                        /*!< ptr to the data */
+    char lockers;              /*!< usage ref count */
+    char dirty;                        /*!< is buffer modified */
+    char hashIndex;            /*!< back ptr to hash table */
 } *Buffers;
 
 #define pHash(page) ((page) & (PHSIZE-1))
 
 afs_int32 ubik_nBuffers = NBUFFERS;
-static struct buffer *phTable[PHSIZE]; /* page hash table */
+static struct buffer *phTable[PHSIZE]; /*!< page hash table */
 static struct buffer *LruBuffer;
 static int nbuffers;
-static int calls=0, ios=0, lastb=0;
+static int calls = 0, ios = 0, lastb = 0;
 static char *BufferData;
-static struct buffer *newslot();
-static initd = 0;
+static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
+                             afs_int32 apage);
 #define        BADFID      0xffffffff
 
-static DTrunc();
+static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
 
-static struct ubik_trunc *freeTruncList=0;
+static struct ubik_trunc *freeTruncList = 0;
 
-/* remove a transaction from the database's active transaction list.  Don't free it */
-static unthread(atrans)
-    struct ubik_trans *atrans; {
+/*!
+ * \brief Remove a transaction from the database's active transaction list.  Don't free it.
+ */
+static int
+unthread(struct ubik_trans *atrans)
+{
     struct ubik_trans **lt, *tt;
     lt = &atrans->dbase->activeTrans;
-    for(tt = *lt; tt; lt = &tt->next, tt = *lt) {
+    for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
        if (tt == atrans) {
            /* found it */
            *lt = tt->next;
            return 0;
        }
     }
-    return 2;  /* no entry */
+    return 2;                  /* no entry */
 }
 
-/* some debugging assistance */
-udisk_Debug(aparm)
-    struct ubik_debug *aparm; {
+/*!
+ * \brief some debugging assistance
+ */
+void
+udisk_Debug(struct ubik_debug *aparm)
+{
     struct buffer *tb;
     int i;
 
-    bcopy(&ubik_dbase->version, &aparm->localVersion, sizeof(struct ubik_version));
+    memcpy(&aparm->localVersion, &ubik_dbase->version,
+          sizeof(struct ubik_version));
     aparm->lockedPages = 0;
     aparm->writeLockedPages = 0;
     tb = Buffers;
-    for(i=0;i<nbuffers;i++, tb++) {
+    for (i = 0; i < nbuffers; i++, tb++) {
        if (tb->lockers) {
            aparm->lockedPages++;
-           if (tb->dirty) aparm->writeLockedPages++;
+           if (tb->dirty)
+               aparm->writeLockedPages++;
        }
     }
 }
 
-/* log format is defined here, and implicitly in recovery.c
+/*!
+ * \brief Write an opcode to the log.
+ *
+ * log format is defined here, and implicitly in recovery.c
  *
  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
  * are in logged in network standard byte order, in case we want to move logs
  * from machine-to-machine someday.
  *
- * Begin transaction: opcode
- * Commit transaction: opcode, version (8 bytes)
- * Truncate file: opcode, file number, length
- * Abort transaction: opcode
- * Write data: opcode, file, position, length, <length> data bytes
+ * Begin transaction: opcode \n
+ * Commit transaction: opcode, version (8 bytes) \n
+ * Truncate file: opcode, file number, length \n
+ * Abort transaction: opcode \n
+ * Write data: opcode, file, position, length, <length> data bytes \n
  */
-
-/* write an opcode to the log */
-udisk_LogOpcode(adbase, aopcode, async)
-    struct ubik_dbase *adbase;
-    afs_int32 aopcode;
-    int async; {
+static int
+udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
+{
     struct ubik_stat ustat;
     afs_int32 code;
-    
+
     /* figure out where to write */
-    code = (*adbase->stat)(adbase, LOGFILE, &ustat);
-    if (code < 0) return code;
+    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
+    if (code < 0)
+       return code;
 
     /* setup data and do write */
     aopcode = htonl(aopcode);
-    code = (*adbase->write)(adbase, LOGFILE, &aopcode, ustat.size, sizeof(afs_int32));
-    if (code != sizeof(afs_int32)) return UIOERROR;
+    code =
+       (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
+                         sizeof(afs_int32));
+    if (code != sizeof(afs_int32))
+       return UIOERROR;
 
     /* optionally sync data */
-    if (async) code = (*adbase->sync)(adbase, LOGFILE);
-    else code = 0;
+    if (async)
+       code = (*adbase->sync) (adbase, LOGFILE);
+    else
+       code = 0;
     return code;
 }
 
-/* log a commit, never syncing */
-udisk_LogEnd(adbase, aversion)
-    struct ubik_dbase *adbase;
-    struct ubik_version *aversion; {
+/*!
+ * \brief Log a commit, never syncing.
+ */
+static int
+udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
+{
     afs_int32 code;
     afs_int32 data[3];
     struct ubik_stat ustat;
-    
+
     /* figure out where to write */
-    code = (*adbase->stat)(adbase, LOGFILE, &ustat);
-    if (code) return code;
+    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
+    if (code)
+       return code;
 
     /* setup data */
     data[0] = htonl(LOGEND);
     data[1] = htonl(aversion->epoch);
     data[2] = htonl(aversion->counter);
-    
+
     /* do write */
-    code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
-    if (code != 3*sizeof(afs_int32)) return UIOERROR;
+    code =
+       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
+                         3 * sizeof(afs_int32));
+    if (code != 3 * sizeof(afs_int32))
+       return UIOERROR;
 
     /* finally sync the log */
-    code = (*adbase->sync)(adbase, LOGFILE);
+    code = (*adbase->sync) (adbase, LOGFILE);
     return code;
 }
-    
-/* log a truncate operation, never syncing */
-udisk_LogTruncate(adbase, afile, alength)
-    struct ubik_dbase *adbase;
-    afs_int32 afile, alength; {
+
+/*!
+ * \brief Log a truncate operation, never syncing.
+ */
+static int
+udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
+                 afs_int32 alength)
+{
     afs_int32 code;
     afs_int32 data[3];
     struct ubik_stat ustat;
-    
+
     /* figure out where to write */
-    code = (*adbase->stat)(adbase, LOGFILE, &ustat);
-    if (code < 0) return code;
+    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
+    if (code < 0)
+       return code;
 
     /* setup data */
     data[0] = htonl(LOGTRUNCATE);
     data[1] = htonl(afile);
     data[2] = htonl(alength);
-    
+
     /* do write */
-    code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
-    if (code != 3*sizeof(afs_int32)) return UIOERROR;
+    code =
+       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
+                         3 * sizeof(afs_int32));
+    if (code != 3 * sizeof(afs_int32))
+       return UIOERROR;
     return 0;
 }
-    
-/* write some data to the log, never syncing */
-udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
-    struct ubik_dbase *adbase;
-    char *abuffer;
-    afs_int32 afile;
-    afs_int32 apos;
-    afs_int32 alen; {
+
+/*!
+ * \brief Write some data to the log, never syncing.
+ */
+static int
+udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
+                  afs_int32 apos, afs_int32 alen)
+{
     struct ubik_stat ustat;
     afs_int32 code;
     afs_int32 data[4];
     afs_int32 lpos;
-    
+
     /* find end of log */
-    code = (*adbase->stat)(adbase, LOGFILE, &ustat);
+    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
     lpos = ustat.size;
-    if (code < 0) return code;
+    if (code < 0)
+       return code;
 
     /* setup header */
     data[0] = htonl(LOGDATA);
@@ -197,121 +223,165 @@ udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
     data[3] = htonl(alen);
 
     /* write header */
-    code = (*adbase->write)(adbase, LOGFILE, data, lpos, 4*sizeof(afs_int32));
-    if (code != 4*sizeof(afs_int32)) return UIOERROR;
-    lpos += 4*sizeof(afs_int32);
-    
+    code =
+       (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
+    if (code != 4 * sizeof(afs_int32))
+       return UIOERROR;
+    lpos += 4 * sizeof(afs_int32);
+
     /* write data */
-    code = (*adbase->write)(adbase, LOGFILE, abuffer, lpos, alen);
-    if (code != alen) return UIOERROR;
+    code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
+    if (code != alen)
+       return UIOERROR;
     return 0;
 }
 
-static int DInit (abuffers)
-    int abuffers; {
+int
+udisk_Init(int abuffers)
+{
     /* Initialize the venus buffer system. */
     int i;
     struct buffer *tb;
-    Buffers = (struct buffer *) malloc(abuffers * sizeof(struct buffer));
-    bzero(Buffers, abuffers * sizeof(struct buffer));
-    BufferData = (char *) malloc(abuffers * PAGESIZE);
+    Buffers = calloc(abuffers, sizeof(struct buffer));
+    BufferData = malloc(abuffers * UBIK_PAGESIZE);
     nbuffers = abuffers;
-    for(i=0;i<PHSIZE;i++) phTable[i] = 0;
-    for (i=0;i<abuffers;i++) {
-        /* Fill in each buffer with an empty indication. */
+    for (i = 0; i < PHSIZE; i++)
+       phTable[i] = 0;
+    for (i = 0; i < abuffers; i++) {
+       /* Fill in each buffer with an empty indication. */
        tb = &Buffers[i];
-        tb->lru_next = &(Buffers[i+1]);
-       tb->lru_prev = &(Buffers[i-1]);
-        tb->data = &BufferData[PAGESIZE*i];
+       tb->lru_next = &(Buffers[i + 1]);
+       tb->lru_prev = &(Buffers[i - 1]);
+       tb->data = &BufferData[UBIK_PAGESIZE * i];
        tb->file = BADFID;
     }
-    Buffers[0].lru_prev = &(Buffers[abuffers-1]);
-    Buffers[abuffers-1].lru_next = &(Buffers[0]);
-    LruBuffer  = &(Buffers[0]);
+    Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
+    Buffers[abuffers - 1].lru_next = &(Buffers[0]);
+    LruBuffer = &(Buffers[0]);
     return 0;
 }
 
-/* Take a buffer and mark it as the least recently used buffer */
-static int Dlru(abuf)
-  struct buffer *abuf;
+/*!
+ * \brief Take a buffer and mark it as the least recently used buffer.
+ */
+static void
+Dlru(struct buffer *abuf)
 {
-  if (LruBuffer == abuf)
-     return 0;
+    if (LruBuffer == abuf)
+       return;
 
-  /* Unthread from where it is in the list */
-  abuf->lru_next->lru_prev = abuf->lru_prev;
-  abuf->lru_prev->lru_next = abuf->lru_next;
+    /* Unthread from where it is in the list */
+    abuf->lru_next->lru_prev = abuf->lru_prev;
+    abuf->lru_prev->lru_next = abuf->lru_next;
 
-  /* Thread onto beginning of LRU list */
-  abuf->lru_next = LruBuffer;
-  abuf->lru_prev = LruBuffer->lru_prev;
+    /* Thread onto beginning of LRU list */
+    abuf->lru_next = LruBuffer;
+    abuf->lru_prev = LruBuffer->lru_prev;
 
-  LruBuffer->lru_prev->lru_next = abuf;
-  LruBuffer->lru_prev = abuf;
-  LruBuffer = abuf;
+    LruBuffer->lru_prev->lru_next = abuf;
+    LruBuffer->lru_prev = abuf;
+    LruBuffer = abuf;
 }
 
-/* Take a buffer and mark it as the most recently used buffer */
-static int Dmru(abuf)
-     struct buffer *abuf;
+/*!
+ * \brief Take a buffer and mark it as the most recently used buffer.
+ */
+static void
+Dmru(struct buffer *abuf)
 {
-  if (LruBuffer == abuf) {
-     LruBuffer = LruBuffer->lru_next;
-     return 0;
-  }
+    if (LruBuffer == abuf) {
+       LruBuffer = LruBuffer->lru_next;
+       return;
+    }
 
-  /* Unthread from where it is in the list */
-  abuf->lru_next->lru_prev = abuf->lru_prev;
-  abuf->lru_prev->lru_next = abuf->lru_next;
+    /* Unthread from where it is in the list */
+    abuf->lru_next->lru_prev = abuf->lru_prev;
+    abuf->lru_prev->lru_next = abuf->lru_next;
 
-  /* Thread onto end of LRU list - making it the MRU buffer */
-  abuf->lru_next = LruBuffer;
-  abuf->lru_prev = LruBuffer->lru_prev;
-  LruBuffer->lru_prev->lru_next = abuf;
-  LruBuffer->lru_prev = abuf;
+    /* Thread onto end of LRU list - making it the MRU buffer */
+    abuf->lru_next = LruBuffer;
+    abuf->lru_prev = LruBuffer->lru_prev;
+    LruBuffer->lru_prev->lru_next = abuf;
+    LruBuffer->lru_prev = abuf;
+}
 
+static_inline int
+MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
+            struct ubik_trans *atrans)
+{
+    if (buf->page != page) {
+       return 0;
+    }
+    if (buf->file != fid) {
+       return 0;
+    }
+    if (atrans->type == UBIK_READTRANS && buf->dirty) {
+       /* if 'buf' is dirty, it has uncommitted changes; we do not want to
+        * see uncommitted changes if we are a read transaction, so skip over
+        * it. */
+       return 0;
+    }
+    if (buf->dbase != atrans->dbase) {
+       return 0;
+    }
+    return 1;
 }
 
-/* get a pointer to a particular buffer */
-static char *DRead(dbase, fid, page)
-    struct ubik_dbase *dbase;
-    afs_int32 fid;
-    int page; {
+/*!
+ * \brief Get a pointer to a particular buffer.
+ */
+static char *
+DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
+{
     /* Read a page from the disk. */
-    struct buffer *tb, *lastbuffer;
-    afs_int32 trys, code;
+    struct buffer *tb, *lastbuffer, *found_tb = NULL;
+    afs_int32 code;
+    struct ubik_dbase *dbase = atrans->dbase;
 
     calls++;
     lastbuffer = LruBuffer->lru_prev;
 
-    if ((lastbuffer->page  == page ) && 
-       (lastbuffer->file  == fid  ) && 
-       (lastbuffer->dbase == dbase)) {
+    /* Skip for write transactions for a clean page - this may not be the right page to use */
+    if (MatchBuffer(lastbuffer, page, fid, atrans)
+               && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
        tb = lastbuffer;
        tb->lockers++;
        lastb++;
        return tb->data;
     }
-    for(tb=phTable[pHash(page)]; tb; tb=tb->hashNext) {
-       if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
-           Dmru(tb);
-           tb->lockers++;
-           return tb->data;
+    for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
+       if (MatchBuffer(tb, page, fid, atrans)) {
+           if (tb->dirty || atrans->type == UBIK_READTRANS) {
+               found_tb = tb;
+               break;
+           }
+           /* Remember this clean page - we might use it */
+           found_tb = tb;
        }
     }
+    /* For a write transaction, use a matching clean page if no dirty one was found */
+    if (found_tb) {
+       Dmru(found_tb);
+       found_tb->lockers++;
+       return found_tb->data;
+    }
+
     /* can't find it */
     tb = newslot(dbase, fid, page);
-    if (!tb) return 0;
-    bzero(tb->data, PAGESIZE);
+    if (!tb)
+       return 0;
+    memset(tb->data, 0, UBIK_PAGESIZE);
 
     tb->lockers++;
-    code = (*dbase->read)(dbase, fid, tb->data, page*PAGESIZE, PAGESIZE);
+    code =
+       (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
+                       UBIK_PAGESIZE);
     if (code < 0) {
-       tb->file = BADFID;
-       Dlru(tb);
-       tb->lockers--;
-       ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
-       return 0;
+       tb->file = BADFID;
+       Dlru(tb);
+       tb->lockers--;
+       ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
+       return 0;
     }
     ios++;
 
@@ -321,17 +391,19 @@ static char *DRead(dbase, fid, page)
     return tb->data;
 }
 
-/* zap truncated pages */
-static DTrunc(dbase, fid, length)
-    struct ubik_dbase *dbase;
-    afs_int32 fid;
-    afs_int32 length; {
+/*!
+ * \brief Zap truncated pages.
+ */
+static int
+DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
+{
     afs_int32 maxPage;
     struct buffer *tb;
-    int   i;
-    
-    maxPage = (length+PAGESIZE-1)>>LOGPAGESIZE;        /* first invalid page now in file */
-    for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
+    int i;
+    struct ubik_dbase *dbase = atrans->dbase;
+
+    maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE;        /* first invalid page now in file */
+    for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
        if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
            tb->file = BADFID;
            Dlru(tb);
@@ -340,68 +412,85 @@ static DTrunc(dbase, fid, length)
     return 0;
 }
 
-/* allocate a truncation entry.  We allocate special entries representing truncations, rather than
-    performing them immediately, so that we can abort a transaction easily by simply purging
-    the in-core memory buffers and discarding these truncation entries.
-*/
-static struct ubik_trunc *GetTrunc() {
+/*!
+ * \brief Allocate a truncation entry.
+ *
+ * We allocate special entries representing truncations, rather than
+ * performing them immediately, so that we can abort a transaction easily by simply purging
+ * the in-core memory buffers and discarding these truncation entries.
+ */
+static struct ubik_trunc *
+GetTrunc(void)
+{
     struct ubik_trunc *tt;
     if (!freeTruncList) {
-       freeTruncList = (struct ubik_trunc *) malloc(sizeof(struct ubik_trunc));
-       freeTruncList->next = (struct ubik_trunc *) 0;
+       freeTruncList = malloc(sizeof(struct ubik_trunc));
+       freeTruncList->next = (struct ubik_trunc *)0;
     }
     tt = freeTruncList;
     freeTruncList = tt->next;
     return tt;
 }
 
-/* free a truncation entry */
-static PutTrunc(at)
-    struct ubik_trunc *at; {
+/*!
+ * \brief Free a truncation entry.
+ */
+static int
+PutTrunc(struct ubik_trunc *at)
+{
     at->next = freeTruncList;
     freeTruncList = at;
     return 0;
 }
 
-/* find a truncation entry for a file, if any */
-static struct ubik_trunc *FindTrunc(atrans, afile)
-    struct ubik_trans *atrans;
-    afs_int32 afile; {
+/*!
+ * \brief Find a truncation entry for a file, if any.
+ */
+static struct ubik_trunc *
+FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
+{
     struct ubik_trunc *tt;
-    for(tt=atrans->activeTruncs; tt; tt=tt->next) {
-       if (tt->file == afile) return tt;
+    for (tt = atrans->activeTruncs; tt; tt = tt->next) {
+       if (tt->file == afile)
+           return tt;
     }
-    return (struct ubik_trunc *) 0;
+    return (struct ubik_trunc *)0;
 }
 
-/* do truncates associated with trans, and free them */
-static DoTruncs(atrans)
-    struct ubik_trans *atrans; {
+/*!
+ * \brief Do truncates associated with \p atrans, and free them.
+ */
+static int
+DoTruncs(struct ubik_trans *atrans)
+{
     struct ubik_trunc *tt, *nt;
-    int (*tproc)();
-    afs_int32 rcode=0, code;
+    int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
+    afs_int32 rcode = 0, code;
 
     tproc = atrans->dbase->truncate;
-    for(tt = atrans->activeTruncs; tt; tt=nt) {
+    for (tt = atrans->activeTruncs; tt; tt = nt) {
        nt = tt->next;
-       DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
-       code = (*tproc)(atrans->dbase, tt->file, tt->length);
-       if (code) rcode = code;
+       DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
+       code = (*tproc) (atrans->dbase, tt->file, tt->length);
+       if (code)
+           rcode = code;
        PutTrunc(tt);
     }
     /* don't unthread, because we do the entire list's worth here */
-    atrans->activeTruncs = (struct ubik_trunc *) 0;
-    return(rcode);
+    atrans->activeTruncs = (struct ubik_trunc *)0;
+    return (rcode);
 }
 
-/* mark a fid as invalid */
-udisk_Invalidate(adbase, afid)
-struct ubik_dbase *adbase;
-afs_int32 afid; {
+/*!
+ * \brief Mark an \p fid as invalid.
+ */
+int
+udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
+{
     struct buffer *tb;
-    int    i;
+    int i;
 
-    for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
+    for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
        if (tb->file == afid) {
            tb->file = BADFID;
            Dlru(tb);
@@ -410,15 +499,18 @@ afs_int32 afid; {
     return 0;
 }
 
-/* move this page into the correct hash bucket */
-static FixupBucket(ap)
-    struct buffer *ap; {
+/*!
+ * \brief Move this page into the correct hash bucket.
+ */
+static int
+FixupBucket(struct buffer *ap)
+{
     struct buffer **lp, *tp;
     int i;
     /* first try to get it out of its current hash bucket, in which it might not be */
     i = ap->hashIndex;
     lp = &phTable[i];
-    for(tp = *lp; tp; tp=tp->hashNext) {
+    for (tp = *lp; tp; tp = tp->hashNext) {
        if (tp == ap) {
            *lp = tp->hashNext;
            break;
@@ -430,28 +522,32 @@ static FixupBucket(ap)
     ap->hashIndex = i;         /* remember where we are for deletion */
     ap->hashNext = phTable[i]; /* add us to the list */
     phTable[i] = ap;
+    return 0;
 }
 
-/* create a new slot for a particular dbase page */
-static struct buffer *newslot (adbase, afid, apage)
-    struct ubik_dbase *adbase;
-    afs_int32 afid, apage; {
+/*!
+ * \brief Create a new slot for a particular dbase page.
+ */
+static struct buffer *
+newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
+{
     /* Find a usable buffer slot */
     afs_int32 i;
     struct buffer *pp, *tp;
 
-    pp = 0;            /* last pure */
-    for (i=0,tp=LruBuffer; i<nbuffers; i++,tp=tp->lru_next) {
-       if (!tp->lockers && !tp->dirty) {
-         pp = tp;
-         break;
-       }
+    pp = 0;                    /* last pure */
+    for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
+       if (!tp->lockers && !tp->dirty) {
+           pp = tp;
+           break;
+       }
     }
 
     if (pp == 0) {
-        /* There are no unlocked buffers that don't need to be written to the disk. */
-        ubik_print("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
-       return (struct buffer *) 0;
+       /* There are no unlocked buffers that don't need to be written to the disk. */
+       ubik_print
+           ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
+       return NULL;
     }
 
     /* Now fill in the header. */
@@ -464,53 +560,68 @@ static struct buffer *newslot (adbase, afid, apage)
     return pp;
 }
 
-/* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
-static DRelease (ap,flag)
-    char *ap;
-    int flag; {
+/*!
+ * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
+ */
+static void
+DRelease(char *ap, int flag)
+{
     int index;
     struct buffer *bp;
 
-    if (!ap) return;
-    index = (ap - (char *)BufferData) >> LOGPAGESIZE;
+    if (!ap)
+       return;
+    index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
     bp = &(Buffers[index]);
     bp->lockers--;
-    if (flag) bp->dirty=1;
-    return 0;
+    if (flag)
+       bp->dirty = 1;
+    return;
 }
 
-/* flush all modified buffers, leaves dirty bits set (they're cleared
- * by DSync).  Note interaction with DSync: you call this thing first,
- * writing the buffers to the disk.  Then you call DSync to sync all the
+/*!
+ * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
+ * by DSync()).
+ *
+ * \note Note interaction with DSync(): you call this thing first,
+ * writing the buffers to the disk.  Then you call DSync() to sync all the
  * files that were written, and to clear the dirty bits.  You should
  * always call DFlush/DSync as a pair.
  */
-static DFlush (adbase)
-    struct ubik_dbase *adbase; {
+static int
+DFlush(struct ubik_trans *atrans)
+{
     int i;
     afs_int32 code;
     struct buffer *tb;
+    struct ubik_dbase *adbase = atrans->dbase;
 
     tb = Buffers;
-    for(i=0;i<nbuffers;i++,tb++) {
-        if (tb->dirty) {
-           code = tb->page * PAGESIZE; /* offset within file */
-           code = (*adbase->write)(adbase, tb->file, tb->data, code, PAGESIZE);
-           if (code != PAGESIZE) return UIOERROR;
+    for (i = 0; i < nbuffers; i++, tb++) {
+       if (tb->dirty) {
+           code = tb->page * UBIK_PAGESIZE;    /* offset within file */
+           code =
+               (*adbase->write) (adbase, tb->file, tb->data, code,
+                                 UBIK_PAGESIZE);
+           if (code != UBIK_PAGESIZE)
+               return UIOERROR;
        }
     }
     return 0;
 }
 
-/* flush all modified buffers */
-static DAbort (adbase)
-    struct ubik_dbase *adbase; {
+/*!
+ * \brief Flush all modified buffers.
+ */
+static int
+DAbort(struct ubik_trans *atrans)
+{
     int i;
     struct buffer *tb;
 
     tb = Buffers;
-    for(i=0;i<nbuffers;i++,tb++) {
-        if (tb->dirty) {
+    for (i = 0; i < nbuffers; i++, tb++) {
+       if (tb->dirty) {
            tb->dirty = 0;
            tb->file = BADFID;
            Dlru(tb);
@@ -519,67 +630,103 @@ static DAbort (adbase)
     return 0;
 }
 
-/* must only be called after DFlush, due to its interpretation of dirty flag */
-static DSync(adbase)
-    struct ubik_dbase *adbase; {
+/**
+ * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
+ * can appear if a read transaction reads a page that is dirty, then that
+ * dirty page is synced. The read transaction will skip over the dirty page,
+ * and create a new buffer, and when the dirty page is synced, it will be
+ * identical (except for contents) to the read-transaction buffer.
+ */
+static void
+DedupBuffer(struct buffer *abuf)
+{
+    struct buffer *tb;
+    for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
+       if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
+           && tb->dbase == abuf->dbase) {
+
+           tb->file = BADFID;
+           Dlru(tb);
+       }
+    }
+}
+
+/*!
+ * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
+ */
+static int
+DSync(struct ubik_trans *atrans)
+{
     int i;
     afs_int32 code;
     struct buffer *tb;
     afs_int32 file;
     afs_int32 rCode;
+    struct ubik_dbase *adbase = atrans->dbase;
 
     rCode = 0;
     while (1) {
        file = BADFID;
-       for(i=0,tb = Buffers; i<nbuffers; i++,tb++) {
+       for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
            if (tb->dirty == 1) {
-               if (file == BADFID) file = tb->file;
-               if (file != BADFID && tb->file == file) tb->dirty = 0;
+               if (file == BADFID)
+                   file = tb->file;
+               if (file != BADFID && tb->file == file) {
+                   tb->dirty = 0;
+                   DedupBuffer(tb);
+               }
            }
        }
-       if (file == BADFID) break;
+       if (file == BADFID)
+           break;
        /* otherwise we have a file to sync */
-       code = (*adbase->sync)(adbase, file);
-       if (code) rCode = code;
+       code = (*adbase->sync) (adbase, file);
+       if (code)
+           rCode = code;
     }
     return rCode;
 }
 
-/* Same as read, only do not even try to read the page */
-static char *DNew (dbase, fid, page)
-    struct ubik_dbase *dbase;
-    int page;
-    afs_int32 fid; {
+/*!
+ * \brief Same as DRead(), only do not even try to read the page.
+ */
+static char *
+DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
+{
     struct buffer *tb;
+    struct ubik_dbase *dbase = atrans->dbase;
 
-    if ((tb = newslot(dbase, fid, page)) == 0) return (char *) 0;
+    if ((tb = newslot(dbase, fid, page)) == 0)
+       return NULL;
     tb->lockers++;
-    bzero(tb->data, PAGESIZE);
+    memset(tb->data, 0, UBIK_PAGESIZE);
     return tb->data;
 }
 
-/* read data from database */
-udisk_read(atrans, afile, abuffer, apos, alen)
-    afs_int32 afile;
-    char *abuffer;
-    afs_int32 apos, alen;
-    struct ubik_trans *atrans; {
+/*!
+ * \brief Read data from database.
+ */
+int
+udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
+          afs_int32 apos, afs_int32 alen)
+{
     char *bp;
     afs_int32 offset, len, totalLen;
-    struct ubik_dbase *dbase;
 
-    if (atrans->flags & TRDONE) return UDONE;
+    if (atrans->flags & TRDONE)
+       return UDONE;
     totalLen = 0;
-    dbase = atrans->dbase;
     while (alen > 0) {
-       bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
-       if (!bp) return UEOF;
+       bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
+       if (!bp)
+           return UEOF;
        /* otherwise, min of remaining bytes and end of buffer to user mode */
-       offset = apos & (PAGESIZE-1);
-       len = PAGESIZE - offset;
-       if (len > alen) len = alen;
-       bcopy(bp+offset, abuffer, len);
-       abuffer += len;
+       offset = apos & (UBIK_PAGESIZE - 1);
+       len = UBIK_PAGESIZE - offset;
+       if (len > alen)
+           len = alen;
+       memcpy(abuffer, bp + offset, len);
+       abuffer = (char *)abuffer + len;
        apos += len;
        alen -= len;
        totalLen += len;
@@ -588,16 +735,19 @@ udisk_read(atrans, afile, abuffer, apos, alen)
     return 0;
 }
 
-/* truncate file */
-udisk_truncate(atrans, afile, alength)
-    struct ubik_trans *atrans;
-    afs_int32 afile;
-    afs_int32 alength; {
+/*!
+ * \brief Truncate file.
+ */
+int
+udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
+{
     afs_int32 code;
     struct ubik_trunc *tt;
 
-    if (atrans->flags & TRDONE) return UDONE;
-    if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
+    if (atrans->flags & TRDONE)
+       return UDONE;
+    if (atrans->type != UBIK_WRITETRANS)
+       return UBADTYPE;
 
     /* write a truncate log record */
     code = udisk_LogTruncate(atrans->dbase, afile, alength);
@@ -606,38 +756,40 @@ udisk_truncate(atrans, afile, alength)
     tt = FindTrunc(atrans, afile);
     if (!tt) {
        /* this file not truncated yet */
-       tt=GetTrunc();
+       tt = GetTrunc();
        tt->next = atrans->activeTruncs;
        atrans->activeTruncs = tt;
        tt->file = afile;
        tt->length = alength;
-    }
-    else {
+    } else {
        /* already truncated to a certain length */
-       if (tt->length > alength) tt->length = alength;
+       if (tt->length > alength)
+           tt->length = alength;
     }
     return code;
 }
 
-/* write data to database, using logs */
-udisk_write(atrans, afile, abuffer, apos, alen)
-    afs_int32 afile;
-    char *abuffer;
-    afs_int32 apos, alen;
-    struct ubik_trans *atrans; {
+/*!
+ * \brief Write data to database, using logs.
+ */
+int
+udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
+           afs_int32 apos, afs_int32 alen)
+{
     char *bp;
     afs_int32 offset, len, totalLen;
-    struct ubik_dbase *dbase;
     struct ubik_trunc *tt;
     afs_int32 code;
 
-    if (atrans->flags & TRDONE) return UDONE;
-    if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
+    if (atrans->flags & TRDONE)
+       return UDONE;
+    if (atrans->type != UBIK_WRITETRANS)
+       return UBADTYPE;
 
-    dbase = atrans->dbase;
     /* first write the data to the log */
-    code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
-    if (code) return code;
+    code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
+    if (code)
+       return code;
 
     /* expand any truncations of this file */
     tt = FindTrunc(atrans, afile);
@@ -650,116 +802,148 @@ udisk_write(atrans, afile, abuffer, apos, alen)
     /* now update vm */
     totalLen = 0;
     while (alen > 0) {
-       bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
+       bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
        if (!bp) {
-           bp = DNew(dbase, afile, apos>>LOGPAGESIZE);
-           if (!bp) return UIOERROR;
-           bzero(bp, PAGESIZE);
+           bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
+           if (!bp)
+               return UIOERROR;
+           memset(bp, 0, UBIK_PAGESIZE);
        }
        /* otherwise, min of remaining bytes and end of buffer to user mode */
-       offset = apos & (PAGESIZE-1);
-       len = PAGESIZE-offset;
-       if (len > alen) len = alen;
-       bcopy(abuffer, bp+offset, len);
-       abuffer += len;
+       offset = apos & (UBIK_PAGESIZE - 1);
+       len = UBIK_PAGESIZE - offset;
+       if (len > alen)
+           len = alen;
+       memcpy(bp + offset, abuffer, len);
+       abuffer = (char *)abuffer + len;
        apos += len;
        alen -= len;
        totalLen += len;
-       DRelease(bp, 1);    /* buffer modified */
+       DRelease(bp, 1);        /* buffer modified */
     }
     return 0;
 }
 
-/* begin a new local transaction */
-udisk_begin(adbase, atype, atrans)
-    struct ubik_trans **atrans;
-    int atype;
-    struct ubik_dbase *adbase; {
+/*!
+ * \brief Begin a new local transaction.
+ */
+int
+udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
+{
     afs_int32 code;
     struct ubik_trans *tt;
 
-    *atrans = (struct ubik_trans *)NULL;
-    /* Make sure system is initialized before doing anything */
-    if (!initd) {
-       initd = 1;
-       DInit(ubik_nBuffers);
-    }
+    *atrans = NULL;
     if (atype == UBIK_WRITETRANS) {
-       if (adbase->flags & DBWRITING) return USYNC;
+       if (adbase->flags & DBWRITING)
+           return USYNC;
        code = udisk_LogOpcode(adbase, LOGNEW, 0);
-       if (code) return code;
+       if (code)
+           return code;
     }
-    tt = (struct ubik_trans *) malloc(sizeof(struct ubik_trans));
-    bzero(tt, sizeof(struct ubik_trans));
+    tt = calloc(1, sizeof(struct ubik_trans));
     tt->dbase = adbase;
     tt->next = adbase->activeTrans;
     adbase->activeTrans = tt;
     tt->type = atype;
-    if (atype == UBIK_READTRANS) adbase->readers++;
-    else if (atype == UBIK_WRITETRANS) adbase->flags |= DBWRITING;
+    if (atype == UBIK_READTRANS)
+       adbase->readers++;
+    else if (atype == UBIK_WRITETRANS) {
+       UBIK_VERSION_LOCK;
+       adbase->flags |= DBWRITING;
+       UBIK_VERSION_UNLOCK;
+    }
     *atrans = tt;
     return 0;
 }
 
-/* commit transaction */
-udisk_commit(atrans)
-    struct ubik_trans *atrans; {
+/*!
+ * \brief Commit transaction.
+ */
+int
+udisk_commit(struct ubik_trans *atrans)
+{
     struct ubik_dbase *dbase;
-    afs_int32 code=0;
+    afs_int32 code = 0;
     struct ubik_version oldversion, newversion;
+    afs_int32 now = FT_ApproxTime();
 
     if (atrans->flags & TRDONE)
-       return(UTWOENDS);
+       return (UTWOENDS);
 
     if (atrans->type == UBIK_WRITETRANS) {
-        dbase = atrans->dbase;
+       dbase = atrans->dbase;
 
        /* On the first write to the database. We update the versions */
-        if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
-          oldversion = dbase->version;
-          newversion.epoch   = FT_ApproxTime();;
-          newversion.counter = 1;
-         
-          code = (*dbase->setlabel)(dbase, 0, &newversion);
-          if (code) return(code);
-          ubik_epochTime = newversion.epoch;
-          dbase->version = newversion;
-
-          /* Ignore the error here. If the call fails, the site is
-           * marked down and when we detect it is up again, we will 
-           * send the entire database to it.
-           */
-          ContactQuorum(DISK_SetVersion, atrans, 1/*CStampVersion*/,
-                        &oldversion, &newversion);
-          urecovery_state |= UBIK_RECLABELDB;
+       if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
+           UBIK_VERSION_LOCK;
+           if (version_globals.ubik_epochTime < UBIK_MILESTONE
+               || version_globals.ubik_epochTime > now) {
+               ubik_print
+                   ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
+                    version_globals.ubik_epochTime, UBIK_MILESTONE, now);
+               panic("Writing Ubik DB label\n");
+           }
+           oldversion = dbase->version;
+           newversion.epoch = version_globals.ubik_epochTime;
+           newversion.counter = 1;
+
+           code = (*dbase->setlabel) (dbase, 0, &newversion);
+           if (code) {
+               UBIK_VERSION_UNLOCK;
+               return code;
+           }
+
+           dbase->version = newversion;
+           UBIK_VERSION_UNLOCK;
+
+           urecovery_state |= UBIK_RECLABELDB;
+
+           /* Ignore the error here. If the call fails, the site is
+            * marked down and when we detect it is up again, we will
+            * send the entire database to it.
+            */
+           ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
+                                          &oldversion, &newversion);
        }
 
+       UBIK_VERSION_LOCK;
        dbase->version.counter++;       /* bump commit count */
+#ifdef AFS_PTHREAD_ENV
+       opr_cv_broadcast(&dbase->version_cond);
+#else
        LWP_NoYieldSignal(&dbase->version);
-
+#endif
        code = udisk_LogEnd(dbase, &dbase->version);
        if (code) {
-          dbase->version.counter--;
-          return(code);
+           dbase->version.counter--;
+           UBIK_VERSION_UNLOCK;
+           return code;
        }
+       UBIK_VERSION_UNLOCK;
 
        /* If we fail anytime after this, then panic and let the
-        * recovery replay the log. 
+        * recovery replay the log.
         */
-       code = DFlush(dbase);   /* write dirty pages to respective files */
-       if (code) panic("Writing Ubik DB modifications\n");
-       code = DSync(dbase);    /* sync the files and mark pages not dirty */
-       if (code) panic("Synchronizing Ubik DB modifications\n");
+       code = DFlush(atrans);  /* write dirty pages to respective files */
+       if (code)
+           panic("Writing Ubik DB modifications\n");
+       code = DSync(atrans);   /* sync the files and mark pages not dirty */
+       if (code)
+           panic("Synchronizing Ubik DB modifications\n");
 
-       code = DoTruncs(atrans); /* Perform requested truncations */
-       if (code) panic("Truncating Ubik DB\n");
+       code = DoTruncs(atrans);        /* Perform requested truncations */
+       if (code)
+           panic("Truncating Ubik DB\n");
 
        /* label the committed dbase */
-       code = (*dbase->setlabel)(dbase, 0, &dbase->version);
-       if (code) panic("Truncating Ubik DB\n");
+       code = (*dbase->setlabel) (dbase, 0, &dbase->version);
+       if (code)
+           panic("Truncating Ubik DB\n");
 
-       code = (*dbase->truncate)(dbase, LOGFILE, 0);   /* discard log (optional) */
-       if (code) panic("Truncating Ubik logfile\n");
+       code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
+       if (code)
+           panic("Truncating Ubik logfile\n");
 
     }
 
@@ -770,29 +954,32 @@ udisk_commit(atrans)
     return code;
 }
 
-/* abort transaction */
-udisk_abort(atrans)
-    struct ubik_trans *atrans;
+/*!
+ * \brief Abort transaction.
+ */
+int
+udisk_abort(struct ubik_trans *atrans)
 {
     struct ubik_dbase *dbase;
     afs_int32 code;
-    
+
     if (atrans->flags & TRDONE)
-       return UTWOENDS;
+       return UTWOENDS;
 
     /* Check if we are the write trans before logging abort, lest we
-     * abort a good write trans in progress. 
-     * We don't really care if the LOGABORT gets to the log because we 
-     * truncate the log next. If the truncate fails, we panic; for 
+     * abort a good write trans in progress.
+     * We don't really care if the LOGABORT gets to the log because we
+     * truncate the log next. If the truncate fails, we panic; for
      * otherwise, the log entries remain. On restart, replay of the log
      * will do nothing because the abort is there or no LogEnd opcode.
      */
     dbase = atrans->dbase;
     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
        udisk_LogOpcode(dbase, LOGABORT, 1);
-       code = (*dbase->truncate)(dbase, LOGFILE, 0);
-       if (code) panic("Truncating Ubik logfile during an abort\n");
-       DAbort(dbase);         /* remove all dirty pages */
+       code = (*dbase->truncate) (dbase, LOGFILE, 0);
+       if (code)
+           panic("Truncating Ubik logfile during an abort\n");
+       DAbort(atrans);         /* remove all dirty pages */
     }
 
     /* When the transaction is marked done, it also means the logfile
@@ -802,15 +989,19 @@ udisk_abort(atrans)
     return 0;
 }
 
-/* destroy a transaction after it has been committed or aborted.  if
- * it hasn't committed before you call this routine, we'll abort the
+/*!
+ * \brief Destroy a transaction after it has been committed or aborted.
+ *
+ * If it hasn't committed before you call this routine, we'll abort the
  * transaction for you.
  */
-udisk_end(atrans)
-    struct ubik_trans *atrans; {
+int
+udisk_end(struct ubik_trans *atrans)
+{
     struct ubik_dbase *dbase;
 
-    if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
+    if (!(atrans->flags & TRDONE))
+       udisk_abort(atrans);
     dbase = atrans->dbase;
 
     ulock_relLock(atrans);
@@ -820,15 +1011,23 @@ udisk_end(atrans)
      * we could be unsetting someone else's bit.
      */
     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
-       dbase->flags &= ~DBWRITING;
+       UBIK_VERSION_LOCK;
+       dbase->flags &= ~DBWRITING;
+       UBIK_VERSION_UNLOCK;
     } else {
-       dbase->readers--;
+       dbase->readers--;
     }
-    if (atrans->iovec_info.iovec_wrt_val) free(atrans->iovec_info.iovec_wrt_val);
-    if (atrans->iovec_data.iovec_buf_val) free(atrans->iovec_data.iovec_buf_val);
+    if (atrans->iovec_info.iovec_wrt_val)
+       free(atrans->iovec_info.iovec_wrt_val);
+    if (atrans->iovec_data.iovec_buf_val)
+       free(atrans->iovec_data.iovec_buf_val);
     free(atrans);
 
     /* Wakeup any writers waiting in BeginTrans() */
+#ifdef AFS_PTHREAD_ENV
+    opr_cv_broadcast(&dbase->flags_cond);
+#else
     LWP_NoYieldSignal(&dbase->flags);
+#endif
     return 0;
 }