#include <lwp.h> /* temporary hack by klm */
-#define ERROR_EXIT(code) {error=(code); goto error_exit;}
+#define ERROR_EXIT(code) do { \
+ error = (code); \
+ goto error_exit; \
+} while (0)
/*!
* \file
#endif /* AFS_PTHREAD_ENV */
}
+
+/*
+ * Iterate over all servers. Callers pass in *ts which is used to track
+ * the current server.
+ * - Returns 1 if there are no more servers
+ * - Returns 0 with conn set to the connection for the current server if
+ * it's up and current
+ */
+static int
+ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
+ struct rx_connection **conn, afs_int32 *rcode,
+ afs_int32 *okcalls, afs_int32 code)
+{
+ if (!*ts) {
+ /* Initial call - start iterating over servers */
+ *ts = ubik_servers;
+ *conn = NULL;
+ *rcode = 0;
+ *okcalls = 0;
+ } else {
+ if (*conn) {
+ Quorum_EndIO(atrans, *conn);
+ *conn = NULL;
+ if (code) { /* failure */
+ *rcode = code;
+ (*ts)->up = 0; /* mark as down now; beacons will no longer be sent */
+ (*ts)->beaconSinceDown = 0;
+ (*ts)->currentDB = 0;
+ urecovery_LostServer(*ts); /* tell recovery to try to resend dbase later */
+ } else { /* success */
+ if (!(*ts)->isClone)
+ (*okcalls)++; /* count up how many worked */
+ if (aflags & CStampVersion) {
+ (*ts)->version = atrans->dbase->version;
+ }
+ }
+ }
+ *ts = (*ts)->next;
+ }
+ if (!(*ts))
+ return 1;
+ if (!(*ts)->up || !(*ts)->currentDB) {
+ (*ts)->currentDB = 0; /* db is no longer current; we just missed an update */
+ return 0; /* not up-to-date, don't bother. NULL conn will tell caller not to use */
+ }
+ *conn = Quorum_StartIO(atrans, *ts);
+ return 0;
+}
+
+static int
+ContactQuorum_rcode(int okcalls, afs_int32 rcode)
+{
+ /*
+ * return 0 if we successfully contacted a quorum, otherwise return error code.
+ * We don't have to contact ourselves (that was done locally)
+ */
+ if (okcalls + 1 >= ubik_quorum)
+ return 0;
+ else
+ return rcode;
+}
+
/*!
* \brief Perform an operation at a quorum, handling error conditions.
* \return 0 if all worked and a quorum was contacted successfully
ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
struct ubik_trans *atrans, int aflags)
{
- struct ubik_server *ts;
- afs_int32 code;
- afs_int32 rcode, okcalls;
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
+ int done;
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
-
- conn = Quorum_StartIO(atrans, ts);
-
- code = (*proc)(conn, &atrans->tid);
-
- Quorum_EndIO(atrans, conn);
- conn = NULL;
-
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ while (!done) {
+ if (conn)
+ code = (*proc)(conn, &atrans->tid);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
+
afs_int32
ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
afs_int32 position, afs_int32 length, afs_int32 type)
{
- struct ubik_server *ts;
- afs_int32 code;
- afs_int32 rcode, okcalls;
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
+ int done;
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
-
- conn = Quorum_StartIO(atrans, ts);
-
- code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
-
- Quorum_EndIO(atrans, conn);
- conn = NULL;
-
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ while (!done) {
+ if (conn)
+ code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
+
afs_int32
ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
afs_int32 file, afs_int32 position, bulkdata *data)
{
- struct ubik_server *ts;
- afs_int32 code;
- afs_int32 rcode, okcalls;
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
+ int done;
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
-
- conn = Quorum_StartIO(atrans, ts);
-
- code = DISK_Write(conn, &atrans->tid, file, position, data);
-
- Quorum_EndIO(atrans, conn);
- conn = NULL;
-
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ while (!done) {
+ if (conn)
+ code = DISK_Write(conn, &atrans->tid, file, position, data);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
+
afs_int32
ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
afs_int32 file, afs_int32 length)
{
- struct ubik_server *ts;
- afs_int32 code;
- afs_int32 rcode, okcalls;
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
+ int done;
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
-
- conn = Quorum_StartIO(atrans, ts);
-
- code = DISK_Truncate(conn, &atrans->tid, file, length);
-
- Quorum_EndIO(atrans, conn);
- conn = NULL;
-
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ while (!done) {
+ if (conn)
+ code = DISK_Truncate(conn, &atrans->tid, file, length);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
+
afs_int32
ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
iovec_wrt * io_vector, iovec_buf *io_buffer)
{
- struct ubik_server *ts;
- afs_int32 code;
- afs_int32 rcode, okcalls;
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
-
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
-
- conn = Quorum_StartIO(atrans, ts);
-
- code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
-
- Quorum_EndIO(atrans, conn);
- conn = NULL;
-
- if ((code <= -450) && (code > -500)) {
- /* An RPC interface mismatch (as defined in comerr/error_msg.c).
- * Un-bulk the entries and do individual DISK_Write calls
- * instead of DISK_WriteV.
- */
- struct ubik_iovec *iovec =
- (struct ubik_iovec *)io_vector->iovec_wrt_val;
- char *iobuf = (char *)io_buffer->iovec_buf_val;
- bulkdata tcbs;
- afs_int32 i, offset;
-
- conn = Quorum_StartIO(atrans, ts);
-
- for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
- /* Sanity check for going off end of buffer */
- if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
- code = UINTERNAL;
- break;
+ int done;
+
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ while (!done) {
+ if (conn) {
+ code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+ if ((code <= -450) && (code > -500)) {
+ /* An RPC interface mismatch (as defined in comerr/error_msg.c).
+ * Un-bulk the entries and do individual DISK_Write calls
+ * instead of DISK_WriteV.
+ */
+ struct ubik_iovec *iovec =
+ (struct ubik_iovec *)io_vector->iovec_wrt_val;
+ char *iobuf = (char *)io_buffer->iovec_buf_val;
+ bulkdata tcbs;
+ afs_int32 i, offset;
+
+ for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
+ /* Sanity check for going off end of buffer */
+ if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
+ code = UINTERNAL;
+ break;
+ }
+ tcbs.bulkdata_len = iovec[i].length;
+ tcbs.bulkdata_val = &iobuf[offset];
+ code = DISK_Write(conn, &atrans->tid, iovec[i].file,
+ iovec[i].position, &tcbs);
+ if (code)
+ break;
+ offset += iovec[i].length;
}
- tcbs.bulkdata_len = iovec[i].length;
- tcbs.bulkdata_val = &iobuf[offset];
-
- code =
- DISK_Write(conn, &atrans->tid, iovec[i].file,
- iovec[i].position, &tcbs);
- if (code)
- break;
-
- offset += iovec[i].length;
- }
-
- Quorum_EndIO(atrans, conn);
- conn = NULL;
- }
-
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
}
}
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
+
afs_int32
ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
ubik_version *OldVersion,
ubik_version *NewVersion)
{
- struct ubik_server *ts;
- afs_int32 code;
- afs_int32 rcode, okcalls;
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
struct rx_connection *conn;
+ int done;
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
-
- conn = Quorum_StartIO(atrans, ts);
-
- code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
-
- Quorum_EndIO(atrans, conn);
- conn = NULL;
-
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+ while (!done) {
+ if (conn)
+ code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
+
/*!
* \brief This routine initializes the ubik system for a set of servers.
* \return 0 for success, or an error code on failure.
if (code < 0)
return code;
+ udisk_Init(ubik_nBuffers);
+ ulock_Init();
+
+ code = uvote_Init();
+ if (code)
+ return code;
+ code = urecovery_Initialize(tdb);
+ if (code)
+ return code;
+ if (info)
+ code = ubeacon_InitServerListByInfo(myHost, info, clones);
+ else
+ code = ubeacon_InitServerList(myHost, serverList);
+ if (code)
+ return code;
+
ubik_callPortal = myPort;
/* try to get an additional security object */
ubik_sc[0] = rxnull_NewServerSecurityObject();
NULL, "rx_ServerProc", &junk);
#endif
- /* do basic initialization */
- code = uvote_Init();
- if (code)
- return code;
- code = urecovery_Initialize(tdb);
- if (code)
- return code;
- if (info)
- code = ubeacon_InitServerListByInfo(myHost, info, clones);
- else
- code = ubeacon_InitServerList(myHost, serverList);
- if (code)
- return code;
-
/* now start up async processes */
#ifdef AFS_PTHREAD_ENV
/* do assert stuff */
struct ubik_trans *jt;
struct ubik_trans *tt;
afs_int32 code;
-#if defined(UBIK_PAUSE)
- int count;
-#endif /* UBIK_PAUSE */
if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
/* it's not safe to use ubik_BeginTransReadAnyWrite without a
if ((transMode != UBIK_READTRANS) && readAny)
return UBADTYPE;
DBHOLD(dbase);
-#if defined(UBIK_PAUSE)
- /* if we're polling the slave sites, wait until the returns
- * are all in. Otherwise, the urecovery_CheckTid call may
- * glitch us.
- */
- if (transMode == UBIK_WRITETRANS)
- for (count = 75; dbase->flags & DBVOTING; --count) {
- DBRELE(dbase);
-#ifdef GRAND_PAUSE_DEBUGGING
- if (count == 75)
- fprintf(stderr,
- "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
- time(0), ntohs(ubik_callPortal));
- else
-#endif
- if (count <= 0) {
-#if 1
- fprintf(stderr,
- "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
- time(0), ntohs(ubik_callPortal));
-#endif
- return UNOQUORUM; /* a white lie */
- }
-#ifdef AFS_PTHREAD_ENV
- sleep(2);
-#else
- IOMGR_Sleep(2);
-#endif
- DBHOLD(dbase);
- }
-#endif /* UBIK_PAUSE */
if (urecovery_AllBetter(dbase, readAny) == 0) {
DBRELE(dbase);
return UNOQUORUM;