#include <afsconfig.h>
#include <afs/param.h>
-
-#include <sys/types.h>
-#include <string.h>
-#include <stdarg.h>
-#include <errno.h>
-#include <assert.h>
-
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
-#include <time.h>
-#include <fcntl.h>
-#else
-#include <sys/file.h>
-#include <netinet/in.h>
-#include <sys/time.h>
-#endif
+#include <roken.h>
#include <lock.h>
#include <rx/xdr.h>
#include <rx/rx.h>
#include <afs/afsutil.h>
+#include <afs/cellconfig.h>
#define UBIK_INTERNALS
#include "ubik.h"
* \brief sync site
*
* routine called when a non-sync site server goes down; restarts recovery
- * process to send missing server the new db when it comes back up.
+ * process to send missing server the new db when it comes back up for
+ * non-sync site servers.
*
* \note This routine should not do anything with variables used by non-sync site servers.
*/
int
-urecovery_LostServer(void)
+urecovery_LostServer(struct ubik_server *ts)
{
+ ubeacon_ReinitServer(ts);
#if !defined(AFS_PTHREAD_ENV)
/* No corresponding LWP_WaitProcess found anywhere for this -- klm */
LWP_NoYieldSignal(&urecovery_state);
* that the sync site is still the sync site, 'cause it won't talk
* to us until a timeout period has gone by. When we recover, we
* leave this clear until we get a new dbase */
- else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) { /* && order is important */
+ else if ((uvote_GetSyncSite() && uvote_eq_dbVersion(ubik_dbase->version))) { /* && order is important */
rcode = 1;
}
|| atid->counter > ubik_currentTrans->tid.counter || abortalways) {
/* don't match, abort it */
/* If the thread is not waiting for lock - ok to end it */
-#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
-#endif /* UBIK_PAUSE */
udisk_end(ubik_currentTrans);
-#if !defined(UBIK_PAUSE)
}
-#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *)0;
}
}
} else if (opcode == LOGABORT)
panic("log abort\n");
else if (opcode == LOGEND) {
+ struct ubik_version version;
tpos += 4;
code =
(*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
2 * sizeof(afs_int32));
if (code != 2 * sizeof(afs_int32))
return UBADLOG;
- code = (*adbase->setlabel) (adbase, 0, (ubik_version *)buffer);
+ version.epoch = ntohl(buffer[0]);
+ version.counter = ntohl(buffer[1]);
+ code = (*adbase->setlabel) (adbase, 0, &version);
if (code)
return code;
+ ubik_print("Successfully replayed log for interrupted "
+ "transaction; db version is now %ld.%ld\n",
+ (long) version.epoch, (long) version.counter);
logIsGood = 1;
break; /* all done now */
} else if (opcode == LOGTRUNCATE) {
code = (*adbase->getlabel) (adbase, 0, &adbase->version);
if (code) {
/* try setting the label to a new value */
+ UBIK_VERSION_LOCK;
adbase->version.epoch = 1; /* value for newly-initialized db */
adbase->version.counter = 1;
code = (*adbase->setlabel) (adbase, 0, &adbase->version);
(*adbase->setlabel) (adbase, 0, &adbase->version);
}
#ifdef AFS_PTHREAD_ENV
- assert(pthread_cond_broadcast(&adbase->version_cond) == 0);
+ CV_BROADCAST(&adbase->version_cond);
#else
LWP_NoYieldSignal(&adbase->version);
#endif
+ UBIK_VERSION_UNLOCK;
}
return 0;
}
struct ubik_server *bestServer = NULL;
struct ubik_server *ts;
int dbok, doingRPC, now;
- afs_int32 lastProbeTime, lastDBVCheck;
+ afs_int32 lastProbeTime;
/* if we're the sync site, the best db version we've found yet */
static struct ubik_version bestDBVersion;
struct ubik_version tversion;
struct ubik_stat ubikstat;
struct in_addr inAddr;
char hoststr[16];
-#ifndef OLD_URECOVERY
char pbuffer[1028];
- int flen, fd = -1;
+ int fd = -1;
afs_int32 pass;
-#endif
/* otherwise, begin interaction */
urecovery_state = 0;
lastProbeTime = 0;
- lastDBVCheck = 0;
while (1) {
/* Run through this loop every 4 seconds */
tv.tv_sec = 4;
#endif
for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
+ UBIK_BEACON_LOCK;
if (!ts->up) {
+ UBIK_BEACON_UNLOCK;
doingRPC = 1;
code = DoProbe(ts);
if (code == 0) {
+ UBIK_BEACON_LOCK;
ts->up = 1;
urecovery_state &= ~UBIK_RECFOUNDDB;
}
} else if (!ts->currentDB) {
urecovery_state &= ~UBIK_RECFOUNDDB;
}
+ UBIK_BEACON_UNLOCK;
}
#ifdef AFS_PTHREAD_ENV
/* Mark whether we are the sync site */
if (!ubeacon_AmSyncSite()) {
urecovery_state &= ~UBIK_RECSYNCSITE;
+ DBRELE(ubik_dbase);
continue; /* nothing to do */
}
urecovery_state |= UBIK_RECSYNCSITE;
bestDBVersion.epoch = 0;
bestDBVersion.counter = 0;
for (ts = ubik_servers; ts; ts = ts->next) {
- if (!ts->up)
+ UBIK_BEACON_LOCK;
+ if (!ts->up) {
+ UBIK_BEACON_UNLOCK;
continue; /* don't bother with these guys */
+ }
+ UBIK_BEACON_UNLOCK;
if (ts->isClone)
continue;
+ UBIK_ADDR_LOCK;
code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
+ UBIK_ADDR_UNLOCK;
if (code == 0) {
/* perhaps this is the best version */
if (vcmp(ts->version, bestDBVersion) > 0) {
*/
urecovery_state &= ~UBIK_RECHAVEDB;
}
- lastDBVCheck = FT_ApproxTime();
urecovery_state |= UBIK_RECFOUNDDB;
urecovery_state &= ~UBIK_RECSENTDB;
}
-#if defined(UBIK_PAUSE)
- /* it's not possible for UBIK_RECFOUNDDB not to be set here.
- * However, we might have lost UBIK_RECSYNCSITE, and that
- * IS important.
- */
- if (!(urecovery_state & UBIK_RECSYNCSITE))
- continue; /* lost sync */
-#else
- if (!(urecovery_state & UBIK_RECFOUNDDB))
+ if (!(urecovery_state & UBIK_RECFOUNDDB)) {
+ DBRELE(ubik_dbase);
continue; /* not ready */
-#endif /* UBIK_PAUSE */
+ }
/* If we, the sync site, do not have the best db version, then
* go and get it from the server that does.
urecovery_state |= UBIK_RECHAVEDB;
} else {
/* we don't have the best version; we should fetch it. */
- DBHOLD(ubik_dbase);
urecovery_AbortAll(ubik_dbase);
/* Rx code to do the Bulk fetch */
file = 0;
offset = 0;
+ UBIK_ADDR_LOCK;
rxcall = rx_NewCall(bestServer->disk_rxcid);
ubik_print("Ubik: Synchronize database with server %s\n",
afs_inet_ntoa_r(bestServer->addr[0], hoststr));
+ UBIK_ADDR_UNLOCK;
code = StartDISK_GetFile(rxcall, file);
if (code) {
goto FetchEndCall;
}
-#ifdef OLD_URECOVERY
- /* Truncate the file first */
- code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
- if (code) {
- ubik_dprint("truncate io error=%d\n", code);
- goto FetchEndCall;
- }
- tversion.counter = 0;
-#endif
/* give invalid label during file transit */
+ UBIK_VERSION_LOCK;
tversion.epoch = 0;
code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
+ UBIK_VERSION_UNLOCK;
if (code) {
ubik_dprint("setlabel io error=%d\n", code);
goto FetchEndCall;
}
-#ifndef OLD_URECOVERY
- flen = length;
- afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
+ snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
+ ubik_dbase->pathName, (file<0)?"SYS":"",
+ (file<0)?-file:file);
fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
if (fd < 0) {
code = errno;
close(fd);
goto FetchEndCall;
}
-#endif
pass = 0;
while (length > 0) {
close(fd);
goto FetchEndCall;
}
-#ifdef OLD_URECOVERY
- nbytes =
- (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
- tlen);
-#else
nbytes = write(fd, tbuffer, tlen);
pass++;
-#endif
if (nbytes != tlen) {
code = UIOERROR;
close(fd);
offset += tlen;
length -= tlen;
}
-#ifndef OLD_URECOVERY
code = close(fd);
if (code)
goto FetchEndCall;
-#endif
code = EndDISK_GetFile(rxcall, &tversion);
FetchEndCall:
tcode = rx_EndCall(rxcall, code);
if (!code) {
/* we got a new file, set up its header */
urecovery_state |= UBIK_RECHAVEDB;
+ UBIK_VERSION_LOCK;
memcpy(&ubik_dbase->version, &tversion,
sizeof(struct ubik_version));
-#ifdef OLD_URECOVERY
- (*ubik_dbase->sync) (ubik_dbase, 0); /* get data out first */
-#else
- afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
+ snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
+ ubik_dbase->pathName, (file<0)?"SYS":"",
+ (file<0)?-file:file);
#ifdef AFS_NT40_ENV
- afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
+ snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
+ ubik_dbase->pathName, (file<0)?"SYS":"",
+ (file<0)?-file:file);
code = unlink(pbuffer);
if (!code)
code = rename(tbuffer, pbuffer);
- afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
+ snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
+ ubik_dbase->pathName, (file<0)?"SYS":"",
+ (file<0)?-file:file);
#endif
if (!code)
code = rename(pbuffer, tbuffer);
if (!code) {
(*ubik_dbase->open) (ubik_dbase, file);
-#endif
/* after data is good, sync disk with correct label */
code =
(*ubik_dbase->setlabel) (ubik_dbase, 0,
&ubik_dbase->version);
-#ifndef OLD_URECOVERY
}
+ UBIK_VERSION_UNLOCK;
#ifdef AFS_NT40_ENV
- afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
+ snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
+ ubik_dbase->pathName, (file<0)?"SYS":"",
+ (file<0)?-file:file);
unlink(pbuffer);
#endif
-#endif
}
if (code) {
-#ifndef OLD_URECOVERY
unlink(pbuffer);
/*
* We will effectively invalidate the old data forever now.
* Unclear if we *should* but we do.
*/
-#endif
+ UBIK_VERSION_LOCK;
ubik_dbase->version.epoch = 0;
ubik_dbase->version.counter = 0;
+ UBIK_VERSION_UNLOCK;
ubik_print("Ubik: Synchronize database failed (error = %d)\n",
code);
} else {
}
udisk_Invalidate(ubik_dbase, 0); /* data has changed */
#ifdef AFS_PTHREAD_ENV
- assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
+ CV_BROADCAST(&ubik_dbase->version_cond);
#else
LWP_NoYieldSignal(&ubik_dbase->version);
#endif
DBRELE(ubik_dbase);
}
-#if defined(UBIK_PAUSE)
- if (!(urecovery_state & UBIK_RECSYNCSITE))
- continue; /* lost sync */
-#endif /* UBIK_PAUSE */
- if (!(urecovery_state & UBIK_RECHAVEDB))
+ if (!(urecovery_state & UBIK_RECHAVEDB)) {
+ DBRELE(ubik_dbase);
continue; /* not ready */
+ }
/* If the database was newly initialized, then when we establish quorum, write
* a new label. This allows urecovery_AllBetter() to allow access for reads.
* database and overwrite this one.
*/
if (ubik_dbase->version.epoch == 1) {
- DBHOLD(ubik_dbase);
urecovery_AbortAll(ubik_dbase);
- ubik_epochTime = 2;
- ubik_dbase->version.epoch = ubik_epochTime;
+ UBIK_VERSION_LOCK;
+ version_globals.ubik_epochTime = 2;
+ ubik_dbase->version.epoch = version_globals.ubik_epochTime;
ubik_dbase->version.counter = 1;
code =
(*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
+ UBIK_VERSION_UNLOCK;
udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
#ifdef AFS_PTHREAD_ENV
- assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
+ CV_BROADCAST(&ubik_dbase->version_cond);
#else
LWP_NoYieldSignal(&ubik_dbase->version);
#endif
- DBRELE(ubik_dbase);
}
/* Check the other sites and send the database to them if they
/* now propagate out new version to everyone else */
dbok = 1; /* start off assuming they all worked */
- DBHOLD(ubik_dbase);
/*
* Check if a write transaction is in progress. We can't send the
* db when a write is in progress here because the db would be
}
for (ts = ubik_servers; ts; ts = ts->next) {
+ UBIK_ADDR_LOCK;
inAddr.s_addr = ts->addr[0];
+ UBIK_ADDR_UNLOCK;
+ UBIK_BEACON_LOCK;
if (!ts->up) {
+ UBIK_BEACON_UNLOCK;
ubik_dprint("recovery cannot send version to %s\n",
afs_inet_ntoa_r(inAddr.s_addr, hoststr));
dbok = 0;
continue;
}
+ UBIK_BEACON_UNLOCK;
ubik_dprint("recovery sending version to %s\n",
afs_inet_ntoa_r(inAddr.s_addr, hoststr));
if (vcmp(ts->version, ubik_dbase->version) != 0) {
if (!code) {
length = ubikstat.size;
file = offset = 0;
+ UBIK_ADDR_LOCK;
rxcall = rx_NewCall(ts->disk_rxcid);
+ UBIK_ADDR_UNLOCK;
code =
StartDISK_SendFile(rxcall, file, length,
&ubik_dbase->version);
ts->currentDB = 1;
}
}
- DBRELE(ubik_dbase);
if (dbok)
urecovery_state |= UBIK_RECSENTDB;
}
+ DBRELE(ubik_dbase);
}
return NULL;
}
afs_uint32 addr;
char buffer[32];
char hoststr[16];
- extern afs_int32 ubikSecIndex;
- extern struct rx_securityClass *ubikSecClass;
+ UBIK_ADDR_LOCK;
for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
i++) {
conns[i] =
rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
- ubikSecClass, ubikSecIndex);
+ addr_globals.ubikSecClass, addr_globals.ubikSecIndex);
/* user requirement to use only the primary interface */
if (ubikPrimaryAddrOnly) {
break;
}
}
- assert(i); /* at least one interface address for this server */
+ UBIK_ADDR_UNLOCK;
+ osi_Assert(i); /* at least one interface address for this server */
#ifdef AFS_PTHREAD_ENV
DBRELE(ubik_dbase);
#endif
if (success_i >= 0) {
+ UBIK_ADDR_LOCK;
addr = server->addr[success_i]; /* successful interface addr */
if (server->disk_rxcid) /* destroy existing conn */
/* make new connections */
server->disk_rxcid = conns[success_i];
server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
- VOTE_SERVICE_ID, ubikSecClass,
- ubikSecIndex);
+ VOTE_SERVICE_ID, addr_globals.ubikSecClass,
+ addr_globals.ubikSecIndex);
connSuccess = conns[success_i];
strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
ubik_print("ubik:server %s is back up: will be contacted through %s\n",
buffer, afs_inet_ntoa_r(addr, hoststr));
+ UBIK_ADDR_UNLOCK;
}
/* Destroy all connections except the one on which we succeeded */