/*
* Copyright 2006-2008, Sine Nomine Associates and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
#include <afsconfig.h>
#include <afs/param.h>
-RCSID
- ("$Header$");
-
-#include <sys/types.h>
-#include <stdio.h>
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
-#include <time.h>
-#else
-#include <sys/param.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <sys/time.h>
-#endif
-#include <errno.h>
-#include <assert.h>
-#include <signal.h>
-#include <string.h>
+#include <afs/procmgmt.h>
+#include <roken.h>
+#include <stddef.h>
-#include <rx/xdr.h>
+#include <afs/opr.h>
+#include <opr/lock.h>
#include <afs/afsint.h>
+#include <rx/rx_queue.h>
+
#include "nfs.h"
#include <afs/errors.h>
#include "salvsync.h"
-#include "lwp.h"
#include "lock.h"
#include <afs/afssyscalls.h>
#include "ihandle.h"
#include "vnode.h"
#include "volume.h"
#include "partition.h"
+#include "common.h"
#include <rx/rx_queue.h>
-#include <afs/procmgmt.h>
-
-#if !defined(offsetof)
-#include <stddef.h>
-#endif
#ifdef USE_UNIX_SOCKETS
#include <afs/afsutil.h>
#include <sys/un.h>
#endif
-
-/*@printflike@*/ extern void Log(const char *format, ...);
+#ifndef WCOREDUMP
+#define WCOREDUMP(x) ((x) & 0200)
+#endif
#define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
* move = dump+restore can run on single server */
/* Forward declarations */
static void * SALVSYNC_syncThread(void *);
-static void SALVSYNC_newconnection(int fd);
-static void SALVSYNC_com(int fd);
-static void SALVSYNC_Drop(int fd);
+static void SALVSYNC_newconnection(osi_socket fd);
+static void SALVSYNC_com(osi_socket fd);
+static void SALVSYNC_Drop(osi_socket fd);
static void AcceptOn(void);
static void AcceptOff(void);
static void InitHandler(void);
static void CallHandler(fd_set * fdsetp);
-static int AddHandler(int afd, void (*aproc) (int));
-static int FindHandler(register int afd);
-static int FindHandler_r(register int afd);
-static int RemoveHandler(register int afd);
+static int AddHandler(osi_socket afd, void (*aproc) (int));
+static int FindHandler(osi_socket afd);
+static int FindHandler_r(osi_socket afd);
+static int RemoveHandler(osi_socket afd);
static void GetHandler(fd_set * fdsetp, int *maxfdp);
static int AllocNode(struct SalvageQueueNode ** node);
static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
static void AddToPendingQueue(struct SalvageQueueNode * node);
static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
-static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
static void UpdateCommandPrio(struct SalvageQueueNode * node);
-static void HandlePrio(struct SalvageQueueNode * clone,
+static void HandlePrio(struct SalvageQueueNode * clone,
struct SalvageQueueNode * parent,
afs_uint32 new_prio);
static int LinkNode(struct SalvageQueueNode * parent,
struct SalvageQueueNode * clone);
-static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
+static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
struct SalvageQueueNode ** parent);
static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
struct SalvageQueueNode ** parent);
static void AddNodeToHash(struct SalvageQueueNode * node);
-static void DeleteNodeFromHash(struct SalvageQueueNode * node);
static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
-extern int LogLevel;
extern int VInit;
extern pthread_mutex_t vol_salvsync_mutex;
/**
* salvsync server socket handle.
*/
-static SYNC_server_state_t salvsync_server_state =
- { -1, /* file descriptor */
+static SYNC_server_state_t salvsync_server_state =
+ { OSI_NULLSOCKET, /* file descriptor */
SALVSYNC_ENDPOINT_DECL, /* server endpoint */
SALVSYNC_PROTO_VERSION, /* protocol version */
5, /* bind() retry limit */
*/
static int partition_salvaging[VOLMAXPARTS+1];
+static int HandlerFD[MAXHANDLERS];
+static void (*HandlerProc[MAXHANDLERS]) (int);
+
#define VSHASH_SIZE 64
#define VSHASH_MASK (VSHASH_SIZE-1)
#define VSHASH(vid) ((vid)&VSHASH_MASK)
static struct QueueHead SalvageHashTable[VSHASH_SIZE];
static struct SalvageQueueNode *
-LookupNode(afs_uint32 vid, char * partName,
+LookupNode(VolumeId vid, char * partName,
struct SalvageQueueNode ** parent)
{
struct rx_queue *qp, *nqp;
- struct SalvageQueueNode *vsp;
+ struct SalvageQueueNode *vsp = NULL;
int idx = VSHASH(vid);
for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
SalvageHashTable[idx].len++;
}
-static void
-DeleteNodeFromHash(struct SalvageQueueNode * node)
-{
- int idx = VSHASH(node->command.sop.volume);
-
- if (queue_IsNotOnQueue(&node->hash_chain)) {
- return;
- }
-
- queue_Remove(&node->hash_chain);
- SalvageHashTable[idx].len--;
-}
-
void
SALVSYNC_salvInit(void)
{
/* initialize the queues */
Lock_Init(&SALVSYNC_handler_lock);
- assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
+ CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
for (i = 0; i <= VOLMAXPARTS; i++) {
queue_Init(&salvageQueue.part[i]);
salvageQueue.len[i] = 0;
}
- assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
+ CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
queue_Init(&pendingQueue);
salvageQueue.total_len = pendingQueue.len = 0;
salvageQueue.last_insert = -1;
memset(partition_salvaging, 0, sizeof(partition_salvaging));
for (i = 0; i < VSHASH_SIZE; i++) {
- assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
+ CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
SalvageHashTable[i].len = 0;
queue_Init(&SalvageHashTable[i]);
}
/* start the salvsync thread */
- assert(pthread_attr_init(&tattr) == 0);
- assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
- assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
+ opr_Verify(pthread_attr_init(&tattr) == 0);
+ opr_Verify(pthread_attr_setdetachstate(&tattr,
+ PTHREAD_CREATE_DETACHED) == 0);
+ opr_Verify(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
}
+static void
+CleanFDs(void)
+{
+ int i;
+ for (i = 0; i < MAXHANDLERS; ++i) {
+ if (HandlerFD[i] >= 0) {
+ SALVSYNC_Drop(HandlerFD[i]);
+ }
+ }
+
+ /* just in case we were in AcceptOff mode, and thus this fd wouldn't
+ * have a handler */
+ close(salvsync_server_state.fd);
+ salvsync_server_state.fd = OSI_NULLSOCKET;
+}
static fd_set SALVSYNC_readfds;
static void *
SALVSYNC_syncThread(void * args)
{
- int on = 1;
int code;
- int numTries;
- int tid;
SYNC_server_state_t * state = &salvsync_server_state;
+ /* when we fork, the child needs to close the salvsync server sockets,
+ * otherwise, it may get salvsync requests, instead of the parent
+ * salvageserver */
+ opr_Verify(pthread_atfork(NULL, NULL, CleanFDs) == 0);
+
SYNC_getAddr(&state->endpoint, &state->addr);
SYNC_cleanupSock(state);
state->fd = SYNC_getSock(&state->endpoint);
code = SYNC_bindSock(state);
- assert(!code);
+ opr_Assert(!code);
InitHandler();
AcceptOn();
for (;;) {
int maxfd;
+ struct timeval s_timeout;
GetHandler(&SALVSYNC_readfds, &maxfd);
+ s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
+ s_timeout.tv_usec = 0;
/* Note: check for >= 1 below is essential since IOMGR_select
* doesn't have exactly same semantics as select.
*/
- if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
+ if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
CallHandler(&SALVSYNC_readfds);
}
- return NULL;
+ AFS_UNREACHED(return(NULL));
}
static void
#else /* USE_UNIX_SOCKETS */
struct sockaddr_in other;
#endif
- int junk, fd;
+ int fd;
+ socklen_t junk;
+
junk = sizeof(other);
fd = accept(afd, (struct sockaddr *)&other, &junk);
- if (fd == -1) {
- Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
- assert(1 == 2);
+ if (fd == OSI_NULLSOCKET) {
+ osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
} else if (!AddHandler(fd, SALVSYNC_com)) {
AcceptOff();
- assert(AddHandler(fd, SALVSYNC_com));
+ opr_Verify(AddHandler(fd, SALVSYNC_com));
}
}
/* this function processes commands from an salvsync file descriptor (fd) */
static afs_int32 SALV_cnt = 0;
static void
-SALVSYNC_com(int fd)
+SALVSYNC_com(osi_socket fd)
{
SYNC_command com;
SYNC_response res;
SALVSYNC_command scom;
SALVSYNC_response sres;
SYNC_PROTO_BUF_DECL(buf);
-
+
+ memset(&com, 0, sizeof(com));
+ memset(&res, 0, sizeof(res));
+ memset(&scom, 0, sizeof(scom));
+ memset(&sres, 0, sizeof(sres));
+ memset(&sres_hdr, 0, sizeof(sres_hdr));
+
com.payload.buf = (void *)buf;
com.payload.len = SYNC_PROTO_MAX_LEN;
res.payload.buf = (void *) &sres_hdr;
if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
res.hdr.response = SYNC_OK;
res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
- goto respond;
+
+ /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
+ * never wait for a response. */
+ goto done;
}
if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
respond:
SYNC_putRes(&salvsync_server_state, fd, &res);
+
+ done:
if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
SALVSYNC_Drop(fd);
}
*
* @internal
*
- * @post the volume is enqueued in the to-be-salvaged queue.
- * if the volume was already in the salvage queue, its
- * priority (and thus its location in the queue) are
+ * @post the volume is enqueued in the to-be-salvaged queue.
+ * if the volume was already in the salvage queue, its
+ * priority (and thus its location in the queue) are
* updated.
*/
static afs_int32
memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
- /*
+ /*
* make sure volgroup parent partition path is kept coherent
*
* If we ever want to support non-COW clones on a machine holding
res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
res->sop->prio = node->command.sop.prio;
res->sop->state = node->state;
- if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
+ if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
(node->state == SALVSYNC_STATE_QUEUED)) {
DeleteFromSalvageQueue(node);
}
}
static void
-SALVSYNC_Drop(int fd)
+SALVSYNC_Drop(osi_socket fd)
{
RemoveHandler(fd);
-#ifdef AFS_NT40_ENV
- closesocket(fd);
-#else
- close(fd);
-#endif
+ rk_closesocket(fd);
AcceptOn();
}
AcceptOn(void)
{
if (AcceptHandler == -1) {
- assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
+ opr_Verify(AddHandler(salvsync_server_state.fd,
+ SALVSYNC_newconnection));
AcceptHandler = FindHandler(salvsync_server_state.fd);
}
}
AcceptOff(void)
{
if (AcceptHandler != -1) {
- assert(RemoveHandler(salvsync_server_state.fd));
+ opr_Verify(RemoveHandler(salvsync_server_state.fd));
AcceptHandler = -1;
}
}
/* The multiple FD handling code. */
-static int HandlerFD[MAXHANDLERS];
-static void (*HandlerProc[MAXHANDLERS]) (int);
-
static void
InitHandler(void)
{
- register int i;
+ int i;
ObtainWriteLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++) {
- HandlerFD[i] = -1;
+ HandlerFD[i] = OSI_NULLSOCKET;
HandlerProc[i] = NULL;
}
ReleaseWriteLock(&SALVSYNC_handler_lock);
static void
CallHandler(fd_set * fdsetp)
{
- register int i;
+ int i;
ObtainReadLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++) {
if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
}
static int
-AddHandler(int afd, void (*aproc) (int))
+AddHandler(osi_socket afd, void (*aproc) (int))
{
- register int i;
+ int i;
ObtainWriteLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++)
- if (HandlerFD[i] == -1)
+ if (HandlerFD[i] == OSI_NULLSOCKET)
break;
if (i >= MAXHANDLERS) {
ReleaseWriteLock(&SALVSYNC_handler_lock);
}
static int
-FindHandler(register int afd)
+FindHandler(osi_socket afd)
{
- register int i;
+ int i;
ObtainReadLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++)
if (HandlerFD[i] == afd) {
return i;
}
ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
- assert(1 == 2);
- return -1; /* satisfy compiler */
+ osi_Panic("Failed to find handler\n");
+ AFS_UNREACHED(return -1);
}
static int
-FindHandler_r(register int afd)
+FindHandler_r(osi_socket afd)
{
- register int i;
+ int i;
for (i = 0; i < MAXHANDLERS; i++)
if (HandlerFD[i] == afd) {
return i;
}
- assert(1 == 2);
- return -1; /* satisfy compiler */
+ osi_Panic("Failed to find handler\n");
+ AFS_UNREACHED(return -1);
}
static int
-RemoveHandler(register int afd)
+RemoveHandler(osi_socket afd)
{
ObtainWriteLock(&SALVSYNC_handler_lock);
- HandlerFD[FindHandler_r(afd)] = -1;
+ HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
ReleaseWriteLock(&SALVSYNC_handler_lock);
return 1;
}
static void
GetHandler(fd_set * fdsetp, int *maxfdp)
{
- register int i;
- register int maxfd = -1;
+ int i;
+ int maxfd = -1;
FD_ZERO(fdsetp);
ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
for (i = 0; i < MAXHANDLERS; i++)
- if (HandlerFD[i] != -1) {
+ if (HandlerFD[i] != OSI_NULLSOCKET) {
FD_SET(HandlerFD[i], fdsetp);
- if (maxfd < HandlerFD[i])
+#ifndef AFS_NT40_ENV
+ /* On Windows the nfds parameter to select() is ignored */
+ if (maxfd < HandlerFD[i] || maxfd == (int)-1)
maxfd = HandlerFD[i];
+#endif
}
*maxfdp = maxfd;
ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
int code = 0;
struct SalvageQueueNode * node;
- *node_out = node = (struct SalvageQueueNode *)
- malloc(sizeof(struct SalvageQueueNode));
+ *node_out = node = calloc(1, sizeof(struct SalvageQueueNode));
if (node == NULL) {
code = 1;
goto done;
}
- memset(node, 0, sizeof(struct SalvageQueueNode));
node->type = SALVSYNC_VOLGROUP_PARENT;
node->state = SALVSYNC_STATE_UNKNOWN;
switch (clone->state) {
case SALVSYNC_STATE_QUEUED:
DeleteFromSalvageQueue(clone);
-
+ AFS_FALLTHROUGH;
case SALVSYNC_STATE_SALVAGING:
switch (parent->state) {
case SALVSYNC_STATE_UNKNOWN:
}
static void
-HandlePrio(struct SalvageQueueNode * clone,
+HandlePrio(struct SalvageQueueNode * clone,
struct SalvageQueueNode * node,
afs_uint32 new_prio)
{
case SALVSYNC_STATE_UNKNOWN:
node->command.sop.prio = 0;
break;
+ default:
+ break;
}
if (new_prio < clone->command.sop.prio) {
UpdateCommandPrio(node);
}
- assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
+ CV_BROADCAST(&salvageQueue.cv);
return 0;
}
salvageQueue.len[node->partition_id]--;
salvageQueue.total_len--;
node->state = SALVSYNC_STATE_UNKNOWN;
- assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
+ CV_BROADCAST(&salvageQueue.cv);
}
}
queue_Append(&pendingQueue, node);
pendingQueue.len++;
node->state = SALVSYNC_STATE_SALVAGING;
- assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
+ CV_BROADCAST(&pendingQueue.queue_change_cv);
}
static void
queue_Remove(node);
pendingQueue.len--;
node->state = SALVSYNC_STATE_UNKNOWN;
- assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
+ CV_BROADCAST(&pendingQueue.queue_change_cv);
}
}
static struct SalvageQueueNode *
-LookupPendingCommand(SALVSYNC_command_hdr * qry)
-{
- struct SalvageQueueNode * np, * nnp;
-
- for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
- if ((np->command.sop.volume == qry->volume) &&
- !strncmp(np->command.sop.partName, qry->partName,
- sizeof(qry->partName)))
- break;
- }
-
- if (queue_IsEnd(&pendingQueue, np))
- np = NULL;
- return np;
-}
-
-static struct SalvageQueueNode *
LookupPendingCommandByPid(int pid)
{
struct SalvageQueueNode * np, * nnp;
afs_int32 id;
afs_uint32 prio;
- assert(queue_IsOnQueue(node));
+ opr_Assert(queue_IsOnQueue(node));
prio = node->command.sop.prio;
id = node->partition_id;
/* this will need to be rearchitected if we ever want more than one thread
* to wait for new salvage nodes */
-struct SalvageQueueNode *
+struct SalvageQueueNode *
SALVSYNC_getWork(void)
{
- int i, ret;
+ int i;
struct DiskPartition64 * dp = NULL, * fdp;
static afs_int32 next_part_sched = 0;
- struct SalvageQueueNode *node = NULL, *np;
+ struct SalvageQueueNode *node = NULL;
VOL_LOCK;
VOL_CV_WAIT(&salvageQueue.cv);
}
- /*
+ /*
* short circuit for simple case where only one partition has
* scheduled salvages
*/
}
- /*
+ /*
* ok, more than one partition has scheduled salvages.
- * now search for partitions with scheduled salvages, but no pending salvages.
+ * now search for partitions with scheduled salvages, but no pending salvages.
*/
dp = VGetPartitionById_r(next_part_sched, 0);
if (!dp) {
}
fdp = dp;
- for (i=0 ;
- !i || dp != fdp ;
+ for (i=0 ;
+ !i || dp != fdp ;
dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
*/
dp = fdp;
- for (i=0 ;
- !i || dp != fdp ;
+ for (i=0 ;
+ !i || dp != fdp ;
dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
if (salvageQueue.len[dp->index]) {
node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
}
/* we should never reach this line */
- assert(1==2);
+ osi_Panic("Node not found\n");
have_node:
- assert(node != NULL);
+ opr_Assert(node != NULL);
node->pid = 0;
partition_salvaging[node->partition_id]++;
DeleteFromSalvageQueue(node);
}
}
- bail:
VOL_UNLOCK;
return node;
}
static int
ChildFailed(int status)
{
- return (WCOREDUMP(status) ||
- WIFSIGNALED(status) ||
- ((WEXITSTATUS(status) != 0) &&
+ return (WCOREDUMP(status) ||
+ WIFSIGNALED(status) ||
+ ((WEXITSTATUS(status) != 0) &&
(WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
}
{
struct SalvageQueueNode * node;
char partName[16];
- afs_uint32 volids[VOLMAXTYPES+1];
+ VolumeId volids[VOLMAXTYPES+1];
unsigned int idx;
memset(volids, 0, sizeof(volids));