#include <roken.h>
-#include <sys/types.h>
-#include <string.h>
-#include <stdarg.h>
-#include <errno.h>
-
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
-#include <time.h>
-#include <fcntl.h>
-#else
-#include <sys/file.h>
-#include <netinet/in.h>
-#include <sys/time.h>
-#endif
-
#include <lock.h>
#include <rx/xdr.h>
#include <rx/rx.h>
* that the sync site is still the sync site, 'cause it won't talk
* to us until a timeout period has gone by. When we recover, we
* leave this clear until we get a new dbase */
- else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) { /* && order is important */
+ else if (uvote_HaveSyncAndVersion(ubik_dbase->version)) {
rcode = 1;
}
code = (*adbase->getlabel) (adbase, 0, &adbase->version);
if (code) {
/* try setting the label to a new value */
+ UBIK_VERSION_LOCK;
adbase->version.epoch = 1; /* value for newly-initialized db */
adbase->version.counter = 1;
code = (*adbase->setlabel) (adbase, 0, &adbase->version);
#else
LWP_NoYieldSignal(&adbase->version);
#endif
+ UBIK_VERSION_UNLOCK;
}
return 0;
}
{
afs_int32 code;
+ DBHOLD(adbase);
code = ReplayLog(adbase);
if (code)
- return code;
+ goto done;
code = InitializeDB(adbase);
+done:
+ DBRELE(adbase);
return code;
}
*/
if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
-#ifdef AFS_PTHREAD_ENV
- DBHOLD(ubik_dbase);
-#endif
-
for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
+ UBIK_BEACON_LOCK;
if (!ts->up) {
+ UBIK_BEACON_UNLOCK;
doingRPC = 1;
code = DoProbe(ts);
if (code == 0) {
+ UBIK_BEACON_LOCK;
ts->up = 1;
+ UBIK_BEACON_UNLOCK;
+ DBHOLD(ubik_dbase);
urecovery_state &= ~UBIK_RECFOUNDDB;
+ DBRELE(ubik_dbase);
}
- } else if (!ts->currentDB) {
- urecovery_state &= ~UBIK_RECFOUNDDB;
+ } else {
+ UBIK_BEACON_UNLOCK;
+ DBHOLD(ubik_dbase);
+ if (!ts->currentDB)
+ urecovery_state &= ~UBIK_RECFOUNDDB;
+ DBRELE(ubik_dbase);
}
}
-#ifdef AFS_PTHREAD_ENV
- DBRELE(ubik_dbase);
-#endif
-
if (doingRPC)
now = FT_ApproxTime();
lastProbeTime = now;
}
/* Mark whether we are the sync site */
+ DBHOLD(ubik_dbase);
if (!ubeacon_AmSyncSite()) {
urecovery_state &= ~UBIK_RECSYNCSITE;
+ DBRELE(ubik_dbase);
continue; /* nothing to do */
}
urecovery_state |= UBIK_RECSYNCSITE;
* most current database, then go find the most current db.
*/
if (!(urecovery_state & UBIK_RECFOUNDDB)) {
+ DBRELE(ubik_dbase);
bestServer = (struct ubik_server *)0;
bestDBVersion.epoch = 0;
bestDBVersion.counter = 0;
for (ts = ubik_servers; ts; ts = ts->next) {
- if (!ts->up)
+ UBIK_BEACON_LOCK;
+ if (!ts->up) {
+ UBIK_BEACON_UNLOCK;
continue; /* don't bother with these guys */
+ }
+ UBIK_BEACON_UNLOCK;
if (ts->isClone)
continue;
+ UBIK_ADDR_LOCK;
code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
+ UBIK_ADDR_UNLOCK;
if (code == 0) {
/* perhaps this is the best version */
if (vcmp(ts->version, bestDBVersion) > 0) {
* the sync site, have the best version. Also note that
* we may need to send the best version out.
*/
+ DBHOLD(ubik_dbase);
if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
bestDBVersion = ubik_dbase->version;
bestServer = (struct ubik_server *)0;
urecovery_state |= UBIK_RECFOUNDDB;
urecovery_state &= ~UBIK_RECSENTDB;
}
- if (!(urecovery_state & UBIK_RECFOUNDDB))
+ if (!(urecovery_state & UBIK_RECFOUNDDB)) {
+ DBRELE(ubik_dbase);
continue; /* not ready */
+ }
/* If we, the sync site, do not have the best db version, then
* go and get it from the server that does.
urecovery_state |= UBIK_RECHAVEDB;
} else {
/* we don't have the best version; we should fetch it. */
- DBHOLD(ubik_dbase);
urecovery_AbortAll(ubik_dbase);
/* Rx code to do the Bulk fetch */
file = 0;
offset = 0;
+ UBIK_ADDR_LOCK;
rxcall = rx_NewCall(bestServer->disk_rxcid);
ubik_print("Ubik: Synchronize database with server %s\n",
afs_inet_ntoa_r(bestServer->addr[0], hoststr));
+ UBIK_ADDR_UNLOCK;
code = StartDISK_GetFile(rxcall, file);
if (code) {
}
/* give invalid label during file transit */
+ UBIK_VERSION_LOCK;
tversion.epoch = 0;
code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
+ UBIK_VERSION_UNLOCK;
if (code) {
ubik_dprint("setlabel io error=%d\n", code);
goto FetchEndCall;
if (!code) {
/* we got a new file, set up its header */
urecovery_state |= UBIK_RECHAVEDB;
+ UBIK_VERSION_LOCK;
memcpy(&ubik_dbase->version, &tversion,
sizeof(struct ubik_version));
snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
(*ubik_dbase->setlabel) (ubik_dbase, 0,
&ubik_dbase->version);
}
+ UBIK_VERSION_UNLOCK;
#ifdef AFS_NT40_ENV
snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
ubik_dbase->pathName, (file<0)?"SYS":"",
* We will effectively invalidate the old data forever now.
* Unclear if we *should* but we do.
*/
+ UBIK_VERSION_LOCK;
ubik_dbase->version.epoch = 0;
ubik_dbase->version.counter = 0;
+ UBIK_VERSION_UNLOCK;
ubik_print("Ubik: Synchronize database failed (error = %d)\n",
code);
} else {
#else
LWP_NoYieldSignal(&ubik_dbase->version);
#endif
- DBRELE(ubik_dbase);
}
- if (!(urecovery_state & UBIK_RECHAVEDB))
+ if (!(urecovery_state & UBIK_RECHAVEDB)) {
+ DBRELE(ubik_dbase);
continue; /* not ready */
+ }
/* If the database was newly initialized, then when we establish quorum, write
* a new label. This allows urecovery_AllBetter() to allow access for reads.
* database and overwrite this one.
*/
if (ubik_dbase->version.epoch == 1) {
- DBHOLD(ubik_dbase);
urecovery_AbortAll(ubik_dbase);
- ubik_epochTime = 2;
- ubik_dbase->version.epoch = ubik_epochTime;
+ UBIK_VERSION_LOCK;
+ version_globals.ubik_epochTime = 2;
+ ubik_dbase->version.epoch = version_globals.ubik_epochTime;
ubik_dbase->version.counter = 1;
code =
(*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
+ UBIK_VERSION_UNLOCK;
udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
#ifdef AFS_PTHREAD_ENV
CV_BROADCAST(&ubik_dbase->version_cond);
#else
LWP_NoYieldSignal(&ubik_dbase->version);
#endif
- DBRELE(ubik_dbase);
}
/* Check the other sites and send the database to them if they
/* now propagate out new version to everyone else */
dbok = 1; /* start off assuming they all worked */
- DBHOLD(ubik_dbase);
/*
* Check if a write transaction is in progress. We can't send the
* db when a write is in progress here because the db would be
if (ubik_dbase->flags & DBWRITING) {
struct timeval tv;
int safety = 0;
- tv.tv_sec = 0;
- tv.tv_usec = 50000;
+ long cur_usec = 50000;
while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
DBRELE(ubik_dbase);
/* sleep for a little while */
+ tv.tv_sec = 0;
+ tv.tv_usec = cur_usec;
#ifdef AFS_PTHREAD_ENV
select(0, 0, 0, 0, &tv);
#else
IOMGR_Select(0, 0, 0, 0, &tv);
#endif
- tv.tv_usec += 10000;
+ cur_usec += 10000;
safety++;
DBHOLD(ubik_dbase);
}
}
for (ts = ubik_servers; ts; ts = ts->next) {
+ UBIK_ADDR_LOCK;
inAddr.s_addr = ts->addr[0];
+ UBIK_ADDR_UNLOCK;
+ UBIK_BEACON_LOCK;
if (!ts->up) {
+ UBIK_BEACON_UNLOCK;
ubik_dprint("recovery cannot send version to %s\n",
afs_inet_ntoa_r(inAddr.s_addr, hoststr));
dbok = 0;
continue;
}
+ UBIK_BEACON_UNLOCK;
ubik_dprint("recovery sending version to %s\n",
afs_inet_ntoa_r(inAddr.s_addr, hoststr));
if (vcmp(ts->version, ubik_dbase->version) != 0) {
if (!code) {
length = ubikstat.size;
file = offset = 0;
+ UBIK_ADDR_LOCK;
rxcall = rx_NewCall(ts->disk_rxcid);
+ UBIK_ADDR_UNLOCK;
code =
StartDISK_SendFile(rxcall, file, length,
&ubik_dbase->version);
ts->currentDB = 1;
}
}
- DBRELE(ubik_dbase);
if (dbok)
urecovery_state |= UBIK_RECSENTDB;
}
+ DBRELE(ubik_dbase);
}
return NULL;
}
afs_uint32 addr;
char buffer[32];
char hoststr[16];
- extern afs_int32 ubikSecIndex;
- extern struct rx_securityClass *ubikSecClass;
+ UBIK_ADDR_LOCK;
for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
i++) {
conns[i] =
rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
- ubikSecClass, ubikSecIndex);
+ addr_globals.ubikSecClass, addr_globals.ubikSecIndex);
/* user requirement to use only the primary interface */
if (ubikPrimaryAddrOnly) {
break;
}
}
+ UBIK_ADDR_UNLOCK;
osi_Assert(i); /* at least one interface address for this server */
-#ifdef AFS_PTHREAD_ENV
- DBRELE(ubik_dbase);
-#endif
-
multi_Rx(conns, i) {
multi_DISK_Probe();
if (!multi_error) { /* first success */
}
} multi_End_Ignore;
-#ifdef AFS_PTHREAD_ENV
- DBHOLD(ubik_dbase);
-#endif
-
if (success_i >= 0) {
+ UBIK_ADDR_LOCK;
addr = server->addr[success_i]; /* successful interface addr */
if (server->disk_rxcid) /* destroy existing conn */
/* make new connections */
server->disk_rxcid = conns[success_i];
server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
- VOTE_SERVICE_ID, ubikSecClass,
- ubikSecIndex);
+ VOTE_SERVICE_ID, addr_globals.ubikSecClass,
+ addr_globals.ubikSecIndex);
connSuccess = conns[success_i];
strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
ubik_print("ubik:server %s is back up: will be contacted through %s\n",
buffer, afs_inet_ntoa_r(addr, hoststr));
+ UBIK_ADDR_UNLOCK;
}
/* Destroy all connections except the one on which we succeeded */