struct ubik_server *ts;
afs_int32 temp, yesVotes, lastWakeupTime, oldestYesVote, syncsite;
struct ubik_tid ttid;
+ struct ubik_version tversion;
afs_int32 startTime;
/* loop forever getting votes */
* assume we'll be fine until SMALLTIME seconds after we start collecting votes */
/* this next is essentially an expansion of rgen's ServBeacon routine */
- ttid.epoch = ubik_epochTime;
+ UBIK_VERSION_LOCK;
+ ttid.epoch = version_globals.ubik_epochTime;
if (ubik_dbase->flags & DBWRITING) {
/*
* if a write is in progress, we have to send the writeTidCounter
ttid.counter = ubik_dbase->writeTidCounter;
} else
ttid.counter = ubik_dbase->tidCounter + 1;
+ tversion.epoch = ubik_dbase->version.epoch;
+ tversion.counter = ubik_dbase->version.counter;
/* now analyze return codes, counting up our votes */
yesVotes = 0; /* count how many to ensure we have quorum */
/*
* Don't waste time using mult Rx calls if there are no connections out there
*/
+ UBIK_VERSION_UNLOCK;
if (i > 0) {
char hoststr[16];
multi_Rx(connections, i) {
- multi_VOTE_Beacon(syncsite, startTime, &ubik_dbase->version,
+ multi_VOTE_Beacon(syncsite, startTime, &tversion,
&ttid);
temp = FT_ApproxTime(); /* now, more or less */
ts = servers[multi_i];
* the same restrictions apply for our voting for ourself as for our voting
* for anyone else. */
i = SVOTE_Beacon((struct rx_call *)0, ubeacon_AmSyncSite(), startTime,
- &ubik_dbase->version, &ttid);
+ &tversion, &ttid);
if (i) {
yesVotes += 2;
if (amIMagic)
tt->type = atype;
if (atype == UBIK_READTRANS)
adbase->readers++;
- else if (atype == UBIK_WRITETRANS)
+ else if (atype == UBIK_WRITETRANS) {
+ UBIK_VERSION_LOCK;
adbase->flags |= DBWRITING;
+ UBIK_VERSION_UNLOCK;
+ }
*atrans = tt;
return 0;
}
/* On the first write to the database. We update the versions */
if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
+ UBIK_VERSION_LOCK;
oldversion = dbase->version;
newversion.epoch = FT_ApproxTime();;
newversion.counter = 1;
code = (*dbase->setlabel) (dbase, 0, &newversion);
if (code)
return (code);
- ubik_epochTime = newversion.epoch;
+ version_globals.ubik_epochTime = newversion.epoch;
dbase->version = newversion;
+ UBIK_VERSION_UNLOCK;
/* Ignore the error here. If the call fails, the site is
* marked down and when we detect it is up again, we will
urecovery_state |= UBIK_RECLABELDB;
}
+ UBIK_VERSION_LOCK;
dbase->version.counter++; /* bump commit count */
#ifdef AFS_PTHREAD_ENV
CV_BROADCAST(&dbase->version_cond);
dbase->version.counter--;
return (code);
}
+ UBIK_VERSION_UNLOCK;
/* If we fail anytime after this, then panic and let the
* recovery replay the log.
* we could be unsetting someone else's bit.
*/
if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
+ UBIK_VERSION_LOCK;
dbase->flags &= ~DBWRITING;
+ UBIK_VERSION_UNLOCK;
} else {
dbase->readers--;
}
code = (*adbase->getlabel) (adbase, 0, &adbase->version);
if (code) {
/* try setting the label to a new value */
+ UBIK_VERSION_LOCK;
adbase->version.epoch = 1; /* value for newly-initialized db */
adbase->version.counter = 1;
code = (*adbase->setlabel) (adbase, 0, &adbase->version);
#else
LWP_NoYieldSignal(&adbase->version);
#endif
+ UBIK_VERSION_UNLOCK;
}
return 0;
}
/* Mark whether we are the sync site */
if (!ubeacon_AmSyncSite()) {
urecovery_state &= ~UBIK_RECSYNCSITE;
+ DBRELE(ubik_dbase);
continue; /* nothing to do */
}
urecovery_state |= UBIK_RECSYNCSITE;
urecovery_state |= UBIK_RECFOUNDDB;
urecovery_state &= ~UBIK_RECSENTDB;
}
- if (!(urecovery_state & UBIK_RECFOUNDDB))
+ if (!(urecovery_state & UBIK_RECFOUNDDB)) {
+ DBRELE(ubik_dbase);
continue; /* not ready */
+ }
/* If we, the sync site, do not have the best db version, then
* go and get it from the server that does.
urecovery_state |= UBIK_RECHAVEDB;
} else {
/* we don't have the best version; we should fetch it. */
- DBHOLD(ubik_dbase);
urecovery_AbortAll(ubik_dbase);
/* Rx code to do the Bulk fetch */
}
/* give invalid label during file transit */
+ UBIK_VERSION_LOCK;
tversion.epoch = 0;
code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
+ UBIK_VERSION_UNLOCK;
if (code) {
ubik_dprint("setlabel io error=%d\n", code);
goto FetchEndCall;
if (!code) {
/* we got a new file, set up its header */
urecovery_state |= UBIK_RECHAVEDB;
+ UBIK_VERSION_LOCK;
memcpy(&ubik_dbase->version, &tversion,
sizeof(struct ubik_version));
snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
(*ubik_dbase->setlabel) (ubik_dbase, 0,
&ubik_dbase->version);
}
+ UBIK_VERSION_UNLOCK;
#ifdef AFS_NT40_ENV
snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
ubik_dbase->pathName, (file<0)?"SYS":"",
* We will effectively invalidate the old data forever now.
* Unclear if we *should* but we do.
*/
+ UBIK_VERSION_LOCK;
ubik_dbase->version.epoch = 0;
ubik_dbase->version.counter = 0;
+ UBIK_VERSION_UNLOCK;
ubik_print("Ubik: Synchronize database failed (error = %d)\n",
code);
} else {
#endif
DBRELE(ubik_dbase);
}
- if (!(urecovery_state & UBIK_RECHAVEDB))
+ if (!(urecovery_state & UBIK_RECHAVEDB)) {
+ DBRELE(ubik_dbase);
continue; /* not ready */
+ }
/* If the database was newly initialized, then when we establish quorum, write
* a new label. This allows urecovery_AllBetter() to allow access for reads.
* database and overwrite this one.
*/
if (ubik_dbase->version.epoch == 1) {
- DBHOLD(ubik_dbase);
urecovery_AbortAll(ubik_dbase);
- ubik_epochTime = 2;
- ubik_dbase->version.epoch = ubik_epochTime;
+ UBIK_VERSION_LOCK;
+ version_globals.ubik_epochTime = 2;
+ ubik_dbase->version.epoch = version_globals.ubik_epochTime;
ubik_dbase->version.counter = 1;
code =
(*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
+ UBIK_VERSION_UNLOCK;
udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
#ifdef AFS_PTHREAD_ENV
CV_BROADCAST(&ubik_dbase->version_cond);
#else
LWP_NoYieldSignal(&ubik_dbase->version);
#endif
- DBRELE(ubik_dbase);
}
/* Check the other sites and send the database to them if they
/* now propagate out new version to everyone else */
dbok = 1; /* start off assuming they all worked */
- DBHOLD(ubik_dbase);
/*
* Check if a write transaction is in progress. We can't send the
* db when a write is in progress here because the db would be
ts->currentDB = 1;
}
}
- DBRELE(ubik_dbase);
if (dbok)
urecovery_state |= UBIK_RECSENTDB;
}
+ DBRELE(ubik_dbase);
}
return NULL;
}
afs_inet_ntoa_r(otherHost, hoststr));
offset = 0;
+ UBIK_VERSION_LOCK;
epoch = tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
(*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
if (fd < 0) {
code = errno;
- goto failed;
+ goto failed_locked;
}
code = lseek(fd, HDRSIZE, 0);
if (code != HDRSIZE) {
close(fd);
- goto failed;
+ goto failed_locked;
}
pass = 0;
memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
+ UBIK_VERSION_UNLOCK;
while (length > 0) {
tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
#if !defined(AFS_PTHREAD_ENV)
#endif
if (!code)
code = rename(pbuffer, tbuffer);
+ UBIK_VERSION_LOCK;
if (!code) {
(*ubik_dbase->open) (ubik_dbase, file);
code = (*ubik_dbase->setlabel) (dbase, file, avers);
LWP_NoYieldSignal(&dbase->version);
#endif
+failed_locked:
+ UBIK_VERSION_UNLOCK;
+
failed:
if (code) {
unlink(pbuffer);
/* Failed to sync. Allow reads again for now. */
if (dbase != NULL) {
+ UBIK_VERSION_LOCK;
tversion.epoch = epoch;
(*dbase->setlabel) (dbase, file, &tversion);
+ UBIK_VERSION_UNLOCK;
}
ubik_print
("Ubik: Synchronize database with server %s failed (error = %d)\n",
/* Set the label if its version matches the sync-site's */
if (uvote_eq_dbVersion(*oldversionp)) {
+ UBIK_VERSION_LOCK;
code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
if (!code) {
ubik_dbase->version = *newversionp;
uvote_set_dbVersion(*newversionp);
}
+ UBIK_VERSION_UNLOCK;
} else {
code = USYNC;
}
static int (*checkSecurityProc)(void *, struct rx_call *) = NULL;
static void *securityRock = NULL;
+struct version_data version_globals;
+
#define CStampVersion 1 /* meaning set ts->version */
static_inline struct rx_connection *
DBRELE(dbase);
return code;
}
+ UBIK_VERSION_LOCK;
if (readAny) {
tt->flags |= TRREADANY;
if (readAny > 1) {
udisk_abort(tt);
ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
udisk_end(tt);
+ UBIK_VERSION_UNLOCK;
DBRELE(dbase);
return code;
}
}
*transPtr = tt;
+ UBIK_VERSION_UNLOCK;
DBRELE(dbase);
return 0;
}
extern struct ubik_stats { /* random stats */
afs_int32 escapes;
} ubik_stats;
-extern afs_int32 ubik_epochTime; /* time when this site started */
extern afs_int32 urecovery_state; /* sync site recovery process state */
extern struct ubik_trans *ubik_currentTrans; /* current trans */
extern afs_int32 ubik_debugFlag; /* ubik debug flag */
#define UBIK_ADDR_LOCK MUTEX_ENTER(&addr_globals.addr_lock)
#define UBIK_ADDR_UNLOCK MUTEX_EXIT(&addr_globals.addr_lock)
+/*!
+ * \brief The version lock protects the structure member, as well as
+ * the database version, flags, tidCounter, writeTidCounter
+ */
+struct version_data {
+#ifdef AFS_PTHREAD_ENV
+ pthread_mutex_t version_lock;
+#endif
+ afs_int32 ubik_epochTime; /* time when this site started */
+};
+
+#define UBIK_VERSION_LOCK MUTEX_ENTER(&version_globals.version_lock)
+#define UBIK_VERSION_UNLOCK MUTEX_EXIT(&version_globals.version_lock)
+
/* phys.c */
extern int uphys_stat(struct ubik_dbase *adbase, afs_int32 afid,
struct ubik_stat *astat);
extern int ubik_CheckCache(struct ubik_trans *atrans,
ubik_updatecache_func check,
void *rock);
+extern struct version_data version_globals;
/*\}*/
/*! \name ubikclient.c */
aparm->currentTrans = 0;
}
- aparm->epochTime = ubik_epochTime;
+ aparm->epochTime = version_globals.ubik_epochTime;
return 0;
}
aparm->currentTrans = 0;
}
- aparm->epochTime = ubik_epochTime;
+ aparm->epochTime = version_globals.ubik_epochTime;
return 0;
}