#ifdef AFS_NT40_ENV
#include <winsock2.h>
#include <time.h>
+#include <fcntl.h>
#else
#include <sys/file.h>
#include <netinet/in.h>
#endif
#include <assert.h>
#include <lock.h>
-#ifdef HAVE_STRING_H
#include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
#include <rx/xdr.h>
#include <rx/rx.h>
#include <errno.h>
urecovery_ResetState(void)
{
urecovery_state = 0;
+#if !defined(AFS_PTHREAD_ENV)
+ /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
LWP_NoYieldSignal(&urecovery_state);
+#endif
return 0;
}
int
urecovery_LostServer(void)
{
+#if !defined(AFS_PTHREAD_ENV)
+ /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
LWP_NoYieldSignal(&urecovery_state);
return 0;
+#endif
}
/* return true iff we have a current database (called by both sync
adbase->version.counter = 0;
(*adbase->setlabel) (adbase, 0, &adbase->version);
}
+#ifdef AFS_PTHREAD_ENV
+ assert(pthread_cond_broadcast(&adbase->version_cond) == 0);
+#else
LWP_NoYieldSignal(&adbase->version);
+#endif
}
return 0;
}
* requests. However, the recovery module still has one more task:
* propagating the dbase out to everyone who is up in the network.
*/
-int
-urecovery_Interact(void)
+void *
+urecovery_Interact(void *dummy)
{
afs_int32 code, tcode;
struct ubik_server *bestServer = NULL;
struct timeval tv;
int length, tlen, offset, file, nbytes;
struct rx_call *rxcall;
- char tbuffer[256];
+ char tbuffer[1024];
struct ubik_stat ubikstat;
struct in_addr inAddr;
+#ifndef OLD_URECOVERY
+ char pbuffer[1028];
+ int flen, fd = -1;
+ afs_int32 epoch, pass;
+#endif
/* otherwise, begin interaction */
urecovery_state = 0;
/* Run through this loop every 4 seconds */
tv.tv_sec = 4;
tv.tv_usec = 0;
+#ifdef AFS_PTHREAD_ENV
+ select(0, 0, 0, 0, &tv);
+#else
IOMGR_Select(0, 0, 0, 0, &tv);
+#endif
ubik_dprint("recovery running in state %x\n", urecovery_state);
urecovery_state |= UBIK_RECHAVEDB;
} else {
/* we don't have the best version; we should fetch it. */
-#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
-#else
- ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
urecovery_AbortAll(ubik_dbase);
/* Rx code to do the Bulk fetch */
goto FetchEndCall;
}
- /* Truncate the file firest */
+#ifdef OLD_URECOVERY
+ /* Truncate the file first */
code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
if (code) {
ubik_dprint("truncate io error=%d\n", code);
goto FetchEndCall;
}
-
+ tversion.counter = 0;
+#endif
/* give invalid label during file transit */
tversion.epoch = 0;
- tversion.counter = 0;
code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
if (code) {
ubik_dprint("setlabel io error=%d\n", code);
goto FetchEndCall;
}
+#ifndef OLD_URECOVERY
+ flen = length;
+ afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
+ fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
+ if (fd < 0) {
+ code = errno;
+ goto FetchEndCall;
+ }
+ code = lseek(fd, HDRSIZE, 0);
+ if (code != HDRSIZE) {
+ close(fd);
+ goto FetchEndCall;
+ }
+#endif
while (length > 0) {
tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
+#ifndef AFS_PTHREAD_ENV
+ if (pass % 4 == 0)
+ IOMGR_Poll();
+#endif
nbytes = rx_Read(rxcall, tbuffer, tlen);
if (nbytes != tlen) {
ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
code = EIO;
+ close(fd);
goto FetchEndCall;
}
+#ifdef OLD_URECOVERY
nbytes =
(*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
tlen);
+#else
+ nbytes = write(fd, tbuffer, tlen);
+ pass++;
+#endif
if (nbytes != tlen) {
code = UIOERROR;
+ close(fd);
goto FetchEndCall;
}
offset += tlen;
length -= tlen;
}
+#ifndef OLD_URECOVERY
+ code = close(fd);
+ if (code)
+ goto FetchEndCall;
+#endif
code = EndDISK_GetFile(rxcall, &tversion);
FetchEndCall:
tcode = rx_EndCall(rxcall, code);
urecovery_state |= UBIK_RECHAVEDB;
memcpy(&ubik_dbase->version, &tversion,
sizeof(struct ubik_version));
+#ifdef OLD_URECOVERY
(*ubik_dbase->sync) (ubik_dbase, 0); /* get data out first */
+#else
+ afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
+#ifdef AFS_NT40_ENV
+ afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
+ code = unlink(pbuffer);
+ if (!code)
+ code = rename(tbuffer, pbuffer);
+ afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
+#endif
+ if (!code)
+ code = rename(pbuffer, tbuffer);
+ if (!code)
+ code = (*ubik_dbase->open) (ubik_dbase, 0);
+ if (!code)
+#endif
/* after data is good, sync disk with correct label */
code =
(*ubik_dbase->setlabel) (ubik_dbase, 0,
&ubik_dbase->version);
+#ifndef OLD_URECOVERY
+#ifdef AFS_NT40_ENV
+ afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
+ unlink(pbuffer);
+#endif
+#endif
}
if (code) {
+#ifndef OLD_URECOVERY
+ unlink(pbuffer);
+ /*
+ * We will effectively invalidate the old data forever now.
+ * Unclear if we *should* but we do.
+ */
+#endif
ubik_dbase->version.epoch = 0;
ubik_dbase->version.counter = 0;
ubik_print("Ubik: Synchronize database failed (error = %d)\n",
code);
} else {
ubik_print("Ubik: Synchronize database completed\n");
+ urecovery_state |= UBIK_RECHAVEDB;
}
udisk_Invalidate(ubik_dbase, 0); /* data has changed */
+#ifdef AFS_PTHREAD_ENV
+ assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
+#else
LWP_NoYieldSignal(&ubik_dbase->version);
-#if defined(UBIK_PAUSE)
+#endif
DBRELE(ubik_dbase);
-#else
- ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
}
#if defined(UBIK_PAUSE)
if (!(urecovery_state & UBIK_RECSYNCSITE))
* database and overwrite this one.
*/
if (ubik_dbase->version.epoch == 1) {
-#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
-#else
- ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
urecovery_AbortAll(ubik_dbase);
ubik_epochTime = 2;
ubik_dbase->version.epoch = ubik_epochTime;
code =
(*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
+#ifdef AFS_PTHREAD_ENV
+ assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
+#else
LWP_NoYieldSignal(&ubik_dbase->version);
-#if defined(UBIK_PAUSE)
+#endif
DBRELE(ubik_dbase);
-#else
- ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
}
/* Check the other sites and send the database to them if they
/* now propagate out new version to everyone else */
dbok = 1; /* start off assuming they all worked */
-#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
-#else
- ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
/*
* Check if a write transaction is in progress. We can't send the
* db when a write is in progress here because the db would be
tv.tv_sec = 0;
tv.tv_usec = 50000;
while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
-#if defined(UBIK_PAUSE)
DBRELE(ubik_dbase);
-#else
- ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
/* sleep for a little while */
+#ifdef AFS_PTHREAD_ENV
+ select(0, 0, 0, 0, &tv);
+#else
IOMGR_Select(0, 0, 0, 0, &tv);
+#endif
tv.tv_usec += 10000;
safety++;
-#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
-#else
- ObtainWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
}
}
ts->currentDB = 1;
}
}
-#if defined(UBIK_PAUSE)
DBRELE(ubik_dbase);
-#else
- ReleaseWriteLock(&ubik_dbase->versionLock);
-#endif /* UBIK_PAUSE */
if (dbok)
urecovery_state |= UBIK_RECSENTDB;
}
}
+ return NULL;
}
/*