DEVEL15-urecovery-invalidate-without-truncate-20080220
authorDerrick Brashear <shadow@dementia.org>
Wed, 20 Feb 2008 20:09:05 +0000 (20:09 +0000)
committerDerrick Brashear <shadow@dementia.org>
Wed, 20 Feb 2008 20:09:05 +0000 (20:09 +0000)
LICENSE IPL10

critical to label the database invalid when we don't truncate so recovery finishes.

issue in 84609 remains.

(cherry picked from commit d5c32c97a01f02cd69e2167964b43d83c6d413fb)

src/ubik/recovery.c
src/ubik/remote.c

index c48a174..b0b8e9c 100644 (file)
@@ -432,6 +432,7 @@ urecovery_Interact(void)
 #ifndef OLD_URECOVERY
     char pbuffer[1028];
     int flen, fd = -1;
+    afs_int32 epoch, pass;
 #endif
 
     /* otherwise, begin interaction */
@@ -566,16 +567,16 @@ urecovery_Interact(void)
                ubik_dprint("truncate io error=%d\n", code);
                goto FetchEndCall;
            }
-
+           tversion.counter = 0;
+#endif
            /* give invalid label during file transit */
            tversion.epoch = 0;
-           tversion.counter = 0;
            code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
            if (code) {
                ubik_dprint("setlabel io error=%d\n", code);
                goto FetchEndCall;
            }
-#else
+#ifndef OLD_URECOVERY
            flen = length;
            afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
            fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
@@ -592,6 +593,10 @@ urecovery_Interact(void)
 
            while (length > 0) {
                tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
+#ifndef AFS_PTHREAD_ENV
+               if (pass % 4 == 0)
+                   IOMGR_Poll();
+#endif
                nbytes = rx_Read(rxcall, tbuffer, tlen);
                if (nbytes != tlen) {
                    ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
@@ -605,6 +610,7 @@ urecovery_Interact(void)
                                          tlen);
 #else
                nbytes = write(fd, tbuffer, tlen);
+               pass++;
 #endif
                if (nbytes != tlen) {
                    code = UIOERROR;
@@ -658,6 +664,10 @@ urecovery_Interact(void)
            if (code) {
 #ifndef OLD_URECOVERY
                unlink(pbuffer);
+               /* 
+                * We will effectively invalidate the old data forever now.
+                * Unclear if we *should* but we do.
+                */
 #endif
                ubik_dbase->version.epoch = 0;
                ubik_dbase->version.counter = 0;
@@ -665,6 +675,7 @@ urecovery_Interact(void)
                           code);
            } else {
                ubik_print("Ubik: Synchronize database completed\n");
+               urecovery_state |= UBIK_RECHAVEDB;
            }
            udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
            LWP_NoYieldSignal(&ubik_dbase->version);
index e80875a..c81cfb2 100644 (file)
@@ -500,6 +500,7 @@ SDISK_SendFile(rxcall, file, length, avers)
 #ifndef OLD_URECOVERY
     char pbuffer[1028];
     int flen, fd = -1;
+    afs_int32 epoch, pass;
 #endif
 
     /* send the file back to the requester */
@@ -540,10 +541,13 @@ SDISK_SendFile(rxcall, file, length, avers)
     offset = 0;
 #ifdef OLD_URECOVERY
     (*dbase->truncate) (dbase, file, 0);       /* truncate first */
-    tversion.epoch = 0;                /* start off by labelling in-transit db as invalid */
     tversion.counter = 0;
-    (*dbase->setlabel) (dbase, file, &tversion);       /* setlabel does sync */
 #else
+    epoch =
+#endif
+    tversion.epoch = 0;                /* start off by labelling in-transit db as invalid */
+    (*dbase->setlabel) (dbase, file, &tversion);       /* setlabel does sync */
+#ifndef OLD_URECOVERY
     flen = length;
     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
@@ -556,10 +560,16 @@ SDISK_SendFile(rxcall, file, length, avers)
        close(fd);
        goto failed;
     }
+#else
+    pass = 0;
 #endif
     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
     while (length > 0) {
        tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
+#if !defined(OLD_URECOVERY) && defined(AFS_PTHREAD_ENV)
+       if (pass % 4 == 0)
+           IOMGR_Poll();
+#endif
        code = rx_Read(rxcall, tbuffer, tlen);
        if (code != tlen) {
            DBRELE(dbase);
@@ -572,6 +582,7 @@ SDISK_SendFile(rxcall, file, length, avers)
        code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
 #else
        code = write(fd, tbuffer, tlen);
+       pass++;
 #endif
        if (code != tlen) {
            DBRELE(dbase);
@@ -621,6 +632,9 @@ SDISK_SendFile(rxcall, file, length, avers)
     if (code) {
 #ifndef OLD_URECOVERY
        unlink(pbuffer);
+       /* Failed to sync. Allow reads again for now. */
+       tversion.epoch = epoch;
+       (*dbase->setlabel) (dbase, file, &tversion);
 #endif
        ubik_print
            ("Ubik: Synchronize database with server %s failed (error = %d)\n",