DEVEL15-kill-ubik-pthread-env-20080718
[openafs.git] / src / ubik / recovery.c
index 7c86de9..9b76e0d 100644 (file)
@@ -17,6 +17,7 @@ RCSID
 #ifdef AFS_NT40_ENV
 #include <winsock2.h>
 #include <time.h>
+#include <fcntl.h>
 #else
 #include <sys/file.h>
 #include <netinet/in.h>
@@ -24,13 +25,7 @@ RCSID
 #endif
 #include <assert.h>
 #include <lock.h>
-#ifdef HAVE_STRING_H
 #include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
 #include <rx/xdr.h>
 #include <rx/rx.h>
 #include <errno.h>
@@ -77,7 +72,10 @@ int
 urecovery_ResetState(void)
 {
     urecovery_state = 0;
+#if !defined(AFS_PTHREAD_ENV)
+    /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
     LWP_NoYieldSignal(&urecovery_state);
+#endif
     return 0;
 }
 
@@ -88,8 +86,11 @@ urecovery_ResetState(void)
 int
 urecovery_LostServer(void)
 {
+#if !defined(AFS_PTHREAD_ENV)
+    /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
     LWP_NoYieldSignal(&urecovery_state);
     return 0;
+#endif
 }
 
 /* return true iff we have a current database (called by both sync
@@ -368,7 +369,11 @@ InitializeDB(register struct ubik_dbase *adbase)
            adbase->version.counter = 0;
            (*adbase->setlabel) (adbase, 0, &adbase->version);
        }
+#ifdef AFS_PTHREAD_ENV
+       assert(pthread_cond_broadcast(&adbase->version_cond) == 0);
+#else
        LWP_NoYieldSignal(&adbase->version);
+#endif
     }
     return 0;
 }
@@ -417,8 +422,8 @@ urecovery_Initialize(register struct ubik_dbase *adbase)
  * requests.  However, the recovery module still has one more task:
  * propagating the dbase out to everyone who is up in the network.
  */
-int
-urecovery_Interact(void)
+void *
+urecovery_Interact(void *dummy)
 {
     afs_int32 code, tcode;
     struct ubik_server *bestServer = NULL;
@@ -431,9 +436,14 @@ urecovery_Interact(void)
     struct timeval tv;
     int length, tlen, offset, file, nbytes;
     struct rx_call *rxcall;
-    char tbuffer[256];
+    char tbuffer[1024];
     struct ubik_stat ubikstat;
     struct in_addr inAddr;
+#ifndef OLD_URECOVERY
+    char pbuffer[1028];
+    int flen, fd = -1;
+    afs_int32 epoch, pass;
+#endif
 
     /* otherwise, begin interaction */
     urecovery_state = 0;
@@ -443,7 +453,11 @@ urecovery_Interact(void)
        /* Run through this loop every 4 seconds */
        tv.tv_sec = 4;
        tv.tv_usec = 0;
+#ifdef AFS_PTHREAD_ENV
+       select(0, 0, 0, 0, &tv);
+#else
        IOMGR_Select(0, 0, 0, 0, &tv);
+#endif
 
        ubik_dprint("recovery running in state %x\n", urecovery_state);
 
@@ -536,11 +550,7 @@ urecovery_Interact(void)
            urecovery_state |= UBIK_RECHAVEDB;
        } else {
            /* we don't have the best version; we should fetch it. */
-#if defined(UBIK_PAUSE)
            DBHOLD(ubik_dbase);
-#else
-           ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
            urecovery_AbortAll(ubik_dbase);
 
            /* Rx code to do the Bulk fetch */
@@ -564,40 +574,71 @@ urecovery_Interact(void)
                goto FetchEndCall;
            }
 
-           /* Truncate the file firest */
+#ifdef OLD_URECOVERY
+           /* Truncate the file first */
            code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
            if (code) {
                ubik_dprint("truncate io error=%d\n", code);
                goto FetchEndCall;
            }
-
+           tversion.counter = 0;
+#endif
            /* give invalid label during file transit */
            tversion.epoch = 0;
-           tversion.counter = 0;
            code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
            if (code) {
                ubik_dprint("setlabel io error=%d\n", code);
                goto FetchEndCall;
            }
+#ifndef OLD_URECOVERY
+           flen = length;
+           afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
+           fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
+           if (fd < 0) {
+               code = errno;
+               goto FetchEndCall;
+           }
+           code = lseek(fd, HDRSIZE, 0);
+           if (code != HDRSIZE) {
+               close(fd);
+               goto FetchEndCall;
+           }
+#endif
 
            while (length > 0) {
                tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
+#ifndef AFS_PTHREAD_ENV
+               if (pass % 4 == 0)
+                   IOMGR_Poll();
+#endif
                nbytes = rx_Read(rxcall, tbuffer, tlen);
                if (nbytes != tlen) {
                    ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
                    code = EIO;
+                   close(fd);
                    goto FetchEndCall;
                }
+#ifdef OLD_URECOVERY
                nbytes =
                    (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
                                          tlen);
+#else
+               nbytes = write(fd, tbuffer, tlen);
+               pass++;
+#endif
                if (nbytes != tlen) {
                    code = UIOERROR;
+                   close(fd);
                    goto FetchEndCall;
                }
                offset += tlen;
                length -= tlen;
            }
