SALVSYNC_com initialization typo
[openafs.git] / src / vol / salvsync-server.c
index 70d2a4a..c7bc09b 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2006, Sine Nomine Associates and others.
+ * Copyright 2006-2008, Sine Nomine Associates and others.
  * All Rights Reserved.
  * 
  * This software has been released under the terms of the IBM Public
@@ -26,8 +26,6 @@
 #include <afsconfig.h>
 #include <afs/param.h>
 
-RCSID
-    ("$Header$");
 
 #include <sys/types.h>
 #include <stdio.h>
@@ -60,6 +58,7 @@ RCSID
 #include "volume.h"
 #include "partition.h"
 #include <rx/rx_queue.h>
+#include <afs/procmgmt.h>
 
 #if !defined(offsetof)
 #include <stddef.h>
@@ -73,33 +72,9 @@ RCSID
 
 /*@printflike@*/ extern void Log(const char *format, ...);
 
-#ifdef osi_Assert
-#undef osi_Assert
-#endif
-#define osi_Assert(e) (void)(e)
-
 #define MAXHANDLERS    4       /* Up to 4 clients; must be at least 2, so that
                                 * move = dump+restore can run on single server */
 
-#define MAX_BIND_TRIES 5       /* Number of times to retry socket bind */
-
-
-
-/* Forward declarations */
-static void * SALVSYNC_syncThread(void *);
-static void SALVSYNC_newconnection(int fd);
-static void SALVSYNC_com(int fd);
-static void SALVSYNC_Drop(int fd);
-static void AcceptOn(void);
-static void AcceptOff(void);
-static void InitHandler(void);
-static void CallHandler(fd_set * fdsetp);
-static int AddHandler(int afd, void (*aproc) (int));
-static int FindHandler(register int afd);
-static int FindHandler_r(register int afd);
-static int RemoveHandler(register int afd);
-static void GetHandler(fd_set * fdsetp, int *maxfdp);
-
 
 /*
  * This lock controls access to the handler array.
@@ -112,48 +87,86 @@ struct Lock SALVSYNC_handler_lock;
  * SALVSYNC is a feature specific to the demand attach fileserver
  */
 
+/* Forward declarations */
+static void * SALVSYNC_syncThread(void *);
+static void SALVSYNC_newconnection(osi_socket fd);
+static void SALVSYNC_com(osi_socket fd);
+static void SALVSYNC_Drop(osi_socket fd);
+static void AcceptOn(void);
+static void AcceptOff(void);
+static void InitHandler(void);
+static void CallHandler(fd_set * fdsetp);
+static int AddHandler(osi_socket afd, void (*aproc) (int));
+static int FindHandler(register osi_socket afd);
+static int FindHandler_r(register osi_socket afd);
+static int RemoveHandler(register osi_socket afd);
+static void GetHandler(fd_set * fdsetp, int *maxfdp);
+
+static int AllocNode(struct SalvageQueueNode ** node);
+
 static int AddToSalvageQueue(struct SalvageQueueNode * node);
 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
 static void AddToPendingQueue(struct SalvageQueueNode * node);
 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
-static void RaiseCommandPrio(struct SalvageQueueNode * node, SALVSYNC_command_hdr * com);
-
-static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName);
-static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry);
+static void UpdateCommandPrio(struct SalvageQueueNode * node);
+static void HandlePrio(struct SalvageQueueNode * clone, 
+                      struct SalvageQueueNode * parent,
+                      afs_uint32 new_prio);
+
+static int LinkNode(struct SalvageQueueNode * parent,
+                   struct SalvageQueueNode * clone);
+
+static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
+                                           struct SalvageQueueNode ** parent);
+static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
+                                                    struct SalvageQueueNode ** parent);
 static void AddNodeToHash(struct SalvageQueueNode * node);
 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
 
 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
-static afs_int32 SALVSYNC_com_RaisePrio(SALVSYNC_command * com, SALVSYNC_response * res);
 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
+static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
 
 
 extern int LogLevel;
 extern int VInit;
 extern pthread_mutex_t vol_salvsync_mutex;
 
