vos: vos release -force-reclone option
[openafs.git] / src / volser / vsprocs.c
index 2d19fc5..18d47f3 100644 (file)
@@ -21,6 +21,7 @@
 #include <afs/voldefs.h>
 #include <rx/xdr.h>
 #include <rx/rx.h>
+#include <rx/rx_queue.h>
 #include <afs/vlserver.h>
 #include <afs/nfs.h>
 #include <afs/cellconfig.h>
@@ -642,12 +643,10 @@ UV_PartitionInfo64(afs_uint32 server, char *pname,
     struct rx_connection *aconn;
     afs_int32 code = 0;
 
-    aconn = (struct rx_connection *)0;
     aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
     code = AFSVolPartitionInfo64(aconn, pname, partition);
     if (code == RXGEN_OPCODE) {
-       struct diskPartition *dpp =
-           (struct diskPartition *)malloc(sizeof(struct diskPartition));
+       struct diskPartition *dpp = malloc(sizeof(struct diskPartition));
        code = AFSVolPartitionInfo(aconn, pname, dpp);
        if (!code) {
            strncpy(partition->name, dpp->name, 32);
@@ -722,7 +721,6 @@ UV_CreateVolume3(afs_uint32 aserver, afs_int32 apart, char *aname,
     struct volintInfo tstatus;
 
     tid = 0;
-    aconn = (struct rx_connection *)0;
     error = 0;
 
     init_volintInfo(&tstatus);
@@ -762,6 +760,12 @@ UV_CreateVolume3(afs_uint32 aserver, afs_int32 apart, char *aname,
     /* rw,ro, bk id are related in the default case */
     /* If caller specified RW id, but not RO/BK ids, have them be RW+1 and RW+2 */
     lastid = *anewid;
+    if (aroid && *aroid != 0) {
+       lastid = max(lastid, *aroid);
+    }
+    if (abkid && *abkid != 0) {
+       lastid = max(lastid, *abkid);
+    }
     if (aroid && *aroid == 0) {
        *aroid = ++lastid;
     }
@@ -995,12 +999,14 @@ UV_DeleteVolume(afs_uint32 aserver, afs_int32 apart, afs_uint32 avolid)
            ERROR_EXIT(0);
        }
 
-       /* Delete backup if it exists */
-       code = DoVolDelete(aconn, entry.volumeId[BACKVOL], apart,
-                          "the backup", 0, NULL, NULL);
-       if (code && code != VNOVOL) {
-           error = code;
-           goto error_exit;
+       if (entry.volumeId[BACKVOL]) {
+           /* Delete backup if it exists */
+           code = DoVolDelete(aconn, entry.volumeId[BACKVOL], apart,
+                              "the backup", 0, NULL, NULL);
+           if (code && code != VNOVOL) {
+               error = code;
+               goto error_exit;
+           }
        }
 
        if (verbose)
@@ -1165,6 +1171,12 @@ DoVolDelete(struct rx_connection *aconn, afs_uint32 avolid,
     code =
        AFSVolTransCreate_retry(aconn, avolid, apart, ITOffline, &ttid);
 
+    /* return early and quietly for VNOVOL; don't continue the attempt to delete. */
+    if (code == VNOVOL) {
+       error = code;
+       goto dfail;
+    }
+
     EGOTO2(dfail, code, "%sFailed to start transaction on %u\n",
           prefix, avolid);
 
@@ -1326,6 +1338,159 @@ cfail:
     return error;
 }
 
+/* Convert volume from RO to RW; adjust the VLDB entry to match.
+ * The nvldbentry passed to us has already been MapHostToNetwork'd
+ * by the caller.
+ */
+
+int
+UV_ConvertRO(afs_uint32 server, afs_uint32 partition, afs_uint32 volid,
+               struct nvldbentry *entry)
+{
+    afs_int32 code, i, same;
+    struct nvldbentry checkEntry, storeEntry;
+    afs_int32 vcode;
+    afs_int32 rwindex = 0;
+    afs_uint32 rwserver = 0;
+    afs_int32 roindex = 0;
+    afs_uint32 roserver = 0;
+    struct rx_connection *aconn;
+
+    vcode =
+       ubik_VL_SetLock(cstruct, 0, entry->volumeId[RWVOL], RWVOL,
+                 VLOP_MOVE);
+    if (vcode) {
+       fprintf(STDERR,
+               "Unable to lock volume %lu, code %d\n",
+               (unsigned long)entry->volumeId[RWVOL],vcode);
+       PrintError("", vcode);
+       return -1;
+    }
+
+    /* make sure the VLDB entry hasn't changed since we started */
+    memset(&checkEntry, 0, sizeof(checkEntry));
+    vcode = VLDB_GetEntryByID(volid, -1, &checkEntry);
+    if (vcode) {
+       fprintf(STDERR,
+                "Could not fetch the entry for volume %lu from VLDB\n",
+                (unsigned long)volid);
+       PrintError("convertROtoRW ", vcode);
+       code = vcode;
+       goto error_exit;
+    }
+
+    MapHostToNetwork(&checkEntry);
+    entry->flags &= ~VLOP_ALLOPERS;  /* clear any stale lock operation flags */
+    entry->flags |= VLOP_MOVE;        /* set to match SetLock operation above */
+    if (memcmp(entry, &checkEntry, sizeof(*entry)) != 0) {
+        fprintf(STDERR,
+                "VLDB entry for volume %lu has changed; please reissue the command.\n",
+                (unsigned long)volid);
+        code = -1;
+        goto error_exit;
+    }
+
+    /* extract information from the original entry */
+    for (i = 0; i < entry->nServers; i++) {
+       if (entry->serverFlags[i] & ITSRWVOL) {
+           rwindex = i;
+           rwserver = entry->serverNumber[i];
+       /*  rwpartition = entry->serverPartition[i]; */
+           if (roserver)
+               break;
+       } else if ((entry->serverFlags[i] & ITSROVOL) && !roserver) {
+           same = VLDB_IsSameAddrs(server, entry->serverNumber[i], &code);
+           if (code) {
+               fprintf(STDERR,
+                       "Failed to get info about server's %d address(es) from vlserver (err=%d); aborting call!\n",
+                       server, code);
+               code = ENOENT;
+               goto error_exit;
+           }
+           if (same) {
+               roindex = i;
+               roserver = entry->serverNumber[i];
+       /*      ropartition = entry->serverPartition[i]; */
+               if (rwserver)
+                    break;
+           }
+       }
+    }
+
+    aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
+    code = AFSVolConvertROtoRWvolume(aconn, partition, volid);
+    if (code) {
+       fprintf(STDERR,
+               "Converting RO volume %lu to RW volume failed with code %d\n",
+               (unsigned long)volid, code);
+       PrintError("convertROtoRW ", code);
+       goto error_exit;
+    }
+    /* Update the VLDB to match what we did on disk as much as possible.  */
+    /* If the converted RO was in the VLDB, make it look like the new RW. */
+    if (roserver) {
+       entry->serverFlags[roindex] = ITSRWVOL;
+    } else {
+       /* Add a new site entry for the newly created RW.  It's possible
+        * (but unlikely) that we are already at MAXNSERVERS and that this
+        * new site will invalidate the whole VLDB entry;  however,
+        * VLDB_ReplaceEntry will detect this and return VL_BADSERVER,
+        * so we need no extra guard logic here.
+        */
+       afs_int32 newrwindex = entry->nServers;
+       (entry->nServers)++;
+       entry->serverNumber[newrwindex] = server;
+       entry->serverPartition[newrwindex] = partition;
+       entry->serverFlags[newrwindex] = ITSRWVOL;
+    }
+    entry->flags |= RW_EXISTS;
+    entry->flags &= ~BACK_EXISTS;
+
+    /* if the old RW was in the VLDB, remove it by decrementing the number */
+    /* of servers, replacing the RW entry with the last entry, and zeroing */
+    /* out the last entry. */
+    if (rwserver) {
+       (entry->nServers)--;
+       if (rwindex != entry->nServers) {
+           entry->serverNumber[rwindex] = entry->serverNumber[entry->nServers];
+           entry->serverPartition[rwindex] =
+               entry->serverPartition[entry->nServers];
+           entry->serverFlags[rwindex] = entry->serverFlags[entry->nServers];
+           entry->serverNumber[entry->nServers] = 0;
+           entry->serverPartition[entry->nServers] = 0;
+           entry->serverFlags[entry->nServers] = 0;
+       }
+    }
+    entry->flags &= ~RO_EXISTS;
+    for (i = 0; i < entry->nServers; i++) {
+       if (entry->serverFlags[i] & ITSROVOL) {
+           if (!(entry->serverFlags[i] & (RO_DONTUSE | NEW_REPSITE)))
+               entry->flags |= RO_EXISTS;
+       }
+    }
+    MapNetworkToHost(entry, &storeEntry);
+    code =
+       VLDB_ReplaceEntry(entry->volumeId[RWVOL], RWVOL, &storeEntry,
+                         (LOCKREL_OPCODE | LOCKREL_AFSID |
+                          LOCKREL_TIMESTAMP));
+    if (code) {
+       fprintf(STDERR,
+               "Warning: volume converted, but vldb update failed with code %d!\n",
+               code);
+    }
+
+  error_exit:
+    vcode = UV_LockRelease(entry->volumeId[RWVOL]);
+    if (vcode) {
+       fprintf(STDERR,
+               "Unable to unlock volume %lu, code %d\n",
+               (unsigned long)entry->volumeId[RWVOL],vcode);
+       PrintError("", vcode);
+    }
+    return code;
+}
+
+
 /* Move volume <afromvol> on <afromserver> <afrompart> to <atoserver>
  * <atopart>.  The operation is almost idempotent.  The following
  * flags are recognized:
@@ -1507,7 +1672,7 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
     pntg = 1;
     toconn = UV_Bind(atoserver, AFSCONF_VOLUMEPORT);   /* get connections to the servers */
     fromconn = UV_Bind(afromserver, AFSCONF_VOLUMEPORT);
-    fromtid = totid = 0;       /* initialize to uncreated */
+    totid = 0; /* initialize to uncreated */
 
     /* ***
      * clone the read/write volume locally.
@@ -1524,7 +1689,7 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        /* Get a clone id */
        VPRINT1("Allocating new volume id for clone of volume %u ...",
                afromvol);
-       newVol = tmpVol = 0;
+       tmpVol = 0;
        vcode = ubik_VL_GetNewVolumeId(cstruct, 0, 1, &tmpVol);
        newVol = tmpVol;
        EGOTO1(mfail, vcode,
@@ -1854,6 +2019,9 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        code = DoVolDelete(fromconn, newVol, afrompart,
                           "cloned", 0, NULL, NULL);
        if (code) {
+           if (code == VNOVOL) {
+               EPRINT1(code, "Failed to start transaction on %u\n", newVol);
+           }
            error = code;
            goto mfail;
        }
@@ -2016,9 +2184,13 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
            fflush(STDOUT);
        }
 
-       if (volid && toconn)
+       if (volid && toconn) {
            code = DoVolDelete(toconn, volid, atopart,
                               "destination", 0, NULL, "Recovery:");
+           if (code == VNOVOL) {
+               EPRINT1(code, "Recovery: Failed to start transaction on %u\n", volid);
+           }
+        }
 
        /* put source volume on-line */
        if (fromconn) {
@@ -2059,17 +2231,26 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        if (fromconn) {
            code = DoVolDelete(fromconn, backupId, afrompart,
                               "backup", 0, NULL, "Recovery:");
+           if (code == VNOVOL) {
+               EPRINT1(code, "Recovery: Failed to start transaction on %u\n", backupId);
+           }
 
            code = DoVolDelete(fromconn, afromvol, afrompart, "source",
                               (atoserver != afromserver)?atoserver:0,
-                              NULL, NULL);
+                       NULL, NULL);
+           if (code == VNOVOL) {
+               EPRINT1(code, "Failed to start transaction on %u\n", afromvol);
+           }
        }
     }
 
     /* common cleanup - delete local clone */
     if (newVol) {
        code = DoVolDelete(fromconn, newVol, afrompart,
-                          "clone", 0, NULL, "Recovery:");
+                          "clone", 0, NULL, "Recovery:");
+       if (code == VNOVOL) {
+           EPRINT1(code, "Recovery: Failed to start transaction on %u\n", newVol);
+       }
     }
 
     /* unlock VLDB entry */
@@ -2079,7 +2260,6 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        ubik_VL_ReleaseLock(cstruct, 0, afromvol, -1,
                            (LOCKREL_OPCODE | LOCKREL_AFSID | LOCKREL_TIMESTAMP));
        VDONE;
-       islocked = 0;
     }
   done:                        /* routine cleanup */
     if (volName)
@@ -2448,6 +2628,9 @@ cpincr:
        code = DoVolDelete(fromconn, cloneVol, afrompart,
                           "cloned", 0, NULL, NULL);
        if (code) {
+           if (code == VNOVOL) {
+               EPRINT1(code, "Failed to start transaction on %u\n", cloneVol);
+           }
            error = code;
            goto mfail;
        }
@@ -2573,9 +2756,13 @@ cpincr:
     MapHostToNetwork(&entry);
 
     /* common cleanup - delete local clone */
-    if (cloneVol)
+    if (cloneVol) {
        code = DoVolDelete(fromconn, cloneVol, afrompart,
-                          "clone", 0, NULL, "Recovery:");
+                          "clone", 0, NULL, "Recovery:");
+       if (code == VNOVOL) {
+           EPRINT1(code, "Recovery: Failed to start transaction on %u\n", cloneVol);
+       }
+    }
 
   done:                        /* routine cleanup */
     if (fromconn)
@@ -3010,7 +3197,7 @@ GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
 
     /* If the volume does not exist, create it */
     if (!volid || code) {
-       char volname[64];
+       char volname[VL_MAXNAMELEN];
         char hoststr[16];
 
        if (volid && (code != VNOVOL)) {
@@ -3019,11 +3206,15 @@ GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
            goto fail;
        }
 
-       strcpy(volname, vldbEntryPtr->name);
-       if (tmpVolId)
-           strcat(volname, ".roclone");
-       else
-           strcat(volname, ".readonly");
+       strlcpy(volname, vldbEntryPtr->name, sizeof(volname));
+
+       if (strlcat(volname,
+                   tmpVolId?".roclone":".readonly",
+                   sizeof(volname)) >= sizeof(volname)) {
+           code = ENOMEM;
+           PrintError("Volume name is too long\n", code);
+           goto fail;
+       }
 
        if (verbose) {
            fprintf(STDOUT,
@@ -3258,23 +3449,31 @@ DoVolOnline(struct nvldbentry *vldbEntryPtr, afs_uint32 avolid, int index,
     return code;
 }
 
-/* UV_ReleaseVolume()
- *    Release volume <afromvol> on <afromserver> <afrompart> to all
- *    its RO sites (full release). Unless the previous release was
- *    incomplete: in which case we bring the remaining incomplete
- *    volumes up to date with the volumes that were released
- *    successfully.
- *    forceflag: Performs a full release.
+/**
+ * Release a volume to read-only sites
  *
- *    Will create a clone from the RW, then dump the clone out to
- *    the remaining replicas. If there is more than 1 RO sites,
- *    ensure that the VLDB says at least one RO is available all
- *    the time: Influences when we write back the VLDB entry.
+ * Release volume <afromvol> on <afromserver> <afrompart> to all
+ * its RO sites (full release). Unless the previous release was
+ * incomplete: in which case we bring the remaining incomplete
+ * volumes up to date with the volumes that were released
+ * successfully.
+ *
+ * Will create a clone from the RW, then dump the clone out to
+ * the remaining replicas. If there is more than 1 RO sites,
+ * ensure that the VLDB says at least one RO is available all
+ * the time: Influences when we write back the VLDB entry.
+ *
+ * @param[in] afromvol      volume to be released
+ * @param[in] afromserver   server containing afromvol
+ * @param[in] afrompart     partition containing afromvol
+ * @param[in] flags         bitmap of options
+ *                            REL_COMPLETE  - force a complete release
+ *                            REL_FULLDUMPS - force full dumps
+ *                            REL_STAYUP    - dump to clones to avoid offline time
  */
-
 int
 UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
-                afs_int32 afrompart, int forceflag, int stayUp)
+                afs_int32 afrompart, int flags)
 {
     char vname[64];
     afs_int32 code = 0;
@@ -3282,7 +3481,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     afs_uint32 cloneVolId = 0, roVolId;
     struct replica *replicas = 0;
     struct nvldbentry entry, storeEntry;
-    int i, volcount = 0, m, fullrelease, vldbindex;
+    int i, volcount = 0, m, vldbindex;
     int failure;
     struct restoreCookie cookie;
     struct rx_connection **toconns = 0;
@@ -3300,7 +3499,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     manyDests tr;
     manyResults results;
     int rwindex, roindex, roclone, roexists;
-    afs_uint32 rwcrdate = 0;
+    afs_uint32 rwcrdate = 0, rwupdate = 0;
     afs_uint32 clcrdate;
     struct rtime {
        int validtime;
@@ -3311,6 +3510,23 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     char hoststr[16];
     afs_int32 origflags[NMAXNSERVERS];
     struct volser_status orig_status;
+    int notreleased = 0;
+    int tried_justnewsites = 0;
+    int justnewsites = 0; /* are we just trying to release to new RO sites? */
+    int sites = 0; /* number of ro sites */
+    int new_sites = 0; /* number of ro sites markes as new */
+    int stayUp = (flags & REL_STAYUP);
+
+    typedef enum {
+        CR_RECOVER    = 0x0000, /**< not complete: a recovery from a previous failed release */
+        CR_FORCED     = 0x0001, /**< complete: forced by caller */
+        CR_LAST_OK    = 0x0002, /**< complete: no sites have been marked as new release */
+        CR_ALL_NEW    = 0x0004, /**< complete: all sites have been marked as new release */
+        CR_NEW_RW     = 0x0008, /**< complete: read-write has changed */
+        CR_RO_MISSING = 0x0010, /**< complete: ro clone is missing */
+    } complete_release_t;
+
+    complete_release_t complete_release = CR_RECOVER;
 
     memset(remembertime, 0, sizeof(remembertime));
     memset(&results, 0, sizeof(results));
@@ -3367,21 +3583,51 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        ONERROR(vcode, entry.name, "Could not update vldb entry for %s.\n");
     }
 
-    /* Will we be completing a previously unfinished release. -force overrides */
-    for (s = 0, m = 0, fullrelease=0, i=0; (i<entry.nServers); i++) {
+    /*
+     * Determine if this is to be a complete release or a recovery of a
+     * previous unfinished release. The previous release is considered to be
+     * unfinished when the clone was successfully distributed to at least one
+     * (but not all) of the read-only sites, as indicated by the NEW_REPSITE
+     * vldb flags.
+     *
+     * The caller can override the vldb flags check using the -force
+     * or -force-reclone flag, to force this to be a complete release.
+     */
+    for (i = 0; i < entry.nServers; i++) {
        if (entry.serverFlags[i] & ITSROVOL) {
-           m++;
-           if (entry.serverFlags[i] & NEW_REPSITE) s++;
+           sites++;
+           if (entry.serverFlags[i] & NEW_REPSITE)
+               new_sites++;
+           if (entry.serverFlags[i] & RO_DONTUSE)
+               notreleased++;
        }
        origflags[i] = entry.serverFlags[i];
     }
-    if ((forceflag && !fullrelease) || (s == m) || (s == 0))
-       fullrelease = 1;
+
+    if (flags & REL_COMPLETE) {
+       complete_release |= CR_FORCED;
+    }
+
+    if (new_sites == 0) {
+       complete_release |= CR_LAST_OK;
+    } else if (new_sites == sites) {
+       complete_release |= CR_ALL_NEW;
+    }
+
+    if ((complete_release & (CR_LAST_OK | CR_ALL_NEW))
+       && !(complete_release & CR_FORCED)) {
+       if (notreleased && notreleased != sites) {
+           /* we have some new unreleased sites. try to just release to those,
+            * if the RW has not changed. The caller can override with -force
+            * or with -force-reclone. */
+           justnewsites = 1;
+       }
+    }
 
     /* Determine which volume id to use and see if it exists */
-    cloneVolId =
-       ((fullrelease
-         || (entry.cloneId == 0)) ? entry.volumeId[ROVOL] : entry.cloneId);
+    cloneVolId = (complete_release || entry.cloneId == 0)
+                 ? entry.volumeId[ROVOL] : entry.cloneId;
+
     code = VolumeExists(afromserver, afrompart, cloneVolId);
     roexists = ((code == ENODEV) ? 0 : 1);
 
@@ -3413,10 +3659,10 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        ONERROR(-1, afromserver,
                "Cannot establish connection with server 0x%x\n");
 
-    if (!fullrelease) {
-       if (!roexists)
-           fullrelease = 1;    /* Do a full release if RO clone does not exist */
-       else {
+    if (!complete_release) {
+       if (!roexists) {
+           complete_release |= CR_RO_MISSING;  /* Do a complete release if RO clone does not exist */
+       } else {
            /* Begin transaction on RW and mark it busy while we query it */
            code = AFSVolTransCreate_retry(
                        fromconn, afromvol, afrompart, ITBusy, &fromtid
@@ -3456,27 +3702,56 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
                    "Failed to end transaction on RW clone %u\n");
 
            if (rwcrdate > clcrdate)
-               fullrelease = 2;/* Do a full release if RO clone older than RW */
+               complete_release |= CR_NEW_RW; /* Do a complete release if RO clone older than RW */
        }
     }
 
+    if (!complete_release || (complete_release & CR_NEW_RW)) {
+       /* in case the RW has changed, and just to be safe */
+       justnewsites = 0;
+    }
+
     if (verbose) {
-       switch (fullrelease) {
-           case 2:
-               fprintf(STDOUT, "RW %lu changed, doing a complete release\n",
-                       (unsigned long)afromvol);
-               break;
-           case 1:
-               fprintf(STDOUT, "This is a complete release of volume %lu\n",
-                       (unsigned long)afromvol);
-               break;
-           case 0:
-               fprintf(STDOUT, "This is a completion of a previous release\n");
-               break;
+       if (!complete_release) {
+           fprintf(STDOUT,
+                   "This is a recovery of previously failed release\n");
+       } else {
+           fprintf(STDOUT, "This is a complete release of volume %u", afromvol);
+           /* Give the reasons for a complete release, except if only CR_LAST_OK. */
+           if (complete_release != CR_LAST_OK) {
+               char *sep = " (";
+               if (complete_release & CR_FORCED) {
+                   fprintf(STDOUT, "%sforced", sep);
+                   sep = ", ";
+               }
+               if (complete_release & CR_LAST_OK) {
+                   fprintf(STDOUT, "%slast ok", sep);
+                   sep = ", ";
+               }
+               if (complete_release & CR_ALL_NEW) {
+                   fprintf(STDOUT, "%sall sites are new", sep);
+                   sep = ", ";
+               }
+               if (complete_release & CR_NEW_RW) {
+                   fprintf(STDOUT, "%srw %u changed", sep, afromvol);
+                   sep = ", ";
+               }
+               if (complete_release & CR_RO_MISSING) {
+                   fprintf(STDOUT, "%sro clone missing", sep);
+               }
+               fprintf(STDOUT, ")");
+           }
+           fprintf(STDOUT, "\n");
+           if (justnewsites) {
+               tried_justnewsites = 1;
+               fprintf(STDOUT, "There are new RO sites; we will try to "
+                       "only release to new sites\n");
+           }
        }
     }
 
-    if (fullrelease) {
+    if (complete_release) {
+       afs_int32 oldest = 0;
        /* If the RO clone exists, then if the clone is a temporary
         * clone, delete it. Or if the RO clone is marked RO_DONTUSE
         * (it was recently added), then also delete it. We do not
@@ -3503,16 +3778,108 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
            roexists = 0;
        }
 
+       if (justnewsites) {
+           VPRINT("Querying old RO sites for update times...");
+           for (vldbindex = 0; vldbindex < entry.nServers; vldbindex++) {
+               volEntries volumeInfo;
+               struct rx_connection *conn;
+               afs_int32 crdate;
+
+               if (!(entry.serverFlags[vldbindex] & ITSROVOL)) {
+                   continue;
+               }
+               if ((entry.serverFlags[vldbindex] & RO_DONTUSE)) {
+                   continue;
+               }
+               conn = UV_Bind(entry.serverNumber[vldbindex], AFSCONF_VOLUMEPORT);
+               if (!conn) {
+                   fprintf(STDERR, "Cannot establish connection to server %s\n",
+                                   hostutil_GetNameByINet(entry.serverNumber[vldbindex]));
+                   justnewsites = 0;
+                   break;
+               }
+               volumeInfo.volEntries_val = NULL;
+               volumeInfo.volEntries_len = 0;
+               code = AFSVolListOneVolume(conn, entry.serverPartition[vldbindex],
+                                          entry.volumeId[ROVOL],
+                                          &volumeInfo);
+               if (code) {
+                   fprintf(STDERR, "Could not fetch information about RO vol %lu from server %s\n",
+                                   (unsigned long)entry.volumeId[ROVOL],
+                                   hostutil_GetNameByINet(entry.serverNumber[vldbindex]));
+                   PrintError("", code);
+                   justnewsites = 0;
+                   rx_DestroyConnection(conn);
+                   break;
+               }
+
+               crdate = CLOCKADJ(volumeInfo.volEntries_val[0].creationDate);
+
+               if (oldest == 0 || crdate < oldest) {
+                   oldest = crdate;
+               }
+
+               rx_DestroyConnection(conn);
+               free(volumeInfo.volEntries_val);
+               volumeInfo.volEntries_val = NULL;
+               volumeInfo.volEntries_len = 0;
+           }
+           VDONE;
+       }
+       if (justnewsites) {
+           volEntries volumeInfo;
+           volumeInfo.volEntries_val = NULL;
+           volumeInfo.volEntries_len = 0;
+           code = AFSVolListOneVolume(fromconn, afrompart, afromvol,
+                                      &volumeInfo);
+           if (code) {
+               fprintf(STDERR, "Could not fetch information about RW vol %lu from server %s\n",
+                               (unsigned long)afromvol,
+                               hostutil_GetNameByINet(afromserver));
+               PrintError("", code);
+               justnewsites = 0;
+           } else {
+               rwupdate = volumeInfo.volEntries_val[0].updateDate;
+
+               free(volumeInfo.volEntries_val);
+               volumeInfo.volEntries_val = NULL;
+               volumeInfo.volEntries_len = 0;
+           }
+       }
+       if (justnewsites && oldest <= rwupdate) {
+           /* RW has changed */
+           justnewsites = 0;
+       }
+
        /* Mark all the ROs in the VLDB entry as RO_DONTUSE. We don't
         * write this entry out to the vlserver until after the first
         * RO volume is released (temp RO clones don't count).
+        *
+        * If 'justnewsites' is set, we're only updating sites that have
+        * RO_DONTUSE set, so set NEW_REPSITE for all of the others.
         */
        for (i = 0; i < entry.nServers; i++) {
-           entry.serverFlags[i] &= ~NEW_REPSITE;
-           entry.serverFlags[i] |= RO_DONTUSE;
+           if (justnewsites) {
+               if ((entry.serverFlags[i] & RO_DONTUSE)) {
+                   entry.serverFlags[i] &= ~NEW_REPSITE;
+               } else {
+                   entry.serverFlags[i] |= NEW_REPSITE;
+               }
+           } else {
+               entry.serverFlags[i] &= ~NEW_REPSITE;
+               entry.serverFlags[i] |= RO_DONTUSE;
+           }
        }
        entry.serverFlags[rwindex] |= NEW_REPSITE;
        entry.serverFlags[rwindex] &= ~RO_DONTUSE;
+    }
+
+    if (justnewsites && roexists) {
+       /* if 'justnewsites' and 'roexists' are set, we don't need to do
+        * anything with the RO clone, so skip the reclone */
+       /* noop */
+
+    } else if (complete_release) {
 
        if (roclone) {
            strcpy(vname, entry.name);
@@ -3532,6 +3899,17 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
            goto rfail;
        }
 
+       if (justnewsites && rwupdate != volstatus.updateDate) {
+           justnewsites = 0;
+           /* reset the serverFlags as if 'justnewsites' had never been set */
+           for (i = 0; i < entry.nServers; i++) {
+               entry.serverFlags[i] &= ~NEW_REPSITE;
+               entry.serverFlags[i] |= RO_DONTUSE;
+           }
+           entry.serverFlags[rwindex] |= NEW_REPSITE;
+           entry.serverFlags[rwindex] &= ~RO_DONTUSE;
+       }
+
        rwcrdate = volstatus.creationDate;
 
        /* Remember clone volume ID in case we fail or are interrupted */
@@ -3566,7 +3944,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
             * There is a fix in the 3.4 client that does not need this sleep
             * anymore, but we don't know what clients we have.
             */
-           if (entry.nServers > 2)
+           if (entry.nServers > 2 && !justnewsites)
                sleep(5);
 
            /* Mark the RO clone in the VLDB as a good site (already released) */
@@ -3590,6 +3968,14 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        }
     }
 
+    if (justnewsites) {
+       VPRINT("RW vol has not changed; only releasing to new RO sites\n");
+       /* act like this is a completion of a previous release */
+       complete_release = CR_RECOVER;
+    } else if (tried_justnewsites) {
+       VPRINT("RW vol has changed; releasing to all sites\n");
+    }
+
     /* Now we will release from the clone to the remaining RO replicas.
      * The first 2 ROs (counting the non-temporary RO clone) are released
      * individually: releasecount. This is to reduce the race condition
@@ -3610,27 +3996,18 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     cookie.clone = 0;
 
     /* how many to do at once, excluding clone */
-    if (stayUp)
+    if (stayUp || justnewsites)
        nservers = entry.nServers; /* can do all, none offline */
     else
        nservers = entry.nServers / 2;
-    replicas =
-       (struct replica *)malloc(sizeof(struct replica) * nservers + 1);
-    times = (struct release *)malloc(sizeof(struct release) * nservers + 1);
-    toconns =
-       (struct rx_connection **)malloc(sizeof(struct rx_connection *) *
-                                       nservers + 1);
-    results.manyResults_val =
-       (afs_int32 *) malloc(sizeof(afs_int32) * nservers + 1);
+    replicas = calloc(nservers + 1, sizeof(struct replica));
+    times = calloc(nservers + 1, sizeof(struct release));
+    toconns = calloc(nservers + 1, sizeof(struct rx_connection *));
+    results.manyResults_val = calloc(nservers + 1, sizeof(afs_int32));
     if (!replicas || !times || !results.manyResults_val || !toconns)
        ONERROR0(ENOMEM,
                "Failed to create transaction on the release clone\n");
 
-    memset(replicas, 0, (sizeof(struct replica) * nservers + 1));
-    memset(times, 0, (sizeof(struct release) * nservers + 1));
-    memset(toconns, 0, (sizeof(struct rx_connection *) * nservers + 1));
-    memset(results.manyResults_val, 0, (sizeof(afs_int32) * nservers + 1));
-
     /* Create a transaction on the cloned volume */
     VPRINT1("Starting transaction on cloned volume %u...", cloneVolId);
     code =
@@ -3639,14 +4016,14 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        memset(&orig_status, 0, sizeof(orig_status));
        code = AFSVolGetStatus(fromconn, fromtid, &orig_status);
     }
-    if (!fullrelease && code)
+    if (!complete_release && code)
        ONERROR(VOLSERNOVOL, afromvol,
                "Old clone is inaccessible. Try vos release -f %u.\n");
     ONERROR0(code, "Failed to create transaction on the release clone\n");
     VDONE;
 
     /* if we have a clone, treat this as done, for now */
-    if (stayUp && !fullrelease) {
+    if (stayUp && !complete_release) {
        entry.serverFlags[roindex] |= NEW_REPSITE;
        entry.serverFlags[roindex] &= ~RO_DONTUSE;
        entry.flags |= RO_EXISTS;
@@ -3660,7 +4037,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        for (volcount = 0;
             ((volcount < nservers) && (vldbindex < entry.nServers));
             vldbindex++) {
-           if (!stayUp) {
+           if (!stayUp && !justnewsites) {
                /* The first two RO volumes will be released individually.
                 * The rest are then released in parallel. This is a hack
                 * for clients not recognizing right away when a RO volume
@@ -3700,15 +4077,28 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
                continue;
 
            /* Thisdate is the date from which we want to pick up all changes */
-           if (forceflag || !fullrelease
-               || (rwcrdate > times[volcount].crtime)) {
-               /* If the forceflag is set, then we want to do a full dump.
-                * If it's not a full release, we can't be sure that the creation
-                *  date is good (so we also do a full dump).
-                * If the RW volume was replaced (its creation date is newer than
-                *  the last release), then we can't be sure what has changed (so
-                *  we do a full dump).
+           if (flags & REL_FULLDUMPS) {
+               /* Do a full dump when forced by the caller. */
+               VPRINT("This will be a full dump: forced\n");
+               thisdate = 0;
+           } else if (!complete_release) {
+               /* If this release is a recovery of a failed release, we can't be
+                * sure the creation date is good, so do a full dump.
+                */
+               VPRINT("This will be a full dump: previous release failed\n");
+               thisdate = 0;
+           } else if (times[volcount].crtime == 0) {
+               /* A full dump is needed for a new read-only volume. */
+               VPRINT
+                   ("This will be a full dump: read-only volume needs to be created\n");
+               thisdate = 0;
+           } else if ((rwcrdate > times[volcount].crtime)) {
+               /* If the RW volume was replaced (its creation date is newer than
+                * the last release), then we can't be sure what has changed (so
+                * we do a full dump).
                 */
+               VPRINT
+                   ("This will be a full dump: read-write volume was replaced\n");
                thisdate = 0;
            } else if (remembertime[vldbindex].validtime) {
                /* Trans was prev ended. Use the time from the prev trans
@@ -3771,7 +4161,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
            }
 
            if (fromdate == 0)
-               fprintf(STDOUT, " (full release)");
+               fprintf(STDOUT, " (entire volume)");
            else {
                tmv = fromdate;
                fprintf(STDOUT, " (as of %.24s)", ctime(&tmv));
@@ -4162,7 +4552,7 @@ UV_DumpVolume(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
     struct rx_connection * volatile fromconn = NULL;
     afs_int32 volatile fromtid = 0;
 
-    afs_int32 rxError = 0, rcode = 0;
+    afs_int32 rcode = 0;
     afs_int32 code, error = 0;
     afs_int32 tmp;
     time_t tmv = fromdate;
@@ -4212,7 +4602,7 @@ UV_DumpVolume(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
 
   error_exit:
     if (fromcall) {
-       code = rx_EndCall(fromcall, rxError);
+       code = rx_EndCall(fromcall, 0);
        if (code && code != RXGEN_OPCODE)
            fprintf(STDERR, "Error in rx_EndCall\n");
        if (code && !error)
@@ -4257,7 +4647,7 @@ UV_DumpClonedVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     afs_uint32 volatile clonevol = 0;
 
     afs_int32 tmp;
-    afs_int32 fromtid = 0, rxError = 0, rcode = 0;
+    afs_int32 fromtid = 0, rcode = 0;
     afs_int32 code = 0, error = 0;
     afs_uint32 tmpVol;
     char vname[64];
@@ -4365,7 +4755,7 @@ UV_DumpClonedVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     }
 
     if (fromcall) {
-       code = rx_EndCall(fromcall, rxError);
+       code = rx_EndCall(fromcall, 0);
        if (code) {
            fprintf(STDERR, "Error in rx_EndCall\n");
            if (!error)
@@ -4407,14 +4797,12 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
     struct rx_connection *toconn, *tempconn;
     struct rx_call *tocall;
     afs_int32 totid, code, rcode, vcode, terror = 0;
-    afs_int32 rxError = 0;
     struct volser_status tstatus;
     struct volintInfo vinfo;
     char partName[10];
     char tovolreal[VOLSER_OLDMAXVOLNAME];
     afs_uint32 pvolid;
     afs_int32 temptid, pparentid;
-    int success;
     struct nvldbentry entry, storeEntry;
     afs_int32 error;
     int islocked;
@@ -4428,11 +4816,9 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
 
     memset(&cookie, 0, sizeof(cookie));
     islocked = 0;
-    success = 0;
     error = 0;
     reuseID = 1;
     tocall = (struct rx_call *)0;
-    toconn = (struct rx_connection *)0;
     tempconn = (struct rx_connection *)0;
     totid = 0;
     temptid = 0;
@@ -4556,7 +4942,7 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
        error = code;
        goto refail;
     }
-    terror = rx_EndCall(tocall, rxError);
+    terror = rx_EndCall(tocall, 0);
     tocall = (struct rx_call *)0;
     if (terror) {
        fprintf(STDERR, "rx_EndCall Failed \n");
@@ -4629,10 +5015,9 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
        goto refail;
     }
 
-    success = 1;
     fprintf(STDOUT, " done\n");
     fflush(STDOUT);
-    if (success && (!reuseID || (flags & RV_FULLRST))) {
+    if (!reuseID || (flags & RV_FULLRST)) {
        /* Volume was restored on the file server, update the
         * VLDB to reflect the change.
         */
@@ -4781,7 +5166,7 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
     }
   refail:
     if (tocall) {
-       code = rx_EndCall(tocall, rxError);
+       code = rx_EndCall(tocall, 0);
        if (!error)
            error = code;
     }
@@ -5207,9 +5592,7 @@ UV_ZapVolumeClones(afs_uint32 aserver, afs_int32 apart,
     int curPos;
     afs_int32 code = 0;
     afs_int32 success = 1;
-    afs_int32 tid;
 
-    aconn = (struct rx_connection *)0;
     aconn = UV_Bind(aserver, AFSCONF_VOLUMEPORT);
     curPos = 0;
     for (curPtr = volPtr; curPos < arraySize; curPtr++) {
@@ -5231,7 +5614,6 @@ UV_ZapVolumeClones(afs_uint32 aserver, afs_int32 apart,
                VPRINT2("Clone of %s %u deleted\n", curPtr->volName,
                        curPtr->volCloneId);
            curPos++;
-           tid = 0;
        }
     }
     if (aconn)
@@ -5255,7 +5637,6 @@ UV_GenerateVolumeClones(afs_uint32 aserver, afs_int32 apart,
     afs_uint32 curCloneId = 0;
     char cloneName[256];       /*max vol name */
 
-    aconn = (struct rx_connection *)0;
     aconn = UV_Bind(aserver, AFSCONF_VOLUMEPORT);
     curPos = 0;
     if ((volPtr->volFlags & REUSECLONEID) && (volPtr->volFlags & ENTRYVALID))
@@ -5407,7 +5788,6 @@ UV_XListVolumes(afs_uint32 a_serverID, afs_int32 a_partID, int a_all,
      * We set the val field to a null pointer as a hint for the stub to
      * allocate space.
      */
-    code = 0;
     *a_numEntsInResultP = 0;
     *a_resultPP = (volintXInfo *) 0;
     volumeXInfo.volXEntries_val = (volintXInfo *) 0;
@@ -5449,8 +5829,6 @@ UV_ListOneVolume(afs_uint32 aserver, afs_int32 apart, afs_uint32 volid,
     afs_int32 code = 0;
     volEntries volumeInfo;
 
-    code = 0;
-
     *resultPtr = (volintInfo *) 0;
     volumeInfo.volEntries_val = (volintInfo *) 0;      /*this hints the stub to allocate space */
     volumeInfo.volEntries_len = 0;
@@ -5513,7 +5891,6 @@ UV_XListOneVolume(afs_uint32 a_serverID, afs_int32 a_partID, afs_uint32 a_volID,
      * the info.  Setting the val field to a null pointer tells the stub
      * to allocate space for us.
      */
-    code = 0;
     *a_resultPP = (volintXInfo *) 0;
     volumeXInfo.volXEntries_val = (volintXInfo *) 0;
     volumeXInfo.volXEntries_len = 0;
@@ -5611,7 +5988,7 @@ CheckVolume(volintInfo * volumeinfo, afs_uint32 aserver, afs_int32 apart,
        if (code) {
            if (code != VL_NOENT) {
                fprintf(STDOUT,
-                       "Could not retreive the VLDB entry for volume %lu \n",
+                       "Could not retrieve the VLDB entry for volume %lu \n",
                        (unsigned long)rwvolid);
                ERROR_EXIT(code);
            }
@@ -7151,6 +7528,9 @@ UV_VolumeZap(afs_uint32 server, afs_int32 part, afs_uint32 volid)
     aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
     error = DoVolDelete(aconn, volid, part,
                        "the", 0, NULL, NULL);
+    if (error == VNOVOL) {
+       EPRINT1(error, "Failed to start transaction on %u\n", volid);
+    }
 
     PrintError("", error);
     if (aconn)