#include <roken.h>
+
+#include <afs/opr.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
+#else
+# include <opr/lockstub.h>
+#endif
+
#include <lock.h>
-#include <rx/xdr.h>
#include <rx/rx.h>
#include <afs/cellconfig.h>
+#include <afs/afsutil.h>
+
#define UBIK_INTERNALS
#include "ubik.h"
struct ubik_dbase *ubik_dbase = 0;
struct ubik_stats ubik_stats;
afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
-afs_int32 ubik_epochTime = 0;
afs_int32 urecovery_state = 0;
int (*ubik_SyncWriterCacheProc) (void);
struct ubik_server *ubik_servers;
struct version_data version_globals;
#define CStampVersion 1 /* meaning set ts->version */
+#define CCheckSyncAdvertised 2 /* check if the remote knows we are the sync-site */
static_inline struct rx_connection *
Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
static int
ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
struct rx_connection **conn, afs_int32 *rcode,
- afs_int32 *okcalls, afs_int32 code)
+ afs_int32 *okcalls, afs_int32 code, const char *procname)
{
if (!*ts) {
/* Initial call - start iterating over servers */
Quorum_EndIO(atrans, *conn);
*conn = NULL;
if (code) { /* failure */
+ char hoststr[16];
+
*rcode = code;
UBIK_BEACON_LOCK;
(*ts)->up = 0; /* mark as down now; beacons will no longer be sent */
UBIK_BEACON_UNLOCK;
(*ts)->currentDB = 0;
urecovery_LostServer(*ts); /* tell recovery to try to resend dbase later */
+ ViceLog(0, ("Server %s is marked down due to %s code %d\n",
+ afs_inet_ntoa_r((*ts)->addr[0], hoststr), procname, *rcode));
} else { /* success */
if (!(*ts)->isClone)
(*okcalls)++; /* count up how many worked */
if (!(*ts))
return 1;
UBIK_BEACON_LOCK;
- if (!(*ts)->up || !(*ts)->currentDB) {
+ if (!(*ts)->up || !(*ts)->currentDB ||
+ /* do not call DISK_Begin until we know that lastYesState is set on the
+ * remote in question; otherwise, DISK_Begin will fail. */
+ ((aflags & CCheckSyncAdvertised) && !((*ts)->beaconSinceDown && (*ts)->lastVote))) {
UBIK_BEACON_UNLOCK;
(*ts)->currentDB = 0; /* db is no longer current; we just missed an update */
return 0; /* not up-to-date, don't bother. NULL conn will tell caller not to use */
if (okcalls + 1 >= ubik_quorum)
return 0;
else
- return rcode;
+ return (rcode != 0) ? rcode : UNOQUORUM;
}
/*!
* because it is sent the sync count along with the beacon message that
* marks it as \b really up (\p beaconSinceDown).
*/
-afs_int32
+static afs_int32
ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
- struct ubik_trans *atrans, int aflags)
+ struct ubik_trans *atrans, int aflags, const char *procname)
{
struct ubik_server *ts = NULL;
afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
int done;
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
while (!done) {
if (conn)
code = (*proc)(conn, &atrans->tid);
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
return ContactQuorum_rcode(okcalls, rcode);
}
-afs_int32
+static afs_int32
ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
afs_int32 position, afs_int32 length, afs_int32 type)
{
afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
int done;
+ char *procname = "DISK_Lock";
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
while (!done) {
if (conn)
code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
return ContactQuorum_rcode(okcalls, rcode);
}
-
-afs_int32
-ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
- afs_int32 file, afs_int32 position, bulkdata *data)
-{
- struct ubik_server *ts = NULL;
- afs_int32 code = 0, rcode, okcalls;
- struct rx_connection *conn;
- int done;
-
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
- while (!done) {
- if (conn)
- code = DISK_Write(conn, &atrans->tid, file, position, data);
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
- }
- return ContactQuorum_rcode(okcalls, rcode);
-}
-
-
-afs_int32
+static afs_int32
ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
afs_int32 file, afs_int32 length)
{
afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
int done;
+ char *procname = "DISK_Truncate";
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
while (!done) {
if (conn)
code = DISK_Truncate(conn, &atrans->tid, file, length);
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
return ContactQuorum_rcode(okcalls, rcode);
}
-afs_int32
+static afs_int32
ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
iovec_wrt * io_vector, iovec_buf *io_buffer)
{
afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
int done;
+ char *procname = "DISK_WriteV";
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
while (!done) {
if (conn) {
+ procname = "DISK_WriteV"; /* in case previous fallback to DISK_Write */
code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
if ((code <= -450) && (code > -500)) {
/* An RPC interface mismatch (as defined in comerr/error_msg.c).
bulkdata tcbs;
afs_int32 i, offset;
+ procname = "DISK_Write"; /* for accurate error msg, if any */
for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
/* Sanity check for going off end of buffer */
if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
}
}
}
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
return ContactQuorum_rcode(okcalls, rcode);
}
afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
int done;
+ char *procname = "DISK_SetVersion";
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
while (!done) {
if (conn)
code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
- done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
return ContactQuorum_rcode(okcalls, rcode);
}
#if defined(AFS_PTHREAD_ENV)
static int
ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
- osi_Assert(pthread_attr_init(tattr) == 0);
- osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
- osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
+ opr_Verify(pthread_attr_init(tattr) == 0);
+ opr_Verify(pthread_attr_setdetachstate(tattr,
+ PTHREAD_CREATE_DETACHED) == 0);
+ opr_Verify(pthread_create(thread, tattr, proc, NULL) == 0);
return 0;
}
#endif
*
* \see ubik_ServerInit(), ubik_ServerInitByInfo()
*/
-int
+static int
ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
struct afsconf_cell *info, char clones[],
afs_uint32 serverList[], const char *pathName,
initialize_U_error_table();
- tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
- tdb->pathName = (char *)malloc(strlen(pathName) + 1);
- strcpy(tdb->pathName, pathName);
- tdb->activeTrans = (struct ubik_trans *)0;
- memset(&tdb->version, 0, sizeof(struct ubik_version));
- memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
+ tdb = calloc(1, sizeof(*tdb));
+ tdb->pathName = strdup(pathName);
#ifdef AFS_PTHREAD_ENV
- MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&beacon_globals.beacon_lock, "beacon lock", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&vote_globals.vote_lock, "vote lock", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&addr_globals.addr_lock, "address lock", MUTEX_DEFAULT, 0);
+ opr_mutex_init(&tdb->versionLock);
+ opr_mutex_init(&beacon_globals.beacon_lock);
+ opr_mutex_init(&vote_globals.vote_lock);
+ opr_mutex_init(&addr_globals.addr_lock);
+ opr_mutex_init(&version_globals.version_lock);
#else
Lock_Init(&tdb->versionLock);
#endif
Lock_Init(&tdb->cache_lock);
- tdb->flags = 0;
tdb->read = uphys_read;
tdb->write = uphys_write;
tdb->truncate = uphys_truncate;
tdb->getlabel = uphys_getlabel;
tdb->setlabel = uphys_setlabel;
tdb->getnfiles = uphys_getnfiles;
- tdb->readers = 0;
- tdb->tidCounter = tdb->writeTidCounter = 0;
+ tdb->buffered_append = uphys_buf_append;
*dbase = tdb;
ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
#ifdef AFS_PTHREAD_ENV
- CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
- CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
+ opr_cv_init(&tdb->flags_cond);
#endif /* AFS_PTHREAD_ENV */
/* initialize RX */
if (code < 0)
return code;
+ ubik_callPortal = myPort;
+
udisk_Init(ubik_nBuffers);
ulock_Init();
if (code)
return code;
- ubik_callPortal = myPort;
/* try to get an additional security object */
if (buildSecClassesProc == NULL) {
numClasses = 3;
rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, numClasses,
VOTE_ExecuteRequest);
if (tservice == (struct rx_service *)0) {
- ubik_dprint("Could not create VOTE rx service!\n");
+ ViceLog(0, ("Could not create VOTE rx service!\n"));
return -1;
}
rx_SetMinProcs(tservice, 2);
rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, numClasses,
DISK_ExecuteRequest);
if (tservice == (struct rx_service *)0) {
- ubik_dprint("Could not create DISK rx service!\n");
+ ViceLog(0, ("Could not create DISK rx service!\n"));
return -1;
}
rx_SetMinProcs(tservice, 2);
NULL, "rx_ServerProc", &junk);
#endif
+ /* send addrs to all other servers */
+ code = ubeacon_updateUbikNetworkAddress(ubik_host);
+ if (code)
+ return code;
+
/* now start up async processes */
#ifdef AFS_PTHREAD_ENV
ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
/* it's not safe to use ubik_BeginTransReadAnyWrite without a
* cache-syncing function; fall back to ubik_BeginTransReadAny,
* which is safe but slower */
- ubik_print("ubik_BeginTransReadAnyWrite called, but "
+ ViceLog(0, ("ubik_BeginTransReadAnyWrite called, but "
"ubik_SyncWriterCacheProc not set; pretending "
- "ubik_BeginTransReadAny was called instead\n");
+ "ubik_BeginTransReadAny was called instead\n"));
readAny = 1;
}
* don't know how to restore one without possibly picking up some data from the other. */
if (transMode == UBIK_WRITETRANS) {
/* if we're writing already, wait */
- while (dbase->flags & DBWRITING) {
+ while (dbase->dbFlags & DBWRITING) {
#ifdef AFS_PTHREAD_ENV
- CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
+ opr_cv_wait(&dbase->flags_cond, &dbase->versionLock);
#else
DBRELE(dbase);
- LWP_WaitProcess(&dbase->flags);
+ LWP_WaitProcess(&dbase->dbFlags);
DBHOLD(dbase);
#endif
}
DBRELE(dbase);
return UNOTSYNC;
}
+ if (!ubeacon_SyncSiteAdvertised()) {
+ /* i am the sync-site but the remotes are not aware yet */
+ DBRELE(dbase);
+ return UNOQUORUM;
+ }
}
/* create the transaction */
code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
tt = jt; /* move to a register */
- if (code || tt == (struct ubik_trans *)NULL) {
+ if (code || tt == NULL) {
DBRELE(dbase);
return code;
}
}
}
/* label trans and dbase with new tid */
- tt->tid.epoch = ubik_epochTime;
+ tt->tid.epoch = version_globals.ubik_epochTime;
/* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
tt->tid.counter = (dbase->tidCounter += 2);
if (transMode == UBIK_WRITETRANS) {
/* for a write trans, we have to keep track of the write tid counter too */
dbase->writeTidCounter = tt->tid.counter;
+ }
+
+ UBIK_VERSION_UNLOCK;
+ if (transMode == UBIK_WRITETRANS) {
/* next try to start transaction on appropriate number of machines */
- code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
+ code = ContactQuorum_NoArguments(DISK_Begin, tt, CCheckSyncAdvertised, "DISK_Begin");
if (code) {
/* we must abort the operation */
udisk_abort(tt);
- ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, tt, 0, "DISK_Abort");
udisk_end(tt);
- UBIK_VERSION_UNLOCK;
DBRELE(dbase);
return code;
}
}
*transPtr = tt;
- UBIK_VERSION_UNLOCK;
DBRELE(dbase);
return 0;
}
}
/* now it is safe to try remote abort */
- code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
+ code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
code2 = udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
ReleaseWriteLock(&dbase->cache_lock);
- code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
+ code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion, "DISK_Commit");
} else {
memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
* we lose. If we contact a majority of sites, then we won't be here: contacting
* a majority guarantees commit, since it guarantees that one dude will be a
* member of the next quorum. */
- ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+ ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
udisk_end(transPtr);
DBRELE(dbase);
goto error;
* to us, or timeout. Put safety check in anyway */
if (now - realStart > 10 * BIGTIME) {
ubik_stats.escapes++;
- ubik_print("ubik escaping from commit wait\n");
+ ViceLog(0, ("ubik escaping from commit wait\n"));
break;
}
for (ts = ubik_servers; ts; ts = ts->next) {
* The transaction is committed anyway, since we succeeded in contacting a quorum
* at the start (when invoking the DiskCommit function).
*/
- ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+ ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
success:
udisk_end(transPtr);
&transPtr->iovec_data);
if (code) {
udisk_abort(transPtr);
- ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
transPtr->iovec_info.iovec_wrt_len = 0;
transPtr->iovec_data.iovec_buf_len = 0;
ERROR_EXIT(code);
if (!transPtr->iovec_info.iovec_wrt_val) {
transPtr->iovec_info.iovec_wrt_len = 0;
transPtr->iovec_info.iovec_wrt_val =
- (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
- sizeof(struct ubik_iovec));
+ malloc(IOVEC_MAXWRT * sizeof(struct ubik_iovec));
transPtr->iovec_data.iovec_buf_len = 0;
- transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
+ transPtr->iovec_data.iovec_buf_val = malloc(IOVEC_MAXBUF);
if (!transPtr->iovec_info.iovec_wrt_val
|| !transPtr->iovec_data.iovec_buf_val) {
if (transPtr->iovec_info.iovec_wrt_val)
if (code) {
/* we must abort the operation */
udisk_abort(transPtr);
- ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
ERROR_EXIT(code);
}
if (code) {
/* we must abort the operation */
udisk_abort(atrans);
- ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, atrans, 0, "DISK_Abort");
ERROR_EXIT(code);
}
}
}
/*!
- * \brief utility to wait for a version # to change
- */
-int
-ubik_WaitVersion(struct ubik_dbase *adatabase,
- struct ubik_version *aversion)
-{
- DBHOLD(adatabase);
- while (1) {
- /* wait until version # changes, and then return */
- if (vcmp(*aversion, adatabase->version) != 0) {
- DBRELE(adatabase);
- return 0;
- }
-#ifdef AFS_PTHREAD_ENV
- CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
-#else
- DBRELE(adatabase);
- LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
- DBHOLD(adatabase);
-#endif
- }
-}
-
-/*!
- * \brief utility to get the version of the dbase a transaction is dealing with
- */
-int
-ubik_GetVersion(struct ubik_trans *atrans,
- struct ubik_version *avers)
-{
- DBHOLD(atrans->dbase);
- *avers = atrans->dbase->version;
- DBRELE(atrans->dbase);
- return 0;
-}
-
-/*!
* \brief Facility to simplify database caching.
* \return zero if last trans was done on the local server and was successful.
* \return -1 means bad (NULL) argument.
va_list ap;
va_start(ap, format);
- ubik_print("Ubik PANIC: ");
- ubik_vprint(format, ap);
+ ViceLog(0, ("Ubik PANIC:\n"));
+ vViceLog(0, (format, ap));
va_end(ap);
abort();
- ubik_print("BACK FROM ABORT\n"); /* shouldn't come back */
- exit(1); /* never know, though */
+ AFS_UNREACHED(ViceLog(0, ("BACK FROM ABORT\n")));
+ AFS_UNREACHED(exit(1));
}
/*!