From: Marc Dionne Date: Fri, 4 Feb 2011 01:51:06 +0000 (-0500) Subject: ubik: Introduce version lock X-Git-Tag: openafs-devel-1_7_1~646 X-Git-Url: http://git.openafs.org/?p=openafs.git;a=commitdiff_plain;h=e4ac552ab79be21d90397079eaf6be7050497752 ubik: Introduce version lock The "version" lock is a new lock that protects the database version information. The goal is to allow the beacon thread to use the protected values without blocking for an extended period of time, which could occur if it was using the database lock. Reading requires holding either lock, while writing requires holding both locks. The following values are protected: ubik_epochTime db->version db->flags db->tidCounter db->writeTidCounter Based on analysis and design work from Jeffrey Hutzelman. Change-Id: Ib6e67360807eed8c36e35ec27d1eb938ac899e22 Reviewed-on: http://gerrit.openafs.org/4158 Tested-by: BuildBot Reviewed-by: Jeffrey Altman Reviewed-by: Derrick Brashear --- diff --git a/src/ubik/beacon.c b/src/ubik/beacon.c index ae482bb..bb78a93 100644 --- a/src/ubik/beacon.c +++ b/src/ubik/beacon.c @@ -396,6 +396,7 @@ ubeacon_Interact(void *dummy) struct ubik_server *ts; afs_int32 temp, yesVotes, lastWakeupTime, oldestYesVote, syncsite; struct ubik_tid ttid; + struct ubik_version tversion; afs_int32 startTime; /* loop forever getting votes */ @@ -448,7 +449,8 @@ ubeacon_Interact(void *dummy) * assume we'll be fine until SMALLTIME seconds after we start collecting votes */ /* this next is essentially an expansion of rgen's ServBeacon routine */ - ttid.epoch = ubik_epochTime; + UBIK_VERSION_LOCK; + ttid.epoch = version_globals.ubik_epochTime; if (ubik_dbase->flags & DBWRITING) { /* * if a write is in progress, we have to send the writeTidCounter @@ -459,6 +461,8 @@ ubeacon_Interact(void *dummy) ttid.counter = ubik_dbase->writeTidCounter; } else ttid.counter = ubik_dbase->tidCounter + 1; + tversion.epoch = ubik_dbase->version.epoch; + tversion.counter = ubik_dbase->version.counter; /* now analyze return codes, counting up our votes */ yesVotes = 0; /* count how many to ensure we have quorum */ @@ -468,10 +472,11 @@ ubeacon_Interact(void *dummy) /* * Don't waste time using mult Rx calls if there are no connections out there */ + UBIK_VERSION_UNLOCK; if (i > 0) { char hoststr[16]; multi_Rx(connections, i) { - multi_VOTE_Beacon(syncsite, startTime, &ubik_dbase->version, + multi_VOTE_Beacon(syncsite, startTime, &tversion, &ttid); temp = FT_ApproxTime(); /* now, more or less */ ts = servers[multi_i]; @@ -523,7 +528,7 @@ ubeacon_Interact(void *dummy) * the same restrictions apply for our voting for ourself as for our voting * for anyone else. */ i = SVOTE_Beacon((struct rx_call *)0, ubeacon_AmSyncSite(), startTime, - &ubik_dbase->version, &ttid); + &tversion, &ttid); if (i) { yesVotes += 2; if (amIMagic) diff --git a/src/ubik/disk.c b/src/ubik/disk.c index 478a111..1048edd 100644 --- a/src/ubik/disk.c +++ b/src/ubik/disk.c @@ -847,8 +847,11 @@ udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans) tt->type = atype; if (atype == UBIK_READTRANS) adbase->readers++; - else if (atype == UBIK_WRITETRANS) + else if (atype == UBIK_WRITETRANS) { + UBIK_VERSION_LOCK; adbase->flags |= DBWRITING; + UBIK_VERSION_UNLOCK; + } *atrans = tt; return 0; } @@ -871,6 +874,7 @@ udisk_commit(struct ubik_trans *atrans) /* On the first write to the database. We update the versions */ if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) { + UBIK_VERSION_LOCK; oldversion = dbase->version; newversion.epoch = FT_ApproxTime();; newversion.counter = 1; @@ -878,8 +882,9 @@ udisk_commit(struct ubik_trans *atrans) code = (*dbase->setlabel) (dbase, 0, &newversion); if (code) return (code); - ubik_epochTime = newversion.epoch; + version_globals.ubik_epochTime = newversion.epoch; dbase->version = newversion; + UBIK_VERSION_UNLOCK; /* Ignore the error here. If the call fails, the site is * marked down and when we detect it is up again, we will @@ -890,6 +895,7 @@ udisk_commit(struct ubik_trans *atrans) urecovery_state |= UBIK_RECLABELDB; } + UBIK_VERSION_LOCK; dbase->version.counter++; /* bump commit count */ #ifdef AFS_PTHREAD_ENV CV_BROADCAST(&dbase->version_cond); @@ -901,6 +907,7 @@ udisk_commit(struct ubik_trans *atrans) dbase->version.counter--; return (code); } + UBIK_VERSION_UNLOCK; /* If we fail anytime after this, then panic and let the * recovery replay the log. @@ -991,7 +998,9 @@ udisk_end(struct ubik_trans *atrans) * we could be unsetting someone else's bit. */ if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) { + UBIK_VERSION_LOCK; dbase->flags &= ~DBWRITING; + UBIK_VERSION_UNLOCK; } else { dbase->readers--; } diff --git a/src/ubik/recovery.c b/src/ubik/recovery.c index b3a0071..bc8509c 100644 --- a/src/ubik/recovery.c +++ b/src/ubik/recovery.c @@ -365,6 +365,7 @@ InitializeDB(struct ubik_dbase *adbase) code = (*adbase->getlabel) (adbase, 0, &adbase->version); if (code) { /* try setting the label to a new value */ + UBIK_VERSION_LOCK; adbase->version.epoch = 1; /* value for newly-initialized db */ adbase->version.counter = 1; code = (*adbase->setlabel) (adbase, 0, &adbase->version); @@ -379,6 +380,7 @@ InitializeDB(struct ubik_dbase *adbase) #else LWP_NoYieldSignal(&adbase->version); #endif + UBIK_VERSION_UNLOCK; } return 0; } @@ -508,6 +510,7 @@ urecovery_Interact(void *dummy) /* Mark whether we are the sync site */ if (!ubeacon_AmSyncSite()) { urecovery_state &= ~UBIK_RECSYNCSITE; + DBRELE(ubik_dbase); continue; /* nothing to do */ } urecovery_state |= UBIK_RECSYNCSITE; @@ -557,8 +560,10 @@ urecovery_Interact(void *dummy) urecovery_state |= UBIK_RECFOUNDDB; urecovery_state &= ~UBIK_RECSENTDB; } - if (!(urecovery_state & UBIK_RECFOUNDDB)) + if (!(urecovery_state & UBIK_RECFOUNDDB)) { + DBRELE(ubik_dbase); continue; /* not ready */ + } /* If we, the sync site, do not have the best db version, then * go and get it from the server that does. @@ -567,7 +572,6 @@ urecovery_Interact(void *dummy) urecovery_state |= UBIK_RECHAVEDB; } else { /* we don't have the best version; we should fetch it. */ - DBHOLD(ubik_dbase); urecovery_AbortAll(ubik_dbase); /* Rx code to do the Bulk fetch */ @@ -594,8 +598,10 @@ urecovery_Interact(void *dummy) } /* give invalid label during file transit */ + UBIK_VERSION_LOCK; tversion.epoch = 0; code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion); + UBIK_VERSION_UNLOCK; if (code) { ubik_dprint("setlabel io error=%d\n", code); goto FetchEndCall; @@ -649,6 +655,7 @@ urecovery_Interact(void *dummy) if (!code) { /* we got a new file, set up its header */ urecovery_state |= UBIK_RECHAVEDB; + UBIK_VERSION_LOCK; memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version)); snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", @@ -674,6 +681,7 @@ urecovery_Interact(void *dummy) (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version); } + UBIK_VERSION_UNLOCK; #ifdef AFS_NT40_ENV snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", @@ -687,8 +695,10 @@ urecovery_Interact(void *dummy) * We will effectively invalidate the old data forever now. * Unclear if we *should* but we do. */ + UBIK_VERSION_LOCK; ubik_dbase->version.epoch = 0; ubik_dbase->version.counter = 0; + UBIK_VERSION_UNLOCK; ubik_print("Ubik: Synchronize database failed (error = %d)\n", code); } else { @@ -703,8 +713,10 @@ urecovery_Interact(void *dummy) #endif DBRELE(ubik_dbase); } - if (!(urecovery_state & UBIK_RECHAVEDB)) + if (!(urecovery_state & UBIK_RECHAVEDB)) { + DBRELE(ubik_dbase); continue; /* not ready */ + } /* If the database was newly initialized, then when we establish quorum, write * a new label. This allows urecovery_AllBetter() to allow access for reads. @@ -712,20 +724,20 @@ urecovery_Interact(void *dummy) * database and overwrite this one. */ if (ubik_dbase->version.epoch == 1) { - DBHOLD(ubik_dbase); urecovery_AbortAll(ubik_dbase); - ubik_epochTime = 2; - ubik_dbase->version.epoch = ubik_epochTime; + UBIK_VERSION_LOCK; + version_globals.ubik_epochTime = 2; + ubik_dbase->version.epoch = version_globals.ubik_epochTime; ubik_dbase->version.counter = 1; code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version); + UBIK_VERSION_UNLOCK; udisk_Invalidate(ubik_dbase, 0); /* data may have changed */ #ifdef AFS_PTHREAD_ENV CV_BROADCAST(&ubik_dbase->version_cond); #else LWP_NoYieldSignal(&ubik_dbase->version); #endif - DBRELE(ubik_dbase); } /* Check the other sites and send the database to them if they @@ -735,7 +747,6 @@ urecovery_Interact(void *dummy) /* now propagate out new version to everyone else */ dbok = 1; /* start off assuming they all worked */ - DBHOLD(ubik_dbase); /* * Check if a write transaction is in progress. We can't send the * db when a write is in progress here because the db would be @@ -834,10 +845,10 @@ urecovery_Interact(void *dummy) ts->currentDB = 1; } } - DBRELE(ubik_dbase); if (dbok) urecovery_state |= UBIK_RECSENTDB; } + DBRELE(ubik_dbase); } return NULL; } diff --git a/src/ubik/remote.c b/src/ubik/remote.c index 7dd560c..3d3cd60 100644 --- a/src/ubik/remote.c +++ b/src/ubik/remote.c @@ -487,6 +487,7 @@ SDISK_SendFile(struct rx_call *rxcall, afs_int32 file, afs_inet_ntoa_r(otherHost, hoststr)); offset = 0; + UBIK_VERSION_LOCK; epoch = tversion.epoch = 0; /* start off by labelling in-transit db as invalid */ (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */ snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", @@ -495,15 +496,16 @@ SDISK_SendFile(struct rx_call *rxcall, afs_int32 file, fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600); if (fd < 0) { code = errno; - goto failed; + goto failed_locked; } code = lseek(fd, HDRSIZE, 0); if (code != HDRSIZE) { close(fd); - goto failed; + goto failed_locked; } pass = 0; memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version)); + UBIK_VERSION_UNLOCK; while (length > 0) { tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length); #if !defined(AFS_PTHREAD_ENV) @@ -547,6 +549,7 @@ SDISK_SendFile(struct rx_call *rxcall, afs_int32 file, #endif if (!code) code = rename(pbuffer, tbuffer); + UBIK_VERSION_LOCK; if (!code) { (*ubik_dbase->open) (ubik_dbase, file); code = (*ubik_dbase->setlabel) (dbase, file, avers); @@ -564,13 +567,18 @@ SDISK_SendFile(struct rx_call *rxcall, afs_int32 file, LWP_NoYieldSignal(&dbase->version); #endif +failed_locked: + UBIK_VERSION_UNLOCK; + failed: if (code) { unlink(pbuffer); /* Failed to sync. Allow reads again for now. */ if (dbase != NULL) { + UBIK_VERSION_LOCK; tversion.epoch = epoch; (*dbase->setlabel) (dbase, file, &tversion); + UBIK_VERSION_UNLOCK; } ubik_print ("Ubik: Synchronize database with server %s failed (error = %d)\n", @@ -715,11 +723,13 @@ SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid, /* Set the label if its version matches the sync-site's */ if (uvote_eq_dbVersion(*oldversionp)) { + UBIK_VERSION_LOCK; code = (*dbase->setlabel) (ubik_dbase, 0, newversionp); if (!code) { ubik_dbase->version = *newversionp; uvote_set_dbVersion(*newversionp); } + UBIK_VERSION_UNLOCK; } else { code = USYNC; } diff --git a/src/ubik/ubik.c b/src/ubik/ubik.c index c73e781..bd913ec 100644 --- a/src/ubik/ubik.c +++ b/src/ubik/ubik.c @@ -98,6 +98,8 @@ static void (*buildSecClassesProc)(void *, struct rx_securityClass ***, static int (*checkSecurityProc)(void *, struct rx_call *) = NULL; static void *securityRock = NULL; +struct version_data version_globals; + #define CStampVersion 1 /* meaning set ts->version */ static_inline struct rx_connection * @@ -636,6 +638,7 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode, DBRELE(dbase); return code; } + UBIK_VERSION_LOCK; if (readAny) { tt->flags |= TRREADANY; if (readAny > 1) { @@ -658,12 +661,14 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode, udisk_abort(tt); ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */ udisk_end(tt); + UBIK_VERSION_UNLOCK; DBRELE(dbase); return code; } } *transPtr = tt; + UBIK_VERSION_UNLOCK; DBRELE(dbase); return 0; } diff --git a/src/ubik/ubik.p.h b/src/ubik/ubik.p.h index 7851896..7b4f7d6 100644 --- a/src/ubik/ubik.p.h +++ b/src/ubik/ubik.p.h @@ -340,7 +340,6 @@ extern int ubik_amSyncSite; /* sleep on this waiting to be sync site */ extern struct ubik_stats { /* random stats */ afs_int32 escapes; } ubik_stats; -extern afs_int32 ubik_epochTime; /* time when this site started */ extern afs_int32 urecovery_state; /* sync site recovery process state */ extern struct ubik_trans *ubik_currentTrans; /* current trans */ extern afs_int32 ubik_debugFlag; /* ubik debug flag */ @@ -411,6 +410,20 @@ struct addr_data { #define UBIK_ADDR_LOCK MUTEX_ENTER(&addr_globals.addr_lock) #define UBIK_ADDR_UNLOCK MUTEX_EXIT(&addr_globals.addr_lock) +/*! + * \brief The version lock protects the structure member, as well as + * the database version, flags, tidCounter, writeTidCounter + */ +struct version_data { +#ifdef AFS_PTHREAD_ENV + pthread_mutex_t version_lock; +#endif + afs_int32 ubik_epochTime; /* time when this site started */ +}; + +#define UBIK_VERSION_LOCK MUTEX_ENTER(&version_globals.version_lock) +#define UBIK_VERSION_UNLOCK MUTEX_EXIT(&version_globals.version_lock) + /* phys.c */ extern int uphys_stat(struct ubik_dbase *adbase, afs_int32 afid, struct ubik_stat *astat); @@ -591,6 +604,7 @@ extern int ubik_GetVersion(struct ubik_trans *atrans, extern int ubik_CheckCache(struct ubik_trans *atrans, ubik_updatecache_func check, void *rock); +extern struct version_data version_globals; /*\}*/ /*! \name ubikclient.c */ diff --git a/src/ubik/vote.c b/src/ubik/vote.c index 87c37d0..b59eb62 100644 --- a/src/ubik/vote.c +++ b/src/ubik/vote.c @@ -438,7 +438,7 @@ SVOTE_Debug(struct rx_call * rxcall, struct ubik_debug * aparm) aparm->currentTrans = 0; } - aparm->epochTime = ubik_epochTime; + aparm->epochTime = version_globals.ubik_epochTime; return 0; } @@ -521,7 +521,7 @@ SVOTE_DebugOld(struct rx_call * rxcall, aparm->currentTrans = 0; } - aparm->epochTime = ubik_epochTime; + aparm->epochTime = version_globals.ubik_epochTime; return 0; }