/*
- * Copyright 2006, Sine Nomine Associates and others.
+ * Copyright 2006-2008, Sine Nomine Associates and others.
* All Rights Reserved.
*
* This software has been released under the terms of the IBM Public
#include <afsconfig.h>
#include <afs/param.h>
-RCSID
- ("$Header$");
#include <sys/types.h>
#include <stdio.h>
#include "volume.h"
#include "partition.h"
#include <rx/rx_queue.h>
+#include <afs/procmgmt.h>
#if !defined(offsetof)
#include <stddef.h>
/*@printflike@*/ extern void Log(const char *format, ...);
-#ifdef osi_Assert
-#undef osi_Assert
-#endif
-#define osi_Assert(e) (void)(e)
-
#define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
* move = dump+restore can run on single server */
-#define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
-
-
-
-/* Forward declarations */
-static void * SALVSYNC_syncThread(void *);
-static void SALVSYNC_newconnection(int fd);
-static void SALVSYNC_com(int fd);
-static void SALVSYNC_Drop(int fd);
-static void AcceptOn(void);
-static void AcceptOff(void);
-static void InitHandler(void);
-static void CallHandler(fd_set * fdsetp);
-static int AddHandler(int afd, void (*aproc) (int));
-static int FindHandler(register int afd);
-static int FindHandler_r(register int afd);
-static int RemoveHandler(register int afd);
-static void GetHandler(fd_set * fdsetp, int *maxfdp);
-
/*
* This lock controls access to the handler array.
* SALVSYNC is a feature specific to the demand attach fileserver
*/
+/* Forward declarations */
+static void * SALVSYNC_syncThread(void *);
+static void SALVSYNC_newconnection(osi_socket fd);
+static void SALVSYNC_com(osi_socket fd);
+static void SALVSYNC_Drop(osi_socket fd);
+static void AcceptOn(void);
+static void AcceptOff(void);
+static void InitHandler(void);
+static void CallHandler(fd_set * fdsetp);
+static int AddHandler(osi_socket afd, void (*aproc) (int));
+static int FindHandler(register osi_socket afd);
+static int FindHandler_r(register osi_socket afd);
+static int RemoveHandler(register osi_socket afd);
+static void GetHandler(fd_set * fdsetp, int *maxfdp);
+
+static int AllocNode(struct SalvageQueueNode ** node);
+
static int AddToSalvageQueue(struct SalvageQueueNode * node);
static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
static void AddToPendingQueue(struct SalvageQueueNode * node);
static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
-static void RaiseCommandPrio(struct SalvageQueueNode * node, SALVSYNC_command_hdr * com);
-
-static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName);
-static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry);
+static void UpdateCommandPrio(struct SalvageQueueNode * node);
+static void HandlePrio(struct SalvageQueueNode * clone,
+ struct SalvageQueueNode * parent,
+ afs_uint32 new_prio);
+
+static int LinkNode(struct SalvageQueueNode * parent,
+ struct SalvageQueueNode * clone);
+
+static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
+ struct SalvageQueueNode ** parent);
+static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
+ struct SalvageQueueNode ** parent);
static void AddNodeToHash(struct SalvageQueueNode * node);
static void DeleteNodeFromHash(struct SalvageQueueNode * node);
static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
-static afs_int32 SALVSYNC_com_RaisePrio(SALVSYNC_command * com, SALVSYNC_response * res);
static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
+static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
extern int LogLevel;
extern int VInit;
extern pthread_mutex_t vol_salvsync_mutex;
-static int AcceptSd = -1; /* Socket used by server for accepting connections */
-
-
-/* be careful about rearranging elements in this structure.
- * element placement has been optimized for locality of reference
- * in SALVSYNC_getWork() */
+/**
+ * salvsync server socket handle.
+ */
+static SYNC_server_state_t salvsync_server_state =
+ { -1, /* file descriptor */
+ SALVSYNC_ENDPOINT_DECL, /* server endpoint */
+ SALVSYNC_PROTO_VERSION, /* protocol version */
+ 5, /* bind() retry limit */
+ 100, /* listen() queue depth */
+ "SALVSYNC", /* protocol name string */
+ };
+
+
+/**
+ * queue of all volumes waiting to be salvaged.
+ */
struct SalvageQueue {
volatile int total_len;
- volatile afs_int32 last_insert; /* id of last partition to have a salvage node insert */
+ volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
volatile int len[VOLMAXPARTS+1];
- volatile struct rx_queue part[VOLMAXPARTS+1];
+ volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
pthread_cond_t cv;
};
static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
+/**
+ * queue of all volumes currently being salvaged.
+ */
struct QueueHead {
- volatile struct rx_queue q;
- volatile int len;
+ volatile struct rx_queue q; /**< queue of salvages in progress */
+ volatile int len; /**< length of in-progress queue */
pthread_cond_t queue_change_cv;
};
static struct QueueHead pendingQueue; /* volumes being salvaged */
*/
static int partition_salvaging[VOLMAXPARTS+1];
+static int HandlerFD[MAXHANDLERS];
+static void (*HandlerProc[MAXHANDLERS]) (int);
+
#define VSHASH_SIZE 64
#define VSHASH_MASK (VSHASH_SIZE-1)
#define VSHASH(vid) ((vid)&VSHASH_MASK)
static struct QueueHead SalvageHashTable[VSHASH_SIZE];
static struct SalvageQueueNode *
-LookupNode(afs_uint32 vid, char * partName)
+LookupNode(afs_uint32 vid, char * partName,
+ struct SalvageQueueNode ** parent)
{
struct rx_queue *qp, *nqp;
struct SalvageQueueNode *vsp;
if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
vsp = NULL;
}
+
+ if (parent) {
+ if (vsp) {
+ *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
+ vsp->volgroup.parent : vsp;
+ } else {
+ *parent = NULL;
+ }
+ }
+
return vsp;
}
static struct SalvageQueueNode *
-LookupNodeByCommand(SALVSYNC_command_hdr * qry)
+LookupNodeByCommand(SALVSYNC_command_hdr * qry,
+ struct SalvageQueueNode ** parent)
{
- return LookupNode(qry->volume, qry->partName);
+ return LookupNode(qry->volume, qry->partName, parent);
}
static void
pthread_attr_t tattr;
/* initialize the queues */
+ Lock_Init(&SALVSYNC_handler_lock);
assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
for (i = 0; i <= VOLMAXPARTS; i++) {
queue_Init(&salvageQueue.part[i]);
assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
}
-#ifdef USE_UNIX_SOCKETS
-static int
-getport(struct sockaddr_un *addr)
-{
- int sd;
- char tbuffer[AFSDIR_PATH_MAX];
-
- strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
- "fssync.sock", NULL);
-
- memset(addr, 0, sizeof(*addr));
- addr->sun_family = AF_UNIX;
- strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
- assert((sd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
- return sd;
-}
-#else
-static int
-getport(struct sockaddr_in *addr)
+static void
+CleanFDs(void)
{
- int sd;
-
- memset(addr, 0, sizeof(*addr));
- assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
-#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
- addr->sin_len = sizeof(struct sockaddr_in);
-#endif
- addr->sin_addr.s_addr = htonl(0x7f000001);
- addr->sin_family = AF_INET; /* was localhost->h_addrtype */
- addr->sin_port = htons(2041); /* XXXX htons not _really_ neccessary */
+ int i;
+ for (i = 0; i < MAXHANDLERS; ++i) {
+ if (HandlerFD[i] >= 0) {
+ SALVSYNC_Drop(HandlerFD[i]);
+ }
+ }
- return sd;
+ /* just in case we were in AcceptOff mode, and thus this fd wouldn't
+ * have a handler */
+ close(salvsync_server_state.fd);
+ salvsync_server_state.fd = -1;
}
-#endif
static fd_set SALVSYNC_readfds;
static void *
SALVSYNC_syncThread(void * args)
{
- struct sockaddr_in addr;
int on = 1;
int code;
int numTries;
int tid;
-#ifdef USE_UNIX_SOCKETS
- char tbuffer[AFSDIR_PATH_MAX];
-#endif
+ SYNC_server_state_t * state = &salvsync_server_state;
+
+ /* when we fork, the child needs to close the salvsync server sockets,
+ * otherwise, it may get salvsync requests, instead of the parent
+ * salvageserver */
+ assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
+
+ SYNC_getAddr(&state->endpoint, &state->addr);
+ SYNC_cleanupSock(state);
#ifndef AFS_NT40_ENV
(void)signal(SIGPIPE, SIG_IGN);
#endif
- /* set our 'thread-id' so that the host hold table works */
- MUTEX_ENTER(&rx_stats_mutex); /* protects rxi_pthread_hinum */
- tid = ++rxi_pthread_hinum;
- MUTEX_EXIT(&rx_stats_mutex);
- pthread_setspecific(rx_thread_id_key, (void *)tid);
- Log("Set thread id %d for SALVSYNC_syncThread\n", tid);
-
-#ifdef USE_UNIX_SOCKETS
- strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
- "fssync.sock", NULL);
- /* ignore errors */
- remove(tbuffer);
-#endif /* USE_UNIX_SOCKETS */
-
- AcceptSd = getport(&addr);
- /* Reuseaddr needed because system inexplicably leaves crud lying around */
- code =
- setsockopt(AcceptSd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
- sizeof(on));
- if (code)
- Log("SALVSYNC_sync: setsockopt failed with (%d)\n", errno);
-
- for (numTries = 0; numTries < MAX_BIND_TRIES; numTries++) {
- if ((code =
- bind(AcceptSd, (struct sockaddr *)&addr, sizeof(addr))) == 0)
- break;
- Log("SALVSYNC_sync: bind failed with (%d), will sleep and retry\n",
- errno);
- sleep(5);
- }
+ state->fd = SYNC_getSock(&state->endpoint);
+ code = SYNC_bindSock(state);
assert(!code);
- listen(AcceptSd, 100);
+
InitHandler();
AcceptOn();
/* this function processes commands from an salvsync file descriptor (fd) */
static afs_int32 SALV_cnt = 0;
static void
-SALVSYNC_com(int fd)
+SALVSYNC_com(osi_socket fd)
{
SYNC_command com;
SYNC_response res;
SALVSYNC_command scom;
SALVSYNC_response sres;
SYNC_PROTO_BUF_DECL(buf);
+
+ memset(&com, 0, sizeof(com));
+ memset(&res, 0, sizeof(res));
+ memset(&scom, 0, sizeof(scom));
+ memset(&sres, 0, sizeof(sres));
+ memset(&sres_hdr, 0, sizeof(sres_hdr));
com.payload.buf = (void *)buf;
com.payload.len = SYNC_PROTO_MAX_LEN;
sres.res = &res;
SALV_cnt++;
- if (SYNC_getCom(fd, &com)) {
+ if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
SALVSYNC_Drop(fd);
return;
}
+ if (com.recv_len < sizeof(com.hdr)) {
+ Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
+ res.hdr.response = SYNC_COM_ERROR;
+ res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
+ res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
+ goto respond;
+ }
+
if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
res.hdr.response = SYNC_COM_ERROR;
goto respond;
}
+ if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
+ res.hdr.response = SYNC_OK;
+ res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
+ goto respond;
+ }
+
if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
res.hdr.response = SYNC_COM_ERROR;
goto respond;
}
+ res.hdr.com_seq = com.hdr.com_seq;
+
VOL_LOCK;
switch (com.hdr.command) {
case SALVSYNC_NOP:
break;
case SALVSYNC_SALVAGE:
+ case SALVSYNC_RAISEPRIO:
res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
break;
case SALVSYNC_CANCEL:
/* cancel all queued salvages */
res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
break;
- case SALVSYNC_RAISEPRIO:
- /* raise the priority of a salvage */
- res.hdr.response = SALVSYNC_com_RaisePrio(&scom, &sres);
- break;
case SALVSYNC_QUERY:
/* query whether a volume is done salvaging */
res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
break;
- case SYNC_COM_CHANNEL_CLOSE:
- res.hdr.response = SYNC_OK;
- res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
+ case SALVSYNC_OP_LINK:
+ /* link a clone to its parent in the scheduler */
+ res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
break;
default:
res.hdr.response = SYNC_BAD_COMMAND;
VOL_UNLOCK;
respond:
- SYNC_putRes(fd, &res);
+ SYNC_putRes(&salvsync_server_state, fd, &res);
if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
SALVSYNC_Drop(fd);
}
}
+/**
+ * request that a volume be salvaged.
+ *
+ * @param[in] com inbound command object
+ * @param[out] res outbound response object
+ *
+ * @return operation status
+ * @retval SYNC_OK success
+ * @retval SYNC_DENIED failed to enqueue request
+ * @retval SYNC_FAILED malformed command packet
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ *
+ * @post the volume is enqueued in the to-be-salvaged queue.
+ * if the volume was already in the salvage queue, its
+ * priority (and thus its location in the queue) are
+ * updated.
+ */
static afs_int32
SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
{
afs_int32 code = SYNC_OK;
- struct SalvageQueueNode * node;
+ struct SalvageQueueNode * node, * clone;
+ int hash = 0;
if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
code = SYNC_FAILED;
goto done;
}
- node = LookupNodeByCommand(com->sop);
+ clone = LookupNodeByCommand(com->sop, &node);
- /* schedule a salvage for this volume */
- if (node != NULL) {
- switch (node->state) {
- case SALVSYNC_STATE_ERROR:
- case SALVSYNC_STATE_DONE:
- memcpy(&node->command.com, com->hdr, sizeof(SYNC_command_hdr));
- memcpy(&node->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
- node->command.sop.prio = 0;
- if (AddToSalvageQueue(node)) {
- code = SYNC_DENIED;
- }
- break;
- default:
- break;
- }
- } else {
- node = (struct SalvageQueueNode *) malloc(sizeof(struct SalvageQueueNode));
- if (node == NULL) {
+ if (node == NULL) {
+ if (AllocNode(&node)) {
code = SYNC_DENIED;
+ res->hdr->reason = SYNC_REASON_NOMEM;
goto done;
}
- memset(node, 0, sizeof(struct SalvageQueueNode));
- memcpy(&node->command.com, com->hdr, sizeof(SYNC_command_hdr));
- memcpy(&node->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
- AddNodeToHash(node);
+ clone = node;
+ hash = 1;
+ }
+
+ HandlePrio(clone, node, com->sop->prio);
+
+ switch (node->state) {
+ case SALVSYNC_STATE_QUEUED:
+ UpdateCommandPrio(node);
+ break;
+
+ case SALVSYNC_STATE_ERROR:
+ case SALVSYNC_STATE_DONE:
+ case SALVSYNC_STATE_UNKNOWN:
+ memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
+ memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
+
+ /*
+ * make sure volgroup parent partition path is kept coherent
+ *
+ * If we ever want to support non-COW clones on a machine holding
+ * the RW site, please note that this code does not work under the
+ * conditions where someone zaps a COW clone on partition X, and
+ * subsequently creates a full clone on partition Y -- we'd need
+ * an inverse to SALVSYNC_com_Link.
+ * -- tkeiser 11/28/2007
+ */
+ strcpy(node->command.sop.partName, com->sop->partName);
+
if (AddToSalvageQueue(node)) {
- /* roll back */
- DeleteNodeFromHash(node);
- free(node);
- node = NULL;
code = SYNC_DENIED;
- goto done;
}
+ break;
+
+ default:
+ break;
+ }
+
+ if (hash) {
+ AddNodeToHash(node);
}
res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
return code;
}
+/**
+ * cancel a pending salvage request.
+ *
+ * @param[in] com inbound command object
+ * @param[out] res outbound response object
+ *
+ * @return operation status
+ * @retval SYNC_OK success
+ * @retval SYNC_FAILED malformed command packet
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ */
static afs_int32
SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
{
goto done;
}
- node = LookupNodeByCommand(com->sop);
+ node = LookupNodeByCommand(com->sop, NULL);
if (node == NULL) {
res->sop->state = SALVSYNC_STATE_UNKNOWN;
res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
res->sop->prio = node->command.sop.prio;
res->sop->state = node->state;
- if (node->state == SALVSYNC_STATE_QUEUED) {
+ if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
+ (node->state == SALVSYNC_STATE_QUEUED)) {
DeleteFromSalvageQueue(node);
}
}
return code;
}
+/**
+ * cancel all pending salvage requests.
+ *
+ * @param[in] com incoming command object
+ * @param[out] res outbound response object
+ *
+ * @return operation status
+ * @retval SYNC_OK success
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ */
static afs_int32
SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
{
struct SalvageQueueNode * np, *nnp;
- struct DiskPartition * dp;
+ struct DiskPartition64 * dp;
for (dp = DiskPartitionList ; dp ; dp = dp->next) {
for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
return SYNC_OK;
}
+/**
+ * link a queue node for a clone to its parent volume.
+ *
+ * @param[in] com inbound command object
+ * @param[out] res outbound response object
+ *
+ * @return operation status
+ * @retval SYNC_OK success
+ * @retval SYNC_FAILED malformed command packet
+ * @retval SYNC_DENIED the request could not be completed
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @post the requested volume is marked as a child of another volume.
+ * thus, future salvage requests for this volume will result in the
+ * parent of the volume group being scheduled for salvage instead
+ * of this clone.
+ *
+ * @internal
+ */
static afs_int32
-SALVSYNC_com_RaisePrio(SALVSYNC_command * com, SALVSYNC_response * res)
+SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
{
afs_int32 code = SYNC_OK;
- struct SalvageQueueNode * node;
+ struct SalvageQueueNode * clone, * parent;
if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
code = SYNC_FAILED;
goto done;
}
- node = LookupNodeByCommand(com->sop);
+ /* lookup clone's salvage scheduling node */
+ clone = LookupNodeByCommand(com->sop, NULL);
+ if (clone == NULL) {
+ code = SYNC_DENIED;
+ res->hdr->reason = SALVSYNC_REASON_ERROR;
+ goto done;
+ }
- /* raise the priority of a salvage */
- if (node == NULL) {
- code = SALVSYNC_com_Salvage(com, res);
- node = LookupNodeByCommand(com->sop);
- } else {
- switch (node->state) {
- case SALVSYNC_STATE_QUEUED:
- RaiseCommandPrio(node, com->sop);
- break;
- case SALVSYNC_STATE_SALVAGING:
- break;
- case SALVSYNC_STATE_ERROR:
- case SALVSYNC_STATE_DONE:
- code = SALVSYNC_com_Salvage(com, res);
- break;
- default:
- break;
+ /* lookup parent's salvage scheduling node */
+ parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
+ if (parent == NULL) {
+ if (AllocNode(&parent)) {
+ code = SYNC_DENIED;
+ res->hdr->reason = SYNC_REASON_NOMEM;
+ goto done;
}
+ memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
+ memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
+ parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
+ AddNodeToHash(parent);
}
- if (node == NULL) {
- res->sop->prio = 0;
- res->sop->state = SALVSYNC_STATE_UNKNOWN;
- } else {
- res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
- res->sop->prio = node->command.sop.prio;
- res->sop->state = node->state;
+ if (LinkNode(parent, clone)) {
+ code = SYNC_DENIED;
+ goto done;
}
done:
return code;
}
+/**
+ * query the status of a volume salvage request.
+ *
+ * @param[in] com inbound command object
+ * @param[out] res outbound response object
+ *
+ * @return operation status
+ * @retval SYNC_OK success
+ * @retval SYNC_FAILED malformed command packet
+ *
+ * @note this is a SALVSYNC protocol rpc handler
+ *
+ * @internal
+ */
static afs_int32
SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
{
goto done;
}
- node = LookupNodeByCommand(com->sop);
+ LookupNodeByCommand(com->sop, &node);
/* query whether a volume is done salvaging */
if (node == NULL) {
}
static void
-SALVSYNC_Drop(int fd)
+SALVSYNC_Drop(osi_socket fd)
{
RemoveHandler(fd);
#ifdef AFS_NT40_ENV
AcceptOn(void)
{
if (AcceptHandler == -1) {
- assert(AddHandler(AcceptSd, SALVSYNC_newconnection));
- AcceptHandler = FindHandler(AcceptSd);
+ assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
+ AcceptHandler = FindHandler(salvsync_server_state.fd);
}
}
AcceptOff(void)
{
if (AcceptHandler != -1) {
- assert(RemoveHandler(AcceptSd));
+ assert(RemoveHandler(salvsync_server_state.fd));
AcceptHandler = -1;
}
}
/* The multiple FD handling code. */
-static int HandlerFD[MAXHANDLERS];
-static void (*HandlerProc[MAXHANDLERS]) (int);
-
static void
InitHandler(void)
{
}
static int
-AddHandler(int afd, void (*aproc) (int))
+AddHandler(osi_socket afd, void (*aproc) (int))
{
register int i;
ObtainWriteLock(&SALVSYNC_handler_lock);
}
static int
-FindHandler(register int afd)
+FindHandler(register osi_socket afd)
{
register int i;
ObtainReadLock(&SALVSYNC_handler_lock);
}
static int
-FindHandler_r(register int afd)
+FindHandler_r(register osi_socket afd)
{
register int i;
for (i = 0; i < MAXHANDLERS; i++)
}
static int
-RemoveHandler(register int afd)
+RemoveHandler(register osi_socket afd)
{
ObtainWriteLock(&SALVSYNC_handler_lock);
HandlerFD[FindHandler_r(afd)] = -1;
ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
}
+/**
+ * allocate a salvage queue node.
+ *
+ * @param[out] node_out address in which to store new node pointer
+ *
+ * @return operation status
+ * @retval 0 success
+ * @retval 1 failed to allocate node
+ *
+ * @internal
+ */
+static int
+AllocNode(struct SalvageQueueNode ** node_out)
+{
+ int code = 0;
+ struct SalvageQueueNode * node;
+
+ *node_out = node = (struct SalvageQueueNode *)
+ malloc(sizeof(struct SalvageQueueNode));
+ if (node == NULL) {
+ code = 1;
+ goto done;
+ }
+
+ memset(node, 0, sizeof(struct SalvageQueueNode));
+ node->type = SALVSYNC_VOLGROUP_PARENT;
+ node->state = SALVSYNC_STATE_UNKNOWN;
+
+ done:
+ return code;
+}
+
+/**
+ * link a salvage queue node to its parent.
+ *
+ * @param[in] parent pointer to queue node for parent of volume group
+ * @param[in] clone pointer to queue node for a clone
+ *
+ * @return operation status
+ * @retval 0 success
+ * @retval 1 failure
+ *
+ * @internal
+ */
+static int
+LinkNode(struct SalvageQueueNode * parent,
+ struct SalvageQueueNode * clone)
+{
+ int code = 0;
+ int idx;
+
+ /* check for attaching a clone to a clone */
+ if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
+ code = 1;
+ goto done;
+ }
+
+ /* check for pre-existing registration and openings */
+ for (idx = 0; idx < VOLMAXTYPES; idx++) {
+ if (parent->volgroup.children[idx] == clone) {
+ goto linked;
+ }
+ if (parent->volgroup.children[idx] == NULL) {
+ break;
+ }
+ }
+ if (idx == VOLMAXTYPES) {
+ code = 1;
+ goto done;
+ }
+
+ /* link parent and child */
+ parent->volgroup.children[idx] = clone;
+ clone->type = SALVSYNC_VOLGROUP_CLONE;
+ clone->volgroup.parent = parent;
+
+
+ linked:
+ switch (clone->state) {
+ case SALVSYNC_STATE_QUEUED:
+ DeleteFromSalvageQueue(clone);
+
+ case SALVSYNC_STATE_SALVAGING:
+ switch (parent->state) {
+ case SALVSYNC_STATE_UNKNOWN:
+ case SALVSYNC_STATE_ERROR:
+ case SALVSYNC_STATE_DONE:
+ parent->command.sop.prio = clone->command.sop.prio;
+ AddToSalvageQueue(parent);
+ break;
+
+ case SALVSYNC_STATE_QUEUED:
+ if (clone->command.sop.prio) {
+ parent->command.sop.prio += clone->command.sop.prio;
+ UpdateCommandPrio(parent);
+ }
+ break;
+
+ default:
+ break;
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ done:
+ return code;
+}
+
+static void
+HandlePrio(struct SalvageQueueNode * clone,
+ struct SalvageQueueNode * node,
+ afs_uint32 new_prio)
+{
+ afs_uint32 delta;
+
+ switch (node->state) {
+ case SALVSYNC_STATE_ERROR:
+ case SALVSYNC_STATE_DONE:
+ case SALVSYNC_STATE_UNKNOWN:
+ node->command.sop.prio = 0;
+ break;
+ }
+
+ if (new_prio < clone->command.sop.prio) {
+ /* strange. let's just set our delta to 1 */
+ delta = 1;
+ } else {
+ delta = new_prio - clone->command.sop.prio;
+ }
+
+ if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
+ clone->command.sop.prio = new_prio;
+ }
+
+ node->command.sop.prio += delta;
+}
+
static int
AddToSalvageQueue(struct SalvageQueueNode * node)
{
afs_int32 id;
+ struct SalvageQueueNode * last = NULL;
id = volutil_GetPartitionID(node->command.sop.partName);
if (id < 0 || id > VOLMAXPARTS) {
/* don't enqueue salvage requests for unmounted partitions */
return 1;
}
+ if (queue_IsOnQueue(node)) {
+ return 0;
+ }
+
+ if (queue_IsNotEmpty(&salvageQueue.part[id])) {
+ last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
+ }
queue_Append(&salvageQueue.part[id], node);
salvageQueue.len[id]++;
salvageQueue.total_len++;
salvageQueue.last_insert = id;
node->partition_id = id;
node->state = SALVSYNC_STATE_QUEUED;
+
+ /* reorder, if necessary */
+ if (last && last->command.sop.prio < node->command.sop.prio) {
+ UpdateCommandPrio(node);
+ }
+
assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
return 0;
}
/* raise the priority of a previously scheduled salvage */
static void
-RaiseCommandPrio(struct SalvageQueueNode * node, SALVSYNC_command_hdr * com)
+UpdateCommandPrio(struct SalvageQueueNode * node)
{
struct SalvageQueueNode *np, *nnp;
afs_int32 id;
+ afs_uint32 prio;
assert(queue_IsOnQueue(node));
- node->command.sop.prio = com->prio;
+ prio = node->command.sop.prio;
id = node->partition_id;
- if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < com->prio) {
+ if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
queue_Remove(node);
queue_Prepend(&salvageQueue.part[id], node);
} else {
for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
- if (np->command.sop.prio > com->prio)
+ if (np->command.sop.prio > prio)
break;
}
if (queue_IsEnd(&salvageQueue.part[id], np)) {
SALVSYNC_getWork(void)
{
int i, ret;
- struct DiskPartition * dp = NULL, * fdp;
+ struct DiskPartition64 * dp = NULL, * fdp;
static afs_int32 next_part_sched = 0;
struct SalvageQueueNode *node = NULL, *np;
* if there are no disk partitions, just sit in this wait loop forever
*/
while (!salvageQueue.total_len || !DiskPartitionList) {
- assert(pthread_cond_wait(&salvageQueue.cv, &vol_glock_mutex) == 0);
+ VOL_CV_WAIT(&salvageQueue.cv);
}
-
/*
* short circuit for simple case where only one partition has
* scheduled salvages
return node;
}
+/**
+ * update internal scheduler state to reflect completion of a work unit.
+ *
+ * @param[in] node salvage queue node object pointer
+ * @param[in] result worker process result code
+ *
+ * @post scheduler state is updated.
+ *
+ * @internal
+ */
static void
SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
{
afs_int32 partid;
+ int idx;
+
DeleteFromPendingQueue(node);
partid = node->partition_id;
if (partid >=0 && partid <= VOLMAXPARTS) {
}
if (result == 0) {
node->state = SALVSYNC_STATE_DONE;
- } else {
+ } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
node->state = SALVSYNC_STATE_ERROR;
}
+
+ if (node->type == SALVSYNC_VOLGROUP_PARENT) {
+ for (idx = 0; idx < VOLMAXTYPES; idx++) {
+ if (node->volgroup.children[idx]) {
+ node->volgroup.children[idx]->state = node->state;
+ }
+ }
+ }
}
-void
-SALVSYNC_doneWork(struct SalvageQueueNode * node, int result)
+/**
+ * check whether worker child failed.
+ *
+ * @param[in] status status bitfield return by wait()
+ *
+ * @return boolean failure code
+ * @retval 0 child succeeded
+ * @retval 1 child failed
+ *
+ * @internal
+ */
+static int
+ChildFailed(int status)
{
- VOL_LOCK;
- SALVSYNC_doneWork_r(node, result);
- VOL_UNLOCK;
+ return (WCOREDUMP(status) ||
+ WIFSIGNALED(status) ||
+ ((WEXITSTATUS(status) != 0) &&
+ (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
}
+
+/**
+ * notify salvsync scheduler of node completion, by child pid.
+ *
+ * @param[in] pid pid of worker child
+ * @param[in] status worker status bitfield from wait()
+ *
+ * @post scheduler state is updated.
+ * if status code is a failure, fileserver notification was attempted
+ *
+ * @see SALVSYNC_doneWork_r
+ */
void
-SALVSYNC_doneWorkByPid(int pid, int result)
+SALVSYNC_doneWorkByPid(int pid, int status)
{
struct SalvageQueueNode * node;
+ char partName[16];
+ afs_uint32 volids[VOLMAXTYPES+1];
+ unsigned int idx;
+
+ memset(volids, 0, sizeof(volids));
VOL_LOCK;
node = LookupPendingCommandByPid(pid);
if (node != NULL) {
- SALVSYNC_doneWork_r(node, result);
+ SALVSYNC_doneWork_r(node, status);
+
+ if (ChildFailed(status)) {
+ /* populate volume id list for later processing outside the glock */
+ volids[0] = node->command.sop.volume;
+ strcpy(partName, node->command.sop.partName);
+ if (node->type == SALVSYNC_VOLGROUP_PARENT) {
+ for (idx = 0; idx < VOLMAXTYPES; idx++) {
+ if (node->volgroup.children[idx]) {
+ volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
+ }
+ }
+ }
+ }
}
VOL_UNLOCK;
+
+ /*
+ * if necessary, notify fileserver of
+ * failure to salvage volume group
+ * [we cannot guarantee that the child made the
+ * appropriate notifications (e.g. SIGSEGV)]
+ * -- tkeiser 11/28/2007
+ */
+ if (ChildFailed(status)) {
+ for (idx = 0; idx <= VOLMAXTYPES; idx++) {
+ if (volids[idx]) {
+ FSYNC_VolOp(volids[idx],
+ partName,
+ FSYNC_VOL_FORCE_ERROR,
+ FSYNC_WHATEVER,
+ NULL);
+ }
+ }
+ }
}
#endif /* AFS_DEMAND_ATTACH_FS */