vos: convertROtoRW - prevent VLDB corruption
[openafs.git] / src / volser / vos.c
index 27be7d6..273b6f6 100644 (file)
@@ -104,7 +104,7 @@ qPut(struct tqHead *ahead, afs_uint32 volid)
 {
     struct tqElem *elem;
 
-    elem = (struct tqElem *)malloc(sizeof(struct tqElem));
+    elem = malloc(sizeof(struct tqElem));
     elem->next = ahead->next;
     elem->volid = volid;
     ahead->next = elem;
@@ -282,7 +282,7 @@ SendFile(usd_handle_t ufd, struct rx_call *call, long blksize)
     int done = 0;
     afs_uint32 nbytes;
 
-    buffer = (char *)malloc(blksize);
+    buffer = malloc(blksize);
     if (!buffer) {
        fprintf(STDERR, "malloc failed\n");
        return -1;
@@ -392,7 +392,7 @@ ReceiveFile(usd_handle_t ufd, struct rx_call *call, long blksize)
     afs_uint32 bytesleft, w;
     afs_int32 error = 0;
 
-    buffer = (char *)malloc(blksize);
+    buffer = malloc(blksize);
     if (!buffer) {
        fprintf(STDERR, "memory allocation failed\n");
        ERROR_EXIT(-1);
@@ -4604,9 +4604,7 @@ ListVLDB(struct cmd_syndesc *as, void *arock)
            } else {
                /* Grow the tarray to keep the extra entries */
                parraysize = (centries * sizeof(struct nvldbentry));
-               ttarray =
-                   (struct nvldbentry *)realloc(tarray,
-                                                tarraysize + parraysize);
+               ttarray = realloc(tarray, tarraysize + parraysize);
                if (!ttarray) {
                    fprintf(STDERR,
                            "Could not allocate enough space for  the VLDB entries\n");
@@ -5499,7 +5497,7 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
     afs_uint32 volid;
     afs_uint32 server;
     afs_int32 code, i, same;
-    struct nvldbentry entry, storeEntry;
+    struct nvldbentry entry, checkEntry, storeEntry;
     afs_int32 vcode;
     afs_int32 rwindex = 0;
     afs_uint32 rwserver = 0;
@@ -5538,23 +5536,23 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
            PrintError("", code);
        else
            fprintf(STDERR, "Unknown volume ID or name '%s'\n",
-                   as->parms[0].items->data);
+                   as->parms[2].items->data);
        return -1;
     }
     if (as->parms[3].items)
        force = 1;
 
+    memset(&entry, 0, sizeof(entry));
     vcode = VLDB_GetEntryByID(volid, -1, &entry);
     if (vcode) {
        fprintf(STDERR,
                "Could not fetch the entry for volume %lu from VLDB\n",
                (unsigned long)volid);
-       PrintError("convertROtoRW", code);
+       PrintError("convertROtoRW ", vcode);
        return vcode;
     }
 
     /* use RO volid even if user specified RW or BK volid */
-
     if (volid != entry.volumeId[ROVOL])
        volid = entry.volumeId[ROVOL];
 
@@ -5564,8 +5562,9 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
            rwindex = i;
            rwserver = entry.serverNumber[i];
            rwpartition = entry.serverPartition[i];
-       }
-       if (entry.serverFlags[i] & ITSROVOL) {
+           if (roserver)
+               break;
+       } else if ((entry.serverFlags[i] & ITSROVOL) && !roserver) {
            same = VLDB_IsSameAddrs(server, entry.serverNumber[i], &code);
            if (code) {
                fprintf(STDERR,
@@ -5577,14 +5576,15 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
                roindex = i;
                roserver = entry.serverNumber[i];
                ropartition = entry.serverPartition[i];
-               break;
+               if (rwserver)
+                    break;
            }
        }
     }
     if (!roserver) {
        fprintf(STDERR, "Warning: RO volume didn't exist in vldb!\n");
     }
-    if (ropartition != partition) {
+    if (roserver && (ropartition != partition)) {
        fprintf(STDERR,
                "Warning: RO volume should be in partition %d instead of %d (vldb)\n",
                ropartition, partition);
@@ -5610,6 +5610,37 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
     vcode =
        ubik_VL_SetLock(cstruct, 0, entry.volumeId[RWVOL], RWVOL,
                  VLOP_MOVE);
+    if (vcode) {
+       fprintf(STDERR,
+               "Unable to lock volume %lu, code %d\n",
+               (unsigned long)entry.volumeId[RWVOL],vcode);
+       PrintError("", vcode);
+       return -1;
+    }
+
+    /* make sure the VLDB entry hasn't changed since we started */
+    memset(&checkEntry, 0, sizeof(checkEntry));
+    vcode = VLDB_GetEntryByID(volid, -1, &checkEntry);
+    if (vcode) {
+       fprintf(STDERR,
+                "Could not fetch the entry for volume %lu from VLDB\n",
+                (unsigned long)volid);
+       PrintError("convertROtoRW ", vcode);
+       code = vcode;
+       goto error_exit;
+    }
+
+    MapHostToNetwork(&checkEntry);
+    entry.flags &= ~VLOP_ALLOPERS;  /* clear any stale lock operation flags */
+    entry.flags |= VLOP_MOVE;        /* set to match SetLock operation above */
+    if (memcmp(&entry, &checkEntry, sizeof(entry)) != 0) {
+        fprintf(STDERR,
+                "VLDB entry for volume %lu has changed; please reissue the command.\n",
+                (unsigned long)volid);
+        code = -1;
+        goto error_exit;
+    }
+
     aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
     code = AFSVolConvertROtoRWvolume(aconn, partition, volid);
     if (code) {
@@ -5617,11 +5648,31 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
                "Converting RO volume %lu to RW volume failed with code %d\n",
                (unsigned long)volid, code);
        PrintError("convertROtoRW ", code);
-       return -1;
+       goto error_exit;
+    }
+    /* Update the VLDB to match what we did on disk as much as possible.  */
+    /* If the converted RO was in the VLDB, make it look like the new RW. */
+    if (roserver) {
+       entry.serverFlags[roindex] = ITSRWVOL;
+    } else {
+       /* Add a new site entry for the newly created RW.  It's possible
+        * (but unlikely) that we are already at MAXNSERVERS and that this
+        * new site will invalidate the whole VLDB entry;  however,
+        * VLDB_ReplaceEntry will detect this and return VL_BADSERVER,
+        * so we need no extra guard logic here.
+        */
+       afs_int32 newrwindex = entry.nServers;
+       (entry.nServers)++;
+       entry.serverNumber[newrwindex] = server;
+       entry.serverPartition[newrwindex] = partition;
+       entry.serverFlags[newrwindex] = ITSRWVOL;
     }
-    entry.serverFlags[roindex] = ITSRWVOL;
     entry.flags |= RW_EXISTS;
     entry.flags &= ~BACK_EXISTS;
+
+    /* if the old RW was in the VLDB, remove it by decrementing the number */
+    /* of servers, replacing the RW entry with the last entry, and zeroing */
+    /* out the last entry. */
     if (rwserver) {
        (entry.nServers)--;
        if (rwindex != entry.nServers) {
@@ -5651,9 +5702,14 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
                "Warning: volume converted, but vldb update failed with code %d!\n",
                code);
     }
+
+  error_exit:
     vcode = UV_LockRelease(entry.volumeId[RWVOL]);
     if (vcode) {
-       PrintDiagnostics("unlock", vcode);
+       fprintf(STDERR,
+               "Unable to unlock volume %lu, code %d\n",
+               (unsigned long)entry.volumeId[RWVOL],vcode);
+       PrintError("", vcode);
     }
     return code;
 }