DEVEL15-kill-ubik-pthread-env-20080718
[openafs.git] / src / ubik / remote.c
index c5107fa..2ed2a29 100644 (file)
@@ -16,20 +16,16 @@ RCSID
 #include <sys/types.h>
 #ifdef AFS_NT40_ENV
 #include <winsock2.h>
+#include <fcntl.h>
 #else
 #include <sys/file.h>
 #include <netinet/in.h>
 #endif
-#ifdef HAVE_STRING_H
 #include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
 #include <lock.h>
 #include <rx/xdr.h>
 #include <rx/rx.h>
+#include <errno.h>
 #include <afs/afsutil.h>
 
 #define UBIK_INTERNALS
@@ -459,6 +455,7 @@ SDISK_GetFile(rxcall, file, version)
     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
     if (code != sizeof(afs_int32)) {
        DBRELE(dbase);
+       ubik_dprint("Rx-write length error=%d\n", code);
        return BULK_ERROR;
     }
     offset = 0;
@@ -467,11 +464,13 @@ SDISK_GetFile(rxcall, file, version)
        code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
        if (code != tlen) {
            DBRELE(dbase);
+           ubik_dprint("read failed error=%d\n", code);
            return UIOERROR;
        }
        code = rx_Write(rxcall, tbuffer, tlen);
        if (code != tlen) {
            DBRELE(dbase);
+           ubik_dprint("Rx-write length error=%d\n", code);
            return BULK_ERROR;
        }
        length -= tlen;
@@ -491,13 +490,18 @@ SDISK_SendFile(rxcall, file, length, avers)
 {
     register afs_int32 code;
     register struct ubik_dbase *dbase;
-    char tbuffer[256];
+    char tbuffer[1024];
     afs_int32 offset;
     struct ubik_version tversion;
     register int tlen;
     struct rx_peer *tpeer;
     struct rx_connection *tconn;
     afs_uint32 otherHost;
+#ifndef OLD_URECOVERY
+    char pbuffer[1028];
+    int flen, fd = -1;
+    afs_int32 epoch, pass;
+#endif
 
     /* send the file back to the requester */
 
@@ -535,39 +539,109 @@ SDISK_SendFile(rxcall, file, length, avers)
               afs_inet_ntoa(otherHost));
 
     offset = 0;
+#ifdef OLD_URECOVERY
     (*dbase->truncate) (dbase, file, 0);       /* truncate first */
-    tversion.epoch = 0;                /* start off by labelling in-transit db as invalid */
     tversion.counter = 0;
+#else
+    epoch =
+#endif
+    tversion.epoch = 0;                /* start off by labelling in-transit db as invalid */
     (*dbase->setlabel) (dbase, file, &tversion);       /* setlabel does sync */
+#ifndef OLD_URECOVERY
+    flen = length;
+    afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
+    fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
+    if (fd < 0) {
+       code = errno;
+       goto failed;
+    }
+    code = lseek(fd, HDRSIZE, 0);
+    if (code != HDRSIZE) {
+       close(fd);
+       goto failed;
+    }
+#else
+    pass = 0;
+#endif
     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
     while (length > 0) {
        tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
+#if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
+       if (pass % 4 == 0)
+           IOMGR_Poll();
+#endif
        code = rx_Read(rxcall, tbuffer, tlen);
        if (code != tlen) {
            DBRELE(dbase);
+           ubik_dprint("Rx-read length error=%d\n", code);
            code = BULK_ERROR;
+           close(fd);
            goto failed;
        }
+#ifdef OLD_URECOVERY
        code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
+#else
+       code = write(fd, tbuffer, tlen);
+       pass++;
+#endif
        if (code != tlen) {
            DBRELE(dbase);
+           ubik_dprint("write failed error=%d\n", code);
            code = UIOERROR;
+           close(fd);
            goto failed;
        }
        offset += tlen;
        length -= tlen;
     }
+#ifndef OLD_URECOVERY
+    code = close(fd);
+    if (code)
+       goto failed;
+#endif     
 
     /* sync data first, then write label and resync (resync done by setlabel call).
      * This way, good label is only on good database. */
+#ifdef OLD_URECOVERY
     (*ubik_dbase->sync) (dbase, file);
+#else
+    afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
+#ifdef AFS_NT40_ENV
+    afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
+    code = unlink(pbuffer);
+    if (!code)
+       code = rename(tbuffer, pbuffer);
+    afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
+#endif
+    if (!code) 
+       code = rename(pbuffer, tbuffer);
+    if (!code) 
+       code = (*ubik_dbase->open) (ubik_dbase, 0);
+    if (!code)
+#endif
     code = (*ubik_dbase->setlabel) (dbase, file, avers);
+#ifndef OLD_URECOVERY
+#ifdef AFS_NT40_ENV
+    afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
+    unlink(pbuffer);
+#endif
+#endif
     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
     udisk_Invalidate(dbase, file);     /* new dbase, flush disk buffers */
+#ifdef AFS_PTHREAD_ENV
+    assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
+#else
     LWP_NoYieldSignal(&dbase->version);
+#endif
     DBRELE(dbase);
   failed:
     if (code) {
+#ifndef OLD_URECOVERY
+       unlink(pbuffer);
+       /* Failed to sync. Allow reads again for now. */
+       tversion.epoch = epoch;
+       (*dbase->setlabel) (dbase, file, &tversion);
+#endif
        ubik_print
            ("Ubik: Synchronize database with server %s failed (error = %d)\n",
             afs_inet_ntoa(otherHost), code);