-static int AcceptSd = -1;              /* Socket used by server for accepting connections */
-
-
-/* be careful about rearranging elements in this structure.
- * element placement has been optimized for locality of reference
- * in SALVSYNC_getWork() */
+/**
+ * salvsync server socket handle.
+ */
+static SYNC_server_state_t salvsync_server_state = 
+    { -1,                       /* file descriptor */
+      SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
+      SALVSYNC_PROTO_VERSION,   /* protocol version */
+      5,                        /* bind() retry limit */
+      100,                      /* listen() queue depth */
+      "SALVSYNC",               /* protocol name string */
+    };
+
+
+/**
+ * queue of all volumes waiting to be salvaged.
+ */
 struct SalvageQueue {
     volatile int total_len;
-    volatile afs_int32 last_insert;    /* id of last partition to have a salvage node insert */
+    volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
     volatile int len[VOLMAXPARTS+1];
-    volatile struct rx_queue part[VOLMAXPARTS+1];
+    volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
     pthread_cond_t cv;
 };
 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
 
+/**
+ * queue of all volumes currently being salvaged.
+ */
 struct QueueHead {
-    volatile struct rx_queue q;
-    volatile int len;
+    volatile struct rx_queue q;  /**< queue of salvages in progress */
+    volatile int len;            /**< length of in-progress queue */
     pthread_cond_t queue_change_cv;
 };
 static struct QueueHead pendingQueue;  /* volumes being salvaged */
@@ -176,6 +189,9 @@ static struct QueueHead pendingQueue;  /* volumes being salvaged */
  */
 static int partition_salvaging[VOLMAXPARTS+1];
 
+static int HandlerFD[MAXHANDLERS];
+static void (*HandlerProc[MAXHANDLERS]) (int);
+
 #define VSHASH_SIZE 64
 #define VSHASH_MASK (VSHASH_SIZE-1)
 #define VSHASH(vid) ((vid)&VSHASH_MASK)
@@ -183,7 +199,8 @@ static int partition_salvaging[VOLMAXPARTS+1];
 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
 
 static struct SalvageQueueNode *
-LookupNode(afs_uint32 vid, char * partName)
+LookupNode(afs_uint32 vid, char * partName,
+          struct SalvageQueueNode ** parent)
 {
     struct rx_queue *qp, *nqp;
     struct SalvageQueueNode *vsp;
@@ -200,13 +217,24 @@ LookupNode(afs_uint32 vid, char * partName)
     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
        vsp = NULL;
     }
+
+    if (parent) {
+       if (vsp) {
+           *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
+               vsp->volgroup.parent : vsp;
+       } else {
+           *parent = NULL;
+       }
+    }
+
     return vsp;
 }
 
 static struct SalvageQueueNode *
-LookupNodeByCommand(SALVSYNC_command_hdr * qry)
+LookupNodeByCommand(SALVSYNC_command_hdr * qry,
+                   struct SalvageQueueNode ** parent)
 {
-    return LookupNode(qry->volume, qry->partName);
+    return LookupNode(qry->volume, qry->partName, parent);
 }
 
 static void
@@ -243,6 +271,7 @@ SALVSYNC_salvInit(void)
     pthread_attr_t tattr;
 
     /* initialize the queues */
+    Lock_Init(&SALVSYNC_handler_lock);
     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
     for (i = 0; i <= VOLMAXPARTS; i++) {
        queue_Init(&salvageQueue.part[i]);
@@ -266,91 +295,49 @@ SALVSYNC_salvInit(void)
     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
 }
 
