vos: vos release -force-reclone option
[openafs.git] / src / volser / vsprocs.c
index 37b2f1d..18d47f3 100644 (file)
@@ -21,6 +21,7 @@
 #include <afs/voldefs.h>
 #include <rx/xdr.h>
 #include <rx/rx.h>
+#include <rx/rx_queue.h>
 #include <afs/vlserver.h>
 #include <afs/nfs.h>
 #include <afs/cellconfig.h>
@@ -166,11 +167,13 @@ static afs_int32 CheckAndDeleteVolume(struct rx_connection *aconn,
 static int GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
                    struct rx_connection **connPtr, afs_int32 * transPtr,
                    afs_uint32 * crtimePtr, afs_uint32 * uptimePtr,
-                   afs_int32 *origflags);
+                   afs_int32 *origflags, afs_uint32 tmpVolId);
 static int SimulateForwardMultiple(struct rx_connection *fromconn,
                                   afs_int32 fromtid, afs_int32 fromdate,
                                   manyDests * tr, afs_int32 flags,
                                   void *cookie, manyResults * results);
+static int DoVolOnline(struct nvldbentry *vldbEntryPtr, afs_uint32 avolid,
+                      int index, char *vname, struct rx_connection *connPtr);
 static int DoVolClone(struct rx_connection *aconn, afs_uint32 avolid,
                      afs_int32 apart, int type, afs_uint32 cloneid,
                      char *typestring, char *pname, char *vname, char *suffix,
@@ -640,12 +643,10 @@ UV_PartitionInfo64(afs_uint32 server, char *pname,
     struct rx_connection *aconn;
     afs_int32 code = 0;
 
-    aconn = (struct rx_connection *)0;
     aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
     code = AFSVolPartitionInfo64(aconn, pname, partition);
     if (code == RXGEN_OPCODE) {
-       struct diskPartition *dpp =
-           (struct diskPartition *)malloc(sizeof(struct diskPartition));
+       struct diskPartition *dpp = malloc(sizeof(struct diskPartition));
        code = AFSVolPartitionInfo(aconn, pname, dpp);
        if (!code) {
            strncpy(partition->name, dpp->name, 32);
@@ -720,7 +721,6 @@ UV_CreateVolume3(afs_uint32 aserver, afs_int32 apart, char *aname,
     struct volintInfo tstatus;
 
     tid = 0;
-    aconn = (struct rx_connection *)0;
     error = 0;
 
     init_volintInfo(&tstatus);
@@ -760,6 +760,12 @@ UV_CreateVolume3(afs_uint32 aserver, afs_int32 apart, char *aname,
     /* rw,ro, bk id are related in the default case */
     /* If caller specified RW id, but not RO/BK ids, have them be RW+1 and RW+2 */
     lastid = *anewid;
+    if (aroid && *aroid != 0) {
+       lastid = max(lastid, *aroid);
+    }
+    if (abkid && *abkid != 0) {
+       lastid = max(lastid, *abkid);
+    }
     if (aroid && *aroid == 0) {
        *aroid = ++lastid;
     }
@@ -993,12 +999,14 @@ UV_DeleteVolume(afs_uint32 aserver, afs_int32 apart, afs_uint32 avolid)
            ERROR_EXIT(0);
        }
 
-       /* Delete backup if it exists */
-       code = DoVolDelete(aconn, entry.volumeId[BACKVOL], apart,
-                          "the backup", 0, NULL, NULL);
-       if (code && code != VNOVOL) {
-           error = code;
-           goto error_exit;
+       if (entry.volumeId[BACKVOL]) {
+           /* Delete backup if it exists */
+           code = DoVolDelete(aconn, entry.volumeId[BACKVOL], apart,
+                              "the backup", 0, NULL, NULL);
+           if (code && code != VNOVOL) {
+               error = code;
+               goto error_exit;
+           }
        }
 
        if (verbose)
@@ -1163,6 +1171,12 @@ DoVolDelete(struct rx_connection *aconn, afs_uint32 avolid,
     code =
        AFSVolTransCreate_retry(aconn, avolid, apart, ITOffline, &ttid);
 
+    /* return early and quietly for VNOVOL; don't continue the attempt to delete. */
+    if (code == VNOVOL) {
+       error = code;
+       goto dfail;
+    }
+
     EGOTO2(dfail, code, "%sFailed to start transaction on %u\n",
           prefix, avolid);
 
@@ -1225,7 +1239,7 @@ DoVolClone(struct rx_connection *aconn, afs_uint32 avolid,
     code = AFSVolTransCreate_retry(aconn, cloneid, apart, ITOffline, &btid);
     if (code) {
         if (code != VNOVOL) {
-            fprintf(STDERR, "Could not reach the %s volume %lu\n",
+           EPRINT2(code, "Could not reach the %s volume %lu\n",
                     typestring, (unsigned long)cloneid);
             error = code;
             goto cfail;
@@ -1263,7 +1277,7 @@ DoVolClone(struct rx_connection *aconn, afs_uint32 avolid,
 
         code = AFSVolReClone(aconn, ttid, cloneid);
         if (code) {
-            fprintf(STDERR, "Could not re-clone %s volume %lu\n",
+            EPRINT2(code, "Could not re-clone %s volume %lu\n",
                     typestring, (unsigned long)cloneid);
             error = code;
             goto cfail;
@@ -1286,6 +1300,8 @@ DoVolClone(struct rx_connection *aconn, afs_uint32 avolid,
         }
     }
 
+    VDONE;
+
     if (volstatus) {
        VPRINT1("Getting status of parent volume %u...", avolid);
        code = AFSVolGetStatus(aconn, ttid, volstatus);
@@ -1298,23 +1314,6 @@ DoVolClone(struct rx_connection *aconn, afs_uint32 avolid,
        VDONE;
     }
 
-    if (transPtr) {
-       *transPtr = ttid;
-    } else {
-       /* End the transaction on the RW volume */
-       code = AFSVolEndTrans(aconn, ttid, &rcode);
-       ttid = 0;
-       if (code || rcode) {
-           fprintf(STDERR,
-                   "Failed to end the transaction on the parent volume %lu\n",
-                   (unsigned long)avolid);
-           error = (code ? code : rcode);
-           goto cfail;
-       }
-    }
-    /* so we don't wipe ttid */
-    return error;
-
 cfail:
     if (ttid) {
         code = AFSVolEndTrans(aconn, ttid, &rcode);
@@ -1330,7 +1329,7 @@ cfail:
         code = AFSVolEndTrans(aconn, btid, &rcode);
         if (code || rcode) {
             fprintf(STDERR,
-                    "Could not end transaction the %s volume %lu\n",
+                    "Could not end transaction on the %s volume %lu\n",
                     typestring, (unsigned long)cloneid);
             if (!error)
                 error = (code ? code : rcode);
@@ -1339,6 +1338,159 @@ cfail:
     return error;
 }
 
+/* Convert volume from RO to RW; adjust the VLDB entry to match.
+ * The nvldbentry passed to us has already been MapHostToNetwork'd
+ * by the caller.
+ */
+
+int
+UV_ConvertRO(afs_uint32 server, afs_uint32 partition, afs_uint32 volid,
+               struct nvldbentry *entry)
+{
+    afs_int32 code, i, same;
+    struct nvldbentry checkEntry, storeEntry;
+    afs_int32 vcode;
+    afs_int32 rwindex = 0;
+    afs_uint32 rwserver = 0;
+    afs_int32 roindex = 0;
+    afs_uint32 roserver = 0;
+    struct rx_connection *aconn;
+
+    vcode =
+       ubik_VL_SetLock(cstruct, 0, entry->volumeId[RWVOL], RWVOL,
+                 VLOP_MOVE);
+    if (vcode) {
+       fprintf(STDERR,
+               "Unable to lock volume %lu, code %d\n",
+               (unsigned long)entry->volumeId[RWVOL],vcode);
+       PrintError("", vcode);
+       return -1;
+    }
+
+    /* make sure the VLDB entry hasn't changed since we started */
+    memset(&checkEntry, 0, sizeof(checkEntry));
+    vcode = VLDB_GetEntryByID(volid, -1, &checkEntry);
+    if (vcode) {
+       fprintf(STDERR,
+                "Could not fetch the entry for volume %lu from VLDB\n",
+                (unsigned long)volid);
+       PrintError("convertROtoRW ", vcode);
+       code = vcode;
+       goto error_exit;
+    }
+
+    MapHostToNetwork(&checkEntry);
+    entry->flags &= ~VLOP_ALLOPERS;  /* clear any stale lock operation flags */
+    entry->flags |= VLOP_MOVE;        /* set to match SetLock operation above */
+    if (memcmp(entry, &checkEntry, sizeof(*entry)) != 0) {
+        fprintf(STDERR,
+                "VLDB entry for volume %lu has changed; please reissue the command.\n",
+                (unsigned long)volid);
+        code = -1;
+        goto error_exit;
+    }
+
+    /* extract information from the original entry */
+    for (i = 0; i < entry->nServers; i++) {
+       if (entry->serverFlags[i] & ITSRWVOL) {
+           rwindex = i;
+           rwserver = entry->serverNumber[i];
+       /*  rwpartition = entry->serverPartition[i]; */
+           if (roserver)
+               break;
+       } else if ((entry->serverFlags[i] & ITSROVOL) && !roserver) {
+           same = VLDB_IsSameAddrs(server, entry->serverNumber[i], &code);
+           if (code) {
+               fprintf(STDERR,
+                       "Failed to get info about server's %d address(es) from vlserver (err=%d); aborting call!\n",
+                       server, code);
+               code = ENOENT;
+               goto error_exit;
+           }
+           if (same) {
+               roindex = i;
+               roserver = entry->serverNumber[i];
+       /*      ropartition = entry->serverPartition[i]; */
+               if (rwserver)
+                    break;
+           }
+       }
+    }
+
+    aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
+    code = AFSVolConvertROtoRWvolume(aconn, partition, volid);
+    if (code) {
+       fprintf(STDERR,
+               "Converting RO volume %lu to RW volume failed with code %d\n",
+               (unsigned long)volid, code);
+       PrintError("convertROtoRW ", code);
+       goto error_exit;
+    }
+    /* Update the VLDB to match what we did on disk as much as possible.  */
+    /* If the converted RO was in the VLDB, make it look like the new RW. */
+    if (roserver) {
+       entry->serverFlags[roindex] = ITSRWVOL;
+    } else {
+       /* Add a new site entry for the newly created RW.  It's possible
+        * (but unlikely) that we are already at MAXNSERVERS and that this
+        * new site will invalidate the whole VLDB entry;  however,
+        * VLDB_ReplaceEntry will detect this and return VL_BADSERVER,
+        * so we need no extra guard logic here.
+        */
+       afs_int32 newrwindex = entry->nServers;
+       (entry->nServers)++;
+       entry->serverNumber[newrwindex] = server;
+       entry->serverPartition[newrwindex] = partition;
+       entry->serverFlags[newrwindex] = ITSRWVOL;
+    }
+    entry->flags |= RW_EXISTS;
+    entry->flags &= ~BACK_EXISTS;
+
+    /* if the old RW was in the VLDB, remove it by decrementing the number */
+    /* of servers, replacing the RW entry with the last entry, and zeroing */
+    /* out the last entry. */
+    if (rwserver) {
+       (entry->nServers)--;
+       if (rwindex != entry->nServers) {
+           entry->serverNumber[rwindex] = entry->serverNumber[entry->nServers];
+           entry->serverPartition[rwindex] =
+               entry->serverPartition[entry->nServers];
+           entry->serverFlags[rwindex] = entry->serverFlags[entry->nServers];
+           entry->serverNumber[entry->nServers] = 0;
+           entry->serverPartition[entry->nServers] = 0;
+           entry->serverFlags[entry->nServers] = 0;
+       }
+    }
+    entry->flags &= ~RO_EXISTS;
+    for (i = 0; i < entry->nServers; i++) {
+       if (entry->serverFlags[i] & ITSROVOL) {
+           if (!(entry->serverFlags[i] & (RO_DONTUSE | NEW_REPSITE)))
+               entry->flags |= RO_EXISTS;
+       }
+    }
+    MapNetworkToHost(entry, &storeEntry);
+    code =
+       VLDB_ReplaceEntry(entry->volumeId[RWVOL], RWVOL, &storeEntry,
+                         (LOCKREL_OPCODE | LOCKREL_AFSID |
+                          LOCKREL_TIMESTAMP));
+    if (code) {
+       fprintf(STDERR,
+               "Warning: volume converted, but vldb update failed with code %d!\n",
+               code);
+    }
+
+  error_exit:
+    vcode = UV_LockRelease(entry->volumeId[RWVOL]);
+    if (vcode) {
+       fprintf(STDERR,
+               "Unable to unlock volume %lu, code %d\n",
+               (unsigned long)entry->volumeId[RWVOL],vcode);
+       PrintError("", vcode);
+    }
+    return code;
+}
+
+
 /* Move volume <afromvol> on <afromserver> <afrompart> to <atoserver>
  * <atopart>.  The operation is almost idempotent.  The following
  * flags are recognized:
@@ -1520,7 +1672,7 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
     pntg = 1;
     toconn = UV_Bind(atoserver, AFSCONF_VOLUMEPORT);   /* get connections to the servers */
     fromconn = UV_Bind(afromserver, AFSCONF_VOLUMEPORT);
-    fromtid = totid = 0;       /* initialize to uncreated */
+    totid = 0; /* initialize to uncreated */
 
     /* ***
      * clone the read/write volume locally.
@@ -1537,7 +1689,7 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        /* Get a clone id */
        VPRINT1("Allocating new volume id for clone of volume %u ...",
                afromvol);
-       newVol = tmpVol = 0;
+       tmpVol = 0;
        vcode = ubik_VL_GetNewVolumeId(cstruct, 0, 1, &tmpVol);
        newVol = tmpVol;
        EGOTO1(mfail, vcode,
@@ -1867,6 +2019,9 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        code = DoVolDelete(fromconn, newVol, afrompart,
                           "cloned", 0, NULL, NULL);
        if (code) {
+           if (code == VNOVOL) {
+               EPRINT1(code, "Failed to start transaction on %u\n", newVol);
+           }
            error = code;
            goto mfail;
        }
@@ -2029,9 +2184,13 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
            fflush(STDOUT);
        }
 
-       if (volid && toconn)
+       if (volid && toconn) {
            code = DoVolDelete(toconn, volid, atopart,
                               "destination", 0, NULL, "Recovery:");
+           if (code == VNOVOL) {
+               EPRINT1(code, "Recovery: Failed to start transaction on %u\n", volid);
+           }
+        }
 
        /* put source volume on-line */
        if (fromconn) {
@@ -2072,17 +2231,26 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        if (fromconn) {
            code = DoVolDelete(fromconn, backupId, afrompart,
                               "backup", 0, NULL, "Recovery:");
+           if (code == VNOVOL) {
+               EPRINT1(code, "Recovery: Failed to start transaction on %u\n", backupId);
+           }
 
            code = DoVolDelete(fromconn, afromvol, afrompart, "source",
                               (atoserver != afromserver)?atoserver:0,
-                              NULL, NULL);
+                       NULL, NULL);
+           if (code == VNOVOL) {
+               EPRINT1(code, "Failed to start transaction on %u\n", afromvol);
+           }
        }
     }
 
     /* common cleanup - delete local clone */
     if (newVol) {
        code = DoVolDelete(fromconn, newVol, afrompart,
-                          "clone", 0, NULL, "Recovery:");
+                          "clone", 0, NULL, "Recovery:");
+       if (code == VNOVOL) {
+           EPRINT1(code, "Recovery: Failed to start transaction on %u\n", newVol);
+       }
     }
 
     /* unlock VLDB entry */
@@ -2092,7 +2260,6 @@ UV_MoveVolume2(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
        ubik_VL_ReleaseLock(cstruct, 0, afromvol, -1,
                            (LOCKREL_OPCODE | LOCKREL_AFSID | LOCKREL_TIMESTAMP));
        VDONE;
-       islocked = 0;
     }
   done:                        /* routine cleanup */
     if (volName)
@@ -2461,6 +2628,9 @@ cpincr:
        code = DoVolDelete(fromconn, cloneVol, afrompart,
                           "cloned", 0, NULL, NULL);
        if (code) {
+           if (code == VNOVOL) {
+               EPRINT1(code, "Failed to start transaction on %u\n", cloneVol);
+           }
            error = code;
            goto mfail;
        }
@@ -2586,9 +2756,13 @@ cpincr:
     MapHostToNetwork(&entry);
 
     /* common cleanup - delete local clone */
-    if (cloneVol)
+    if (cloneVol) {
        code = DoVolDelete(fromconn, cloneVol, afrompart,
-                          "clone", 0, NULL, "Recovery:");
+                          "clone", 0, NULL, "Recovery:");
+       if (code == VNOVOL) {
+           EPRINT1(code, "Recovery: Failed to start transaction on %u\n", cloneVol);
+       }
+    }
 
   done:                        /* routine cleanup */
     if (fromconn)
@@ -2956,7 +3130,7 @@ static int
 GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
         struct rx_connection **connPtr, afs_int32 * transPtr,
         afs_uint32 * crtimePtr, afs_uint32 * uptimePtr,
-        afs_int32 *origflags)
+        afs_int32 *origflags, afs_uint32 tmpVolId)
 {
     afs_uint32 volid;
     struct volser_status tstatus;
@@ -3023,7 +3197,7 @@ GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
 
     /* If the volume does not exist, create it */
     if (!volid || code) {
-       char volname[64];
+       char volname[VL_MAXNAMELEN];
         char hoststr[16];
 
        if (volid && (code != VNOVOL)) {
@@ -3032,13 +3206,20 @@ GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
            goto fail;
        }
 
-       strcpy(volname, vldbEntryPtr->name);
-       strcat(volname, ".readonly");
+       strlcpy(volname, vldbEntryPtr->name, sizeof(volname));
+
+       if (strlcat(volname,
+                   tmpVolId?".roclone":".readonly",
+                   sizeof(volname)) >= sizeof(volname)) {
+           code = ENOMEM;
+           PrintError("Volume name is too long\n", code);
+           goto fail;
+       }
 
        if (verbose) {
            fprintf(STDOUT,
                    "Creating new volume %lu on replication site %s: ",
-                   (unsigned long)volid,
+                   tmpVolId?(unsigned long)tmpVolId:(unsigned long)volid,
                     noresolve ? afs_inet_ntoa_r(vldbEntryPtr->
                                                 serverNumber[index], hoststr) :
                     hostutil_GetNameByINet(vldbEntryPtr->
@@ -3049,7 +3230,8 @@ GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
        code =
          AFSVolCreateVolume(*connPtr, vldbEntryPtr->serverPartition[index],
                             volname, volser_RO,
-                            vldbEntryPtr->volumeId[RWVOL], &volid,
+                            vldbEntryPtr->volumeId[RWVOL],
+                            tmpVolId?&tmpVolId:&volid,
                             transPtr);
        if (code) {
            PrintError("Failed to create the ro volume: ", code);
@@ -3084,6 +3266,20 @@ GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
                       code);
            goto fail;
        }
+       if (tmpVolId) {
+           code = AFSVolEndTrans(*connPtr, *transPtr, &rcode);
+           *transPtr = 0;
+           if (!code)
+               code = rcode;
+           if (!code)
+               code = DoVolClone(*connPtr, volid,
+                                 vldbEntryPtr->serverPartition[index],
+                                 readonlyVolume, tmpVolId, "temporary",
+                                 vldbEntryPtr->name, NULL, ".roclone", NULL,
+                                 transPtr);
+           if (code)
+               goto fail;
+       }
        *crtimePtr = CLOCKADJ(tstatus.creationDate);
        *uptimePtr = CLOCKADJ(tstatus.updateDate);
     }
@@ -3096,7 +3292,7 @@ GetTrans(struct nvldbentry *vldbEntryPtr, afs_int32 index,
        *transPtr = 0;
        if (!tcode)
            tcode = rcode;
-       if (tcode)
+       if (tcode && tcode != ENOENT)
            PrintError("Could not end transaction on a ro volume: ", tcode);
     }
 
@@ -3185,24 +3381,99 @@ CheckTrans(struct rx_connection *aconn, afs_int32 *atid, afs_int32 apart,
     return 0;
 }
 
+static void
+PutTrans(afs_int32 *vldbindex, struct replica *replicas,
+        struct rx_connection **toconns, struct release *times,
+        afs_int32 volcount)
+{
+    afs_int32 s, code = 0, rcode = 0;
+    /* End the transactions and destroy the connections */
+    for (s = 0; s < volcount; s++) {
+       if (replicas[s].trans) {
+           code = AFSVolEndTrans(toconns[s], replicas[s].trans, &rcode);
+
+           replicas[s].trans = 0;
+           if (!code)
+               code = rcode;
+           if (code) {
+               if ((s == 0) || (code != ENOENT)) {
+                   PrintError("Could not end transaction on a ro volume: ",
+                              code);
+               } else {
+                   PrintError
+                       ("Transaction timed out on a ro volume. Will retry.\n",
+                        0);
+                   if (times[s].vldbEntryIndex < *vldbindex)
+                       *vldbindex = times[s].vldbEntryIndex;
+               }
+           }
+       }
+       if (toconns[s])
+           rx_DestroyConnection(toconns[s]);
+       toconns[s] = 0;
+    }
+}
+
+static int
+DoVolOnline(struct nvldbentry *vldbEntryPtr, afs_uint32 avolid, int index,
+           char *vname, struct rx_connection *connPtr)
+{
+    afs_int32 code = 0, rcode = 0, onlinetid = 0;
+
+    code =
+       AFSVolTransCreate_retry(connPtr, avolid,
+                               vldbEntryPtr->serverPartition[index],
+                               ITOffline,
+                               &onlinetid);
+    if (code)
+      EPRINT(code, "Could not create transaction on readonly...\n");
+
+    else {
+       code = AFSVolSetFlags(connPtr, onlinetid, 0);
+       if (code)
+           EPRINT(code, "Could not set flags on readonly...\n");
+    }
 
-/* UV_ReleaseVolume()
- *    Release volume <afromvol> on <afromserver> <afrompart> to all
- *    its RO sites (full release). Unless the previous release was
- *    incomplete: in which case we bring the remaining incomplete
- *    volumes up to date with the volumes that were released
- *    successfully.
- *    forceflag: Performs a full release.
+    if (!code) {
+       code =
+           AFSVolSetIdsTypes(connPtr, onlinetid, vname,
+                             ROVOL, vldbEntryPtr->volumeId[RWVOL],
+                             0, 0);
+       if (code)
+           EPRINT(code, "Could not set ids on readonly...\n");
+    }
+    if (!code)
+       code = AFSVolEndTrans(connPtr, onlinetid, &rcode);
+    if (!code)
+       code = rcode;
+    return code;
+}
+
+/**
+ * Release a volume to read-only sites
+ *
+ * Release volume <afromvol> on <afromserver> <afrompart> to all
+ * its RO sites (full release). Unless the previous release was
+ * incomplete: in which case we bring the remaining incomplete
+ * volumes up to date with the volumes that were released
+ * successfully.
+ *
+ * Will create a clone from the RW, then dump the clone out to
+ * the remaining replicas. If there is more than 1 RO sites,
+ * ensure that the VLDB says at least one RO is available all
+ * the time: Influences when we write back the VLDB entry.
  *
- *    Will create a clone from the RW, then dump the clone out to
- *    the remaining replicas. If there is more than 1 RO sites,
- *    ensure that the VLDB says at least one RO is available all
- *    the time: Influences when we write back the VLDB entry.
+ * @param[in] afromvol      volume to be released
+ * @param[in] afromserver   server containing afromvol
+ * @param[in] afrompart     partition containing afromvol
+ * @param[in] flags         bitmap of options
+ *                            REL_COMPLETE  - force a complete release
+ *                            REL_FULLDUMPS - force full dumps
+ *                            REL_STAYUP    - dump to clones to avoid offline time
  */
-
 int
 UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
-                afs_int32 afrompart, int forceflag)
+                afs_int32 afrompart, int flags)
 {
     char vname[64];
     afs_int32 code = 0;
@@ -3210,7 +3481,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     afs_uint32 cloneVolId = 0, roVolId;
     struct replica *replicas = 0;
     struct nvldbentry entry, storeEntry;
-    int i, volcount, m, fullrelease, vldbindex;
+    int i, volcount = 0, m, vldbindex;
     int failure;
     struct restoreCookie cookie;
     struct rx_connection **toconns = 0;
@@ -3228,7 +3499,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     manyDests tr;
     manyResults results;
     int rwindex, roindex, roclone, roexists;
-    afs_uint32 rwcrdate = 0;
+    afs_uint32 rwcrdate = 0, rwupdate = 0;
     afs_uint32 clcrdate;
     struct rtime {
        int validtime;
@@ -3239,6 +3510,23 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     char hoststr[16];
     afs_int32 origflags[NMAXNSERVERS];
     struct volser_status orig_status;
+    int notreleased = 0;
+    int tried_justnewsites = 0;
+    int justnewsites = 0; /* are we just trying to release to new RO sites? */
+    int sites = 0; /* number of ro sites */
+    int new_sites = 0; /* number of ro sites markes as new */
+    int stayUp = (flags & REL_STAYUP);
+
+    typedef enum {
+        CR_RECOVER    = 0x0000, /**< not complete: a recovery from a previous failed release */
+        CR_FORCED     = 0x0001, /**< complete: forced by caller */
+        CR_LAST_OK    = 0x0002, /**< complete: no sites have been marked as new release */
+        CR_ALL_NEW    = 0x0004, /**< complete: all sites have been marked as new release */
+        CR_NEW_RW     = 0x0008, /**< complete: read-write has changed */
+        CR_RO_MISSING = 0x0010, /**< complete: ro clone is missing */
+    } complete_release_t;
+
+    complete_release_t complete_release = CR_RECOVER;
 
     memset(remembertime, 0, sizeof(remembertime));
     memset(&results, 0, sizeof(results));
@@ -3295,33 +3583,86 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        ONERROR(vcode, entry.name, "Could not update vldb entry for %s.\n");
     }
 
-    /* Will we be completing a previously unfinished release. -force overrides */
-    for (s = 0, m = 0, fullrelease=0, i=0; (i<entry.nServers); i++) {
+    /*
+     * Determine if this is to be a complete release or a recovery of a
+     * previous unfinished release. The previous release is considered to be
+     * unfinished when the clone was successfully distributed to at least one
+     * (but not all) of the read-only sites, as indicated by the NEW_REPSITE
+     * vldb flags.
+     *
+     * The caller can override the vldb flags check using the -force
+     * or -force-reclone flag, to force this to be a complete release.
+     */
+    for (i = 0; i < entry.nServers; i++) {
        if (entry.serverFlags[i] & ITSROVOL) {
-           m++;
-           if (entry.serverFlags[i] & NEW_REPSITE) s++;
+           sites++;
+           if (entry.serverFlags[i] & NEW_REPSITE)
+               new_sites++;
+           if (entry.serverFlags[i] & RO_DONTUSE)
+               notreleased++;
        }
        origflags[i] = entry.serverFlags[i];
     }
-    if ((forceflag && !fullrelease) || (s == m) || (s == 0))
-       fullrelease = 1;
+
+    if (flags & REL_COMPLETE) {
+       complete_release |= CR_FORCED;
+    }
+
+    if (new_sites == 0) {
+       complete_release |= CR_LAST_OK;
+    } else if (new_sites == sites) {
+       complete_release |= CR_ALL_NEW;
+    }
+
+    if ((complete_release & (CR_LAST_OK | CR_ALL_NEW))
+       && !(complete_release & CR_FORCED)) {
+       if (notreleased && notreleased != sites) {
+           /* we have some new unreleased sites. try to just release to those,
+            * if the RW has not changed. The caller can override with -force
+            * or with -force-reclone. */
+           justnewsites = 1;
+       }
+    }
 
     /* Determine which volume id to use and see if it exists */
-    cloneVolId =
-       ((fullrelease
-         || (entry.cloneId == 0)) ? entry.volumeId[ROVOL] : entry.cloneId);
+    cloneVolId = (complete_release || entry.cloneId == 0)
+                 ? entry.volumeId[ROVOL] : entry.cloneId;
+
     code = VolumeExists(afromserver, afrompart, cloneVolId);
     roexists = ((code == ENODEV) ? 0 : 1);
 
+    /* For stayUp case, if roclone is the only site, bypass special handling */
+    if (stayUp && roclone) {
+       int e;
+       error = 0;
+
+       for (e = 0; (e < entry.nServers) && !error; e++) {
+           if ((entry.serverFlags[e] & ITSROVOL)) {
+               if (!(VLDB_IsSameAddrs(entry.serverNumber[e], afromserver,
+                                      &error)))
+                   break;
+           }
+       }
+       if (e >= entry.nServers)
+           stayUp = 0;
+    }
+
+    /* If we had a previous release to complete, do so, else: */
+    if (stayUp && (cloneVolId == entry.volumeId[ROVOL])) {
+       code = ubik_VL_GetNewVolumeId(cstruct, 0, 1, &cloneVolId);
+       ONERROR(code, afromvol,
+               "Cannot get temporary clone id for volume %u\n");
+    }
+
     fromconn = UV_Bind(afromserver, AFSCONF_VOLUMEPORT);
     if (!fromconn)
        ONERROR(-1, afromserver,
                "Cannot establish connection with server 0x%x\n");
 
-    if (!fullrelease) {
-       if (!roexists)
-           fullrelease = 1;    /* Do a full release if RO clone does not exist */
-       else {
+    if (!complete_release) {
+       if (!roexists) {
+           complete_release |= CR_RO_MISSING;  /* Do a complete release if RO clone does not exist */
+       } else {
            /* Begin transaction on RW and mark it busy while we query it */
            code = AFSVolTransCreate_retry(
                        fromconn, afromvol, afrompart, ITBusy, &fromtid
@@ -3361,73 +3702,220 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
                    "Failed to end transaction on RW clone %u\n");
 
            if (rwcrdate > clcrdate)
-               fullrelease = 2;/* Do a full release if RO clone older than RW */
+               complete_release |= CR_NEW_RW; /* Do a complete release if RO clone older than RW */
        }
     }
 
+    if (!complete_release || (complete_release & CR_NEW_RW)) {
+       /* in case the RW has changed, and just to be safe */
+       justnewsites = 0;
+    }
+
     if (verbose) {
-       switch (fullrelease) {
-           case 2:
-               fprintf(STDOUT, "RW %lu changed, doing a complete release\n",
-                       (unsigned long)afromvol);
-               break;
-           case 1:
-               fprintf(STDOUT, "This is a complete release of volume %lu\n",
-                       (unsigned long)afromvol);
-               break;
-           case 0:
-               fprintf(STDOUT, "This is a completion of a previous release\n");
-               break;
+       if (!complete_release) {
+           fprintf(STDOUT,
+                   "This is a recovery of previously failed release\n");
+       } else {
+           fprintf(STDOUT, "This is a complete release of volume %u", afromvol);
+           /* Give the reasons for a complete release, except if only CR_LAST_OK. */
+           if (complete_release != CR_LAST_OK) {
+               char *sep = " (";
+               if (complete_release & CR_FORCED) {
+                   fprintf(STDOUT, "%sforced", sep);
+                   sep = ", ";
+               }
+               if (complete_release & CR_LAST_OK) {
+                   fprintf(STDOUT, "%slast ok", sep);
+                   sep = ", ";
+               }
+               if (complete_release & CR_ALL_NEW) {
+                   fprintf(STDOUT, "%sall sites are new", sep);
+                   sep = ", ";
+               }
+               if (complete_release & CR_NEW_RW) {
+                   fprintf(STDOUT, "%srw %u changed", sep, afromvol);
+                   sep = ", ";
+               }
+               if (complete_release & CR_RO_MISSING) {
+                   fprintf(STDOUT, "%sro clone missing", sep);
+               }
+               fprintf(STDOUT, ")");
+           }
+           fprintf(STDOUT, "\n");
+           if (justnewsites) {
+               tried_justnewsites = 1;
+               fprintf(STDOUT, "There are new RO sites; we will try to "
+                       "only release to new sites\n");
+           }
        }
     }
 
-    if (fullrelease) {
+    if (complete_release) {
+       afs_int32 oldest = 0;
        /* If the RO clone exists, then if the clone is a temporary
         * clone, delete it. Or if the RO clone is marked RO_DONTUSE
         * (it was recently added), then also delete it. We do not
         * want to "reclone" a temporary RO clone.
         */
+       if (stayUp) {
+           code = VolumeExists(afromserver, afrompart, cloneVolId);
+           if (!code) {
+               code = DoVolDelete(fromconn, cloneVolId, afrompart, "previous clone", 0,
+                                  NULL, NULL);
+               if (code && (code != VNOVOL))
+                   ERROREXIT(code);
+               VDONE;
+           }
+       }
+       /* clean up any previous tmp clone before starting if staying up */
        if (roexists
            && (!roclone || (entry.serverFlags[roindex] & RO_DONTUSE))) {
-           code = DoVolDelete(fromconn, cloneVolId, afrompart, "the", 0,
-                              NULL, NULL);
+           code = DoVolDelete(fromconn,
+                              stayUp ? entry.volumeId[ROVOL] : cloneVolId,
+                              afrompart, "the", 0, NULL, NULL);
            if (code && (code != VNOVOL))
                ERROREXIT(code);
            roexists = 0;
        }
 
+       if (justnewsites) {
+           VPRINT("Querying old RO sites for update times...");
+           for (vldbindex = 0; vldbindex < entry.nServers; vldbindex++) {
+               volEntries volumeInfo;
+               struct rx_connection *conn;
+               afs_int32 crdate;
+
+               if (!(entry.serverFlags[vldbindex] & ITSROVOL)) {
+                   continue;
+               }
+               if ((entry.serverFlags[vldbindex] & RO_DONTUSE)) {
+                   continue;
+               }
+               conn = UV_Bind(entry.serverNumber[vldbindex], AFSCONF_VOLUMEPORT);
+               if (!conn) {
+                   fprintf(STDERR, "Cannot establish connection to server %s\n",
+                                   hostutil_GetNameByINet(entry.serverNumber[vldbindex]));
+                   justnewsites = 0;
+                   break;
+               }
+               volumeInfo.volEntries_val = NULL;
+               volumeInfo.volEntries_len = 0;
+               code = AFSVolListOneVolume(conn, entry.serverPartition[vldbindex],
+                                          entry.volumeId[ROVOL],
+                                          &volumeInfo);
+               if (code) {
+                   fprintf(STDERR, "Could not fetch information about RO vol %lu from server %s\n",
+                                   (unsigned long)entry.volumeId[ROVOL],
+                                   hostutil_GetNameByINet(entry.serverNumber[vldbindex]));
+                   PrintError("", code);
+                   justnewsites = 0;
+                   rx_DestroyConnection(conn);
+                   break;
+               }
+
+               crdate = CLOCKADJ(volumeInfo.volEntries_val[0].creationDate);
+
+               if (oldest == 0 || crdate < oldest) {
+                   oldest = crdate;
+               }
+
+               rx_DestroyConnection(conn);
+               free(volumeInfo.volEntries_val);
+               volumeInfo.volEntries_val = NULL;
+               volumeInfo.volEntries_len = 0;
+           }
+           VDONE;
+       }
+       if (justnewsites) {
+           volEntries volumeInfo;
+           volumeInfo.volEntries_val = NULL;
+           volumeInfo.volEntries_len = 0;
+           code = AFSVolListOneVolume(fromconn, afrompart, afromvol,
+                                      &volumeInfo);
+           if (code) {
+               fprintf(STDERR, "Could not fetch information about RW vol %lu from server %s\n",
+                               (unsigned long)afromvol,
+                               hostutil_GetNameByINet(afromserver));
+               PrintError("", code);
+               justnewsites = 0;
+           } else {
+               rwupdate = volumeInfo.volEntries_val[0].updateDate;
+
+               free(volumeInfo.volEntries_val);
+               volumeInfo.volEntries_val = NULL;
+               volumeInfo.volEntries_len = 0;
+           }
+       }
+       if (justnewsites && oldest <= rwupdate) {
+           /* RW has changed */
+           justnewsites = 0;
+       }
+
        /* Mark all the ROs in the VLDB entry as RO_DONTUSE. We don't
         * write this entry out to the vlserver until after the first
         * RO volume is released (temp RO clones don't count).
+        *
+        * If 'justnewsites' is set, we're only updating sites that have
+        * RO_DONTUSE set, so set NEW_REPSITE for all of the others.
         */
        for (i = 0; i < entry.nServers; i++) {
-           entry.serverFlags[i] &= ~NEW_REPSITE;
-           entry.serverFlags[i] |= RO_DONTUSE;
+           if (justnewsites) {
+               if ((entry.serverFlags[i] & RO_DONTUSE)) {
+                   entry.serverFlags[i] &= ~NEW_REPSITE;
+               } else {
+                   entry.serverFlags[i] |= NEW_REPSITE;
+               }
+           } else {
+               entry.serverFlags[i] &= ~NEW_REPSITE;
+               entry.serverFlags[i] |= RO_DONTUSE;
+           }
        }
        entry.serverFlags[rwindex] |= NEW_REPSITE;
        entry.serverFlags[rwindex] &= ~RO_DONTUSE;
+    }
+
+    if (justnewsites && roexists) {
+       /* if 'justnewsites' and 'roexists' are set, we don't need to do
+        * anything with the RO clone, so skip the reclone */
+       /* noop */
+
+    } else if (complete_release) {
 
        if (roclone) {
            strcpy(vname, entry.name);
-           strcat(vname, ".readonly");
+           if (stayUp)
+               strcat(vname, ".roclone");
+           else
+               strcat(vname, ".readonly");
        } else {
            strcpy(vname, "readonly-clone-temp");
        }
 
        code = DoVolClone(fromconn, afromvol, afrompart, readonlyVolume,
-                         cloneVolId, roclone ? "permanent RO":
+                         cloneVolId, (roclone && !stayUp)?"permanent RO":
                          "temporary RO", NULL, vname, NULL, &volstatus, NULL);
        if (code) {
            error = code;
            goto rfail;
        }
 
+       if (justnewsites && rwupdate != volstatus.updateDate) {
+           justnewsites = 0;
+           /* reset the serverFlags as if 'justnewsites' had never been set */
+           for (i = 0; i < entry.nServers; i++) {
+               entry.serverFlags[i] &= ~NEW_REPSITE;
+               entry.serverFlags[i] |= RO_DONTUSE;
+           }
+           entry.serverFlags[rwindex] |= NEW_REPSITE;
+           entry.serverFlags[rwindex] &= ~RO_DONTUSE;
+       }
+
        rwcrdate = volstatus.creationDate;
 
        /* Remember clone volume ID in case we fail or are interrupted */
        entry.cloneId = cloneVolId;
 
-       if (roclone) {
+       if (roclone && !stayUp) {
            /* Bring the RO clone online - though not if it's a temporary clone */
            VPRINT1("Starting transaction on RO clone volume %u...",
                    cloneVolId);
@@ -3456,7 +3944,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
             * There is a fix in the 3.4 client that does not need this sleep
             * anymore, but we don't know what clients we have.
             */
-           if (entry.nServers > 2)
+           if (entry.nServers > 2 && !justnewsites)
                sleep(5);
 
            /* Mark the RO clone in the VLDB as a good site (already released) */
@@ -3480,6 +3968,14 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        }
     }
 
+    if (justnewsites) {
+       VPRINT("RW vol has not changed; only releasing to new RO sites\n");
+       /* act like this is a completion of a previous release */
+       complete_release = CR_RECOVER;
+    } else if (tried_justnewsites) {
+       VPRINT("RW vol has changed; releasing to all sites\n");
+    }
+
     /* Now we will release from the clone to the remaining RO replicas.
      * The first 2 ROs (counting the non-temporary RO clone) are released
      * individually: releasecount. This is to reduce the race condition
@@ -3489,31 +3985,29 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
      */
 
     strcpy(vname, entry.name);
-    strcat(vname, ".readonly");
+    if (stayUp)
+       strcat(vname, ".roclone");
+    else
+       strcat(vname, ".readonly");
     memset(&cookie, 0, sizeof(cookie));
     strncpy(cookie.name, vname, VOLSER_OLDMAXVOLNAME);
     cookie.type = ROVOL;
     cookie.parent = entry.volumeId[RWVOL];
     cookie.clone = 0;
 
-    nservers = entry.nServers / 2;     /* how many to do at once, excluding clone */
-    replicas =
-       (struct replica *)malloc(sizeof(struct replica) * nservers + 1);
-    times = (struct release *)malloc(sizeof(struct release) * nservers + 1);
-    toconns =
-       (struct rx_connection **)malloc(sizeof(struct rx_connection *) *
-                                       nservers + 1);
-    results.manyResults_val =
-       (afs_int32 *) malloc(sizeof(afs_int32) * nservers + 1);
+    /* how many to do at once, excluding clone */
+    if (stayUp || justnewsites)
+       nservers = entry.nServers; /* can do all, none offline */
+    else
+       nservers = entry.nServers / 2;
+    replicas = calloc(nservers + 1, sizeof(struct replica));
+    times = calloc(nservers + 1, sizeof(struct release));
+    toconns = calloc(nservers + 1, sizeof(struct rx_connection *));
+    results.manyResults_val = calloc(nservers + 1, sizeof(afs_int32));
     if (!replicas || !times || !results.manyResults_val || !toconns)
        ONERROR0(ENOMEM,
                "Failed to create transaction on the release clone\n");
 
-    memset(replicas, 0, (sizeof(struct replica) * nservers + 1));
-    memset(times, 0, (sizeof(struct release) * nservers + 1));
-    memset(toconns, 0, (sizeof(struct rx_connection *) * nservers + 1));
-    memset(results.manyResults_val, 0, (sizeof(afs_int32) * nservers + 1));
-
     /* Create a transaction on the cloned volume */
     VPRINT1("Starting transaction on cloned volume %u...", cloneVolId);
     code =
@@ -3522,26 +4016,36 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        memset(&orig_status, 0, sizeof(orig_status));
        code = AFSVolGetStatus(fromconn, fromtid, &orig_status);
     }
-    if (!fullrelease && code)
+    if (!complete_release && code)
        ONERROR(VOLSERNOVOL, afromvol,
                "Old clone is inaccessible. Try vos release -f %u.\n");
     ONERROR0(code, "Failed to create transaction on the release clone\n");
     VDONE;
 
+    /* if we have a clone, treat this as done, for now */
+    if (stayUp && !complete_release) {
+       entry.serverFlags[roindex] |= NEW_REPSITE;
+       entry.serverFlags[roindex] &= ~RO_DONTUSE;
+       entry.flags |= RO_EXISTS;
+
+       releasecount++;
+    }
+
     /* For each index in the VLDB */
     for (vldbindex = 0; vldbindex < entry.nServers;) {
-
-       /* Get a transaction on the replicas. Pick replacas which have an old release. */
+       /* Get a transaction on the replicas. Pick replicas which have an old release. */
        for (volcount = 0;
             ((volcount < nservers) && (vldbindex < entry.nServers));
             vldbindex++) {
-           /* The first two RO volumes will be released individually.
-            * The rest are then released in parallel. This is a hack
-            * for clients not recognizing right away when a RO volume
-            * comes back on-line.
-            */
-           if ((volcount == 1) && (releasecount < 2))
-               break;
+           if (!stayUp && !justnewsites) {
+               /* The first two RO volumes will be released individually.
+                * The rest are then released in parallel. This is a hack
+                * for clients not recognizing right away when a RO volume
+                * comes back on-line.
+                */
+               if ((volcount == 1) && (releasecount < 2))
+                   break;
+           }
 
            if (vldbindex == roindex)
                continue;       /* the clone    */
@@ -3568,20 +4072,33 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
                         &(replicas[volcount].trans),
                         &(times[volcount].crtime),
                         &(times[volcount].uptime),
-                        origflags);
+                        origflags, stayUp?cloneVolId:0);
            if (code)
                continue;
 
            /* Thisdate is the date from which we want to pick up all changes */
-           if (forceflag || !fullrelease
-               || (rwcrdate > times[volcount].crtime)) {
-               /* If the forceflag is set, then we want to do a full dump.
-                * If it's not a full release, we can't be sure that the creation
-                *  date is good (so we also do a full dump).
-                * If the RW volume was replaced (its creation date is newer than
-                *  the last release), then we can't be sure what has changed (so
-                *  we do a full dump).
+           if (flags & REL_FULLDUMPS) {
+               /* Do a full dump when forced by the caller. */
+               VPRINT("This will be a full dump: forced\n");
+               thisdate = 0;
+           } else if (!complete_release) {
+               /* If this release is a recovery of a failed release, we can't be
+                * sure the creation date is good, so do a full dump.
+                */
+               VPRINT("This will be a full dump: previous release failed\n");
+               thisdate = 0;
+           } else if (times[volcount].crtime == 0) {
+               /* A full dump is needed for a new read-only volume. */
+               VPRINT
+                   ("This will be a full dump: read-only volume needs to be created\n");
+               thisdate = 0;
+           } else if ((rwcrdate > times[volcount].crtime)) {
+               /* If the RW volume was replaced (its creation date is newer than
+                * the last release), then we can't be sure what has changed (so
+                * we do a full dump).
                 */
+               VPRINT
+                   ("This will be a full dump: read-write volume was replaced\n");
                thisdate = 0;
            } else if (remembertime[vldbindex].validtime) {
                /* Trans was prev ended. Use the time from the prev trans
@@ -3626,7 +4143,8 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
 
        if (verbose) {
            fprintf(STDOUT, "Starting ForwardMulti from %lu to %u on %s",
-                   (unsigned long)cloneVolId, entry.volumeId[ROVOL],
+                   (unsigned long)cloneVolId, stayUp?
+                   cloneVolId:entry.volumeId[ROVOL],
                     noresolve ? afs_inet_ntoa_r(entry.serverNumber[times[0].
                                                 vldbEntryIndex], hoststr) :
                     hostutil_GetNameByINet(entry.
@@ -3643,7 +4161,7 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
            }
 
            if (fromdate == 0)
-               fprintf(STDOUT, " (full release)");
+               fprintf(STDOUT, " (entire volume)");
            else {
                tmv = fromdate;
                fprintf(STDOUT, " (as of %.24s)", ctime(&tmv));
@@ -3713,36 +4231,166 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
            }
        }
 
-       /* End the transactions and destroy the connections */
+       if (!stayUp) {
+           PutTrans(&vldbindex, replicas, toconns, times, volcount);
+           MapNetworkToHost(&entry, &storeEntry);
+           vcode = VLDB_ReplaceEntry(afromvol, RWVOL, &storeEntry, 0);
+           ONERROR(vcode, afromvol,
+                   " Could not update VLDB entry for volume %u\n");
+       }
+    }                          /* for each index in the vldb */
+
+    /* for the stayup case, put back at the end */
+    if (stayUp) {
+       afs_uint32 tmpVol = entry.volumeId[ROVOL];
+       strcpy(vname, entry.name);
+       strcat(vname, ".readonly");
+
+       if (roclone) {
+           /* have to clear flags to ensure new vol goes online
+            */
+           code = AFSVolSetFlags(fromconn, fromtid, 0);
+           if (code && (code != ENOENT)) {
+               PrintError("Failed to set flags on ro volume: ",
+                          code);
+           }
+
+           VPRINT3("%sloning to permanent RO %u on %s...", roexists?"Re-c":"C", tmpVol,
+                   noresolve ?
+                   afs_inet_ntoa_r(entry.serverNumber[roindex],
+                                   hoststr) :
+                   hostutil_GetNameByINet(entry.serverNumber[roindex]));
+
+           code = AFSVolClone(fromconn, fromtid, roexists?tmpVol:0,
+                              readonlyVolume, vname, &tmpVol);
+
+           if (!code) {
+               VDONE;
+               VPRINT("Bringing readonly online...");
+               code = DoVolOnline(&entry, tmpVol, roindex, vname,
+                                  fromconn);
+           }
+           if (code) {
+               EPRINT(code, "Failed: ");
+               entry.serverFlags[roindex] &= ~NEW_REPSITE;
+               entry.serverFlags[roindex] |= RO_DONTUSE;
+           } else {
+               entry.serverFlags[roindex] |= NEW_REPSITE;
+               entry.serverFlags[roindex] &= ~RO_DONTUSE;
+               entry.flags |= RO_EXISTS;
+               VDONE;
+           }
+
+       }
        for (s = 0; s < volcount; s++) {
-           if (replicas[s].trans)
+           if (replicas[s].trans) {
+               vldbindex = times[s].vldbEntryIndex;
+
+               /* ok, so now we have to end the previous transaction */
                code = AFSVolEndTrans(toconns[s], replicas[s].trans, &rcode);
-           replicas[s].trans = 0;
-           if (!code)
-               code = rcode;
-           if (code) {
-               if ((s == 0) || (code != ENOENT)) {
-                   PrintError("Could not end transaction on a ro volume: ",
-                              code);
+               if (!code)
+                   code = rcode;
+
+               if (!code) {
+                   code = AFSVolTransCreate_retry(toconns[s],
+                                                  cloneVolId,
+                                                  entry.serverPartition[vldbindex],
+                                                  ITBusy,
+                                                  &(replicas[s].trans));
+                   if (code) {
+                       PrintError("Unable to begin transaction on temporary clone: ", code);
+                   }
                } else {
-                   PrintError
-                       ("Transaction timed out on a ro volume. Will retry.\n",
-                        0);
-                   if (times[s].vldbEntryIndex < vldbindex)
-                       vldbindex = times[s].vldbEntryIndex;
+                   PrintError("Unable to end transaction on temporary clone: ", code);
                }
-           }
 
-           if (toconns[s])
-               rx_DestroyConnection(toconns[s]);
-           toconns[s] = 0;
+               VPRINT3("%sloning to permanent RO %u on %s...", times[s].crtime?"Re-c":"C",
+                       tmpVol, noresolve ?
+                       afs_inet_ntoa_r(htonl(replicas[s].server.destHost),
+                                       hoststr) :
+                       hostutil_GetNameByINet(htonl(replicas[s].server.destHost)));
+               if (times[s].crtime)
+                   code = AFSVolClone(toconns[s], replicas[s].trans, tmpVol,
+                                      readonlyVolume, vname, &tmpVol);
+               else
+                   code = AFSVolClone(toconns[s], replicas[s].trans, 0,
+                                      readonlyVolume, vname, &tmpVol);
+
+               if (code) {
+                   if (!times[s].crtime) {
+                       entry.serverFlags[vldbindex] |= RO_DONTUSE;
+                   }
+                   entry.serverFlags[vldbindex] &= ~NEW_REPSITE;
+                   PrintError("Failed: ",
+                              code);
+               } else
+                   VDONE;
+
+               if (entry.serverFlags[vldbindex] != RO_DONTUSE) {
+                   /* bring it online (mark it InService) */
+                   VPRINT1("Bringing readonly online on %s...",
+                           noresolve ?
+                           afs_inet_ntoa_r(
+                               htonl(replicas[s].server.destHost),
+                               hoststr) :
+                           hostutil_GetNameByINet(
+                               htonl(replicas[s].server.destHost)));
+
+                   code = DoVolOnline(&entry, tmpVol, vldbindex, vname,
+                                      toconns[s]);
+                   /* needed to come online for cloning */
+                   if (code) {
+                       /* technically it's still new, just not online */
+                       entry.serverFlags[s] &= ~NEW_REPSITE;
+                       entry.serverFlags[s] |= RO_DONTUSE;
+                       if (code != ENOENT) {
+                           PrintError("Failed to set correct names and ids: ",
+                                      code);
+                       }
+                   } else
+                       VDONE;
+               }
+
+               VPRINT("Marking temporary clone for deletion...\n");
+               code = AFSVolSetFlags(toconns[s],
+                                     replicas[s].trans,
+                                     VTDeleteOnSalvage |
+                                     VTOutOfService);
+               if (code)
+                 EPRINT(code, "Failed: ");
+               else
+                 VDONE;
+
+               VPRINT("Ending transaction on temporary clone...\n");
+               code = AFSVolEndTrans(toconns[s], replicas[s].trans, &rcode);
+               if (!code)
+                   rcode = code;
+               if (code)
+                   PrintError("Failed: ", code);
+               else {
+                   VDONE;
+                   /* ended successfully */
+                   replicas[s].trans = 0;
+
+                   VPRINT2("Deleting temporary clone %u on %s...", cloneVolId,
+                           noresolve ?
+                           afs_inet_ntoa_r(htonl(replicas[s].server.destHost),
+                                           hoststr) :
+                           hostutil_GetNameByINet(htonl(replicas[s].server.destHost)));
+                   code = DoVolDelete(toconns[s], cloneVolId,
+                                      entry.serverPartition[vldbindex],
+                                      NULL, 0, NULL, NULL);
+                   if (code) {
+                       EPRINT(code, "Failed: ");
+                   } else
+                       VDONE;
+               }
+           }
        }
 
-       MapNetworkToHost(&entry, &storeEntry);
-       vcode = VLDB_ReplaceEntry(afromvol, RWVOL, &storeEntry, 0);
-       ONERROR(vcode, afromvol,
-               " Could not update VLDB entry for volume %u\n");
-    }                          /* for each index in the vldb */
+       /* done. put the vldb entry in the success tail case*/
+       PutTrans(&vldbindex, replicas, toconns, times, volcount);
+    }
 
     /* End the transaction on the cloned volume */
     code = AFSVolEndTrans(fromconn, fromtid, &rcode);
@@ -3770,7 +4418,6 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
                         hostutil_GetNameByINet(entry.serverNumber[i]), pname);
            }
        }
-
        MapNetworkToHost(&entry, &storeEntry);
        vcode =
            VLDB_ReplaceEntry(afromvol, RWVOL, &storeEntry,
@@ -3781,19 +4428,19 @@ UV_ReleaseVolume(afs_uint32 afromvol, afs_uint32 afromserver,
        ERROREXIT(VOLSERBADRELEASE);
     }
 
+    entry.cloneId = 0;
     /* All the ROs were release successfully. Remove the temporary clone */
-    if (!roclone) {
+    if (!roclone || stayUp) {
        if (verbose) {
            fprintf(STDOUT, "Deleting the releaseClone %lu ...",
                    (unsigned long)cloneVolId);
            fflush(STDOUT);
        }
-       code = DoVolDelete(fromconn, cloneVolId, afrompart, "the", 0, NULL,
+       code = DoVolDelete(fromconn, cloneVolId, afrompart, NULL, 0, NULL,
                           NULL);
        ONERROR(code, cloneVolId, "Failed to delete volume %u.\n");
        VDONE;
     }
-    entry.cloneId = 0;
 
     for (i = 0; i < entry.nServers; i++)
        entry.serverFlags[i] &= ~NEW_REPSITE;
@@ -3905,7 +4552,7 @@ UV_DumpVolume(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
     struct rx_connection * volatile fromconn = NULL;
     afs_int32 volatile fromtid = 0;
 
-    afs_int32 rxError = 0, rcode = 0;
+    afs_int32 rcode = 0;
     afs_int32 code, error = 0;
     afs_int32 tmp;
     time_t tmv = fromdate;
@@ -3955,7 +4602,7 @@ UV_DumpVolume(afs_uint32 afromvol, afs_uint32 afromserver, afs_int32 afrompart,
 
   error_exit:
     if (fromcall) {
-       code = rx_EndCall(fromcall, rxError);
+       code = rx_EndCall(fromcall, 0);
        if (code && code != RXGEN_OPCODE)
            fprintf(STDERR, "Error in rx_EndCall\n");
        if (code && !error)
@@ -4000,7 +4647,7 @@ UV_DumpClonedVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     afs_uint32 volatile clonevol = 0;
 
     afs_int32 tmp;
-    afs_int32 fromtid = 0, rxError = 0, rcode = 0;
+    afs_int32 fromtid = 0, rcode = 0;
     afs_int32 code = 0, error = 0;
     afs_uint32 tmpVol;
     char vname[64];
@@ -4108,7 +4755,7 @@ UV_DumpClonedVolume(afs_uint32 afromvol, afs_uint32 afromserver,
     }
 
     if (fromcall) {
-       code = rx_EndCall(fromcall, rxError);
+       code = rx_EndCall(fromcall, 0);
        if (code) {
            fprintf(STDERR, "Error in rx_EndCall\n");
            if (!error)
@@ -4150,14 +4797,12 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
     struct rx_connection *toconn, *tempconn;
     struct rx_call *tocall;
     afs_int32 totid, code, rcode, vcode, terror = 0;
-    afs_int32 rxError = 0;
     struct volser_status tstatus;
     struct volintInfo vinfo;
     char partName[10];
     char tovolreal[VOLSER_OLDMAXVOLNAME];
     afs_uint32 pvolid;
     afs_int32 temptid, pparentid;
-    int success;
     struct nvldbentry entry, storeEntry;
     afs_int32 error;
     int islocked;
@@ -4171,11 +4816,9 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
 
     memset(&cookie, 0, sizeof(cookie));
     islocked = 0;
-    success = 0;
     error = 0;
     reuseID = 1;
     tocall = (struct rx_call *)0;
-    toconn = (struct rx_connection *)0;
     tempconn = (struct rx_connection *)0;
     totid = 0;
     temptid = 0;
@@ -4299,7 +4942,7 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
        error = code;
        goto refail;
     }
-    terror = rx_EndCall(tocall, rxError);
+    terror = rx_EndCall(tocall, 0);
     tocall = (struct rx_call *)0;
     if (terror) {
        fprintf(STDERR, "rx_EndCall Failed \n");
@@ -4372,10 +5015,9 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
        goto refail;
     }
 
-    success = 1;
     fprintf(STDOUT, " done\n");
     fflush(STDOUT);
-    if (success && (!reuseID || (flags & RV_FULLRST))) {
+    if (!reuseID || (flags & RV_FULLRST)) {
        /* Volume was restored on the file server, update the
         * VLDB to reflect the change.
         */
@@ -4524,7 +5166,7 @@ UV_RestoreVolume2(afs_uint32 toserver, afs_int32 topart, afs_uint32 tovolid,
     }
   refail:
     if (tocall) {
-       code = rx_EndCall(tocall, rxError);
+       code = rx_EndCall(tocall, 0);
        if (!error)
            error = code;
     }
@@ -4950,9 +5592,7 @@ UV_ZapVolumeClones(afs_uint32 aserver, afs_int32 apart,
     int curPos;
     afs_int32 code = 0;
     afs_int32 success = 1;
-    afs_int32 tid;
 
-    aconn = (struct rx_connection *)0;
     aconn = UV_Bind(aserver, AFSCONF_VOLUMEPORT);
     curPos = 0;
     for (curPtr = volPtr; curPos < arraySize; curPtr++) {
@@ -4974,7 +5614,6 @@ UV_ZapVolumeClones(afs_uint32 aserver, afs_int32 apart,
                VPRINT2("Clone of %s %u deleted\n", curPtr->volName,
                        curPtr->volCloneId);
            curPos++;
-           tid = 0;
        }
     }
     if (aconn)
@@ -4998,7 +5637,6 @@ UV_GenerateVolumeClones(afs_uint32 aserver, afs_int32 apart,
     afs_uint32 curCloneId = 0;
     char cloneName[256];       /*max vol name */
 
-    aconn = (struct rx_connection *)0;
     aconn = UV_Bind(aserver, AFSCONF_VOLUMEPORT);
     curPos = 0;
     if ((volPtr->volFlags & REUSECLONEID) && (volPtr->volFlags & ENTRYVALID))
@@ -5150,7 +5788,6 @@ UV_XListVolumes(afs_uint32 a_serverID, afs_int32 a_partID, int a_all,
      * We set the val field to a null pointer as a hint for the stub to
      * allocate space.
      */
-    code = 0;
     *a_numEntsInResultP = 0;
     *a_resultPP = (volintXInfo *) 0;
     volumeXInfo.volXEntries_val = (volintXInfo *) 0;
@@ -5192,8 +5829,6 @@ UV_ListOneVolume(afs_uint32 aserver, afs_int32 apart, afs_uint32 volid,
     afs_int32 code = 0;
     volEntries volumeInfo;
 
-    code = 0;
-
     *resultPtr = (volintInfo *) 0;
     volumeInfo.volEntries_val = (volintInfo *) 0;      /*this hints the stub to allocate space */
     volumeInfo.volEntries_len = 0;
@@ -5256,7 +5891,6 @@ UV_XListOneVolume(afs_uint32 a_serverID, afs_int32 a_partID, afs_uint32 a_volID,
      * the info.  Setting the val field to a null pointer tells the stub
      * to allocate space for us.
      */
-    code = 0;
     *a_resultPP = (volintXInfo *) 0;
     volumeXInfo.volXEntries_val = (volintXInfo *) 0;
     volumeXInfo.volXEntries_len = 0;
@@ -5354,7 +5988,7 @@ CheckVolume(volintInfo * volumeinfo, afs_uint32 aserver, afs_int32 apart,
        if (code) {
            if (code != VL_NOENT) {
                fprintf(STDOUT,
-                       "Could not retreive the VLDB entry for volume %lu \n",
+                       "Could not retrieve the VLDB entry for volume %lu \n",
                        (unsigned long)rwvolid);
                ERROR_EXIT(code);
            }
@@ -6894,6 +7528,9 @@ UV_VolumeZap(afs_uint32 server, afs_int32 part, afs_uint32 volid)
     aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
     error = DoVolDelete(aconn, volid, part,
                        "the", 0, NULL, NULL);
+    if (error == VNOVOL) {
+       EPRINT1(error, "Failed to start transaction on %u\n", volid);
+    }
 
     PrintError("", error);
     if (aconn)