ubik: Avoid unlinking garbage during recovery
[openafs.git] / src / ubik / recovery.c
index b351c96..a53aa59 100644 (file)
@@ -133,9 +133,17 @@ int
 urecovery_AbortAll(struct ubik_dbase *adbase)
 {
     struct ubik_trans *tt;
+    int reads = 0, writes = 0;
+
     for (tt = adbase->activeTrans; tt; tt = tt->next) {
+       if (tt->type == UBIK_WRITETRANS)
+           writes++;
+       else
+           reads++;
        udisk_abort(tt);
     }
+    ViceLog(0, ("urecovery_AbortAll: just aborted %d read and %d write transactions\n",
+                   reads, writes));
     return 0;
 }
 
@@ -151,8 +159,20 @@ urecovery_CheckTid(struct ubik_tid *atid, int abortalways)
        if (atid->epoch != ubik_currentTrans->tid.epoch
            || atid->counter > ubik_currentTrans->tid.counter || abortalways) {
            /* don't match, abort it */
+           int endit = 0;
            /* If the thread is not waiting for lock - ok to end it */
            if (ubik_currentTrans->locktype != LOCKWAIT) {
+               endit = 1;
+           }
+
+           ViceLog(0, ("urecovery_CheckTid: Aborting/ending bad remote "
+                       "transaction. (tx %d.%d, atid %d.%d, abortalways %d, "
+                       "endit %d)\n",
+                       ubik_currentTrans->tid.epoch,
+                       ubik_currentTrans->tid.counter,
+                       atid->epoch, atid->counter,
+                       abortalways, endit));
+           if (endit) {
                udisk_end(ubik_currentTrans);
            }
            ubik_currentTrans = (struct ubik_trans *)0;
@@ -374,11 +394,6 @@ InitializeDB(struct ubik_dbase *adbase)
            adbase->version.counter = 0;
            (*adbase->setlabel) (adbase, 0, &adbase->version);
        }
-#ifdef AFS_PTHREAD_ENV
-       opr_cv_broadcast(&adbase->version_cond);
-#else
-       LWP_NoYieldSignal(&adbase->version);
-#endif
        UBIK_VERSION_UNLOCK;
     }
     return 0;
@@ -457,7 +472,9 @@ urecovery_Interact(void *dummy)
     int fd = -1;
     afs_int32 pass;
 
-    afs_pthread_setname_self("recovery");
+    memset(pbuffer, 0, sizeof(pbuffer));
+
+    opr_threadname_set("recovery");
 
     /* otherwise, begin interaction */
     urecovery_state = 0;
@@ -593,13 +610,16 @@ urecovery_Interact(void *dummy)
            /* we don't have the best version; we should fetch it. */
            urecovery_AbortAll(ubik_dbase);
 
+           pbuffer[0] = '\0';
+
            /* Rx code to do the Bulk fetch */
            file = 0;
            offset = 0;
            UBIK_ADDR_LOCK;
            rxcall = rx_NewCall(bestServer->disk_rxcid);
 
-           ViceLog(0, ("Ubik: Synchronize database via DISK_GetFile to server %s\n",
+           ViceLog(0, ("Ubik: Synchronize database: receive (via GetFile) "
+                       "from server %s begin\n",
                       afs_inet_ntoa_r(bestServer->addr[0], hoststr)));
            UBIK_ADDR_UNLOCK;
 
@@ -707,7 +727,9 @@ urecovery_Interact(void *dummy)
 #endif
            }
            if (code) {
-               unlink(pbuffer);
+               if (pbuffer[0] != '\0') {
+                   unlink(pbuffer);
+               }
                /*
                 * We will effectively invalidate the old data forever now.
                 * Unclear if we *should* but we do.
@@ -716,18 +738,20 @@ urecovery_Interact(void *dummy)
                ubik_dbase->version.epoch = 0;
                ubik_dbase->version.counter = 0;
                UBIK_VERSION_UNLOCK;
-               ViceLog(0, ("Ubik: Synchronize database failed (error = %d)\n",
-                          code));
+               ViceLog(0,
+                   ("Ubik: Synchronize database: receive (via GetFile) "
+                   "from server %s failed (error = %d)\n",
+                   afs_inet_ntoa_r(bestServer->addr[0], hoststr), code));
            } else {
-               ViceLog(0, ("Ubik: Synchronize database completed\n"));
+               ViceLog(0,
+                   ("Ubik: Synchronize database: receive (via GetFile) "
+                   "from server %s complete, version: %d.%d\n",
+                   afs_inet_ntoa_r(bestServer->addr[0], hoststr),
+                   ubik_dbase->version.epoch, ubik_dbase->version.counter));
+
                urecovery_state |= UBIK_RECHAVEDB;
            }
            udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
-#ifdef AFS_PTHREAD_ENV
-           opr_cv_broadcast(&ubik_dbase->version_cond);
-#else
-           LWP_NoYieldSignal(&ubik_dbase->version);
-#endif
        }
        if (!(urecovery_state & UBIK_RECHAVEDB)) {
            DBRELE(ubik_dbase);
@@ -748,11 +772,6 @@ urecovery_Interact(void *dummy)
                (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
            UBIK_VERSION_UNLOCK;
            udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
-#ifdef AFS_PTHREAD_ENV
-           opr_cv_broadcast(&ubik_dbase->version_cond);
-#else
-           LWP_NoYieldSignal(&ubik_dbase->version);
-#endif
        }
 
        /* Check the other sites and send the database to them if they
@@ -771,11 +790,11 @@ urecovery_Interact(void *dummy)
             * the write-lock above if there is a write transaction in progress,
             * but then, it won't hurt to check, will it?
             */
-           if (ubik_dbase->flags & DBWRITING) {
+           if (ubik_dbase->dbFlags & DBWRITING) {
                struct timeval tv;
                int safety = 0;
                long cur_usec = 50000;
-               while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
+               while ((ubik_dbase->dbFlags & DBWRITING) && (safety < 500)) {
                    DBRELE(ubik_dbase);
                    /* sleep for a little while */
                    tv.tv_sec = 0;
@@ -810,10 +829,11 @@ urecovery_Interact(void *dummy)
                    continue;
                }
                UBIK_BEACON_UNLOCK;
-               ViceLog(5, ("recovery sending version to %s\n",
-                           afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
+
                if (vcmp(ts->version, ubik_dbase->version) != 0) {
-                   ViceLog(5, ("recovery stating local database\n"));
+                   ViceLog(0, ("Synchronize database: send (via SendFile) "
+                               "to server %s begin\n",
+                           afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
 
                    /* Rx code to do the Bulk Store */
                    code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
@@ -856,12 +876,24 @@ urecovery_Interact(void *dummy)
                      StoreEndCall:
                        code = rx_EndCall(rxcall, code);
                    }
+
                    if (code == 0) {
                        /* we set a new file, process its header */
                        ts->version = ubik_dbase->version;
                        ts->currentDB = 1;
-                   } else
+                       ViceLog(0,
+                           ("Ubik: Synchronize database: send (via SendFile) "
+                           "to server %s complete, version: %d.%d\n",
+                           afs_inet_ntoa_r(inAddr.s_addr, hoststr),
+                           ts->version.epoch, ts->version.counter));
+
+                   } else {
                        dbok = 0;
+                       ViceLog(0,
+                           ("Ubik: Synchronize database: send (via SendFile) "
+                            "to server %s failed (error = %d)\n",
+                           afs_inet_ntoa_r(inAddr.s_addr, hoststr), code));
+                   }
                } else {
                    /* mark file up to date */
                    ts->currentDB = 1;