-#ifdef USE_UNIX_SOCKETS
-static int
-getport(struct sockaddr_un *addr)
-{
-    int sd;
-    char tbuffer[AFSDIR_PATH_MAX]; 
-    
-    strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
-               "fssync.sock", NULL);
-    
-    memset(addr, 0, sizeof(*addr));
-    addr->sun_family = AF_UNIX;
-    strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
-    assert((sd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
-    return sd;
-}
-#else
-static int
-getport(struct sockaddr_in *addr)
+static void
+CleanFDs(void)
 {
-    int sd;
-
-    memset(addr, 0, sizeof(*addr));
-    assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
-#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
-    addr->sin_len = sizeof(struct sockaddr_in);
-#endif
-    addr->sin_addr.s_addr = htonl(0x7f000001);
-    addr->sin_family = AF_INET;        /* was localhost->h_addrtype */
-    addr->sin_port = htons(2041);      /* XXXX htons not _really_ neccessary */
+    int i;
+    for (i = 0; i < MAXHANDLERS; ++i) {
+       if (HandlerFD[i] >= 0) {
+           SALVSYNC_Drop(HandlerFD[i]);
+       }
+    }
 
-    return sd;
+    /* just in case we were in AcceptOff mode, and thus this fd wouldn't
+     * have a handler */
+    close(salvsync_server_state.fd);
+    salvsync_server_state.fd = -1;
 }
-#endif
 
 static fd_set SALVSYNC_readfds;
 
 static void *
 SALVSYNC_syncThread(void * args)
 {
-    struct sockaddr_in addr;
     int on = 1;
     int code;
     int numTries;
     int tid;
-#ifdef USE_UNIX_SOCKETS
-    char tbuffer[AFSDIR_PATH_MAX]; 
-#endif
+    SYNC_server_state_t * state = &salvsync_server_state;
+
+    /* when we fork, the child needs to close the salvsync server sockets,
+     * otherwise, it may get salvsync requests, instead of the parent
+     * salvageserver */
+    assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
+
+    SYNC_getAddr(&state->endpoint, &state->addr);
+    SYNC_cleanupSock(state);
 
 #ifndef AFS_NT40_ENV
     (void)signal(SIGPIPE, SIG_IGN);
 #endif
 
-    /* set our 'thread-id' so that the host hold table works */
-    MUTEX_ENTER(&rx_stats_mutex);      /* protects rxi_pthread_hinum */
-    tid = ++rxi_pthread_hinum;
-    MUTEX_EXIT(&rx_stats_mutex);
-    pthread_setspecific(rx_thread_id_key, (void *)tid);
-    Log("Set thread id %d for SALVSYNC_syncThread\n", tid);
-
-#ifdef USE_UNIX_SOCKETS
-    strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
-               "fssync.sock", NULL);
-    /* ignore errors */
-    remove(tbuffer);
-#endif /* USE_UNIX_SOCKETS */
-
-    AcceptSd = getport(&addr);
-    /* Reuseaddr needed because system inexplicably leaves crud lying around */
-    code =
-       setsockopt(AcceptSd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
-                  sizeof(on));
-    if (code)
-       Log("SALVSYNC_sync: setsockopt failed with (%d)\n", errno);
-
-    for (numTries = 0; numTries < MAX_BIND_TRIES; numTries++) {
-       if ((code =
-            bind(AcceptSd, (struct sockaddr *)&addr, sizeof(addr))) == 0)
-           break;
-       Log("SALVSYNC_sync: bind failed with (%d), will sleep and retry\n",
-           errno);
-       sleep(5);
-    }
+    state->fd = SYNC_getSock(&state->endpoint);
+    code = SYNC_bindSock(state);
     assert(!code);
-    listen(AcceptSd, 100);
+
     InitHandler();
     AcceptOn();
 
@@ -390,7 +377,7 @@ SALVSYNC_newconnection(int afd)
 /* this function processes commands from an salvsync file descriptor (fd) */
 static afs_int32 SALV_cnt = 0;
 static void
-SALVSYNC_com(int fd)
+SALVSYNC_com(osi_socket fd)
 {
     SYNC_command com;
     SYNC_response res;
@@ -398,6 +385,12 @@ SALVSYNC_com(int fd)
     SALVSYNC_command scom;
     SALVSYNC_response sres;
     SYNC_PROTO_BUF_DECL(buf);
+
+    memset(&com, 0, sizeof(com));
+    memset(&res, 0, sizeof(res));
+    memset(&scom, 0, sizeof(scom));
+    memset(&sres, 0, sizeof(sres));
+    memset(&sres_hdr, 0, sizeof(sres_hdr));
     
     com.payload.buf = (void *)buf;
     com.payload.len = SYNC_PROTO_MAX_LEN;
@@ -414,12 +407,20 @@ SALVSYNC_com(int fd)
     sres.res = &res;
 
     SALV_cnt++;
-    if (SYNC_getCom(fd, &com)) {
+    if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
        Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
        SALVSYNC_Drop(fd);
        return;
     }
 
+    if (com.recv_len < sizeof(com.hdr)) {
+       Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
+       res.hdr.response = SYNC_COM_ERROR;
+       res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
+       res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
+       goto respond;
+    }
+
     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
        Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
        res.hdr.response = SYNC_COM_ERROR;
@@ -427,6 +428,12 @@ SALVSYNC_com(int fd)
        goto respond;
     }
 
+    if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
+       res.hdr.response = SYNC_OK;
+       res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
+       goto respond;
+    }
+
     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
        Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
        res.hdr.response = SYNC_COM_ERROR;
@@ -435,11 +442,14 @@ SALVSYNC_com(int fd)
        goto respond;
     }
 
