#include <roken.h>
+
+#include <afs/opr.h>
+#ifdef AFS_PTHREAD_ENV
+# include <opr/lock.h>
+#else
+# include <opr/lockstub.h>
+#endif
+
#include <lock.h>
-#include <rx/xdr.h>
#include <rx/rx.h>
#include <afs/cellconfig.h>
+
#define UBIK_INTERNALS
#include "ubik.h"
#include "ubik_int.h"
struct ubik_dbase *ubik_dbase = 0;
struct ubik_stats ubik_stats;
afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
-afs_int32 ubik_epochTime = 0;
afs_int32 urecovery_state = 0;
int (*ubik_SyncWriterCacheProc) (void);
struct ubik_server *ubik_servers;
static int (*checkSecurityProc)(void *, struct rx_call *) = NULL;
static void *securityRock = NULL;
+struct version_data version_globals;
+
#define CStampVersion 1 /* meaning set ts->version */
static_inline struct rx_connection *
{
struct rx_connection *conn;
+ UBIK_ADDR_LOCK;
conn = as->disk_rxcid;
#ifdef AFS_PTHREAD_ENV
rx_GetConnection(conn);
+ UBIK_ADDR_UNLOCK;
DBRELE(atrans->dbase);
+#else
+ UBIK_ADDR_UNLOCK;
#endif /* AFS_PTHREAD_ENV */
return conn;
if (okcalls + 1 >= ubik_quorum)
return 0;
else
- return rcode;
+ return (rcode != 0) ? rcode : UNOQUORUM;
}
/*!
#if defined(AFS_PTHREAD_ENV)
static int
ubik_thread_create(pthread_attr_t *tattr, pthread_t *thread, void *proc) {
- osi_Assert(pthread_attr_init(tattr) == 0);
- osi_Assert(pthread_attr_setdetachstate(tattr, PTHREAD_CREATE_DETACHED) == 0);
- osi_Assert(pthread_create(thread, tattr, proc, NULL) == 0);
+ opr_Verify(pthread_attr_init(tattr) == 0);
+ opr_Verify(pthread_attr_setdetachstate(tattr,
+ PTHREAD_CREATE_DETACHED) == 0);
+ opr_Verify(pthread_create(thread, tattr, proc, NULL) == 0);
return 0;
}
#endif
*
* \see ubik_ServerInit(), ubik_ServerInitByInfo()
*/
-int
+static int
ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
struct afsconf_cell *info, char clones[],
afs_uint32 serverList[], const char *pathName,
initialize_U_error_table();
- tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
- tdb->pathName = (char *)malloc(strlen(pathName) + 1);
- strcpy(tdb->pathName, pathName);
+ tdb = malloc(sizeof(struct ubik_dbase));
+ tdb->pathName = strdup(pathName);
tdb->activeTrans = (struct ubik_trans *)0;
memset(&tdb->version, 0, sizeof(struct ubik_version));
memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
#ifdef AFS_PTHREAD_ENV
- MUTEX_INIT(&tdb->versionLock, "version lock", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&beacon_globals.beacon_lock, "beacon lock", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&vote_globals.vote_lock, "vote lock", MUTEX_DEFAULT, 0);
+ opr_mutex_init(&tdb->versionLock);
+ opr_mutex_init(&beacon_globals.beacon_lock);
+ opr_mutex_init(&vote_globals.vote_lock);
+ opr_mutex_init(&addr_globals.addr_lock);
+ opr_mutex_init(&version_globals.version_lock);
#else
Lock_Init(&tdb->versionLock);
#endif
ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
#ifdef AFS_PTHREAD_ENV
- CV_INIT(&tdb->version_cond, "version", CV_DEFAULT, 0);
- CV_INIT(&tdb->flags_cond, "flags", CV_DEFAULT, 0);
+ opr_cv_init(&tdb->version_cond);
+ opr_cv_init(&tdb->flags_cond);
#endif /* AFS_PTHREAD_ENV */
/* initialize RX */
if (code < 0)
return code;
+ ubik_callPortal = myPort;
+
udisk_Init(ubik_nBuffers);
ulock_Init();
if (code)
return code;
- ubik_callPortal = myPort;
/* try to get an additional security object */
if (buildSecClassesProc == NULL) {
numClasses = 3;
NULL, "rx_ServerProc", &junk);
#endif
+ /* send addrs to all other servers */
+ code = ubeacon_updateUbikNetworkAddress(ubik_host);
+ if (code)
+ return code;
+
/* now start up async processes */
#ifdef AFS_PTHREAD_ENV
ubik_thread_create(&ubeacon_Interact_tattr, &ubeacon_InteractThread,
/* if we're writing already, wait */
while (dbase->flags & DBWRITING) {
#ifdef AFS_PTHREAD_ENV
- CV_WAIT(&dbase->flags_cond, &dbase->versionLock);
+ opr_cv_wait(&dbase->flags_cond, &dbase->versionLock);
#else
DBRELE(dbase);
LWP_WaitProcess(&dbase->flags);
/* create the transaction */
code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
tt = jt; /* move to a register */
- if (code || tt == (struct ubik_trans *)NULL) {
+ if (code || tt == NULL) {
DBRELE(dbase);
return code;
}
+ UBIK_VERSION_LOCK;
if (readAny) {
tt->flags |= TRREADANY;
if (readAny > 1) {
}
}
/* label trans and dbase with new tid */
- tt->tid.epoch = ubik_epochTime;
+ tt->tid.epoch = version_globals.ubik_epochTime;
/* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
tt->tid.counter = (dbase->tidCounter += 2);
if (transMode == UBIK_WRITETRANS) {
/* for a write trans, we have to keep track of the write tid counter too */
dbase->writeTidCounter = tt->tid.counter;
+ }
+ UBIK_VERSION_UNLOCK;
+
+ if (transMode == UBIK_WRITETRANS) {
/* next try to start transaction on appropriate number of machines */
code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
if (code) {
if (transPtr->type != UBIK_WRITETRANS)
return UBADTYPE;
+
+ DBHOLD(transPtr->dbase);
if (!transPtr->iovec_info.iovec_wrt_len
- || !transPtr->iovec_info.iovec_wrt_val)
+ || !transPtr->iovec_info.iovec_wrt_val) {
+ DBRELE(transPtr->dbase);
return 0;
+ }
- DBHOLD(transPtr->dbase);
if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
ERROR_EXIT(UNOQUORUM);
if (!ubeacon_AmSyncSite()) /* only sync site can write */
return 0;
}
+ DBHOLD(transPtr->dbase);
if (!transPtr->iovec_info.iovec_wrt_val) {
transPtr->iovec_info.iovec_wrt_len = 0;
transPtr->iovec_info.iovec_wrt_val =
- (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
- sizeof(struct ubik_iovec));
+ malloc(IOVEC_MAXWRT * sizeof(struct ubik_iovec));
transPtr->iovec_data.iovec_buf_len = 0;
- transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
+ transPtr->iovec_data.iovec_buf_val = malloc(IOVEC_MAXBUF);
if (!transPtr->iovec_info.iovec_wrt_val
|| !transPtr->iovec_data.iovec_buf_val) {
if (transPtr->iovec_info.iovec_wrt_val)
if (transPtr->iovec_data.iovec_buf_val)
free(transPtr->iovec_data.iovec_buf_val);
transPtr->iovec_data.iovec_buf_val = 0;
+ DBRELE(transPtr->dbase);
return UNOMEM;
}
}
/* If this write won't fit in the structure, then flush it out and start anew */
if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
|| ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
+ /* Can't hold the DB lock over ubik_Flush */
+ DBRELE(transPtr->dbase);
code = ubik_Flush(transPtr);
if (code)
return (code);
+ DBHOLD(transPtr->dbase);
}
- DBHOLD(transPtr->dbase);
if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
ERROR_EXIT(UNOQUORUM);
if (!ubeacon_AmSyncSite()) /* only sync site can write */
return 0;
}
#ifdef AFS_PTHREAD_ENV
- CV_WAIT(&adatabase->version_cond, &adatabase->versionLock);
+ opr_cv_wait(&adatabase->version_cond, &adatabase->versionLock);
#else
DBRELE(adatabase);
LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
ubik_GetVersion(struct ubik_trans *atrans,
struct ubik_version *avers)
{
+ DBHOLD(atrans->dbase);
*avers = atrans->dbase->version;
+ DBRELE(atrans->dbase);
return 0;
}
va_list ap;
va_start(ap, format);
- ubik_print("Ubik PANIC: ");
+ ubik_print("Ubik PANIC:\n");
ubik_vprint(format, ap);
va_end(ap);
struct ubik_server *ts;
int j;
+ UBIK_ADDR_LOCK;
for (ts = ubik_servers; ts; ts = ts->next)
for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
- if (ts->addr[j] == addr)
+ if (ts->addr[j] == addr) {
+ UBIK_ADDR_UNLOCK;
return ts->addr[0]; /* net byte order */
+ }
+ UBIK_ADDR_UNLOCK;
return 0; /* if not in server database, return error */
}