+#ifndef OLD_URECOVERY
+           code = close(fd);
+           if (code)
+               goto FetchEndCall;
+#endif     
            code = EndDISK_GetFile(rxcall, &tversion);
          FetchEndCall:
            tcode = rx_EndCall(rxcall, code);
@@ -608,27 +649,57 @@ urecovery_Interact(void)
                urecovery_state |= UBIK_RECHAVEDB;
                memcpy(&ubik_dbase->version, &tversion,
                       sizeof(struct ubik_version));
+#ifdef OLD_URECOVERY
                (*ubik_dbase->sync) (ubik_dbase, 0);    /* get data out first */
+#else
+               afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
+#ifdef AFS_NT40_ENV
+               afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
+               code = unlink(pbuffer);
+               if (!code)
+                   code = rename(tbuffer, pbuffer);
+               afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
+#endif
+               if (!code) 
+                   code = rename(pbuffer, tbuffer);
+               if (!code) 
+                   code = (*ubik_dbase->open) (ubik_dbase, 0);
+               if (!code)
+#endif
                /* after data is good, sync disk with correct label */
                code =
                    (*ubik_dbase->setlabel) (ubik_dbase, 0,
                                             &ubik_dbase->version);
+#ifndef OLD_URECOVERY
+#ifdef AFS_NT40_ENV
+               afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
+               unlink(pbuffer);
+#endif
+#endif
            }
            if (code) {
+#ifndef OLD_URECOVERY
+               unlink(pbuffer);
+               /* 
+                * We will effectively invalidate the old data forever now.
+                * Unclear if we *should* but we do.
+                */
+#endif
                ubik_dbase->version.epoch = 0;
                ubik_dbase->version.counter = 0;
                ubik_print("Ubik: Synchronize database failed (error = %d)\n",
                           code);
            } else {
                ubik_print("Ubik: Synchronize database completed\n");
+               urecovery_state |= UBIK_RECHAVEDB;
            }
            udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
+#ifdef AFS_PTHREAD_ENV
+           assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
+#else
            LWP_NoYieldSignal(&ubik_dbase->version);
-#if defined(UBIK_PAUSE)
+#endif
            DBRELE(ubik_dbase);
-#else
-           ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
        }
 #if defined(UBIK_PAUSE)
        if (!(urecovery_state & UBIK_RECSYNCSITE))
@@ -643,11 +714,7 @@ urecovery_Interact(void)
         * database and overwrite this one.
         */
        if (ubik_dbase->version.epoch == 1) {
-#if defined(UBIK_PAUSE)
            DBHOLD(ubik_dbase);
-#else
-           ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
            urecovery_AbortAll(ubik_dbase);
            ubik_epochTime = 2;
            ubik_dbase->version.epoch = ubik_epochTime;
@@ -655,12 +722,12 @@ urecovery_Interact(void)
            code =
                (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
            udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
+#ifdef AFS_PTHREAD_ENV
+           assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
+#else
            LWP_NoYieldSignal(&ubik_dbase->version);
-#if defined(UBIK_PAUSE)
+#endif
            DBRELE(ubik_dbase);
-#else
-           ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
        }
 
        /* Check the other sites and send the database to them if they
@@ -670,11 +737,7 @@ urecovery_Interact(void)
            /* now propagate out new version to everyone else */
            dbok = 1;           /* start off assuming they all worked */
 
-#if defined(UBIK_PAUSE)
            DBHOLD(ubik_dbase);
-#else
-           ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
            /*
             * Check if a write transaction is in progress. We can't send the
             * db when a write is in progress here because the db would be
@@ -690,20 +753,16 @@ urecovery_Interact(void)
                tv.tv_sec = 0;
                tv.tv_usec = 50000;
                while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
-#if defined(UBIK_PAUSE)
                    DBRELE(ubik_dbase);
-#else
-                   ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
                    /* sleep for a little while */
+#ifdef AFS_PTHREAD_ENV
+                   select(0, 0, 0, 0, &tv);
+#else
                    IOMGR_Select(0, 0, 0, 0, &tv);
+#endif
                    tv.tv_usec += 10000;
                    safety++;
-#if defined(UBIK_PAUSE)
                    DBHOLD(ubik_dbase);
-#else
-                   ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
                }
            }
 
@@ -770,15 +829,12 @@ urecovery_Interact(void)
                    ts->currentDB = 1;
                }
            }
-#if defined(UBIK_PAUSE)
            DBRELE(ubik_dbase);
-#else
-           ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
            if (dbok)
                urecovery_state |= UBIK_RECSENTDB;
        }
     }
+    return NULL;
 }
 
 /*