+    res.hdr.com_seq = com.hdr.com_seq;
+
     VOL_LOCK;
     switch (com.hdr.command) {
     case SALVSYNC_NOP:
        break;
     case SALVSYNC_SALVAGE:
+    case SALVSYNC_RAISEPRIO:
        res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
        break;
     case SALVSYNC_CANCEL:
@@ -450,17 +460,13 @@ SALVSYNC_com(int fd)
        /* cancel all queued salvages */
        res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
        break;
-    case SALVSYNC_RAISEPRIO:
-       /* raise the priority of a salvage */
-       res.hdr.response = SALVSYNC_com_RaisePrio(&scom, &sres);
-       break;
     case SALVSYNC_QUERY:
        /* query whether a volume is done salvaging */
        res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
        break;
-    case SYNC_COM_CHANNEL_CLOSE:
-       res.hdr.response = SYNC_OK;
-       res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
+    case SALVSYNC_OP_LINK:
+       /* link a clone to its parent in the scheduler */
+       res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
        break;
     default:
        res.hdr.response = SYNC_BAD_COMMAND;
@@ -472,17 +478,38 @@ SALVSYNC_com(int fd)
     VOL_UNLOCK;
 
  respond:
-    SYNC_putRes(fd, &res);
+    SYNC_putRes(&salvsync_server_state, fd, &res);
     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
        SALVSYNC_Drop(fd);
     }
 }
 
+/**
+ * request that a volume be salvaged.
+ *
+ * @param[in]  com  inbound command object
+ * @param[out] res  outbound response object
+ *
+ * @return operation status
+ *    @retval SYNC_OK success
+ *    @retval SYNC_DENIED failed to enqueue request
+ *    @retval SYNC_FAILED malformed command packet
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ *
+ * @post the volume is enqueued in the to-be-salvaged queue.  
+ *       if the volume was already in the salvage queue, its 
+ *       priority (and thus its location in the queue) are 
+ *       updated.
+ */
 static afs_int32
 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
 {
     afs_int32 code = SYNC_OK;
-    struct SalvageQueueNode * node;
+    struct SalvageQueueNode * node, * clone;
+    int hash = 0;
 
     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
        code = SYNC_FAILED;
@@ -490,41 +517,54 @@ SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
        goto done;
     }
 
-    node = LookupNodeByCommand(com->sop);
+    clone = LookupNodeByCommand(com->sop, &node);
 
