ubik-pause-collapsing-20020624
[openafs.git] / src / ubik / recovery.c
index f4b0a40..6878534 100644 (file)
@@ -147,9 +147,13 @@ urecovery_CheckTid(atid)
        if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
            /* don't match, abort it */
            /* If the thread is not waiting for lock - ok to end it */
+#if !defined(UBIK_PAUSE)
            if (ubik_currentTrans->locktype != LOCKWAIT) {
+#endif /* UBIK_PAUSE */
               udisk_end(ubik_currentTrans);
+#if !defined(UBIK_PAUSE)
            }
+#endif /* UBIK_PAUSE */
            ubik_currentTrans = (struct ubik_trans *) 0;
        }
     }
@@ -466,7 +470,15 @@ urecovery_Interact() {
            urecovery_state |=  UBIK_RECFOUNDDB;
            urecovery_state &= ~UBIK_RECSENTDB;
        }
+#if defined(UBIK_PAUSE)
+       /* it's not possible for UBIK_RECFOUNDDB not to be set here.
+        * However, we might have lost UBIK_RECSYNCSITE, and that
+        * IS important.
+        */
+       if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
+#else
        if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
+#endif /* UBIK_PAUSE */
 
        /* If we, the sync site, do not have the best db version, then
         * go and get it from the server that does.
@@ -475,7 +487,11 @@ urecovery_Interact() {
           urecovery_state |= UBIK_RECHAVEDB;
        } else {
           /* we don't have the best version; we should fetch it. */
+#if defined(UBIK_PAUSE)
+           DBHOLD(ubik_dbase);
+#else
           ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
           urecovery_AbortAll(ubik_dbase);
 
           /* Rx code to do the Bulk fetch */
@@ -551,8 +567,15 @@ FetchEndCall:
           }
           udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
           LWP_NoYieldSignal(&ubik_dbase->version);
+#if defined(UBIK_PAUSE)
+          DBRELE(ubik_dbase);
+#else
           ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
        }
+#if defined(UBIK_PAUSE)
+       if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
+#endif /* UBIK_PAUSE */
        if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
        
        /* If the database was newly initialized, then when we establish quorum, write
@@ -561,7 +584,11 @@ FetchEndCall:
         * database and overwrite this one.
         */
        if (ubik_dbase->version.epoch == 1) {
+#if defined(UBIK_PAUSE)
+           DBHOLD(ubik_dbase);
+#else
           ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
           urecovery_AbortAll(ubik_dbase);
           ubik_epochTime = 2;
           ubik_dbase->version.epoch   = ubik_epochTime;
@@ -569,7 +596,11 @@ FetchEndCall:
           code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
           udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
           LWP_NoYieldSignal(&ubik_dbase->version);
+#if defined(UBIK_PAUSE)
+          DBRELE(ubik_dbase);
+#else
           ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
        }
 
        /* Check the other sites and send the database to them if they
@@ -579,7 +610,11 @@ FetchEndCall:
            /* now propagate out new version to everyone else */
            dbok = 1;       /* start off assuming they all worked */
 
+#if defined(UBIK_PAUSE)
+           DBHOLD(ubik_dbase);
+#else
            ObtainWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
            /*
             * Check if a write transaction is in progress. We can't send the
             * db when a write is in progress here because the db would be
@@ -595,11 +630,19 @@ FetchEndCall:
              tv.tv_sec =  0;
              tv.tv_usec = 50000;
              while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
+#if defined(UBIK_PAUSE)
+               DBRELE(ubik_dbase);
+#else
                ReleaseWriteLock(&ubik_dbase->versionLock);     
+#endif /* UBIK_PAUSE */
                /* sleep for a little while */
                IOMGR_Select(0, 0, 0, 0, &tv);
                tv.tv_usec += 10000; safety++;
+#if defined(UBIK_PAUSE)
+               DBHOLD(ubik_dbase);
+#else
                ObtainWriteLock(&ubik_dbase->versionLock);                   
+#endif /* UBIK_PAUSE */
              }
            }
 
@@ -658,7 +701,11 @@ StoreEndCall:
                    ts->currentDB = 1;
                }
            }
+#if defined(UBIK_PAUSE)
+           DBRELE(ubik_dbase);
+#else
            ReleaseWriteLock(&ubik_dbase->versionLock);
+#endif /* UBIK_PAUSE */
            if (dbok) urecovery_state |= UBIK_RECSENTDB;
        }
     }