vos: convertROtoRW - prevent VLDB corruption
[openafs.git] / src / volser / vos.c
index a4459e5..273b6f6 100644 (file)
@@ -104,7 +104,7 @@ qPut(struct tqHead *ahead, afs_uint32 volid)
 {
     struct tqElem *elem;
 
-    elem = (struct tqElem *)malloc(sizeof(struct tqElem));
+    elem = malloc(sizeof(struct tqElem));
     elem->next = ahead->next;
     elem->volid = volid;
     ahead->next = elem;
@@ -282,7 +282,7 @@ SendFile(usd_handle_t ufd, struct rx_call *call, long blksize)
     int done = 0;
     afs_uint32 nbytes;
 
-    buffer = (char *)malloc(blksize);
+    buffer = malloc(blksize);
     if (!buffer) {
        fprintf(STDERR, "malloc failed\n");
        return -1;
@@ -392,7 +392,7 @@ ReceiveFile(usd_handle_t ufd, struct rx_call *call, long blksize)
     afs_uint32 bytesleft, w;
     afs_int32 error = 0;
 
-    buffer = (char *)malloc(blksize);
+    buffer = malloc(blksize);
     if (!buffer) {
        fprintf(STDERR, "memory allocation failed\n");
        ERROR_EXIT(-1);
@@ -4604,9 +4604,7 @@ ListVLDB(struct cmd_syndesc *as, void *arock)
            } else {
                /* Grow the tarray to keep the extra entries */
                parraysize = (centries * sizeof(struct nvldbentry));
-               ttarray =
-                   (struct nvldbentry *)realloc(tarray,
-                                                tarraysize + parraysize);
+               ttarray = realloc(tarray, tarraysize + parraysize);
                if (!ttarray) {
                    fprintf(STDERR,
                            "Could not allocate enough space for  the VLDB entries\n");
@@ -5450,13 +5448,13 @@ SetAddrs(struct cmd_syndesc *as, void *arock)
     if (vcode) {
        if (vcode == VL_MULTIPADDR) {
            fprintf(STDERR, "vos: VL_RegisterAddrs rpc failed; The IP address exists on a different server; repair it\n");
-           PrintError("", vcode);
-           return vcode;
        } else if (vcode == RXGEN_OPCODE) {
            fprintf(STDERR, "vlserver doesn't support VL_RegisterAddrs rpc; ignored\n");
-           PrintError("", vcode);
-           return vcode;
+       } else {
+           fprintf(STDERR, "vos: VL_RegisterAddrs rpc failed\n");
        }
+       PrintError("", vcode);
+       return vcode;
     }
     if (verbose) {
        fprintf(STDOUT, "vos: Changed UUID with addresses:\n");
@@ -5499,7 +5497,7 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
     afs_uint32 volid;
     afs_uint32 server;
     afs_int32 code, i, same;
-    struct nvldbentry entry, storeEntry;
+    struct nvldbentry entry, checkEntry, storeEntry;
     afs_int32 vcode;
     afs_int32 rwindex = 0;
     afs_uint32 rwserver = 0;
@@ -5538,23 +5536,23 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
            PrintError("", code);
        else
            fprintf(STDERR, "Unknown volume ID or name '%s'\n",
-                   as->parms[0].items->data);
+                   as->parms[2].items->data);
        return -1;
     }
     if (as->parms[3].items)
        force = 1;
 
+    memset(&entry, 0, sizeof(entry));
     vcode = VLDB_GetEntryByID(volid, -1, &entry);
     if (vcode) {
        fprintf(STDERR,
                "Could not fetch the entry for volume %lu from VLDB\n",
                (unsigned long)volid);
-       PrintError("convertROtoRW", code);
+       PrintError("convertROtoRW ", vcode);
        return vcode;
     }
 
     /* use RO volid even if user specified RW or BK volid */
-
     if (volid != entry.volumeId[ROVOL])
        volid = entry.volumeId[ROVOL];
 
@@ -5564,8 +5562,9 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
            rwindex = i;
            rwserver = entry.serverNumber[i];
            rwpartition = entry.serverPartition[i];
-       }
-       if (entry.serverFlags[i] & ITSROVOL) {
+           if (roserver)
+               break;
+       } else if ((entry.serverFlags[i] & ITSROVOL) && !roserver) {
            same = VLDB_IsSameAddrs(server, entry.serverNumber[i], &code);
            if (code) {
                fprintf(STDERR,
@@ -5577,14 +5576,15 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
                roindex = i;
                roserver = entry.serverNumber[i];
                ropartition = entry.serverPartition[i];
-               break;
+               if (rwserver)
+                    break;
            }
        }
     }
     if (!roserver) {
        fprintf(STDERR, "Warning: RO volume didn't exist in vldb!\n");
     }
-    if (ropartition != partition) {
+    if (roserver && (ropartition != partition)) {
        fprintf(STDERR,
                "Warning: RO volume should be in partition %d instead of %d (vldb)\n",
                ropartition, partition);
@@ -5610,6 +5610,37 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
     vcode =
        ubik_VL_SetLock(cstruct, 0, entry.volumeId[RWVOL], RWVOL,
                  VLOP_MOVE);
+    if (vcode) {
+       fprintf(STDERR,
+               "Unable to lock volume %lu, code %d\n",
+               (unsigned long)entry.volumeId[RWVOL],vcode);
+       PrintError("", vcode);
+       return -1;
+    }
+
+    /* make sure the VLDB entry hasn't changed since we started */
+    memset(&checkEntry, 0, sizeof(checkEntry));
+    vcode = VLDB_GetEntryByID(volid, -1, &checkEntry);
+    if (vcode) {
+       fprintf(STDERR,
+                "Could not fetch the entry for volume %lu from VLDB\n",
+                (unsigned long)volid);
+       PrintError("convertROtoRW ", vcode);
+       code = vcode;
+       goto error_exit;
+    }
+
+    MapHostToNetwork(&checkEntry);
+    entry.flags &= ~VLOP_ALLOPERS;  /* clear any stale lock operation flags */
+    entry.flags |= VLOP_MOVE;        /* set to match SetLock operation above */
+    if (memcmp(&entry, &checkEntry, sizeof(entry)) != 0) {
+        fprintf(STDERR,
+                "VLDB entry for volume %lu has changed; please reissue the command.\n",
+                (unsigned long)volid);
+        code = -1;
+        goto error_exit;
+    }
+
     aconn = UV_Bind(server, AFSCONF_VOLUMEPORT);
     code = AFSVolConvertROtoRWvolume(aconn, partition, volid);
     if (code) {
@@ -5617,11 +5648,31 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
                "Converting RO volume %lu to RW volume failed with code %d\n",
                (unsigned long)volid, code);
        PrintError("convertROtoRW ", code);
-       return -1;
+       goto error_exit;
+    }
+    /* Update the VLDB to match what we did on disk as much as possible.  */
+    /* If the converted RO was in the VLDB, make it look like the new RW. */
+    if (roserver) {
+       entry.serverFlags[roindex] = ITSRWVOL;
+    } else {
+       /* Add a new site entry for the newly created RW.  It's possible
+        * (but unlikely) that we are already at MAXNSERVERS and that this
+        * new site will invalidate the whole VLDB entry;  however,
+        * VLDB_ReplaceEntry will detect this and return VL_BADSERVER,
+        * so we need no extra guard logic here.
+        */
+       afs_int32 newrwindex = entry.nServers;
+       (entry.nServers)++;
+       entry.serverNumber[newrwindex] = server;
+       entry.serverPartition[newrwindex] = partition;
+       entry.serverFlags[newrwindex] = ITSRWVOL;
     }
-    entry.serverFlags[roindex] = ITSRWVOL;
     entry.flags |= RW_EXISTS;
     entry.flags &= ~BACK_EXISTS;
+
+    /* if the old RW was in the VLDB, remove it by decrementing the number */
+    /* of servers, replacing the RW entry with the last entry, and zeroing */
+    /* out the last entry. */
     if (rwserver) {
        (entry.nServers)--;
        if (rwindex != entry.nServers) {
@@ -5651,9 +5702,14 @@ ConvertRO(struct cmd_syndesc *as, void *arock)
                "Warning: volume converted, but vldb update failed with code %d!\n",
                code);
     }
+
+  error_exit:
     vcode = UV_LockRelease(entry.volumeId[RWVOL]);
     if (vcode) {
-       PrintDiagnostics("unlock", vcode);
+       fprintf(STDERR,
+               "Unable to unlock volume %lu, code %d\n",
+               (unsigned long)entry.volumeId[RWVOL],vcode);
+       PrintError("", vcode);
     }
     return code;
 }
@@ -5817,31 +5873,37 @@ MyBeforeProc(struct cmd_syndesc *as, void *arock)
 {
     char *tcell;
     afs_int32 code;
-    afs_int32 sauth;
+    int secFlags;
 
     /* Initialize the ubik_client connection */
     rx_SetRxDeadTime(90);
-    cstruct = (struct ubik_client *)0;
+    cstruct = NULL;
+    secFlags = AFSCONF_SECOPTS_FALLBACK_NULL;
 
-    sauth = 0;
     tcell = NULL;
     if (as->parms[12].items)   /* if -cell specified */
        tcell = as->parms[12].items->data;
-    if (as->parms[14].items)   /* -serverauth specified */
-       sauth = 1;
+
+    if (as->parms[13].items)
+       secFlags |= AFSCONF_SECOPTS_NOAUTH;
+
+    if (as->parms[14].items) { /* -localauth specified */
+       secFlags |= AFSCONF_SECOPTS_LOCALAUTH;
+       confdir = AFSDIR_SERVER_ETC_DIRPATH;
+    }
+
     if (as->parms[16].items     /* -encrypt specified */
 #ifdef AFS_NT40_ENV
         || win32_enableCrypt()
 #endif /* AFS_NT40_ENV */
          )
-       vsu_SetCrypt(1);
+       secFlags |= AFSCONF_SECOPTS_ALWAYSENCRYPT;
 
     if (as->parms[18].items)   /* -config flag set */
        confdir = as->parms[18].items->data;
 
-    if ((code =
-        vsu_ClientInit((as->parms[13].items != 0), confdir, tcell, sauth,
-                       &cstruct, UV_SetSecurity))) {
+    if ((code = vsu_ClientInit(confdir, tcell, secFlags, UV_SetSecurity,
+                              &cstruct))) {
        fprintf(STDERR, "could not initialize VLDB library (code=%lu) \n",
                (unsigned long)code);
        exit(1);