#include <afsconfig.h>
#include <afs/param.h>
+#include <roken.h>
+#include <afs/opr.h>
-#include <sys/types.h>
-#include <string.h>
-#include <stdarg.h>
-#include <errno.h>
-
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
#else
-#include <sys/file.h>
-#include <netinet/in.h>
+# include <opr/lockstub.h>
#endif
-
-#include <lock.h>
-#include <rx/xdr.h>
-
-
+#include <afs/afsutil.h>
#define UBIK_INTERNALS
#include "ubik.h"
static char *BufferData;
static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
afs_int32 apage);
-static int initd = 0;
#define BADFID 0xffffffff
static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
* Abort transaction: opcode \n
* Write data: opcode, file, position, length, <length> data bytes \n
*/
-int
+static int
udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
{
- struct ubik_stat ustat;
afs_int32 code;
- /* figure out where to write */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- if (code < 0)
- return code;
-
/* setup data and do write */
aopcode = htonl(aopcode);
- code =
- (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
- sizeof(afs_int32));
+ code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32));
if (code != sizeof(afs_int32))
return UIOERROR;
/*!
* \brief Log a commit, never syncing.
*/
-int
+static int
udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
{
afs_int32 code;
afs_int32 data[3];
- struct ubik_stat ustat;
-
- /* figure out where to write */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- if (code)
- return code;
/* setup data */
data[0] = htonl(LOGEND);
/* do write */
code =
- (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
- 3 * sizeof(afs_int32));
+ (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
if (code != 3 * sizeof(afs_int32))
return UIOERROR;
/*!
* \brief Log a truncate operation, never syncing.
*/
-int
+static int
udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
afs_int32 alength)
{
afs_int32 code;
afs_int32 data[3];
- struct ubik_stat ustat;
-
- /* figure out where to write */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- if (code < 0)
- return code;
/* setup data */
data[0] = htonl(LOGTRUNCATE);
/* do write */
code =
- (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
- 3 * sizeof(afs_int32));
+ (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
if (code != 3 * sizeof(afs_int32))
return UIOERROR;
return 0;
/*!
* \brief Write some data to the log, never syncing.
*/
-int
+static int
udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
afs_int32 apos, afs_int32 alen)
{
- struct ubik_stat ustat;
afs_int32 code;
afs_int32 data[4];
- afs_int32 lpos;
-
- /* find end of log */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- lpos = ustat.size;
- if (code < 0)
- return code;
/* setup header */
data[0] = htonl(LOGDATA);
/* write header */
code =
- (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
+ (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32));
if (code != 4 * sizeof(afs_int32))
return UIOERROR;
- lpos += 4 * sizeof(afs_int32);
/* write data */
- code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
+ code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen);
if (code != alen)
return UIOERROR;
return 0;
}
-static int
-DInit(int abuffers)
+int
+udisk_Init(int abuffers)
{
/* Initialize the venus buffer system. */
int i;
struct buffer *tb;
- Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
- memset(Buffers, 0, abuffers * sizeof(struct buffer));
- BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
+ Buffers = calloc(abuffers, sizeof(struct buffer));
+ BufferData = malloc(abuffers * UBIK_PAGESIZE);
nbuffers = abuffers;
for (i = 0; i < PHSIZE; i++)
phTable[i] = 0;
if (buf->file != fid) {
return 0;
}
+ if (atrans->type == UBIK_READTRANS && buf->dirty) {
+ /* if 'buf' is dirty, it has uncommitted changes; we do not want to
+ * see uncommitted changes if we are a read transaction, so skip over
+ * it. */
+ return 0;
+ }
if (buf->dbase != atrans->dbase) {
return 0;
}
DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
{
/* Read a page from the disk. */
- struct buffer *tb, *lastbuffer;
+ struct buffer *tb, *lastbuffer, *found_tb = NULL;
afs_int32 code;
struct ubik_dbase *dbase = atrans->dbase;
calls++;
lastbuffer = LruBuffer->lru_prev;
- if (MatchBuffer(lastbuffer, page, fid, atrans)) {
+ /* Skip for write transactions for a clean page - this may not be the right page to use */
+ if (MatchBuffer(lastbuffer, page, fid, atrans)
+ && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
tb = lastbuffer;
tb->lockers++;
lastb++;
}
for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
if (MatchBuffer(tb, page, fid, atrans)) {
- Dmru(tb);
- tb->lockers++;
- return tb->data;
+ if (tb->dirty || atrans->type == UBIK_READTRANS) {
+ found_tb = tb;
+ break;
+ }
+ /* Remember this clean page - we might use it */
+ found_tb = tb;
}
}
+ /* For a write transaction, use a matching clean page if no dirty one was found */
+ if (found_tb) {
+ Dmru(found_tb);
+ found_tb->lockers++;
+ return found_tb->data;
+ }
+
/* can't find it */
tb = newslot(dbase, fid, page);
if (!tb)
tb->file = BADFID;
Dlru(tb);
tb->lockers--;
- ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
+ ViceLog(0, ("Ubik: Error reading database file: errno=%d\n", errno));
return 0;
}
ios++;
{
struct ubik_trunc *tt;
if (!freeTruncList) {
- freeTruncList =
- (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
+ freeTruncList = malloc(sizeof(struct ubik_trunc));
freeTruncList->next = (struct ubik_trunc *)0;
}
tt = freeTruncList;
if (pp == 0) {
/* There are no unlocked buffers that don't need to be written to the disk. */
- ubik_print
- ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
+ ViceLog(0, ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n"));
return NULL;
}
return 0;
}
+/**
+ * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
+ * can appear if a read transaction reads a page that is dirty, then that
+ * dirty page is synced. The read transaction will skip over the dirty page,
+ * and create a new buffer, and when the dirty page is synced, it will be
+ * identical (except for contents) to the read-transaction buffer.
+ */
+static void
+DedupBuffer(struct buffer *abuf)
+{
+ struct buffer *tb;
+ for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
+ if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
+ && tb->dbase == abuf->dbase) {
+
+ tb->file = BADFID;
+ Dlru(tb);
+ }
+ }
+}
+
/*!
* \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
*/
if (tb->dirty == 1) {
if (file == BADFID)
file = tb->file;
- if (file != BADFID && tb->file == file)
+ if (file != BADFID && tb->file == file) {
tb->dirty = 0;
+ DedupBuffer(tb);
+ }
}
}
if (file == BADFID)
bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
if (!bp)
return UIOERROR;
- memset(bp, 0, UBIK_PAGESIZE);
}
/* otherwise, min of remaining bytes and end of buffer to user mode */
offset = apos & (UBIK_PAGESIZE - 1);
afs_int32 code;
struct ubik_trans *tt;
- *atrans = (struct ubik_trans *)NULL;
- /* Make sure system is initialized before doing anything */
- if (!initd) {
- initd = 1;
- DInit(ubik_nBuffers);
- }
+ *atrans = NULL;
if (atype == UBIK_WRITETRANS) {
- if (adbase->flags & DBWRITING)
+ if (adbase->dbFlags & DBWRITING)
return USYNC;
code = udisk_LogOpcode(adbase, LOGNEW, 0);
if (code)
return code;
}
- tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
- memset(tt, 0, sizeof(struct ubik_trans));
+ tt = calloc(1, sizeof(struct ubik_trans));
tt->dbase = adbase;
tt->next = adbase->activeTrans;
adbase->activeTrans = tt;
tt->type = atype;
if (atype == UBIK_READTRANS)
adbase->readers++;
- else if (atype == UBIK_WRITETRANS)
- adbase->flags |= DBWRITING;
+ else if (atype == UBIK_WRITETRANS) {
+ UBIK_VERSION_LOCK;
+ adbase->dbFlags |= DBWRITING;
+ UBIK_VERSION_UNLOCK;
+ }
*atrans = tt;
return 0;
}
struct ubik_dbase *dbase;
afs_int32 code = 0;
struct ubik_version oldversion, newversion;
+ afs_int32 now = FT_ApproxTime();
if (atrans->flags & TRDONE)
return (UTWOENDS);
/* On the first write to the database. We update the versions */
if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
+ UBIK_VERSION_LOCK;
+ if (version_globals.ubik_epochTime < UBIK_MILESTONE
+ || version_globals.ubik_epochTime > now) {
+ ViceLog(0,
+ ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
+ version_globals.ubik_epochTime, UBIK_MILESTONE, now));
+ panic("Writing Ubik DB label\n");
+ }
oldversion = dbase->version;
- newversion.epoch = FT_ApproxTime();;
+ newversion.epoch = version_globals.ubik_epochTime;
newversion.counter = 1;
code = (*dbase->setlabel) (dbase, 0, &newversion);
- if (code)
- return (code);
- ubik_epochTime = newversion.epoch;
+ if (code) {
+ UBIK_VERSION_UNLOCK;
+ return code;
+ }
+
dbase->version = newversion;
+ UBIK_VERSION_UNLOCK;
+
+ urecovery_state |= UBIK_RECLABELDB;
/* Ignore the error here. If the call fails, the site is
* marked down and when we detect it is up again, we will
*/
ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
&oldversion, &newversion);
- urecovery_state |= UBIK_RECLABELDB;
}
+ UBIK_VERSION_LOCK;
dbase->version.counter++; /* bump commit count */
-#ifdef AFS_PTHREAD_ENV
- assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
-#else
- LWP_NoYieldSignal(&dbase->version);
-#endif
code = udisk_LogEnd(dbase, &dbase->version);
if (code) {
dbase->version.counter--;
- return (code);
+ UBIK_VERSION_UNLOCK;
+ return code;
}
+ UBIK_VERSION_UNLOCK;
/* If we fail anytime after this, then panic and let the
* recovery replay the log.
* will do nothing because the abort is there or no LogEnd opcode.
*/
dbase = atrans->dbase;
- if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
+ if (atrans->type == UBIK_WRITETRANS && dbase->dbFlags & DBWRITING) {
udisk_LogOpcode(dbase, LOGABORT, 1);
code = (*dbase->truncate) (dbase, LOGFILE, 0);
if (code)
{
struct ubik_dbase *dbase;
-#if defined(UBIK_PAUSE)
- /* Another thread is trying to lock this transaction.
- * That can only be an RPC doing SDISK_Lock.
- * Unlock the transaction, 'cause otherwise the other
- * thread will never wake up. Don't free it because
- * the caller will do that already.
- */
- if (atrans->flags & TRSETLOCK) {
- atrans->flags |= TRSTALE;
- ulock_relLock(atrans);
- return UINTERNAL;
- }
-#endif /* UBIK_PAUSE */
if (!(atrans->flags & TRDONE))
udisk_abort(atrans);
dbase = atrans->dbase;
/* check if we are the write trans before unsetting the DBWRITING bit, else
* we could be unsetting someone else's bit.
*/
- if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
- dbase->flags &= ~DBWRITING;
+ if (atrans->type == UBIK_WRITETRANS && dbase->dbFlags & DBWRITING) {
+ UBIK_VERSION_LOCK;
+ dbase->dbFlags &= ~DBWRITING;
+ UBIK_VERSION_UNLOCK;
} else {
dbase->readers--;
}
/* Wakeup any writers waiting in BeginTrans() */
#ifdef AFS_PTHREAD_ENV
- assert(pthread_cond_broadcast(&dbase->flags_cond) == 0);
+ opr_cv_broadcast(&dbase->flags_cond);
#else
- LWP_NoYieldSignal(&dbase->flags);
+ LWP_NoYieldSignal(&dbase->dbFlags);
#endif
return 0;
}