/*
* Copyright 2000, International Business Machines Corporation and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
#include <afsconfig.h>
#include <afs/param.h>
+#include <roken.h>
-#include <sys/types.h>
-#include <string.h>
-#include <stdarg.h>
-#include <time.h>
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
+#include <afs/opr.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
#else
-#include <sys/file.h>
-#include <netinet/in.h>
-#include <sys/param.h>
+# include <opr/lockstub.h>
#endif
#include <lock.h>
-#include <rx/xdr.h>
#include <rx/rx.h>
#include <afs/cellconfig.h>
+#include <afs/afsutil.h>
+
#define UBIK_INTERNALS
#include "ubik.h"
#include <lwp.h> /* temporary hack by klm */
-#define ERROR_EXIT(code) {error=(code); goto error_exit;}
+#define ERROR_EXIT(code) do { \
+ error = (code); \
+ goto error_exit; \
+} while (0)
/*!
* \file
* This system is organized in a hierarchical set of related modules. Modules
* at one level can only call modules at the same level or below.
- *
+ *
* At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
* operating system primitives.
*
* \li DISK--The module responsible for representing atomic transactions
* on the local disk. It maintains a new-value only log.
* \li LOCK--The module responsible for locking byte ranges in the database file.
- *
+ *
* At the next level (2) we have
- *
+ *
* \li RECOVERY--The module responsible for ensuring that all members of a quorum
* have the same up-to-date database after a new synchronization site is
* elected. This module runs only on the synchronization site.
- *
+ *
* At the next level (3) we have
*
* \li REMOTE--The module responsible for interpreting requests from the sync
struct ubik_dbase *ubik_dbase = 0;
struct ubik_stats ubik_stats;
afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
-afs_int32 ubik_epochTime = 0;
afs_int32 urecovery_state = 0;
-int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
-void *ubik_SRXSecurityRock;
+int (*ubik_SyncWriterCacheProc) (void);
struct ubik_server *ubik_servers;
short ubik_callPortal;
-static int BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+/* These global variables were used to control the server security layers.
+ * They are retained for backwards compatibility with legacy callers.
+ *
+ * The ubik_SetServerSecurityProcs() interface should be used instead.
+ */
+
+int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
+void *ubik_SRXSecurityRock;
+int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
+void *ubik_CheckRXSecurityRock;
+
+
+
+static int BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
struct ubik_trans **transPtr, int readAny);
-struct rx_securityClass *ubik_sc[3];
+static struct rx_securityClass **ubik_sc = NULL;
+static void (*buildSecClassesProc)(void *, struct rx_securityClass ***,
+ afs_int32 *) = NULL;
+static int (*checkSecurityProc)(void *, struct rx_call *) = NULL;
+static void *securityRock = NULL;
+
+struct version_data version_globals;
#define CStampVersion 1 /* meaning set ts->version */
+#define CCheckSyncAdvertised 2 /* check if the remote knows we are the sync-site */
-/*!
- * \brief Perform an operation at a quorum, handling error conditions.
- * \return 0 if all worked and a quorum was contacted successfully
- * \return otherwise mark failing server as down and return #UERROR
- *
- * \note If any server misses an update, we must wait #BIGTIME seconds before
- * allowing the transaction to commit, to ensure that the missing and
- * possibly still functioning server times out and stops handing out old
- * data. This is done in the commit code, where we wait for a server marked
- * down to have stayed down for #BIGTIME seconds before we allow a transaction
- * to commit. A server that fails but comes back up won't give out old data
- * because it is sent the sync count along with the beacon message that
- * marks it as \b really up (\p beaconSinceDown).
+static_inline struct rx_connection *
+Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
+{
+ struct rx_connection *conn;
+
+ UBIK_ADDR_LOCK;
+ conn = as->disk_rxcid;
+
+#ifdef AFS_PTHREAD_ENV
+ rx_GetConnection(conn);
+ UBIK_ADDR_UNLOCK;
+ DBRELE(atrans->dbase);
+#else
+ UBIK_ADDR_UNLOCK;
+#endif /* AFS_PTHREAD_ENV */
+
+ return conn;
+}
+
+static_inline void
+Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
+{
+#ifdef AFS_PTHREAD_ENV
+ DBHOLD(atrans->dbase);
+ rx_PutConnection(aconn);
+#endif /* AFS_PTHREAD_ENV */
+}
+
+
+/*
+ * Iterate over all servers. Callers pass in *ts which is used to track
+ * the current server.
+ * - Returns 1 if there are no more servers
+ * - Returns 0 with conn set to the connection for the current server if
+ * it's up and current
*/
-afs_int32
-ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
- register struct ubik_trans *atrans, int aflags)
+static int
+ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
+ struct rx_connection **conn, afs_int32 *rcode,
+ afs_int32 *okcalls, afs_int32 code, const char *procname)
{
- register struct ubik_server *ts;
- register afs_int32 code;
- afs_int32 rcode, okcalls;
-
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
- code = (*proc)(ts->disk_rxcid, &atrans->tid);
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
+ if (!*ts) {
+ /* Initial call - start iterating over servers */
+ *ts = ubik_servers;
+ *conn = NULL;
+ *rcode = 0;
+ *okcalls = 0;
+ } else {
+ if (*conn) {
+ Quorum_EndIO(atrans, *conn);
+ *conn = NULL;
+ if (code) { /* failure */
+ char hoststr[16];
+
+ *rcode = code;
+ UBIK_BEACON_LOCK;
+ (*ts)->up = 0; /* mark as down now; beacons will no longer be sent */
+ (*ts)->beaconSinceDown = 0;
+ UBIK_BEACON_UNLOCK;
+ (*ts)->currentDB = 0;
+ urecovery_LostServer(*ts); /* tell recovery to try to resend dbase later */
+ ViceLog(0, ("Server %s is marked down due to %s code %d\n",
+ afs_inet_ntoa_r((*ts)->addr[0], hoststr), procname, *rcode));
+ } else { /* success */
+ if (!(*ts)->isClone)
+ (*okcalls)++; /* count up how many worked */
+ if (aflags & CStampVersion) {
+ (*ts)->version = atrans->dbase->version;
+ }
}
}
+ *ts = (*ts)->next;
+ }
+ if (!(*ts))
+ return 1;
+ UBIK_BEACON_LOCK;
+ if (!(*ts)->up || !(*ts)->currentDB ||
+ /* do not call DISK_Begin until we know that lastYesState is set on the
+ * remote in question; otherwise, DISK_Begin will fail. */
+ ((aflags & CCheckSyncAdvertised) && !((*ts)->beaconSinceDown && (*ts)->lastVote))) {
+ UBIK_BEACON_UNLOCK;
+ (*ts)->currentDB = 0; /* db is no longer current; we just missed an update */
+ return 0; /* not up-to-date, don't bother. NULL conn will tell caller not to use */
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
+ UBIK_BEACON_UNLOCK;
+ *conn = Quorum_StartIO(atrans, *ts);
+ return 0;
+}
+
+static int
+ContactQuorum_rcode(int okcalls, afs_int32 rcode)
+{
+ /*
+ * return 0 if we successfully contacted a quorum, otherwise return error code.
+ * We don't have to contact ourselves (that was done locally)
+ */
if (okcalls + 1 >= ubik_quorum)
return 0;
else
- return rcode;
+ return (rcode != 0) ? rcode : UNOQUORUM;
}
-afs_int32
-ContactQuorum_DISK_Lock(register struct ubik_trans *atrans, int aflags,afs_int32 file,
- afs_int32 position, afs_int32 length, afs_int32 type)
+/*!
+ * \brief Perform an operation at a quorum, handling error conditions.
+ * \return 0 if all worked and a quorum was contacted successfully
+ * \return otherwise mark failing server as down and return #UERROR
+ *
+ * \note If any server misses an update, we must wait #BIGTIME seconds before
+ * allowing the transaction to commit, to ensure that the missing and
+ * possibly still functioning server times out and stops handing out old
+ * data. This is done in the commit code, where we wait for a server marked
+ * down to have stayed down for #BIGTIME seconds before we allow a transaction
+ * to commit. A server that fails but comes back up won't give out old data
+ * because it is sent the sync count along with the beacon message that
+ * marks it as \b really up (\p beaconSinceDown).
+ */
+static afs_int32
+ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
+ struct ubik_trans *atrans, int aflags, const char *procname)
{
- register struct ubik_server *ts;
- register afs_int32 code;
- afs_int32 rcode, okcalls;
-
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
- code = DISK_Lock(ts->disk_rxcid, &atrans->tid, file, position, length,
- type);
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
+ struct rx_connection *conn;
+ int done;
+
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+ while (!done) {
+ if (conn)
+ code = (*proc)(conn, &atrans->tid);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
-afs_int32
-ContactQuorum_DISK_Write(register struct ubik_trans *atrans, int aflags,
- afs_int32 file, afs_int32 position, bulkdata *data)
+
+static afs_int32
+ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
+ afs_int32 position, afs_int32 length, afs_int32 type)
{
- register struct ubik_server *ts;
- register afs_int32 code;
- afs_int32 rcode, okcalls;
-
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
- code = DISK_Write(ts->disk_rxcid, &atrans->tid, file, position, data);
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
+ struct rx_connection *conn;
+ int done;
+ char *procname = "DISK_Lock";
+
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+ while (!done) {
+ if (conn)
+ code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
-afs_int32
-ContactQuorum_DISK_Truncate(register struct ubik_trans *atrans, int aflags,
+static afs_int32
+ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
afs_int32 file, afs_int32 length)
{
- register struct ubik_server *ts;
- register afs_int32 code;
- afs_int32 rcode, okcalls;
-
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
- code = DISK_Truncate(ts->disk_rxcid, &atrans->tid, file, length);
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
+ struct rx_connection *conn;
+ int done;
+ char *procname = "DISK_Truncate";
+
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+ while (!done) {
+ if (conn)
+ code = DISK_Truncate(conn, &atrans->tid, file, length);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
-afs_int32
-ContactQuorum_DISK_WriteV(register struct ubik_trans *atrans, int aflags,
+
+static afs_int32
+ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
iovec_wrt * io_vector, iovec_buf *io_buffer)
{
- register struct ubik_server *ts;
- register afs_int32 code;
- afs_int32 rcode, okcalls;
-
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
-
- code = DISK_WriteV(ts->disk_rxcid, &atrans->tid, io_vector, io_buffer);
-
- if ((code <= -450) && (code > -500)) {
- /* An RPC interface mismatch (as defined in comerr/error_msg.c).
- * Un-bulk the entries and do individual DISK_Write calls
- * instead of DISK_WriteV.
- */
- struct ubik_iovec *iovec =
- (struct ubik_iovec *)io_vector->iovec_wrt_val;
- char *iobuf = (char *)io_buffer->iovec_buf_val;
- bulkdata tcbs;
- afs_int32 i, offset;
-
- for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
- /* Sanity check for going off end of buffer */
- if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
- code = UINTERNAL;
- break;
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
+ struct rx_connection *conn;
+ int done;
+ char *procname = "DISK_WriteV";
+
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+ while (!done) {
+ if (conn) {
+ procname = "DISK_WriteV"; /* in case previous fallback to DISK_Write */
+ code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+ if ((code <= -450) && (code > -500)) {
+ /* An RPC interface mismatch (as defined in comerr/error_msg.c).
+ * Un-bulk the entries and do individual DISK_Write calls
+ * instead of DISK_WriteV.
+ */
+ struct ubik_iovec *iovec =
+ (struct ubik_iovec *)io_vector->iovec_wrt_val;
+ char *iobuf = (char *)io_buffer->iovec_buf_val;
+ bulkdata tcbs;
+ afs_int32 i, offset;
+
+ procname = "DISK_Write"; /* for accurate error msg, if any */
+ for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
+ /* Sanity check for going off end of buffer */
+ if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
+ code = UINTERNAL;
+ break;
+ }
+ tcbs.bulkdata_len = iovec[i].length;
+ tcbs.bulkdata_val = &iobuf[offset];
+ code = DISK_Write(conn, &atrans->tid, iovec[i].file,
+ iovec[i].position, &tcbs);
+ if (code)
+ break;
+ offset += iovec[i].length;
}
- tcbs.bulkdata_len = iovec[i].length;
- tcbs.bulkdata_val = &iobuf[offset];
- code =
- DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
- iovec[i].position, &tcbs);
- if (code)
- break;
-
- offset += iovec[i].length;
- }
- }
-
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
}
}
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
+
afs_int32
-ContactQuorum_DISK_SetVersion(register struct ubik_trans *atrans, int aflags,
+ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
ubik_version *OldVersion,
ubik_version *NewVersion)
{
- register struct ubik_server *ts;
- register afs_int32 code;
- afs_int32 rcode, okcalls;
-
- rcode = 0;
- okcalls = 0;
- for (ts = ubik_servers; ts; ts = ts->next) {
- /* for each server */
- if (!ts->up || !ts->currentDB) {
- ts->currentDB = 0; /* db is no longer current; we just missed an update */
- continue; /* not up-to-date, don't bother */
- }
- code = DISK_SetVersion(ts->disk_rxcid, &atrans->tid, OldVersion,
- NewVersion);
- if (code) { /* failure */
- rcode = code;
- ts->up = 0; /* mark as down now; beacons will no longer be sent */
- ts->currentDB = 0;
- ts->beaconSinceDown = 0;
- urecovery_LostServer(); /* tell recovery to try to resend dbase later */
- } else { /* success */
- if (!ts->isClone)
- okcalls++; /* count up how many worked */
- if (aflags & CStampVersion) {
- ts->version = atrans->dbase->version;
- }
- }
+ struct ubik_server *ts = NULL;
+ afs_int32 code = 0, rcode, okcalls;
+ struct rx_connection *conn;
+ int done;
+ char *procname = "DISK_SetVersion";
+
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
+ while (!done) {
+ if (conn)
+ code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+ done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code, procname);
}
- /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
- if (okcalls + 1 >= ubik_quorum)
- return 0;
- else
- return rcode;
+ return ContactQuorum_rcode(okcalls, rcode);
}
-/*!
+#if defined(AFS_PTHREAD_ENV)
+static int
+ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
+ opr_Verify(pthread_attr_init(tattr) == 0);
+ opr_Verify(pthread_attr_setdetachstate(tattr,
+ PTHREAD_CREATE_DETACHED) == 0);
+ opr_Verify(pthread_create(thread, tattr, proc, NULL) == 0);
+ return 0;
+}
+#endif
+
+/*!
* \brief This routine initializes the ubik system for a set of servers.
* \return 0 for success, or an error code on failure.
* \param serverList set of servers specified; nServers gives the number of entries in this array.
- * \param pathName provides an initial prefix used for naming storage files used by this system.
- * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.
+ * \param pathName provides an initial prefix used for naming storage files used by this system.
+ * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.
*
* \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
*
*
* \see ubik_ServerInit(), ubik_ServerInitByInfo()
*/
-int
-ubik_ServerInitCommon(afs_int32 myHost, short myPort,
+static int
+ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
struct afsconf_cell *info, char clones[],
- afs_int32 serverList[], const char *pathName,
+ afs_uint32 serverList[], const char *pathName,
struct ubik_dbase **dbase)
{
- register struct ubik_dbase *tdb;
- register afs_int32 code;
+ struct ubik_dbase *tdb;
+ afs_int32 code;
#ifdef AFS_PTHREAD_ENV
pthread_t rxServerThread; /* pthread variables */
pthread_t ubeacon_InteractThread;
afs_int32 secIndex;
struct rx_securityClass *secClass;
+ int numClasses;
struct rx_service *tservice;
initialize_U_error_table();
- tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
- tdb->pathName = (char *)malloc(strlen(pathName) + 1);
- strcpy(tdb->pathName, pathName);
+ tdb = malloc(sizeof(struct ubik_dbase));
+ tdb->pathName = strdup(pathName);
tdb->activeTrans = (struct ubik_trans *)0;
memset(&tdb->version, 0, sizeof(struct ubik_version));
memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
+#ifdef AFS_PTHREAD_ENV
+ opr_mutex_init(&tdb->versionLock);
+ opr_mutex_init(&beacon_globals.beacon_lock);
+ opr_mutex_init(&vote_globals.vote_lock);
+ opr_mutex_init(&addr_globals.addr_lock);
+ opr_mutex_init(&version_globals.version_lock);
+#else
Lock_Init(&tdb->versionLock);
+#endif
+ Lock_Init(&tdb->cache_lock);
tdb->flags = 0;
tdb->read = uphys_read;
tdb->write = uphys_write;
tdb->getlabel = uphys_getlabel;
tdb->setlabel = uphys_setlabel;
tdb->getnfiles = uphys_getnfiles;
+ tdb->buffered_append = uphys_buf_append;
tdb->readers = 0;
tdb->tidCounter = tdb->writeTidCounter = 0;
*dbase = tdb;
ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
#ifdef AFS_PTHREAD_ENV
- assert(pthread_cond_init(&tdb->version_cond, NULL) == 0);
- assert(pthread_cond_init(&tdb->flags_cond, NULL) == 0);
- assert(pthread_mutex_init(&tdb->version_mutex, NULL) == 0);
- assert(pthread_mutex_init(&tdb->flags_mutex, NULL) == 0);
+ opr_cv_init(&tdb->version_cond);
+ opr_cv_init(&tdb->flags_cond);
#endif /* AFS_PTHREAD_ENV */
/* initialize RX */
return code;
ubik_callPortal = myPort;
+
+ udisk_Init(ubik_nBuffers);
+ ulock_Init();
+
+ code = uvote_Init();
+ if (code)
+ return code;
+ code = urecovery_Initialize(tdb);
+ if (code)
+ return code;
+ if (info)
+ code = ubeacon_InitServerListByInfo(myHost, info, clones);
+ else
+ code = ubeacon_InitServerList(myHost, serverList);
+ if (code)
+ return code;
+
/* try to get an additional security object */
- ubik_sc[0] = rxnull_NewServerSecurityObject();
- ubik_sc[1] = 0;
- ubik_sc[2] = 0;
- if (ubik_SRXSecurityProc) {
- code =
- (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
- &secIndex);
- if (code == 0) {
- ubik_sc[secIndex] = secClass;
+ if (buildSecClassesProc == NULL) {
+ numClasses = 3;
+ ubik_sc = calloc(numClasses, sizeof(struct rx_securityClass *));
+ ubik_sc[0] = rxnull_NewServerSecurityObject();
+ if (ubik_SRXSecurityProc) {
+ code = (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock,
+ &secClass,
+ &secIndex);
+ if (code == 0) {
+ ubik_sc[secIndex] = secClass;
+ }
}
+ } else {
+ (*buildSecClassesProc) (securityRock, &ubik_sc, &numClasses);
}
- /* for backwards compat this should keep working as it does now
+ /* for backwards compat this should keep working as it does now
and not host bind */
-#if 0
- /* This really needs to be up above, where I have put it. It works
- * here when we're non-pthreaded, but the code above, when using
- * pthreads may (and almost certainly does) end up calling on a
- * pthread resource which gets initialized by rx_Init. The end
- * result is that an assert fails and the program dies. -- klm
- */
- code = rx_Init(myPort);
- if (code < 0)
- return code;
-#endif
tservice =
- rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
+ rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, numClasses,
VOTE_ExecuteRequest);
if (tservice == (struct rx_service *)0) {
- ubik_dprint("Could not create VOTE rx service!\n");
+ ViceLog(0, ("Could not create VOTE rx service!\n"));
return -1;
}
rx_SetMinProcs(tservice, 2);
rx_SetMaxProcs(tservice, 3);
tservice =
- rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
+ rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, numClasses,
DISK_ExecuteRequest);
if (tservice == (struct rx_service *)0) {
- ubik_dprint("Could not create DISK rx service!\n");
+ ViceLog(0, ("Could not create DISK rx service!\n"));
return -1;
}
rx_SetMinProcs(tservice, 2);
rx_SetMaxProcs(tservice, 3);
- /* start an rx_ServerProc to handle incoming RPC's in particular the
+ /* start an rx_ServerProc to handle incoming RPC's in particular the
* UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
* the "steplock" problem in ubik initialization. Defect 11037.
*/
#ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
- assert(pthread_attr_init(&rxServer_tattr) == 0);
- assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/* assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
-
- assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
+ ubik_thread_create(&rxServer_tattr, &rxServerThread, (void *)rx_ServerProc);
#else
LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
NULL, "rx_ServerProc", &junk);
#endif
- /* do basic initialization */
- code = uvote_Init();
- if (code)
- return code;
- code = urecovery_Initialize(tdb);
- if (code)
- return code;
- if (info)
- code = ubeacon_InitServerListByInfo(myHost, info, clones);
- else
- code = ubeacon_InitServerList(myHost, serverList);
+ /* send addrs to all other servers */
+ code = ubeacon_updateUbikNetworkAddress(ubik_host);
if (code)
return code;
/* now start up async processes */
#ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
- assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
- assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/* assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
- /* need another attr set here for priority??? - klm */
-
- assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
- (void *)ubeacon_Interact, NULL) == 0);
+ ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
+ (void *)ubeacon_Interact);
#else
code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
#endif
#ifdef AFS_PTHREAD_ENV
-/* do assert stuff */
- assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
- assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
-/* assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
- /* need another attr set here for priority??? - klm */
-
- assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
- (void *)urecovery_Interact, NULL) == 0);
-
+ ubik_thread_create(&urecovery_Interact_tattr, &urecovery_InteractThread,
+ (void *)urecovery_Interact);
return 0; /* is this correct? - klm */
-#else
+#else
code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
&junk);
* \see ubik_ServerInitCommon()
*/
int
-ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
+ubik_ServerInitByInfo(afs_uint32 myHost, short myPort,
struct afsconf_cell *info, char clones[],
const char *pathName, struct ubik_dbase **dbase)
{
* \see ubik_ServerInitCommon()
*/
int
-ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
+ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
const char *pathName, struct ubik_dbase **dbase)
{
afs_int32 code;
* \brief This routine begins a read or write transaction on the transaction
* identified by transPtr, in the dbase named by dbase.
*
- * An open mode of ubik_READTRANS identifies this as a read transaction,
+ * An open mode of ubik_READTRANS identifies this as a read transaction,
* while a mode of ubik_WRITETRANS identifies this as a write transaction.
- * transPtr is set to the returned transaction control block.
- * The readAny flag is set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
- * ubik_BeginTransReadAny() below.
+ * transPtr is set to the returned transaction control block.
+ * The readAny flag is set to 0 or 1 or 2 by the wrapper functions
+ * ubik_BeginTrans() or ubik_BeginTransReadAny() or
+ * ubik_BeginTransReadAnyWrite() below.
*
* \note We can only begin transaction when we have an up-to-date database.
*/
static int
-BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
struct ubik_trans **transPtr, int readAny)
{
struct ubik_trans *jt;
- register struct ubik_trans *tt;
- register afs_int32 code;
-#if defined(UBIK_PAUSE)
- int count;
-#endif /* UBIK_PAUSE */
+ struct ubik_trans *tt;
+ afs_int32 code;
+
+ if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
+ /* it's not safe to use ubik_BeginTransReadAnyWrite without a
+ * cache-syncing function; fall back to ubik_BeginTransReadAny,
+ * which is safe but slower */
+ ViceLog(0, ("ubik_BeginTransReadAnyWrite called, but "
+ "ubik_SyncWriterCacheProc not set; pretending "
+ "ubik_BeginTransReadAny was called instead\n"));
+ readAny = 1;
+ }
if ((transMode != UBIK_READTRANS) && readAny)
return UBADTYPE;
DBHOLD(dbase);
-#if defined(UBIK_PAUSE)
- /* if we're polling the slave sites, wait until the returns
- * are all in. Otherwise, the urecovery_CheckTid call may
- * glitch us.
- */
- if (transMode == UBIK_WRITETRANS)
- for (count = 75; dbase->flags & DBVOTING; --count) {
- DBRELE(dbase);
-#ifdef GRAND_PAUSE_DEBUGGING
- if (count == 75)
- fprintf(stderr,
- "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
- time(0), ntohs(ubik_callPortal));
- else
-#endif
- if (count <= 0) {
-#if 1
- fprintf(stderr,
- "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
- time(0), ntohs(ubik_callPortal));
-#endif
- return UNOQUORUM; /* a white lie */
- }
-#ifdef AFS_PTHREAD_ENV
- sleep(2);
-#else
- IOMGR_Sleep(2);
-#endif
- DBHOLD(dbase);
- }
-#endif /* UBIK_PAUSE */
if (urecovery_AllBetter(dbase, readAny) == 0) {
DBRELE(dbase);
return UNOQUORUM;
if (transMode == UBIK_WRITETRANS) {
/* if we're writing already, wait */
while (dbase->flags & DBWRITING) {
- DBRELE(dbase);
#ifdef AFS_PTHREAD_ENV
- assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
- assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
- assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
+ opr_cv_wait(&dbase->flags_cond, &dbase->versionLock);
#else
+ DBRELE(dbase);
LWP_WaitProcess(&dbase->flags);
-#endif
DBHOLD(dbase);
+#endif
}
+
if (!ubeacon_AmSyncSite()) {
DBRELE(dbase);
return UNOTSYNC;
}
+ if (!ubeacon_SyncSiteAdvertised()) {
+ /* i am the sync-site but the remotes are not aware yet */
+ DBRELE(dbase);
+ return UNOQUORUM;
+ }
}
/* create the transaction */
code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
tt = jt; /* move to a register */
- if (code || tt == (struct ubik_trans *)NULL) {
+ if (code || tt == NULL) {
DBRELE(dbase);
return code;
}
- if (readAny)
+ UBIK_VERSION_LOCK;
+ if (readAny) {
tt->flags |= TRREADANY;
+ if (readAny > 1) {
+ tt->flags |= TRREADWRITE;
+ }
+ }
/* label trans and dbase with new tid */
- tt->tid.epoch = ubik_epochTime;
+ tt->tid.epoch = version_globals.ubik_epochTime;
/* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
tt->tid.counter = (dbase->tidCounter += 2);
if (transMode == UBIK_WRITETRANS) {
/* for a write trans, we have to keep track of the write tid counter too */
-#if defined(UBIK_PAUSE)
dbase->writeTidCounter = tt->tid.counter;
-#else
- dbase->writeTidCounter += 2;
-#endif /* UBIK_PAUSE */
+ }
+
+ UBIK_VERSION_UNLOCK;
+ if (transMode == UBIK_WRITETRANS) {
/* next try to start transaction on appropriate number of machines */
- code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
+ code = ContactQuorum_NoArguments(DISK_Begin, tt, CCheckSyncAdvertised, "DISK_Begin");
if (code) {
/* we must abort the operation */
udisk_abort(tt);
- ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, tt, 0, "DISK_Abort");
udisk_end(tt);
DBRELE(dbase);
return code;
* \see BeginTrans()
*/
int
-ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+ubik_BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
struct ubik_trans **transPtr)
{
return BeginTrans(dbase, transMode, transPtr, 0);
* \see BeginTrans()
*/
int
-ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
+ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
struct ubik_trans **transPtr)
{
return BeginTrans(dbase, transMode, transPtr, 1);
}
/*!
+ * \see BeginTrans()
+ */
+int
+ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
+ struct ubik_trans **transPtr)
+{
+ return BeginTrans(dbase, transMode, transPtr, 2);
+}
+
+/*!
* \brief This routine ends a read or write transaction by aborting it.
*/
int
-ubik_AbortTrans(register struct ubik_trans *transPtr)
+ubik_AbortTrans(struct ubik_trans *transPtr)
{
- register afs_int32 code;
+ afs_int32 code;
afs_int32 code2;
- register struct ubik_dbase *dbase;
+ struct ubik_dbase *dbase;
dbase = transPtr->dbase;
+
+ if (transPtr->flags & TRCACHELOCKED) {
+ ReleaseReadLock(&dbase->cache_lock);
+ transPtr->flags &= ~TRCACHELOCKED;
+ }
+
+ ObtainWriteLock(&dbase->cache_lock);
+
DBHOLD(dbase);
memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+
+ ReleaseWriteLock(&dbase->cache_lock);
+
/* see if we're still up-to-date */
if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
udisk_abort(transPtr);
}
/* now it is safe to try remote abort */
- code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
+ code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
code2 = udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
return (code ? code : code2);
}
+static void
+WritebackApplicationCache(struct ubik_dbase *dbase)
+{
+ int code = 0;
+ if (ubik_SyncWriterCacheProc) {
+ code = ubik_SyncWriterCacheProc();
+ }
+ if (code) {
+ /* we failed to sync the local cache, so just invalidate the cache;
+ * we'll try to read the cache in again on the next read */
+ memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
+ } else {
+ memcpy(&dbase->cachedVersion, &dbase->version,
+ sizeof(dbase->cachedVersion));
+ }
+}
+
/*!
* \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
* \return an error code.
*/
int
-ubik_EndTrans(register struct ubik_trans *transPtr)
+ubik_EndTrans(struct ubik_trans *transPtr)
{
- register afs_int32 code;
+ afs_int32 code;
struct timeval tv;
afs_int32 realStart;
- register struct ubik_server *ts;
+ struct ubik_server *ts;
afs_int32 now;
- register struct ubik_dbase *dbase;
+ int cachelocked = 0;
+ struct ubik_dbase *dbase;
if (transPtr->type == UBIK_WRITETRANS) {
code = ubik_Flush(transPtr);
}
dbase = transPtr->dbase;
+
+ if (transPtr->flags & TRCACHELOCKED) {
+ ReleaseReadLock(&dbase->cache_lock);
+ transPtr->flags &= ~TRCACHELOCKED;
+ }
+
+ if (transPtr->type != UBIK_READTRANS) {
+ /* must hold cache_lock before DBHOLD'ing */
+ ObtainWriteLock(&dbase->cache_lock);
+ cachelocked = 1;
+ }
+
DBHOLD(dbase);
- memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
/* give up if no longer current */
if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
- return UNOQUORUM;
+ code = UNOQUORUM;
+ goto error;
}
if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
goto success; /* update cachedVersion correctly */
udisk_end(transPtr);
DBRELE(dbase);
- return code;
+ goto error;
}
if (!ubeacon_AmSyncSite()) { /* no longer sync site */
udisk_abort(transPtr);
udisk_end(transPtr);
DBRELE(dbase);
- return UNOTSYNC;
+ code = UNOTSYNC;
+ goto error;
}
/* now it is safe to do commit */
code = udisk_commit(transPtr);
- if (code == 0)
- code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
+ if (code == 0) {
+ /* db data has been committed locally; update the local cache so
+ * readers can get at it */
+ WritebackApplicationCache(dbase);
+
+ ReleaseWriteLock(&dbase->cache_lock);
+
+ code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion, "DISK_Commit");
+
+ } else {
+ memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+ ReleaseWriteLock(&dbase->cache_lock);
+ }
+ cachelocked = 0;
if (code) {
/* failed to commit, so must return failure. Try to clear locks first, just for fun
* Note that we don't know if this transaction will eventually commit at this point.
* we lose. If we contact a majority of sites, then we won't be here: contacting
* a majority guarantees commit, since it guarantees that one dude will be a
* member of the next quorum. */
- ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+ ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
udisk_end(transPtr);
DBRELE(dbase);
- return code;
+ goto error;
}
/* before we can start sending unlock messages, we must wait until all servers
* that are possibly still functioning on the other side of a network partition
* to us, or timeout. Put safety check in anyway */
if (now - realStart > 10 * BIGTIME) {
ubik_stats.escapes++;
- ubik_print("ubik escaping from commit wait\n");
+ ViceLog(0, ("ubik escaping from commit wait\n"));
break;
}
for (ts = ubik_servers; ts; ts = ts->next) {
+ UBIK_BEACON_LOCK;
if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
+ UBIK_BEACON_UNLOCK;
+
/* this guy could have some damaged data, wait for him */
code = 1;
tv.tv_sec = 1; /* try again after a while (ha ha) */
tv.tv_usec = 0;
+
#ifdef AFS_PTHREAD_ENV
+ /* we could release the dbase outside of the loop, but we do
+ * it here, in the loop, to avoid an unnecessary RELE/HOLD
+ * if all sites are up */
+ DBRELE(dbase);
select(0, 0, 0, 0, &tv);
+ DBHOLD(dbase);
#else
IOMGR_Select(0, 0, 0, 0, &tv); /* poll, should we wait on something? */
#endif
+
break;
}
+ UBIK_BEACON_UNLOCK;
}
if (code == 0)
break; /* no down ones still pseudo-active */
* The transaction is committed anyway, since we succeeded in contacting a quorum
* at the start (when invoking the DiskCommit function).
*/
- ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
+ ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0, "DISK_ReleaseLocks");
success:
udisk_end(transPtr);
- /* update version on successful EndTrans */
- memcpy(&dbase->cachedVersion, &dbase->version,
- sizeof(struct ubik_version));
-
+ /* don't update cachedVersion here; it should have been updated way back
+ * in ubik_CheckCache, and earlier in this function for writes */
DBRELE(dbase);
+ if (cachelocked) {
+ ReleaseWriteLock(&dbase->cache_lock);
+ }
return 0;
+
+ error:
+ if (!cachelocked) {
+ ObtainWriteLock(&dbase->cache_lock);
+ }
+ memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
+ ReleaseWriteLock(&dbase->cache_lock);
+ return code;
}
/*!
* \brief This routine reads length bytes into buffer from the current position in the database.
- *
+ *
* The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length. A short read returns zero for an error code.
*
* \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
*/
int
-ubik_Read(register struct ubik_trans *transPtr, void *buffer,
+ubik_Read(struct ubik_trans *transPtr, void *buffer,
afs_int32 length)
{
- register afs_int32 code;
+ afs_int32 code;
/* reads are easy to do: handle locally */
DBHOLD(transPtr->dbase);
}
/*!
- * \brief This routine will flush the io data in the iovec structures.
+ * \brief This routine will flush the io data in the iovec structures.
*
* It first flushes to the local disk and then uses ContactQuorum to write it
* to the other servers.
if (transPtr->type != UBIK_WRITETRANS)
return UBADTYPE;
+
+ DBHOLD(transPtr->dbase);
if (!transPtr->iovec_info.iovec_wrt_len
- || !transPtr->iovec_info.iovec_wrt_val)
+ || !transPtr->iovec_info.iovec_wrt_val) {
+ DBRELE(transPtr->dbase);
return 0;
+ }
- DBHOLD(transPtr->dbase);
if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
ERROR_EXIT(UNOQUORUM);
if (!ubeacon_AmSyncSite()) /* only sync site can write */
&transPtr->iovec_data);
if (code) {
udisk_abort(transPtr);
- ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
transPtr->iovec_info.iovec_wrt_len = 0;
transPtr->iovec_data.iovec_buf_len = 0;
ERROR_EXIT(code);
}
int
-ubik_Write(register struct ubik_trans *transPtr, void *vbuffer,
+ubik_Write(struct ubik_trans *transPtr, void *vbuffer,
afs_int32 length)
{
struct ubik_iovec *iovec;
return 0;
}
+ DBHOLD(transPtr->dbase);
if (!transPtr->iovec_info.iovec_wrt_val) {
transPtr->iovec_info.iovec_wrt_len = 0;
transPtr->iovec_info.iovec_wrt_val =
- (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
- sizeof(struct ubik_iovec));
+ malloc(IOVEC_MAXWRT * sizeof(struct ubik_iovec));
transPtr->iovec_data.iovec_buf_len = 0;
- transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
+ transPtr->iovec_data.iovec_buf_val = malloc(IOVEC_MAXBUF);
if (!transPtr->iovec_info.iovec_wrt_val
|| !transPtr->iovec_data.iovec_buf_val) {
if (transPtr->iovec_info.iovec_wrt_val)
if (transPtr->iovec_data.iovec_buf_val)
free(transPtr->iovec_data.iovec_buf_val);
transPtr->iovec_data.iovec_buf_val = 0;
+ DBRELE(transPtr->dbase);
return UNOMEM;
}
}
/* If this write won't fit in the structure, then flush it out and start anew */
if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
|| ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
+ /* Can't hold the DB lock over ubik_Flush */
+ DBRELE(transPtr->dbase);
code = ubik_Flush(transPtr);
if (code)
return (code);
+ DBHOLD(transPtr->dbase);
}
- DBHOLD(transPtr->dbase);
if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
ERROR_EXIT(UNOQUORUM);
if (!ubeacon_AmSyncSite()) /* only sync site can write */
* and a byte position relative to the specified file \p position.
*/
int
-ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
+ubik_Seek(struct ubik_trans *transPtr, afs_int32 fileid,
afs_int32 position)
{
- register afs_int32 code;
+ afs_int32 code;
DBHOLD(transPtr->dbase);
if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
* transaction in \p fileid and \p position.
*/
int
-ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
+ubik_Tell(struct ubik_trans *transPtr, afs_int32 * fileid,
afs_int32 * position)
{
DBHOLD(transPtr->dbase);
* bytes, if length is less than the file's current size.
*/
int
-ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
+ubik_Truncate(struct ubik_trans *transPtr, afs_int32 length)
{
afs_int32 code, error = 0;
if (code) {
/* we must abort the operation */
udisk_abort(transPtr);
- ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, transPtr, 0, "DISK_Abort");
ERROR_EXIT(code);
}
if (code) {
/* we must abort the operation */
udisk_abort(atrans);
- ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
+ /* force aborts to the others */
+ ContactQuorum_NoArguments(DISK_Abort, atrans, 0, "DISK_Abort");
ERROR_EXIT(code);
}
}
}
/*!
- * \brief utility to wait for a version # to change
+ * \brief Facility to simplify database caching.
+ * \return zero if last trans was done on the local server and was successful.
+ * \return -1 means bad (NULL) argument.
+ *
+ * If return value is non-zero and the caller is a server caching part of the
+ * Ubik database, it should invalidate that cache.
*/
-int
-ubik_WaitVersion(register struct ubik_dbase *adatabase,
- register struct ubik_version *aversion)
+static int
+ubik_CacheUpdate(struct ubik_trans *atrans)
{
- while (1) {
- /* wait until version # changes, and then return */
- if (vcmp(*aversion, adatabase->version) != 0)
- return 0;
-#ifdef AFS_PTHREAD_ENV
- assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
- assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
- assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
-#else
- LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
-#endif
- }
+ if (!(atrans && atrans->dbase))
+ return -1;
+ return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
}
-/*!
- * \brief utility to get the version of the dbase a transaction is dealing with
+/**
+ * check and possibly update cache of ubik db.
+ *
+ * If the version of the cached db data is out of date, this calls (*check) to
+ * update the cache. If (*check) returns success, we update the version of the
+ * cached db data.
+ *
+ * Checking the version of the cached db data is done under a read lock;
+ * updating the cache (and thus calling (*check)) is done under a write lock
+ * so is guaranteed not to interfere with another thread's (*check). On
+ * successful return, a read lock on the cached db data is obtained, which
+ * will be released by ubik_EndTrans or ubik_AbortTrans.
+ *
+ * @param[in] atrans ubik transaction
+ * @param[in] check function to call to check/update cache
+ * @param[in] rock rock to pass to *check
+ *
+ * @return operation status
+ * @retval 0 success
+ * @retval nonzero error; cachedVersion not updated
+ *
+ * @post On success, application cache is read-locked, and cache data is
+ * up-to-date
*/
int
-ubik_GetVersion(register struct ubik_trans *atrans,
- register struct ubik_version *avers)
+ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func cbf, void *rock)
{
- *avers = atrans->dbase->version;
- return 0;
-}
+ int ret = 0;
-/*!
- * \brief Facility to simplify database caching.
- * \return zero if last trans was done on the local server and was successful.
- * \return -1 means bad (NULL) argument.
- *
- * If return value is non-zero and the caller is a server caching part of the
- * Ubik database, it should invalidate that cache.
- */
-int
-ubik_CacheUpdate(register struct ubik_trans *atrans)
-{
if (!(atrans && atrans->dbase))
return -1;
- return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
+
+ ObtainReadLock(&atrans->dbase->cache_lock);
+
+ while (ubik_CacheUpdate(atrans) != 0) {
+
+ ReleaseReadLock(&atrans->dbase->cache_lock);
+ ObtainSharedLock(&atrans->dbase->cache_lock);
+
+ if (ubik_CacheUpdate(atrans) != 0) {
+
+ BoostSharedLock(&atrans->dbase->cache_lock);
+
+ ret = (*cbf) (atrans, rock);
+ if (ret == 0) {
+ memcpy(&atrans->dbase->cachedVersion, &atrans->dbase->version,
+ sizeof(atrans->dbase->cachedVersion));
+ }
+ }
+
+ /* It would be nice if we could convert from a shared lock to a read
+ * lock... instead, just release the shared and acquire the read */
+ ReleaseSharedLock(&atrans->dbase->cache_lock);
+
+ if (ret) {
+ /* if we have an error, don't retry, and don't hold any locks */
+ return ret;
+ }
+
+ ObtainReadLock(&atrans->dbase->cache_lock);
+ }
+
+ atrans->flags |= TRCACHELOCKED;
+
+ return 0;
}
/*!
- * "Who said anything about panicking?" snapped Arthur.
+ * "Who said anything about panicking?" snapped Arthur.
* "This is still just the culture shock. You wait till I've settled down
* into the situation and found my bearings. \em Then I'll start panicking!"
* --Authur Dent
va_list ap;
va_start(ap, format);
- ubik_print("Ubik PANIC: ");
- ubik_vprint(format, ap);
+ ViceLog(0, ("Ubik PANIC:\n"));
+ vViceLog(0, (format, ap));
va_end(ap);
abort();
- ubik_print("BACK FROM ABORT\n"); /* shouldn't come back */
- exit(1); /* never know, though */
+ AFS_UNREACHED(ViceLog(0, ("BACK FROM ABORT\n")));
+ AFS_UNREACHED(exit(1));
}
/*!
struct ubik_server *ts;
int j;
+ UBIK_ADDR_LOCK;
for (ts = ubik_servers; ts; ts = ts->next)
for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
- if (ts->addr[j] == addr)
+ if (ts->addr[j] == addr) {
+ UBIK_ADDR_UNLOCK;
return ts->addr[0]; /* net byte order */
+ }
+ UBIK_ADDR_UNLOCK;
return 0; /* if not in server database, return error */
}
+
+int
+ubik_CheckAuth(struct rx_call *acall)
+{
+ if (checkSecurityProc)
+ return (*checkSecurityProc) (securityRock, acall);
+ else if (ubik_CheckRXSecurityProc) {
+ return (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
+ } else
+ return 0;
+}
+
+void
+ubik_SetServerSecurityProcs(void (*buildproc) (void *,
+ struct rx_securityClass ***,
+ afs_int32 *),
+ int (*checkproc) (void *, struct rx_call *),
+ void *rock)
+{
+ buildSecClassesProc = buildproc;
+ checkSecurityProc = checkproc;
+ securityRock = rock;
+}