-    /* schedule a salvage for this volume */
-    if (node != NULL) {
-       switch (node->state) {
-       case SALVSYNC_STATE_ERROR:
-       case SALVSYNC_STATE_DONE:
-           memcpy(&node->command.com, com->hdr, sizeof(SYNC_command_hdr));
-           memcpy(&node->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
-           node->command.sop.prio = 0;
-           if (AddToSalvageQueue(node)) {
-               code = SYNC_DENIED;
-           }
-           break;
-       default:
-           break;
-       }
-    } else {
-       node = (struct SalvageQueueNode *) malloc(sizeof(struct SalvageQueueNode));
-       if (node == NULL) {
+    if (node == NULL) {
+       if (AllocNode(&node)) {
            code = SYNC_DENIED;
+           res->hdr->reason = SYNC_REASON_NOMEM;
            goto done;
        }
-       memset(node, 0, sizeof(struct SalvageQueueNode));
-       memcpy(&node->command.com, com->hdr, sizeof(SYNC_command_hdr));
-       memcpy(&node->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
-       AddNodeToHash(node);
+       clone = node;
+       hash = 1;
+    }
+
+    HandlePrio(clone, node, com->sop->prio);
+
+    switch (node->state) {
+    case SALVSYNC_STATE_QUEUED:
+       UpdateCommandPrio(node);
+       break;
+
+    case SALVSYNC_STATE_ERROR:
+    case SALVSYNC_STATE_DONE:
+    case SALVSYNC_STATE_UNKNOWN:
+       memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
+       memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
+
+       /* 
+        * make sure volgroup parent partition path is kept coherent
+        *
+        * If we ever want to support non-COW clones on a machine holding
+        * the RW site, please note that this code does not work under the
+        * conditions where someone zaps a COW clone on partition X, and
+        * subsequently creates a full clone on partition Y -- we'd need
+        * an inverse to SALVSYNC_com_Link.
+        *  -- tkeiser 11/28/2007
+        */
+       strcpy(node->command.sop.partName, com->sop->partName);
+
        if (AddToSalvageQueue(node)) {
-           /* roll back */
-           DeleteNodeFromHash(node);
-           free(node);
-           node = NULL;
            code = SYNC_DENIED;
-           goto done;
        }
+       break;
+
+    default:
+       break;
+    }
+
+    if (hash) {
+       AddNodeToHash(node);
     }
 
     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
@@ -535,6 +575,20 @@ SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
     return code;
 }
 
+/**
+ * cancel a pending salvage request.
+ *
+ * @param[in]  com  inbound command object
+ * @param[out] res  outbound response object
+ *
+ * @return operation status
+ *    @retval SYNC_OK success
+ *    @retval SYNC_FAILED malformed command packet
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ */
 static afs_int32
 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
 {
@@ -547,7 +601,7 @@ SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
        goto done;
     }
 
-    node = LookupNodeByCommand(com->sop);
+    node = LookupNodeByCommand(com->sop, NULL);
 
     if (node == NULL) {
        res->sop->state = SALVSYNC_STATE_UNKNOWN;
@@ -556,7 +610,8 @@ SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
        res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
        res->sop->prio = node->command.sop.prio;
        res->sop->state = node->state;
-       if (node->state == SALVSYNC_STATE_QUEUED) {
+       if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
+           (node->state == SALVSYNC_STATE_QUEUED)) {
            DeleteFromSalvageQueue(node);
        }
     }
@@ -565,11 +620,24 @@ SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
     return code;
 }
 
+/**
+ * cancel all pending salvage requests.
+ *
+ * @param[in]  com  incoming command object
+ * @param[out] res  outbound response object
+ *
+ * @return operation status
+ *    @retval SYNC_OK success
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ */
 static afs_int32
 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
 {
     struct SalvageQueueNode * np, *nnp;
-    struct DiskPartition * dp;
+    struct DiskPartition64 * dp;
 
     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
        for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
@@ -580,11 +648,31 @@ SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
     return SYNC_OK;
 }
 
+/**
+ * link a queue node for a clone to its parent volume.
+ *
+ * @param[in]  com   inbound command object
+ * @param[out] res   outbound response object
+ *
+ * @return operation status
+ *    @retval SYNC_OK success
+ *    @retval SYNC_FAILED malformed command packet
+ *    @retval SYNC_DENIED the request could not be completed
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @post the requested volume is marked as a child of another volume.
+ *       thus, future salvage requests for this volume will result in the
+ *       parent of the volume group being scheduled for salvage instead
+ *       of this clone.
+ *
+ * @internal
+ */
 static afs_int32
-SALVSYNC_com_RaisePrio(SALVSYNC_command * com, SALVSYNC_response * res)
+SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
 {
     afs_int32 code = SYNC_OK;
-    struct SalvageQueueNode * node;
+    struct SalvageQueueNode * clone, * parent;
 
     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
        code = SYNC_FAILED;
@@ -592,41 +680,51 @@ SALVSYNC_com_RaisePrio(SALVSYNC_command * com, SALVSYNC_response * res)
        goto done;
     }
 
-    node = LookupNodeByCommand(com->sop);
+    /* lookup clone's salvage scheduling node */
+    clone = LookupNodeByCommand(com->sop, NULL);
+    if (clone == NULL) {
+       code = SYNC_DENIED;
+       res->hdr->reason = SALVSYNC_REASON_ERROR;
+       goto done;
+    }
 
-    /* raise the priority of a salvage */
-    if (node == NULL) {
-       code = SALVSYNC_com_Salvage(com, res);
-       node = LookupNodeByCommand(com->sop);
-    } else {
-       switch (node->state) {
-       case SALVSYNC_STATE_QUEUED:
-           RaiseCommandPrio(node, com->sop);
-           break;
-       case SALVSYNC_STATE_SALVAGING:
-           break;
-       case SALVSYNC_STATE_ERROR:
-       case SALVSYNC_STATE_DONE:
-           code = SALVSYNC_com_Salvage(com, res);
-           break;
-       default:
-           break;
+    /* lookup parent's salvage scheduling node */
+    parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
+    if (parent == NULL) {
+       if (AllocNode(&parent)) {
+           code = SYNC_DENIED;
+           res->hdr->reason = SYNC_REASON_NOMEM;
+           goto done;
        }
+       memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
+       memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
+       parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
+       AddNodeToHash(parent);
     }
 
-    if (node == NULL) {
-       res->sop->prio = 0;
-       res->sop->state = SALVSYNC_STATE_UNKNOWN;
-    } else {
-       res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
-       res->sop->prio = node->command.sop.prio;
-       res->sop->state = node->state;
+    if (LinkNode(parent, clone)) {
+       code = SYNC_DENIED;
+       goto done;
     }
 
  done:
     return code;
 }
 
+/**
+ * query the status of a volume salvage request.
+ *
+ * @param[in]  com   inbound command object
+ * @param[out] res   outbound response object
+ *
+ * @return operation status
+ *    @retval SYNC_OK success
+ *    @retval SYNC_FAILED malformed command packet
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ */
 static afs_int32
 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
 {
@@ -639,7 +737,7 @@ SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
        goto done;
     }
 
-    node = LookupNodeByCommand(com->sop);
+    LookupNodeByCommand(com->sop, &node);
 
     /* query whether a volume is done salvaging */
     if (node == NULL) {
@@ -656,7 +754,7 @@ SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
 }
 
 static void
-SALVSYNC_Drop(int fd)
+SALVSYNC_Drop(osi_socket fd)
 {
     RemoveHandler(fd);
 #ifdef AFS_NT40_ENV
@@ -673,8 +771,8 @@ static void
 AcceptOn(void)
 {
     if (AcceptHandler == -1) {
-       assert(AddHandler(AcceptSd, SALVSYNC_newconnection));
-       AcceptHandler = FindHandler(AcceptSd);
+       assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
+       AcceptHandler = FindHandler(salvsync_server_state.fd);
     }
 }
 
@@ -682,16 +780,13 @@ static void
 AcceptOff(void)
 {
     if (AcceptHandler != -1) {
-       assert(RemoveHandler(AcceptSd));
+       assert(RemoveHandler(salvsync_server_state.fd));
        AcceptHandler = -1;
     }
 }
 
 /* The multiple FD handling code. */
 
-static int HandlerFD[MAXHANDLERS];
-static void (*HandlerProc[MAXHANDLERS]) (int);
-
 static void
 InitHandler(void)
 {
@@ -720,7 +815,7 @@ CallHandler(fd_set * fdsetp)
 }
 
 static int
-AddHandler(int afd, void (*aproc) (int))
+AddHandler(osi_socket afd, void (*aproc) (int))
 {
     register int i;
     ObtainWriteLock(&SALVSYNC_handler_lock);
@@ -738,7 +833,7 @@ AddHandler(int afd, void (*aproc) (int))
 }
 
 static int
-FindHandler(register int afd)
+FindHandler(register osi_socket afd)
 {
     register int i;
     ObtainReadLock(&SALVSYNC_handler_lock);
@@ -753,7 +848,7 @@ FindHandler(register int afd)
 }
 
 static int
-FindHandler_r(register int afd)
+FindHandler_r(register osi_socket afd)
 {
     register int i;
     for (i = 0; i < MAXHANDLERS; i++)
@@ -765,7 +860,7 @@ FindHandler_r(register int afd)
 }
 
 static int
-RemoveHandler(register int afd)
+RemoveHandler(register osi_socket afd)
 {
     ObtainWriteLock(&SALVSYNC_handler_lock);
     HandlerFD[FindHandler_r(afd)] = -1;
@@ -790,10 +885,151 @@ GetHandler(fd_set * fdsetp, int *maxfdp)
     ReleaseReadLock(&SALVSYNC_handler_lock);   /* just in case */
 }
 
+/**
+ * allocate a salvage queue node.
+ *
+ * @param[out] node_out  address in which to store new node pointer
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval 1 failed to allocate node
+ *
+ * @internal
+ */
+static int
+AllocNode(struct SalvageQueueNode ** node_out)
+{
+    int code = 0;
+    struct SalvageQueueNode * node;
+
+    *node_out = node = (struct SalvageQueueNode *) 
+       malloc(sizeof(struct SalvageQueueNode));
+    if (node == NULL) {
+       code = 1;
+       goto done;
+    }
+
+    memset(node, 0, sizeof(struct SalvageQueueNode));
+    node->type = SALVSYNC_VOLGROUP_PARENT;
+    node->state = SALVSYNC_STATE_UNKNOWN;
+
+ done:
+    return code;
+}
+
+/**
+ * link a salvage queue node to its parent.
+ *
+ * @param[in] parent  pointer to queue node for parent of volume group
+ * @param[in] clone   pointer to queue node for a clone
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval 1 failure
+ *
+ * @internal
+ */
+static int
+LinkNode(struct SalvageQueueNode * parent,
+        struct SalvageQueueNode * clone)
+{
+    int code = 0;
+    int idx;
+
+    /* check for attaching a clone to a clone */
+    if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
+       code = 1;
+       goto done;
+    }
+
+    /* check for pre-existing registration and openings */
+    for (idx = 0; idx < VOLMAXTYPES; idx++) {
+       if (parent->volgroup.children[idx] == clone) {
+           goto linked;
+       }
+       if (parent->volgroup.children[idx] == NULL) {
+           break;
+       }
+    }
+    if (idx == VOLMAXTYPES) {
+       code = 1;
+       goto done;
+    }
+
+    /* link parent and child */
+    parent->volgroup.children[idx] = clone;
+    clone->type = SALVSYNC_VOLGROUP_CLONE;
+    clone->volgroup.parent = parent;
+
+
+ linked:
+    switch (clone->state) {
+    case SALVSYNC_STATE_QUEUED:
+       DeleteFromSalvageQueue(clone);
+
+    case SALVSYNC_STATE_SALVAGING:
+       switch (parent->state) {
+       case SALVSYNC_STATE_UNKNOWN:
+       case SALVSYNC_STATE_ERROR:
+       case SALVSYNC_STATE_DONE:
+           parent->command.sop.prio = clone->command.sop.prio;
+           AddToSalvageQueue(parent);
+           break;
+
+       case SALVSYNC_STATE_QUEUED:
+           if (clone->command.sop.prio) {
+               parent->command.sop.prio += clone->command.sop.prio;
+               UpdateCommandPrio(parent);
+           }
+           break;
+
+       default:
+           break;
+       }
+       break;
+
+    default:
+       break;
+    }
+
+ done:
+    return code;
+}
+
+static void
+HandlePrio(struct SalvageQueueNode * clone, 
+          struct SalvageQueueNode * node,
+          afs_uint32 new_prio)
+{
+    afs_uint32 delta;
+
+    switch (node->state) {
+    case SALVSYNC_STATE_ERROR:
+    case SALVSYNC_STATE_DONE:
+    case SALVSYNC_STATE_UNKNOWN:
+       node->command.sop.prio = 0;
+       break;
+    }
+
+    if (new_prio < clone->command.sop.prio) {
+       /* strange. let's just set our delta to 1 */
+       delta = 1;
+    } else {
+       delta = new_prio - clone->command.sop.prio;
+    }
+
+    if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
+       clone->command.sop.prio = new_prio;
+    }
+
+    node->command.sop.prio += delta;
+}
+
 static int
 AddToSalvageQueue(struct SalvageQueueNode * node)
 {
     afs_int32 id;
+    struct SalvageQueueNode * last = NULL;
 
     id = volutil_GetPartitionID(node->command.sop.partName);
     if (id < 0 || id > VOLMAXPARTS) {
@@ -803,12 +1039,25 @@ AddToSalvageQueue(struct SalvageQueueNode * node)
        /* don't enqueue salvage requests for unmounted partitions */
        return 1;
     }
+    if (queue_IsOnQueue(node)) {
+       return 0;
+    }
+
+    if (queue_IsNotEmpty(&salvageQueue.part[id])) {
+       last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
+    }
     queue_Append(&salvageQueue.part[id], node);
     salvageQueue.len[id]++;
     salvageQueue.total_len++;
     salvageQueue.last_insert = id;
     node->partition_id = id;
     node->state = SALVSYNC_STATE_QUEUED;
+
+    /* reorder, if necessary */
+    if (last && last->command.sop.prio < node->command.sop.prio) {
+       UpdateCommandPrio(node);
+    }
+
     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
     return 0;
 }
@@ -880,21 +1129,22 @@ LookupPendingCommandByPid(int pid)
 
 /* raise the priority of a previously scheduled salvage */
 static void
-RaiseCommandPrio(struct SalvageQueueNode * node, SALVSYNC_command_hdr * com)
+UpdateCommandPrio(struct SalvageQueueNode * node)
 {
     struct SalvageQueueNode *np, *nnp;
     afs_int32 id;
+    afs_uint32 prio;
 
     assert(queue_IsOnQueue(node));
 
-    node->command.sop.prio = com->prio;
+    prio = node->command.sop.prio;
     id = node->partition_id;
-    if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < com->prio) {
+    if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
        queue_Remove(node);
        queue_Prepend(&salvageQueue.part[id], node);
     } else {
        for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
-           if (np->command.sop.prio > com->prio)
+           if (np->command.sop.prio > prio)
                break;
        }
        if (queue_IsEnd(&salvageQueue.part[id], np)) {
@@ -913,7 +1163,7 @@ struct SalvageQueueNode *
 SALVSYNC_getWork(void)
 {
     int i, ret;
-    struct DiskPartition * dp = NULL, * fdp;
+    struct DiskPartition64 * dp = NULL, * fdp;
     static afs_int32 next_part_sched = 0;
     struct SalvageQueueNode *node = NULL, *np;
 
@@ -924,10 +1174,9 @@ SALVSYNC_getWork(void)
      * if there are no disk partitions, just sit in this wait loop forever
      */
     while (!salvageQueue.total_len || !DiskPartitionList) {
-      assert(pthread_cond_wait(&salvageQueue.cv, &vol_glock_mutex) == 0);
+       VOL_CV_WAIT(&salvageQueue.cv);
     }
 
-
     /* 
      * short circuit for simple case where only one partition has
      * scheduled salvages
@@ -1000,10 +1249,22 @@ SALVSYNC_getWork(void)
     return node;
 }
 
+/**
+ * update internal scheduler state to reflect completion of a work unit.
+ *
+ * @param[in]  node    salvage queue node object pointer
+ * @param[in]  result  worker process result code
+ *
+ * @post scheduler state is updated.
+ *
+ * @internal
+ */
 static void
 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
 {
     afs_int32 partid;
+    int idx;
+
     DeleteFromPendingQueue(node);
     partid = node->partition_id;
     if (partid >=0 && partid <= VOLMAXPARTS) {
@@ -1011,30 +1272,99 @@ SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
     }
     if (result == 0) {
        node->state = SALVSYNC_STATE_DONE;
-    } else {
+    } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
        node->state = SALVSYNC_STATE_ERROR;
     }
+
+    if (node->type == SALVSYNC_VOLGROUP_PARENT) {
+       for (idx = 0; idx < VOLMAXTYPES; idx++) {
+           if (node->volgroup.children[idx]) {
+               node->volgroup.children[idx]->state = node->state;
+           }
+       }
+    }
 }
 
-void 
-SALVSYNC_doneWork(struct SalvageQueueNode * node, int result)
+/**
+ * check whether worker child failed.
+ *
+ * @param[in] status  status bitfield return by wait()
+ *
+ * @return boolean failure code
+ *    @retval 0 child succeeded
+ *    @retval 1 child failed
+ *
+ * @internal
+ */
+static int
+ChildFailed(int status)
 {
-    VOL_LOCK;
-    SALVSYNC_doneWork_r(node, result);
-    VOL_UNLOCK;
+    return (WCOREDUMP(status) || 
+           WIFSIGNALED(status) || 
+           ((WEXITSTATUS(status) != 0) && 
+            (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
 }
 
+
+/**
+ * notify salvsync scheduler of node completion, by child pid.
+ *
+ * @param[in]  pid     pid of worker child
+ * @param[in]  status  worker status bitfield from wait()
+ *
+ * @post scheduler state is updated.
+ *       if status code is a failure, fileserver notification was attempted
+ *
+ * @see SALVSYNC_doneWork_r
+ */
 void
-SALVSYNC_doneWorkByPid(int pid, int result)
+SALVSYNC_doneWorkByPid(int pid, int status)
 {
     struct SalvageQueueNode * node;
+    char partName[16];
+    afs_uint32 volids[VOLMAXTYPES+1];
+    unsigned int idx;
+
+    memset(volids, 0, sizeof(volids));
 
     VOL_LOCK;
     node = LookupPendingCommandByPid(pid);
     if (node != NULL) {
-       SALVSYNC_doneWork_r(node, result);
+       SALVSYNC_doneWork_r(node, status);
+
+       if (ChildFailed(status)) {
+           /* populate volume id list for later processing outside the glock */
+           volids[0] = node->command.sop.volume;
+           strcpy(partName, node->command.sop.partName);
+           if (node->type == SALVSYNC_VOLGROUP_PARENT) {
+               for (idx = 0; idx < VOLMAXTYPES; idx++) {
+                   if (node->volgroup.children[idx]) {
+                       volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
+                   }
+               }
+           }
+       }
     }
     VOL_UNLOCK;
+
+    /*
+     * if necessary, notify fileserver of
+     * failure to salvage volume group
+     * [we cannot guarantee that the child made the
+     *  appropriate notifications (e.g. SIGSEGV)]
+     *  -- tkeiser 11/28/2007
+     */
+    if (ChildFailed(status)) {
+       for (idx = 0; idx <= VOLMAXTYPES; idx++) {
+           if (volids[idx]) {
+               FSYNC_VolOp(volids[idx],
+                           partName,
+                           FSYNC_VOL_FORCE_ERROR,
+                           FSYNC_WHATEVER,
+                           NULL);
+           }
+       }
+    }
 }
 
 #endif /* AFS_DEMAND_ATTACH_FS */