ubik-pause-collapsing-20020624
authorMarcus Watts <mdw@umich.edu>
Mon, 24 Jun 2002 17:30:34 +0000 (17:30 +0000)
committerDerrick Brashear <shadow@dementia.org>
Mon, 24 Jun 2002 17:30:34 +0000 (17:30 +0000)
I'm told that after we introduce ptserver nested groups we can expect to
see periodic pauses in ubik operations, and this fixes the problem. if it
happens, we can start with the UBIK_PAUSE code and go from there

src/ubik/beacon.c
src/ubik/disk.c
src/ubik/lock.c
src/ubik/recovery.c
src/ubik/remote.c
src/ubik/ubik.c
src/ubik/ubik.p.h

index 598f5d3..140927f 100644 (file)
@@ -364,6 +364,9 @@ ubeacon_Interact() {
          }
        else
            ttid.counter = ubik_dbase->tidCounter+1;
+#if defined(UBIK_PAUSE)
+       ubik_dbase->flags |= DBVOTING;
+#endif /* UBIK_PAUSE */
 
        /* now analyze return codes, counting up our votes */
        yesVotes = 0;               /* count how many to ensure we have quorum */
@@ -417,6 +420,9 @@ ubeacon_Interact() {
            if (amIMagic) yesVotes++;   /* extra epsilon */
            if (i < oldestYesVote) oldestYesVote = i;
        }
+#if defined(UBIK_PAUSE)
+       ubik_dbase->flags &= ~DBVOTING;
+#endif /* UBIK_PAUSE */
 
        /* now decide if we have enough votes to become sync site.
            Note that we can still get enough votes even if we didn't for ourself. */
index a41aef4..15fb858 100644 (file)
@@ -820,6 +820,19 @@ udisk_end(atrans)
     struct ubik_trans *atrans; {
     struct ubik_dbase *dbase;
 
+#if defined(UBIK_PAUSE)
+       /* Another thread is trying to lock this transaction.
+        * That can only be an RPC doing SDISK_Lock.
+        * Unlock the transaction, 'cause otherwise the other
+        * thread will never wake up.  Don't free it because
+        * the caller will do that already.
+        */
+    if (atrans->flags & TRSETLOCK) {
+       atrans->flags |= TRSTALE;
+       ulock_relLock(atrans);
+       return;
+    }
+#endif /* UBIK_PAUSE */
     if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
     dbase = atrans->dbase;
 
index e5d8a5b..b764bc0 100644 (file)
@@ -96,10 +96,23 @@ ulock_getLock(atrans, atype, await)
   }
 
   /* Create new lock record and add to spec'd transaction:
+#if defined(UBIK_PAUSE)
+   * locktype.  Before doing that, set TRSETLOCK,
+   * to tell udisk_end that another thread (us) is waiting.
+#else
    * locktype. This field also tells us if the thread is 
    * waiting for a lock: It will be equal to LOCKWAIT.
+#endif 
    */
+#if defined(UBIK_PAUSE)
+  if (atrans->flag & TRSETLOCK) {
+     printf ("Ubik: Internal Error: TRSETLOCK already set?\n");
+     return EBUSY;
+  }
+  atrans->flag |= TRSETLOCK;
+#else
   atrans->locktype = LOCKWAIT;
+#endif /* UBIK_PAUSE */
   DBRELE(dbase);
   if (atype == LOCKREAD) {
      ObtainReadLock(&rwlock);
@@ -108,6 +121,18 @@ ulock_getLock(atrans, atype, await)
   }
   DBHOLD(dbase);
   atrans->locktype = atype;
+#if defined(UBIK_PAUSE)
+  atrans->flag &= ~TRSETLOCK;
+#if 0
+  /* We don't do this here, because this can only happen in SDISK_Lock,
+   *  and there's already code there to catch this condition.
+   */
+  if (atrans->flag & TRSTALE) {
+     udisk_end(atrans);
+     return UINTERNAL;
+  }
+#endif
+#endif /* UBIK_PAUSE */
 
 /*
  *ubik_print("Ubik: DEBUG: Thread 0x%x took %s lock\n", lwp_cpptr,
@@ -150,3 +175,40 @@ ulock_Debug(aparm)
   }
 }
 
+#if defined(UBIK_PAUSE)
+/* Find the TID of the current write lock (or the best approximation thereof) */
+ulock_FindWLock(struct ubik_dbase *dbase, struct ubik_tid *atid)
+{
+    register struct ubik_lock *tl;
+    register struct ubik_trans *tt, *best;
+
+    best = 0;
+    for(tt=dbase->activeTrans; tt; tt=tt->next) {
+       if (tt->type != UBIK_WRITETRANS) continue;
+       if (!best || best->tid.counter > tt->tid.counter) {
+           best = tt;
+       }
+       for(tl=tt->activeLocks; tl; tl=tl->next) {
+           if (tl->type == LOCKWRITE) {
+               *atid = tt->tid;
+#ifdef GRAND_PAUSE_DEBUGGING
+               ubik_print ("Found real write lock tid %d.%d\n",
+                           atid->epoch, atid->counter);
+#endif
+               return 0;
+           }
+       }
+    }
+    /* if we get here, no locks pending, return the best guess */
+    if (best) {
+       *atid = best->tid;
+#ifdef GRAND_PAUSE_DEBUGGING
+       ubik_print ("Found possible write transaction tid %d.%d\n",
+                   atid->epoch, atid->counter);
+#endif
+       return 0;
+    }
+    return EINVAL;
+}
+#endif /* UBIK_PAUSE */
+
index f4b0a40..6878534 100644 (file)
@@ -147,9 +147,13 @@ urecovery_CheckTid(atid)
        if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
            /* don't match, abort it */
            /* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
            if (ubik_currentTrans->locktype != LOCKWAIT) {
+#endif /* UBIK_PAUSE */
               udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
            }
