/*
* Copyright 2000, International Business Machines Corporation and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
+#include <afsconfig.h>
#include <afs/param.h>
-#include <sys/types.h>
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
-#else
-#include <sys/file.h>
-#include <netinet/in.h>
-#endif
-#include <errno.h>
-#include <lock.h>
-#include <rx/xdr.h>
+#include <roken.h>
+#include <afs/opr.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
+#else
+# include <opr/lockstub.h>
+#endif
#define UBIK_INTERNALS
#include "ubik.h"
#include "ubik_int.h"
#define PHSIZE 128
-struct buffer {
- struct ubik_dbase *dbase; /* dbase within which the buffer resides */
- afs_int32 file; /* Unique cache key */
- afs_int32 page; /* page number */
+static struct buffer {
+ struct ubik_dbase *dbase; /*!< dbase within which the buffer resides */
+ afs_int32 file; /*!< Unique cache key */
+ afs_int32 page; /*!< page number */
struct buffer *lru_next;
struct buffer *lru_prev;
- struct buffer *hashNext; /* next dude in hash table */
- char *data; /* ptr to the data */
- char lockers; /* usage ref count */
- char dirty; /* is buffer modified */
- char hashIndex; /* back ptr to hash table */
+ struct buffer *hashNext; /*!< next dude in hash table */
+ char *data; /*!< ptr to the data */
+ char lockers; /*!< usage ref count */
+ char dirty; /*!< is buffer modified */
+ char hashIndex; /*!< back ptr to hash table */
} *Buffers;
#define pHash(page) ((page) & (PHSIZE-1))
afs_int32 ubik_nBuffers = NBUFFERS;
-static struct buffer *phTable[PHSIZE]; /* page hash table */
+static struct buffer *phTable[PHSIZE]; /*!< page hash table */
static struct buffer *LruBuffer;
static int nbuffers;
-static int calls=0, ios=0, lastb=0;
+static int calls = 0, ios = 0, lastb = 0;
static char *BufferData;
-static struct buffer *newslot();
-static initd = 0;
+static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
+ afs_int32 apage);
#define BADFID 0xffffffff
-static DTrunc();
+static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
-static struct ubik_trunc *freeTruncList=0;
+static struct ubik_trunc *freeTruncList = 0;
-/* remove a transaction from the database's active transaction list. Don't free it */
-static unthread(atrans)
- struct ubik_trans *atrans; {
+/*!
+ * \brief Remove a transaction from the database's active transaction list. Don't free it.
+ */
+static int
+unthread(struct ubik_trans *atrans)
+{
struct ubik_trans **lt, *tt;
lt = &atrans->dbase->activeTrans;
- for(tt = *lt; tt; lt = &tt->next, tt = *lt) {
+ for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
if (tt == atrans) {
/* found it */
*lt = tt->next;
return 0;
}
}
- return 2; /* no entry */
+ return 2; /* no entry */
}
-/* some debugging assistance */
-udisk_Debug(aparm)
- struct ubik_debug *aparm; {
+/*!
+ * \brief some debugging assistance
+ */
+void
+udisk_Debug(struct ubik_debug *aparm)
+{
struct buffer *tb;
int i;
- bcopy(&ubik_dbase->version, &aparm->localVersion, sizeof(struct ubik_version));
+ memcpy(&aparm->localVersion, &ubik_dbase->version,
+ sizeof(struct ubik_version));
aparm->lockedPages = 0;
aparm->writeLockedPages = 0;
tb = Buffers;
- for(i=0;i<nbuffers;i++, tb++) {
+ for (i = 0; i < nbuffers; i++, tb++) {
if (tb->lockers) {
aparm->lockedPages++;
- if (tb->dirty) aparm->writeLockedPages++;
+ if (tb->dirty)
+ aparm->writeLockedPages++;
}
}
}
-/* log format is defined here, and implicitly in recovery.c
+/*!
+ * \brief Write an opcode to the log.
+ *
+ * log format is defined here, and implicitly in recovery.c
*
* 4 byte opcode, followed by parameters, each 4 bytes long. All integers
* are in logged in network standard byte order, in case we want to move logs
* from machine-to-machine someday.
*
- * Begin transaction: opcode
- * Commit transaction: opcode, version (8 bytes)
- * Truncate file: opcode, file number, length
- * Abort transaction: opcode
- * Write data: opcode, file, position, length, <length> data bytes
+ * Begin transaction: opcode \n
+ * Commit transaction: opcode, version (8 bytes) \n
+ * Truncate file: opcode, file number, length \n
+ * Abort transaction: opcode \n
+ * Write data: opcode, file, position, length, <length> data bytes \n
*/
-
-/* write an opcode to the log */
-udisk_LogOpcode(adbase, aopcode, async)
- struct ubik_dbase *adbase;
- afs_int32 aopcode;
- int async; {
+static int
+udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
+{
struct ubik_stat ustat;
afs_int32 code;
-
+
/* figure out where to write */
- code = (*adbase->stat)(adbase, LOGFILE, &ustat);
- if (code < 0) return code;
+ code = (*adbase->stat) (adbase, LOGFILE, &ustat);
+ if (code < 0)
+ return code;
/* setup data and do write */
aopcode = htonl(aopcode);
- code = (*adbase->write)(adbase, LOGFILE, &aopcode, ustat.size, sizeof(afs_int32));
- if (code != sizeof(afs_int32)) return UIOERROR;
+ code =
+ (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
+ sizeof(afs_int32));
+ if (code != sizeof(afs_int32))
+ return UIOERROR;
/* optionally sync data */
- if (async) code = (*adbase->sync)(adbase, LOGFILE);
- else code = 0;
+ if (async)
+ code = (*adbase->sync) (adbase, LOGFILE);
+ else
+ code = 0;
return code;
}
-/* log a commit, never syncing */
-udisk_LogEnd(adbase, aversion)
- struct ubik_dbase *adbase;
- struct ubik_version *aversion; {
+/*!
+ * \brief Log a commit, never syncing.
+ */
+static int
+udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
+{
afs_int32 code;
afs_int32 data[3];
struct ubik_stat ustat;
-
+
/* figure out where to write */
- code = (*adbase->stat)(adbase, LOGFILE, &ustat);
- if (code) return code;
+ code = (*adbase->stat) (adbase, LOGFILE, &ustat);
+ if (code)
+ return code;
/* setup data */
data[0] = htonl(LOGEND);
data[1] = htonl(aversion->epoch);
data[2] = htonl(aversion->counter);
-
+
/* do write */
- code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
- if (code != 3*sizeof(afs_int32)) return UIOERROR;
+ code =
+ (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
+ 3 * sizeof(afs_int32));
+ if (code != 3 * sizeof(afs_int32))
+ return UIOERROR;
/* finally sync the log */
- code = (*adbase->sync)(adbase, LOGFILE);
+ code = (*adbase->sync) (adbase, LOGFILE);
return code;
}
-
-/* log a truncate operation, never syncing */
-udisk_LogTruncate(adbase, afile, alength)
- struct ubik_dbase *adbase;
- afs_int32 afile, alength; {
+
+/*!
+ * \brief Log a truncate operation, never syncing.
+ */
+static int
+udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
+ afs_int32 alength)
+{
afs_int32 code;
afs_int32 data[3];
struct ubik_stat ustat;
-
+
/* figure out where to write */
- code = (*adbase->stat)(adbase, LOGFILE, &ustat);
- if (code < 0) return code;
+ code = (*adbase->stat) (adbase, LOGFILE, &ustat);
+ if (code < 0)
+ return code;
/* setup data */
data[0] = htonl(LOGTRUNCATE);
data[1] = htonl(afile);
data[2] = htonl(alength);
-
+
/* do write */
- code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
- if (code != 3*sizeof(afs_int32)) return UIOERROR;
+ code =
+ (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
+ 3 * sizeof(afs_int32));
+ if (code != 3 * sizeof(afs_int32))
+ return UIOERROR;
return 0;
}
-
-/* write some data to the log, never syncing */
-udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
- struct ubik_dbase *adbase;
- char *abuffer;
- afs_int32 afile;
- afs_int32 apos;
- afs_int32 alen; {
+
+/*!
+ * \brief Write some data to the log, never syncing.
+ */
+static int
+udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
+ afs_int32 apos, afs_int32 alen)
+{
struct ubik_stat ustat;
afs_int32 code;
afs_int32 data[4];
afs_int32 lpos;
-
+
/* find end of log */
- code = (*adbase->stat)(adbase, LOGFILE, &ustat);
+ code = (*adbase->stat) (adbase, LOGFILE, &ustat);
lpos = ustat.size;
- if (code < 0) return code;
+ if (code < 0)
+ return code;
/* setup header */
data[0] = htonl(LOGDATA);
data[3] = htonl(alen);
/* write header */
- code = (*adbase->write)(adbase, LOGFILE, data, lpos, 4*sizeof(afs_int32));
- if (code != 4*sizeof(afs_int32)) return UIOERROR;
- lpos += 4*sizeof(afs_int32);
-
+ code =
+ (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
+ if (code != 4 * sizeof(afs_int32))
+ return UIOERROR;
+ lpos += 4 * sizeof(afs_int32);
+
/* write data */
- code = (*adbase->write)(adbase, LOGFILE, abuffer, lpos, alen);
- if (code != alen) return UIOERROR;
+ code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
+ if (code != alen)
+ return UIOERROR;
return 0;
}
-static int DInit (abuffers)
- int abuffers; {
+int
+udisk_Init(int abuffers)
+{
/* Initialize the venus buffer system. */
int i;
struct buffer *tb;
- Buffers = (struct buffer *) malloc(abuffers * sizeof(struct buffer));
- bzero(Buffers, abuffers * sizeof(struct buffer));
- BufferData = (char *) malloc(abuffers * PAGESIZE);
+ Buffers = calloc(abuffers, sizeof(struct buffer));
+ BufferData = malloc(abuffers * UBIK_PAGESIZE);
nbuffers = abuffers;
- for(i=0;i<PHSIZE;i++) phTable[i] = 0;
- for (i=0;i<abuffers;i++) {
- /* Fill in each buffer with an empty indication. */
+ for (i = 0; i < PHSIZE; i++)
+ phTable[i] = 0;
+ for (i = 0; i < abuffers; i++) {
+ /* Fill in each buffer with an empty indication. */
tb = &Buffers[i];
- tb->lru_next = &(Buffers[i+1]);
- tb->lru_prev = &(Buffers[i-1]);
- tb->data = &BufferData[PAGESIZE*i];
+ tb->lru_next = &(Buffers[i + 1]);
+ tb->lru_prev = &(Buffers[i - 1]);
+ tb->data = &BufferData[UBIK_PAGESIZE * i];
tb->file = BADFID;
}
- Buffers[0].lru_prev = &(Buffers[abuffers-1]);
- Buffers[abuffers-1].lru_next = &(Buffers[0]);
- LruBuffer = &(Buffers[0]);
+ Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
+ Buffers[abuffers - 1].lru_next = &(Buffers[0]);
+ LruBuffer = &(Buffers[0]);
return 0;
}
-/* Take a buffer and mark it as the least recently used buffer */
-static int Dlru(abuf)
- struct buffer *abuf;
+/*!
+ * \brief Take a buffer and mark it as the least recently used buffer.
+ */
+static void
+Dlru(struct buffer *abuf)
{
- if (LruBuffer == abuf)
- return 0;
+ if (LruBuffer == abuf)
+ return;
- /* Unthread from where it is in the list */
- abuf->lru_next->lru_prev = abuf->lru_prev;
- abuf->lru_prev->lru_next = abuf->lru_next;
+ /* Unthread from where it is in the list */
+ abuf->lru_next->lru_prev = abuf->lru_prev;
+ abuf->lru_prev->lru_next = abuf->lru_next;
- /* Thread onto beginning of LRU list */
- abuf->lru_next = LruBuffer;
- abuf->lru_prev = LruBuffer->lru_prev;
+ /* Thread onto beginning of LRU list */
+ abuf->lru_next = LruBuffer;
+ abuf->lru_prev = LruBuffer->lru_prev;
- LruBuffer->lru_prev->lru_next = abuf;
- LruBuffer->lru_prev = abuf;
- LruBuffer = abuf;
+ LruBuffer->lru_prev->lru_next = abuf;
+ LruBuffer->lru_prev = abuf;
+ LruBuffer = abuf;
}
-/* Take a buffer and mark it as the most recently used buffer */
-static int Dmru(abuf)
- struct buffer *abuf;
+/*!
+ * \brief Take a buffer and mark it as the most recently used buffer.
+ */
+static void
+Dmru(struct buffer *abuf)
{
- if (LruBuffer == abuf) {
- LruBuffer = LruBuffer->lru_next;
- return 0;
- }
+ if (LruBuffer == abuf) {
+ LruBuffer = LruBuffer->lru_next;
+ return;
+ }
- /* Unthread from where it is in the list */
- abuf->lru_next->lru_prev = abuf->lru_prev;
- abuf->lru_prev->lru_next = abuf->lru_next;
+ /* Unthread from where it is in the list */
+ abuf->lru_next->lru_prev = abuf->lru_prev;
+ abuf->lru_prev->lru_next = abuf->lru_next;
- /* Thread onto end of LRU list - making it the MRU buffer */
- abuf->lru_next = LruBuffer;
- abuf->lru_prev = LruBuffer->lru_prev;
- LruBuffer->lru_prev->lru_next = abuf;
- LruBuffer->lru_prev = abuf;
+ /* Thread onto end of LRU list - making it the MRU buffer */
+ abuf->lru_next = LruBuffer;
+ abuf->lru_prev = LruBuffer->lru_prev;
+ LruBuffer->lru_prev->lru_next = abuf;
+ LruBuffer->lru_prev = abuf;
+}
+static_inline int
+MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
+ struct ubik_trans *atrans)
+{
+ if (buf->page != page) {
+ return 0;
+ }
+ if (buf->file != fid) {
+ return 0;
+ }
+ if (atrans->type == UBIK_READTRANS && buf->dirty) {
+ /* if 'buf' is dirty, it has uncommitted changes; we do not want to
+ * see uncommitted changes if we are a read transaction, so skip over
+ * it. */
+ return 0;
+ }
+ if (buf->dbase != atrans->dbase) {
+ return 0;
+ }
+ return 1;
}
-/* get a pointer to a particular buffer */
-static char *DRead(dbase, fid, page)
- struct ubik_dbase *dbase;
- afs_int32 fid;
- int page; {
+/*!
+ * \brief Get a pointer to a particular buffer.
+ */
+static char *
+DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
+{
/* Read a page from the disk. */
- struct buffer *tb, *lastbuffer;
- afs_int32 trys, code;
+ struct buffer *tb, *lastbuffer, *found_tb = NULL;
+ afs_int32 code;
+ struct ubik_dbase *dbase = atrans->dbase;
calls++;
lastbuffer = LruBuffer->lru_prev;
- if ((lastbuffer->page == page ) &&
- (lastbuffer->file == fid ) &&
- (lastbuffer->dbase == dbase)) {
+ /* Skip for write transactions for a clean page - this may not be the right page to use */
+ if (MatchBuffer(lastbuffer, page, fid, atrans)
+ && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
tb = lastbuffer;
tb->lockers++;
lastb++;
return tb->data;
}
- for(tb=phTable[pHash(page)]; tb; tb=tb->hashNext) {
- if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
- Dmru(tb);
- tb->lockers++;
- return tb->data;
+ for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
+ if (MatchBuffer(tb, page, fid, atrans)) {
+ if (tb->dirty || atrans->type == UBIK_READTRANS) {
+ found_tb = tb;
+ break;
+ }
+ /* Remember this clean page - we might use it */
+ found_tb = tb;
}
}
+ /* For a write transaction, use a matching clean page if no dirty one was found */
+ if (found_tb) {
+ Dmru(found_tb);
+ found_tb->lockers++;
+ return found_tb->data;
+ }
+
/* can't find it */
tb = newslot(dbase, fid, page);
- if (!tb) return 0;
- bzero(tb->data, PAGESIZE);
+ if (!tb)
+ return 0;
+ memset(tb->data, 0, UBIK_PAGESIZE);
tb->lockers++;
- code = (*dbase->read)(dbase, fid, tb->data, page*PAGESIZE, PAGESIZE);
+ code =
+ (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
+ UBIK_PAGESIZE);
if (code < 0) {
- tb->file = BADFID;
- Dlru(tb);
- tb->lockers--;
- ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
- return 0;
+ tb->file = BADFID;
+ Dlru(tb);
+ tb->lockers--;
+ ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
+ return 0;
}
ios++;
return tb->data;
}
-/* zap truncated pages */
-static DTrunc(dbase, fid, length)
- struct ubik_dbase *dbase;
- afs_int32 fid;
- afs_int32 length; {
+/*!
+ * \brief Zap truncated pages.
+ */
+static int
+DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
+{
afs_int32 maxPage;
struct buffer *tb;
- int i;
-
- maxPage = (length+PAGESIZE-1)>>LOGPAGESIZE; /* first invalid page now in file */
- for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
+ int i;
+ struct ubik_dbase *dbase = atrans->dbase;
+
+ maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
+ for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
tb->file = BADFID;
Dlru(tb);
return 0;
}
-/* allocate a truncation entry. We allocate special entries representing truncations, rather than
- performing them immediately, so that we can abort a transaction easily by simply purging
- the in-core memory buffers and discarding these truncation entries.
-*/
-static struct ubik_trunc *GetTrunc() {
+/*!
+ * \brief Allocate a truncation entry.
+ *
+ * We allocate special entries representing truncations, rather than
+ * performing them immediately, so that we can abort a transaction easily by simply purging
+ * the in-core memory buffers and discarding these truncation entries.
+ */
+static struct ubik_trunc *
+GetTrunc(void)
+{
struct ubik_trunc *tt;
if (!freeTruncList) {
- freeTruncList = (struct ubik_trunc *) malloc(sizeof(struct ubik_trunc));
- freeTruncList->next = (struct ubik_trunc *) 0;
+ freeTruncList = malloc(sizeof(struct ubik_trunc));
+ freeTruncList->next = (struct ubik_trunc *)0;
}
tt = freeTruncList;
freeTruncList = tt->next;
return tt;
}
-/* free a truncation entry */
-static PutTrunc(at)
- struct ubik_trunc *at; {
+/*!
+ * \brief Free a truncation entry.
+ */
+static int
+PutTrunc(struct ubik_trunc *at)
+{
at->next = freeTruncList;
freeTruncList = at;
return 0;
}
-/* find a truncation entry for a file, if any */
-static struct ubik_trunc *FindTrunc(atrans, afile)
- struct ubik_trans *atrans;
- afs_int32 afile; {
+/*!
+ * \brief Find a truncation entry for a file, if any.
+ */
+static struct ubik_trunc *
+FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
+{
struct ubik_trunc *tt;
- for(tt=atrans->activeTruncs; tt; tt=tt->next) {
- if (tt->file == afile) return tt;
+ for (tt = atrans->activeTruncs; tt; tt = tt->next) {
+ if (tt->file == afile)
+ return tt;
}
- return (struct ubik_trunc *) 0;
+ return (struct ubik_trunc *)0;
}
-/* do truncates associated with trans, and free them */
-static DoTruncs(atrans)
- struct ubik_trans *atrans; {
+/*!
+ * \brief Do truncates associated with \p atrans, and free them.
+ */
+static int
+DoTruncs(struct ubik_trans *atrans)
+{
struct ubik_trunc *tt, *nt;
- int (*tproc)();
- afs_int32 rcode=0, code;
+ int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
+ afs_int32 rcode = 0, code;
tproc = atrans->dbase->truncate;
- for(tt = atrans->activeTruncs; tt; tt=nt) {
+ for (tt = atrans->activeTruncs; tt; tt = nt) {
nt = tt->next;
- DTrunc(atrans->dbase, tt->file, tt->length); /* zap pages from buffer cache */
- code = (*tproc)(atrans->dbase, tt->file, tt->length);
- if (code) rcode = code;
+ DTrunc(atrans, tt->file, tt->length); /* zap pages from buffer cache */
+ code = (*tproc) (atrans->dbase, tt->file, tt->length);
+ if (code)
+ rcode = code;
PutTrunc(tt);
}
/* don't unthread, because we do the entire list's worth here */
- atrans->activeTruncs = (struct ubik_trunc *) 0;
- return(rcode);
+ atrans->activeTruncs = (struct ubik_trunc *)0;
+ return (rcode);
}
-/* mark a fid as invalid */
-udisk_Invalidate(adbase, afid)
-struct ubik_dbase *adbase;
-afs_int32 afid; {
+/*!
+ * \brief Mark an \p fid as invalid.
+ */
+int
+udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
+{
struct buffer *tb;
- int i;
+ int i;
- for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
+ for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
if (tb->file == afid) {
tb->file = BADFID;
Dlru(tb);
return 0;
}
-/* move this page into the correct hash bucket */
-static FixupBucket(ap)
- struct buffer *ap; {
+/*!
+ * \brief Move this page into the correct hash bucket.
+ */
+static int
+FixupBucket(struct buffer *ap)
+{
struct buffer **lp, *tp;
int i;
/* first try to get it out of its current hash bucket, in which it might not be */
i = ap->hashIndex;
lp = &phTable[i];
- for(tp = *lp; tp; tp=tp->hashNext) {
+ for (tp = *lp; tp; tp = tp->hashNext) {
if (tp == ap) {
*lp = tp->hashNext;
break;
ap->hashIndex = i; /* remember where we are for deletion */
ap->hashNext = phTable[i]; /* add us to the list */
phTable[i] = ap;
+ return 0;
}
-/* create a new slot for a particular dbase page */
-static struct buffer *newslot (adbase, afid, apage)
- struct ubik_dbase *adbase;
- afs_int32 afid, apage; {
+/*!
+ * \brief Create a new slot for a particular dbase page.
+ */
+static struct buffer *
+newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
+{
/* Find a usable buffer slot */
afs_int32 i;
struct buffer *pp, *tp;
- pp = 0; /* last pure */
- for (i=0,tp=LruBuffer; i<nbuffers; i++,tp=tp->lru_next) {
- if (!tp->lockers && !tp->dirty) {
- pp = tp;
- break;
- }
+ pp = 0; /* last pure */
+ for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
+ if (!tp->lockers && !tp->dirty) {
+ pp = tp;
+ break;
+ }
}
if (pp == 0) {
- /* There are no unlocked buffers that don't need to be written to the disk. */
- ubik_print("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
- return (struct buffer *) 0;
+ /* There are no unlocked buffers that don't need to be written to the disk. */
+ ubik_print
+ ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
+ return NULL;
}
/* Now fill in the header. */
return pp;
}
-/* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
-static DRelease (ap,flag)
- char *ap;
- int flag; {
+/*!
+ * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
+ */
+static void
+DRelease(char *ap, int flag)
+{
int index;
struct buffer *bp;
- if (!ap) return;
- index = (ap - (char *)BufferData) >> LOGPAGESIZE;
+ if (!ap)
+ return;
+ index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
bp = &(Buffers[index]);
bp->lockers--;
- if (flag) bp->dirty=1;
- return 0;
+ if (flag)
+ bp->dirty = 1;
+ return;
}
-/* flush all modified buffers, leaves dirty bits set (they're cleared
- * by DSync). Note interaction with DSync: you call this thing first,
- * writing the buffers to the disk. Then you call DSync to sync all the
+/*!
+ * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
+ * by DSync()).
+ *
+ * \note Note interaction with DSync(): you call this thing first,
+ * writing the buffers to the disk. Then you call DSync() to sync all the
* files that were written, and to clear the dirty bits. You should
* always call DFlush/DSync as a pair.
*/
-static DFlush (adbase)
- struct ubik_dbase *adbase; {
+static int
+DFlush(struct ubik_trans *atrans)
+{
int i;
afs_int32 code;
struct buffer *tb;
+ struct ubik_dbase *adbase = atrans->dbase;
tb = Buffers;
- for(i=0;i<nbuffers;i++,tb++) {
- if (tb->dirty) {
- code = tb->page * PAGESIZE; /* offset within file */
- code = (*adbase->write)(adbase, tb->file, tb->data, code, PAGESIZE);
- if (code != PAGESIZE) return UIOERROR;
+ for (i = 0; i < nbuffers; i++, tb++) {
+ if (tb->dirty) {
+ code = tb->page * UBIK_PAGESIZE; /* offset within file */
+ code =
+ (*adbase->write) (adbase, tb->file, tb->data, code,
+ UBIK_PAGESIZE);
+ if (code != UBIK_PAGESIZE)
+ return UIOERROR;
}
}
return 0;
}
-/* flush all modified buffers */
-static DAbort (adbase)
- struct ubik_dbase *adbase; {
+/*!
+ * \brief Flush all modified buffers.
+ */
+static int
+DAbort(struct ubik_trans *atrans)
+{
int i;
struct buffer *tb;
tb = Buffers;
- for(i=0;i<nbuffers;i++,tb++) {
- if (tb->dirty) {
+ for (i = 0; i < nbuffers; i++, tb++) {
+ if (tb->dirty) {
tb->dirty = 0;
tb->file = BADFID;
Dlru(tb);
return 0;
}
-/* must only be called after DFlush, due to its interpretation of dirty flag */
-static DSync(adbase)
- struct ubik_dbase *adbase; {
+/**
+ * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
+ * can appear if a read transaction reads a page that is dirty, then that
+ * dirty page is synced. The read transaction will skip over the dirty page,
+ * and create a new buffer, and when the dirty page is synced, it will be
+ * identical (except for contents) to the read-transaction buffer.
+ */
+static void
+DedupBuffer(struct buffer *abuf)
+{
+ struct buffer *tb;
+ for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
+ if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
+ && tb->dbase == abuf->dbase) {
+
+ tb->file = BADFID;
+ Dlru(tb);
+ }
+ }
+}
+
+/*!
+ * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
+ */
+static int
+DSync(struct ubik_trans *atrans)
+{
int i;
afs_int32 code;
struct buffer *tb;
afs_int32 file;
afs_int32 rCode;
+ struct ubik_dbase *adbase = atrans->dbase;
rCode = 0;
while (1) {
file = BADFID;
- for(i=0,tb = Buffers; i<nbuffers; i++,tb++) {
+ for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
if (tb->dirty == 1) {
- if (file == BADFID) file = tb->file;
- if (file != BADFID && tb->file == file) tb->dirty = 0;
+ if (file == BADFID)
+ file = tb->file;
+ if (file != BADFID && tb->file == file) {
+ tb->dirty = 0;
+ DedupBuffer(tb);
+ }
}
}
- if (file == BADFID) break;
+ if (file == BADFID)
+ break;
/* otherwise we have a file to sync */
- code = (*adbase->sync)(adbase, file);
- if (code) rCode = code;
+ code = (*adbase->sync) (adbase, file);
+ if (code)
+ rCode = code;
}
return rCode;
}
-/* Same as read, only do not even try to read the page */
-static char *DNew (dbase, fid, page)
- struct ubik_dbase *dbase;
- int page;
- afs_int32 fid; {
+/*!
+ * \brief Same as DRead(), only do not even try to read the page.
+ */
+static char *
+DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
+{
struct buffer *tb;
+ struct ubik_dbase *dbase = atrans->dbase;
- if ((tb = newslot(dbase, fid, page)) == 0) return (char *) 0;
+ if ((tb = newslot(dbase, fid, page)) == 0)
+ return NULL;
tb->lockers++;
- bzero(tb->data, PAGESIZE);
+ memset(tb->data, 0, UBIK_PAGESIZE);
return tb->data;
}
-/* read data from database */
-udisk_read(atrans, afile, abuffer, apos, alen)
- afs_int32 afile;
- char *abuffer;
- afs_int32 apos, alen;
- struct ubik_trans *atrans; {
+/*!
+ * \brief Read data from database.
+ */
+int
+udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
+ afs_int32 apos, afs_int32 alen)
+{
char *bp;
afs_int32 offset, len, totalLen;
- struct ubik_dbase *dbase;
- if (atrans->flags & TRDONE) return UDONE;
+ if (atrans->flags & TRDONE)
+ return UDONE;
totalLen = 0;
- dbase = atrans->dbase;
while (alen > 0) {
- bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
- if (!bp) return UEOF;
+ bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
+ if (!bp)
+ return UEOF;
/* otherwise, min of remaining bytes and end of buffer to user mode */
- offset = apos & (PAGESIZE-1);
- len = PAGESIZE - offset;
- if (len > alen) len = alen;
- bcopy(bp+offset, abuffer, len);
- abuffer += len;
+ offset = apos & (UBIK_PAGESIZE - 1);
+ len = UBIK_PAGESIZE - offset;
+ if (len > alen)
+ len = alen;
+ memcpy(abuffer, bp + offset, len);
+ abuffer = (char *)abuffer + len;
apos += len;
alen -= len;
totalLen += len;
return 0;
}
-/* truncate file */
-udisk_truncate(atrans, afile, alength)
- struct ubik_trans *atrans;
- afs_int32 afile;
- afs_int32 alength; {
+/*!
+ * \brief Truncate file.
+ */
+int
+udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
+{
afs_int32 code;
struct ubik_trunc *tt;
- if (atrans->flags & TRDONE) return UDONE;
- if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
+ if (atrans->flags & TRDONE)
+ return UDONE;
+ if (atrans->type != UBIK_WRITETRANS)
+ return UBADTYPE;
/* write a truncate log record */
code = udisk_LogTruncate(atrans->dbase, afile, alength);
tt = FindTrunc(atrans, afile);
if (!tt) {
/* this file not truncated yet */
- tt=GetTrunc();
+ tt = GetTrunc();
tt->next = atrans->activeTruncs;
atrans->activeTruncs = tt;
tt->file = afile;
tt->length = alength;
- }
- else {
+ } else {
/* already truncated to a certain length */
- if (tt->length > alength) tt->length = alength;
+ if (tt->length > alength)
+ tt->length = alength;
}
return code;
}
-/* write data to database, using logs */
-udisk_write(atrans, afile, abuffer, apos, alen)
- afs_int32 afile;
- char *abuffer;
- afs_int32 apos, alen;
- struct ubik_trans *atrans; {
+/*!
+ * \brief Write data to database, using logs.
+ */
+int
+udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
+ afs_int32 apos, afs_int32 alen)
+{
char *bp;
afs_int32 offset, len, totalLen;
- struct ubik_dbase *dbase;
struct ubik_trunc *tt;
afs_int32 code;
- if (atrans->flags & TRDONE) return UDONE;
- if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
+ if (atrans->flags & TRDONE)
+ return UDONE;
+ if (atrans->type != UBIK_WRITETRANS)
+ return UBADTYPE;
- dbase = atrans->dbase;
/* first write the data to the log */
- code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
- if (code) return code;
+ code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
+ if (code)
+ return code;
/* expand any truncations of this file */
tt = FindTrunc(atrans, afile);
/* now update vm */
totalLen = 0;
while (alen > 0) {
- bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
+ bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
if (!bp) {
- bp = DNew(dbase, afile, apos>>LOGPAGESIZE);
- if (!bp) return UIOERROR;
- bzero(bp, PAGESIZE);
+ bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
+ if (!bp)
+ return UIOERROR;
+ memset(bp, 0, UBIK_PAGESIZE);
}
/* otherwise, min of remaining bytes and end of buffer to user mode */
- offset = apos & (PAGESIZE-1);
- len = PAGESIZE-offset;
- if (len > alen) len = alen;
- bcopy(abuffer, bp+offset, len);
- abuffer += len;
+ offset = apos & (UBIK_PAGESIZE - 1);
+ len = UBIK_PAGESIZE - offset;
+ if (len > alen)
+ len = alen;
+ memcpy(bp + offset, abuffer, len);
+ abuffer = (char *)abuffer + len;
apos += len;
alen -= len;
totalLen += len;
- DRelease(bp, 1); /* buffer modified */
+ DRelease(bp, 1); /* buffer modified */
}
return 0;
}
-/* begin a new local transaction */
-udisk_begin(adbase, atype, atrans)
- struct ubik_trans **atrans;
- int atype;
- struct ubik_dbase *adbase; {
+/*!
+ * \brief Begin a new local transaction.
+ */
+int
+udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
+{
afs_int32 code;
struct ubik_trans *tt;
- *atrans = (struct ubik_trans *)NULL;
- /* Make sure system is initialized before doing anything */
- if (!initd) {
- initd = 1;
- DInit(ubik_nBuffers);
- }
+ *atrans = NULL;
if (atype == UBIK_WRITETRANS) {
- if (adbase->flags & DBWRITING) return USYNC;
+ if (adbase->flags & DBWRITING)
+ return USYNC;
code = udisk_LogOpcode(adbase, LOGNEW, 0);
- if (code) return code;
+ if (code)
+ return code;
}
- tt = (struct ubik_trans *) malloc(sizeof(struct ubik_trans));
- bzero(tt, sizeof(struct ubik_trans));
+ tt = calloc(1, sizeof(struct ubik_trans));
tt->dbase = adbase;
tt->next = adbase->activeTrans;
adbase->activeTrans = tt;
tt->type = atype;
- if (atype == UBIK_READTRANS) adbase->readers++;
- else if (atype == UBIK_WRITETRANS) adbase->flags |= DBWRITING;
+ if (atype == UBIK_READTRANS)
+ adbase->readers++;
+ else if (atype == UBIK_WRITETRANS) {
+ UBIK_VERSION_LOCK;
+ adbase->flags |= DBWRITING;
+ UBIK_VERSION_UNLOCK;
+ }
*atrans = tt;
return 0;
}
-/* commit transaction */
-udisk_commit(atrans)
- struct ubik_trans *atrans; {
+/*!
+ * \brief Commit transaction.
+ */
+int
+udisk_commit(struct ubik_trans *atrans)
+{
struct ubik_dbase *dbase;
- afs_int32 code=0;
+ afs_int32 code = 0;
struct ubik_version oldversion, newversion;
+ afs_int32 now = FT_ApproxTime();
if (atrans->flags & TRDONE)
- return(UTWOENDS);
+ return (UTWOENDS);
if (atrans->type == UBIK_WRITETRANS) {
- dbase = atrans->dbase;
+ dbase = atrans->dbase;
/* On the first write to the database. We update the versions */
- if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
- oldversion = dbase->version;
- newversion.epoch = FT_ApproxTime();;
- newversion.counter = 1;
-
- code = (*dbase->setlabel)(dbase, 0, &newversion);
- if (code) return(code);
- ubik_epochTime = newversion.epoch;
- dbase->version = newversion;
-
- /* Ignore the error here. If the call fails, the site is
- * marked down and when we detect it is up again, we will
- * send the entire database to it.
- */
- ContactQuorum(DISK_SetVersion, atrans, 1/*CStampVersion*/,
- &oldversion, &newversion);
- urecovery_state |= UBIK_RECLABELDB;
+ if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
+ UBIK_VERSION_LOCK;
+ if (version_globals.ubik_epochTime < UBIK_MILESTONE
+ || version_globals.ubik_epochTime > now) {
+ ubik_print
+ ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
+ version_globals.ubik_epochTime, UBIK_MILESTONE, now);
+ panic("Writing Ubik DB label\n");
+ }
+ oldversion = dbase->version;
+ newversion.epoch = version_globals.ubik_epochTime;
+ newversion.counter = 1;
+
+ code = (*dbase->setlabel) (dbase, 0, &newversion);
+ if (code) {
+ UBIK_VERSION_UNLOCK;
+ return code;
+ }
+
+ dbase->version = newversion;
+ UBIK_VERSION_UNLOCK;
+
+ urecovery_state |= UBIK_RECLABELDB;
+
+ /* Ignore the error here. If the call fails, the site is
+ * marked down and when we detect it is up again, we will
+ * send the entire database to it.
+ */
+ ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
+ &oldversion, &newversion);
}
+ UBIK_VERSION_LOCK;
dbase->version.counter++; /* bump commit count */
+#ifdef AFS_PTHREAD_ENV
+ opr_cv_broadcast(&dbase->version_cond);
+#else
LWP_NoYieldSignal(&dbase->version);
-
+#endif
code = udisk_LogEnd(dbase, &dbase->version);
if (code) {
- dbase->version.counter--;
- return(code);
+ dbase->version.counter--;
+ UBIK_VERSION_UNLOCK;
+ return code;
}
+ UBIK_VERSION_UNLOCK;
/* If we fail anytime after this, then panic and let the
- * recovery replay the log.
+ * recovery replay the log.
*/
- code = DFlush(dbase); /* write dirty pages to respective files */
- if (code) panic("Writing Ubik DB modifications\n");
- code = DSync(dbase); /* sync the files and mark pages not dirty */
- if (code) panic("Synchronizing Ubik DB modifications\n");
+ code = DFlush(atrans); /* write dirty pages to respective files */
+ if (code)
+ panic("Writing Ubik DB modifications\n");
+ code = DSync(atrans); /* sync the files and mark pages not dirty */
+ if (code)
+ panic("Synchronizing Ubik DB modifications\n");
- code = DoTruncs(atrans); /* Perform requested truncations */
- if (code) panic("Truncating Ubik DB\n");
+ code = DoTruncs(atrans); /* Perform requested truncations */
+ if (code)
+ panic("Truncating Ubik DB\n");
/* label the committed dbase */
- code = (*dbase->setlabel)(dbase, 0, &dbase->version);
- if (code) panic("Truncating Ubik DB\n");
+ code = (*dbase->setlabel) (dbase, 0, &dbase->version);
+ if (code)
+ panic("Truncating Ubik DB\n");
- code = (*dbase->truncate)(dbase, LOGFILE, 0); /* discard log (optional) */
- if (code) panic("Truncating Ubik logfile\n");
+ code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
+ if (code)
+ panic("Truncating Ubik logfile\n");
}
return code;
}
-/* abort transaction */
-udisk_abort(atrans)
- struct ubik_trans *atrans;
+/*!
+ * \brief Abort transaction.
+ */
+int
+udisk_abort(struct ubik_trans *atrans)
{
struct ubik_dbase *dbase;
afs_int32 code;
-
+
if (atrans->flags & TRDONE)
- return UTWOENDS;
+ return UTWOENDS;
/* Check if we are the write trans before logging abort, lest we
- * abort a good write trans in progress.
- * We don't really care if the LOGABORT gets to the log because we
- * truncate the log next. If the truncate fails, we panic; for
+ * abort a good write trans in progress.
+ * We don't really care if the LOGABORT gets to the log because we
+ * truncate the log next. If the truncate fails, we panic; for
* otherwise, the log entries remain. On restart, replay of the log
* will do nothing because the abort is there or no LogEnd opcode.
*/
dbase = atrans->dbase;
if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
udisk_LogOpcode(dbase, LOGABORT, 1);
- code = (*dbase->truncate)(dbase, LOGFILE, 0);
- if (code) panic("Truncating Ubik logfile during an abort\n");
- DAbort(dbase); /* remove all dirty pages */
+ code = (*dbase->truncate) (dbase, LOGFILE, 0);
+ if (code)
+ panic("Truncating Ubik logfile during an abort\n");
+ DAbort(atrans); /* remove all dirty pages */
}
/* When the transaction is marked done, it also means the logfile
return 0;
}
-/* destroy a transaction after it has been committed or aborted. if
- * it hasn't committed before you call this routine, we'll abort the
+/*!
+ * \brief Destroy a transaction after it has been committed or aborted.
+ *
+ * If it hasn't committed before you call this routine, we'll abort the
* transaction for you.
*/
-udisk_end(atrans)
- struct ubik_trans *atrans; {
+int
+udisk_end(struct ubik_trans *atrans)
+{
struct ubik_dbase *dbase;
- if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
+ if (!(atrans->flags & TRDONE))
+ udisk_abort(atrans);
dbase = atrans->dbase;
ulock_relLock(atrans);
* we could be unsetting someone else's bit.
*/
if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
- dbase->flags &= ~DBWRITING;
+ UBIK_VERSION_LOCK;
+ dbase->flags &= ~DBWRITING;
+ UBIK_VERSION_UNLOCK;
} else {
- dbase->readers--;
+ dbase->readers--;
}
- if (atrans->iovec_info.iovec_wrt_val) free(atrans->iovec_info.iovec_wrt_val);
- if (atrans->iovec_data.iovec_buf_val) free(atrans->iovec_data.iovec_buf_val);
+ if (atrans->iovec_info.iovec_wrt_val)
+ free(atrans->iovec_info.iovec_wrt_val);
+ if (atrans->iovec_data.iovec_buf_val)
+ free(atrans->iovec_data.iovec_buf_val);
free(atrans);
/* Wakeup any writers waiting in BeginTrans() */
+#ifdef AFS_PTHREAD_ENV
+ opr_cv_broadcast(&dbase->flags_cond);
+#else
LWP_NoYieldSignal(&dbase->flags);
+#endif
return 0;
}