}
else
ttid.counter = ubik_dbase->tidCounter+1;
+#if defined(UBIK_PAUSE)
+ ubik_dbase->flags |= DBVOTING;
+#endif /* UBIK_PAUSE */
/* now analyze return codes, counting up our votes */
yesVotes = 0; /* count how many to ensure we have quorum */
if (amIMagic) yesVotes++; /* extra epsilon */
if (i < oldestYesVote) oldestYesVote = i;
}
+#if defined(UBIK_PAUSE)
+ ubik_dbase->flags &= ~DBVOTING;
+#endif /* UBIK_PAUSE */
/* now decide if we have enough votes to become sync site.
Note that we can still get enough votes even if we didn't for ourself. */
struct ubik_trans *atrans; {
struct ubik_dbase *dbase;
+#if defined(UBIK_PAUSE)
+ /* Another thread is trying to lock this transaction.
+ * That can only be an RPC doing SDISK_Lock.
+ * Unlock the transaction, 'cause otherwise the other
+ * thread will never wake up. Don't free it because
+ * the caller will do that already.
+ */
+ if (atrans->flags & TRSETLOCK) {
+ atrans->flags |= TRSTALE;
+ ulock_relLock(atrans);
+ return;
+ }
+#endif /* UBIK_PAUSE */
if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
dbase = atrans->dbase;
}
/* Create new lock record and add to spec'd transaction:
+#if defined(UBIK_PAUSE)
+ * locktype. Before doing that, set TRSETLOCK,
+ * to tell udisk_end that another thread (us) is waiting.
+#else
* locktype. This field also tells us if the thread is
* waiting for a lock: It will be equal to LOCKWAIT.
+#endif
*/
+#if defined(UBIK_PAUSE)
+ if (atrans->flag & TRSETLOCK) {
+ printf ("Ubik: Internal Error: TRSETLOCK already set?\n");
+ return EBUSY;
+ }
+ atrans->flag |= TRSETLOCK;
+#else
atrans->locktype = LOCKWAIT;
+#endif /* UBIK_PAUSE */
DBRELE(dbase);
if (atype == LOCKREAD) {
ObtainReadLock(&rwlock);
}
DBHOLD(dbase);
atrans->locktype = atype;
+#if defined(UBIK_PAUSE)
+ atrans->flag &= ~TRSETLOCK;
+#if 0
+ /* We don't do this here, because this can only happen in SDISK_Lock,
+ * and there's already code there to catch this condition.
+ */
+ if (atrans->flag & TRSTALE) {
+ udisk_end(atrans);
+ return UINTERNAL;
+ }
+#endif
+#endif /* UBIK_PAUSE */
/*
*ubik_print("Ubik: DEBUG: Thread 0x%x took %s lock\n", lwp_cpptr,
}
}
+#if defined(UBIK_PAUSE)
+/* Find the TID of the current write lock (or the best approximation thereof) */
+ulock_FindWLock(struct ubik_dbase *dbase, struct ubik_tid *atid)
+{
+ register struct ubik_lock *tl;
+ register struct ubik_trans *tt, *best;
+
+ best = 0;
+ for(tt=dbase->activeTrans; tt; tt=tt->next) {
+ if (tt->type != UBIK_WRITETRANS) continue;
+ if (!best || best->tid.counter > tt->tid.counter) {
+ best = tt;
+ }
+ for(tl=tt->activeLocks; tl; tl=tl->next) {
+ if (tl->type == LOCKWRITE) {
+ *atid = tt->tid;
+#ifdef GRAND_PAUSE_DEBUGGING
+ ubik_print ("Found real write lock tid %d.%d\n",
+ atid->epoch, atid->counter);
+#endif
+ return 0;
+ }
+ }
+ }
+ /* if we get here, no locks pending, return the best guess */
+ if (best) {
+ *atid = best->tid;
+#ifdef GRAND_PAUSE_DEBUGGING
+ ubik_print ("Found possible write transaction tid %d.%d\n",
+ atid->epoch, atid->counter);
+#endif
+ return 0;
+ }
+ return EINVAL;
+}
+#endif /* UBIK_PAUSE */
+
if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
/* don't match, abort it */
/* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
+#endif /* UBIK_PAUSE */
udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
}
+#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
}
}
urecovery_state |= UBIK_RECFOUNDDB;
urecovery_state &= ~UBIK_RECSENTDB;
}
+#if defined(UBIK_PAUSE)
+ /* it's not possible for UBIK_RECFOUNDDB not to be set here.
+ * However, we might have lost UBIK_RECSYNCSITE, and that
+ * IS important.
+ */
+ if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
+#else
if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
+#endif /* UBIK_PAUSE */
/* If we, the sync site, do not have the best db version, then
* go and get it from the server that does.
urecovery_state |= UBIK_RECHAVEDB;
} else {
/* we don't have the best version; we should fetch it. */
+#if defined(UBIK_PAUSE)
+ DBHOLD(ubik_dbase);
+#else
ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
urecovery_AbortAll(ubik_dbase);
/* Rx code to do the Bulk fetch */
}
udisk_Invalidate(ubik_dbase, 0); /* data has changed */
LWP_NoYieldSignal(&ubik_dbase->version);
+#if defined(UBIK_PAUSE)
+ DBRELE(ubik_dbase);
+#else
ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
}
+#if defined(UBIK_PAUSE)
+ if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
+#endif /* UBIK_PAUSE */
if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
/* If the database was newly initialized, then when we establish quorum, write
* database and overwrite this one.
*/
if (ubik_dbase->version.epoch == 1) {
+#if defined(UBIK_PAUSE)
+ DBHOLD(ubik_dbase);
+#else
ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
urecovery_AbortAll(ubik_dbase);
ubik_epochTime = 2;
ubik_dbase->version.epoch = ubik_epochTime;
code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
LWP_NoYieldSignal(&ubik_dbase->version);
+#if defined(UBIK_PAUSE)
+ DBRELE(ubik_dbase);
+#else
ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
}
/* Check the other sites and send the database to them if they
/* now propagate out new version to everyone else */
dbok = 1; /* start off assuming they all worked */
+#if defined(UBIK_PAUSE)
+ DBHOLD(ubik_dbase);
+#else
ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
/*
* Check if a write transaction is in progress. We can't send the
* db when a write is in progress here because the db would be
tv.tv_sec = 0;
tv.tv_usec = 50000;
while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
+#if defined(UBIK_PAUSE)
+ DBRELE(ubik_dbase);
+#else
ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
/* sleep for a little while */
IOMGR_Select(0, 0, 0, 0, &tv);
tv.tv_usec += 10000; safety++;
+#if defined(UBIK_PAUSE)
+ DBHOLD(ubik_dbase);
+#else
ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
}
}
ts->currentDB = 1;
}
}
+#if defined(UBIK_PAUSE)
+ DBRELE(ubik_dbase);
+#else
ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
if (dbok) urecovery_state |= UBIK_RECSENTDB;
}
}
urecovery_CheckTid(atid);
if (ubik_currentTrans) {
/* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
+#endif /* UBIK_PAUSE */
udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
}
+#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
}
code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
}
/* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
- udisk_end(ubik_currentTrans);
- }
+#endif /* UBIK_PAUSE */
+ udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
+ }
+#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
DBRELE(dbase);
return 0;
code = udisk_abort(ubik_currentTrans);
/* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
- udisk_end(ubik_currentTrans);
- }
+#endif /* UBIK_PAUSE */
+ udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
+ }
+#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
DBRELE(dbase);
return code;
for ( i=0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
ubik_print("\n");
+ fflush(stdout); fflush(stderr);
printServerInfo();
return UBADHOST;
}
struct ubik_trans *jt;
register struct ubik_trans *tt;
register afs_int32 code;
+#if defined(UBIK_PAUSE)
+ int count;
+#endif /* UBIK_PAUSE */
if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
DBHOLD(dbase);
+#if defined(UBIK_PAUSE)
+ /* if we're polling the slave sites, wait until the returns
+ * are all in. Otherwise, the urecovery_CheckTid call may
+ * glitch us.
+ */
+ if (transMode == UBIK_WRITETRANS)
+ for (count = 75; dbase->flags & DBVOTING; --count) {
+ DBRELE(dbase);
+#ifdef GRAND_PAUSE_DEBUGGING
+ if (count==75)
+ fprintf (stderr,"%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n", time(0), ntohs(ubik_callPortal));
+ else
+#endif
+ if (count <= 0) {
+#if 1
+ fprintf (stderr,"%ld: myport=%d: BeginTrans failed because of voting conflict\n", time(0), ntohs(ubik_callPortal));
+#endif
+ return UNOQUORUM; /* a white lie */
+ }
+ IOMGR_Sleep(2);
+ DBHOLD(dbase);
+ }
+#endif /* UBIK_PAUSE */
if (urecovery_AllBetter(dbase, readAny)==0) {
DBRELE(dbase);
return UNOQUORUM;
if (transMode == UBIK_WRITETRANS) {
/* for a write trans, we have to keep track of the write tid counter too */
+#if defined(UBIK_PAUSE)
+ dbase->writeTidCounter = tt->tid.counter;
+#else
dbase->writeTidCounter += 2;
+#endif /* UBIK_PAUSE */
/* next try to start transaction on appropriate number of machines */
code = ContactQuorum(DISK_Begin, tt, 0);
/* ubik_lock types */
#define LOCKREAD 1
#define LOCKWRITE 2
+#if !defined(UBIK_PAUSE)
#define LOCKWAIT 3
+#endif /* UBIK_PAUSE */
/* ubik client flags */
#define UPUBIKONLY 1 /* only check servers presumed functional */
/* ubik_dbase flags */
#define DBWRITING 1 /* are any write trans. in progress */
+#if defined(UBIK_PAUSE)
+#define DBVOTING 2 /* the beacon task is polling */
+#endif /* UBIK_PAUSE */
/* ubik trans flags */
#define TRDONE 1 /* commit or abort done */
#define TRABORT 2 /* if TRDONE, tells if aborted */
#define TRREADANY 4 /* read any data available in trans */
+#if defined(UBIK_PAUSE)
+#define TRSETLOCK 8 /* SetLock is using trans */
+#define TRSTALE 16 /* udisk_end during getLock */
+#endif /* UBIK_PAUSE */
/* ubik_lock flags */
#define LWANT 1