+#endif /* UBIK_PAUSE */
            ubik_currentTrans = (struct ubik_trans *) 0;
        }
     }
@@ -466,7 +470,15 @@ urecovery_Interact() {
            urecovery_state |=  UBIK_RECFOUNDDB;
            urecovery_state &= ~UBIK_RECSENTDB;
        }
+#if defined(UBIK_PAUSE)
+       /* it's not possible for UBIK_RECFOUNDDB not to be set here.
+        * However, we might have lost UBIK_RECSYNCSITE, and that
+        * IS important.
+        */
+       if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
+#else
        if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
+#endif /* UBIK_PAUSE */
 
        /* If we, the sync site, do not have the best db version, then
         * go and get it from the server that does.
@@ -475,7 +487,11 @@ urecovery_Interact() {
           urecovery_state |= UBIK_RECHAVEDB;
        } else {
           /* we don't have the best version; we should fetch it. */
+#if defined(UBIK_PAUSE)
+           DBHOLD(ubik_dbase);
+#else
           ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
           urecovery_AbortAll(ubik_dbase);
 
           /* Rx code to do the Bulk fetch */
@@ -551,8 +567,15 @@ FetchEndCall:
           }
           udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
           LWP_NoYieldSignal(&ubik_dbase->version);
+#if defined(UBIK_PAUSE)
+          DBRELE(ubik_dbase);
+#else
           ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
        }
+#if defined(UBIK_PAUSE)
+       if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
+#endif /* UBIK_PAUSE */
        if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
        
        /* If the database was newly initialized, then when we establish quorum, write
@@ -561,7 +584,11 @@ FetchEndCall:
         * database and overwrite this one.
         */
        if (ubik_dbase->version.epoch == 1) {
+#if defined(UBIK_PAUSE)
+           DBHOLD(ubik_dbase);
+#else
           ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
           urecovery_AbortAll(ubik_dbase);
           ubik_epochTime = 2;
           ubik_dbase->version.epoch   = ubik_epochTime;
@@ -569,7 +596,11 @@ FetchEndCall:
           code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
           udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
           LWP_NoYieldSignal(&ubik_dbase->version);
+#if defined(UBIK_PAUSE)
+          DBRELE(ubik_dbase);
+#else
           ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
        }
 
        /* Check the other sites and send the database to them if they
@@ -579,7 +610,11 @@ FetchEndCall:
            /* now propagate out new version to everyone else */
            dbok = 1;       /* start off assuming they all worked */
 
+#if defined(UBIK_PAUSE)
+           DBHOLD(ubik_dbase);
+#else
            ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
            /*
             * Check if a write transaction is in progress. We can't send the
             * db when a write is in progress here because the db would be
@@ -595,11 +630,19 @@ FetchEndCall:
              tv.tv_sec =  0;
              tv.tv_usec = 50000;
              while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
+#if defined(UBIK_PAUSE)
+               DBRELE(ubik_dbase);
+#else
                ReleaseWriteLock(&ubik_dbase->versionLock);     
+#endif /* UBIK_PAUSE */
                /* sleep for a little while */
                IOMGR_Select(0, 0, 0, 0, &tv);
                tv.tv_usec += 10000; safety++;
+#if defined(UBIK_PAUSE)
+               DBHOLD(ubik_dbase);
+#else
                ObtainWriteLock(&ubik_dbase->versionLock);                   
+#endif /* UBIK_PAUSE */
              }
            }
 
@@ -658,7 +701,11 @@ StoreEndCall:
                    ts->currentDB = 1;
                }
            }
+#if defined(UBIK_PAUSE)
+           DBRELE(ubik_dbase);
+#else
            ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
            if (dbok) urecovery_state |= UBIK_RECSENTDB;
        }
     }
index 513b4fb..35f22a1 100644 (file)
@@ -72,9 +72,13 @@ afs_int32 SDISK_Begin(rxcall, atid)
     urecovery_CheckTid(atid);
     if (ubik_currentTrans) {
         /* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
         if (ubik_currentTrans->locktype != LOCKWAIT) {
+#endif /* UBIK_PAUSE */
           udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
        }
+#endif /* UBIK_PAUSE */
        ubik_currentTrans = (struct ubik_trans *) 0;
     }
     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
@@ -153,9 +157,13 @@ afs_int32 SDISK_ReleaseLocks(rxcall, atid)
     }
 
     /* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
     if (ubik_currentTrans->locktype != LOCKWAIT) {
-       udisk_end(ubik_currentTrans);
-    }
+#endif /* UBIK_PAUSE */
+       udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
+    }    
+#endif /* UBIK_PAUSE */
     ubik_currentTrans = (struct ubik_trans *) 0;
     DBRELE(dbase);
     return 0;
@@ -190,9 +198,13 @@ afs_int32 SDISK_Abort(rxcall, atid)
 
     code = udisk_abort(ubik_currentTrans);
     /* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
     if (ubik_currentTrans->locktype != LOCKWAIT) {
-       udisk_end(ubik_currentTrans);
-    }
+#endif /* UBIK_PAUSE */
+        udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
+     }
+#endif /* UBIK_PAUSE */
     ubik_currentTrans = (struct ubik_trans *) 0;
     DBRELE(dbase);
     return code;
@@ -606,6 +618,7 @@ UbikInterfaceAddr   *inAddr, *outAddr;
        for ( i=0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
            ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
        ubik_print("\n");
+       fflush(stdout); fflush(stderr);
        printServerInfo();
         return UBADHOST;
     }
index 12772ca..f4854be 100644 (file)
@@ -323,9 +323,35 @@ static int BeginTrans(dbase, transMode, transPtr, readAny)
     struct ubik_trans *jt;
     register struct ubik_trans *tt;
     register afs_int32 code;
+#if defined(UBIK_PAUSE)
+    int count;
+#endif /* UBIK_PAUSE */
 
     if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
     DBHOLD(dbase);
+#if defined(UBIK_PAUSE)
+    /* if we're polling the slave sites, wait until the returns
+     *  are all in.  Otherwise, the urecovery_CheckTid call may
+     *  glitch us. 
+     */
+    if (transMode == UBIK_WRITETRANS)
+       for (count = 75; dbase->flags & DBVOTING; --count) {
+           DBRELE(dbase);
+#ifdef GRAND_PAUSE_DEBUGGING
+           if (count==75)
+               fprintf (stderr,"%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n", time(0), ntohs(ubik_callPortal)); 
+           else
+#endif
+               if (count <= 0) {
+#if 1
+                   fprintf (stderr,"%ld: myport=%d: BeginTrans failed because of voting conflict\n", time(0), ntohs(ubik_callPortal));
+#endif
+                   return UNOQUORUM;   /* a white lie */
+               }
+           IOMGR_Sleep(2);
+           DBHOLD(dbase);
+       }
+#endif /* UBIK_PAUSE */
     if (urecovery_AllBetter(dbase, readAny)==0) {
        DBRELE(dbase);
        return UNOQUORUM;
@@ -364,7 +390,11 @@ static int BeginTrans(dbase, transMode, transPtr, readAny)
 
     if (transMode == UBIK_WRITETRANS) {
       /* for a write trans, we have to keep track of the write tid counter too */
+#if defined(UBIK_PAUSE)
+      dbase->writeTidCounter = tt->tid.counter;
+#else
       dbase->writeTidCounter += 2;
+#endif /* UBIK_PAUSE */
 
        /* next try to start transaction on appropriate number of machines */
        code = ContactQuorum(DISK_Begin, tt, 0);
index 9f0d1a4..5d6cb36 100644 (file)
@@ -47,7 +47,9 @@
 /* ubik_lock types */
 #define        LOCKREAD            1
 #define        LOCKWRITE           2
+#if !defined(UBIK_PAUSE)
 #define        LOCKWAIT            3
+#endif /* UBIK_PAUSE */
 
 /* ubik client flags */
 #define UPUBIKONLY         1    /* only check servers presumed functional */
@@ -187,11 +189,18 @@ extern char *ubik_CheckRXSecurityRock;
 
 /* ubik_dbase flags */
 #define        DBWRITING           1           /* are any write trans. in progress */
+#if defined(UBIK_PAUSE)
+#define DBVOTING            2           /* the beacon task is polling */
+#endif /* UBIK_PAUSE */
 
 /* ubik trans flags */
 #define        TRDONE              1           /* commit or abort done */
 #define        TRABORT             2           /* if TRDONE, tells if aborted */
 #define TRREADANY          4           /* read any data available in trans */
+#if defined(UBIK_PAUSE)
+#define TRSETLOCK           8           /* SetLock is using trans */
+#define TRSTALE             16          /* udisk_end during getLock */
+#endif /* UBIK_PAUSE */
 
 /* ubik_lock flags */
 #define        LWANT               1