/*
* Copyright 2006-2008, Sine Nomine Associates and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
#include <afsconfig.h>
#include <afs/param.h>
+#include <afs/procmgmt.h>
+#include <roken.h>
+
+#include <stddef.h>
-#include <sys/types.h>
-#include <stdio.h>
#ifdef AFS_NT40_ENV
#include <winsock2.h>
-#include <time.h>
-#else
-#include <sys/param.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <sys/time.h>
#endif
-#include <errno.h>
-#include <assert.h>
-#include <signal.h>
-#include <string.h>
-
#include <rx/xdr.h>
#include <afs/afsint.h>
#include "partition.h"
#include "common.h"
#include <rx/rx_queue.h>
-#include <afs/procmgmt.h>
-
-#if !defined(offsetof)
-#include <stddef.h>
-#endif
#ifdef USE_UNIX_SOCKETS
#include <afs/afsutil.h>
static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
static void UpdateCommandPrio(struct SalvageQueueNode * node);
-static void HandlePrio(struct SalvageQueueNode * clone,
+static void HandlePrio(struct SalvageQueueNode * clone,
struct SalvageQueueNode * parent,
afs_uint32 new_prio);
static int LinkNode(struct SalvageQueueNode * parent,
struct SalvageQueueNode * clone);
-static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
+static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
struct SalvageQueueNode ** parent);
static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
struct SalvageQueueNode ** parent);
/**
* salvsync server socket handle.
*/
-static SYNC_server_state_t salvsync_server_state =
- { -1, /* file descriptor */
+static SYNC_server_state_t salvsync_server_state =
+ { OSI_NULLSOCKET, /* file descriptor */
SALVSYNC_ENDPOINT_DECL, /* server endpoint */
SALVSYNC_PROTO_VERSION, /* protocol version */
5, /* bind() retry limit */
struct SalvageQueueNode ** parent)
{
struct rx_queue *qp, *nqp;
- struct SalvageQueueNode *vsp;
+ struct SalvageQueueNode *vsp = NULL;
int idx = VSHASH(vid);
for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
/* initialize the queues */
Lock_Init(&SALVSYNC_handler_lock);
- assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
+ CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
for (i = 0; i <= VOLMAXPARTS; i++) {
queue_Init(&salvageQueue.part[i]);
salvageQueue.len[i] = 0;
}
- assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
+ CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
queue_Init(&pendingQueue);
salvageQueue.total_len = pendingQueue.len = 0;
salvageQueue.last_insert = -1;
memset(partition_salvaging, 0, sizeof(partition_salvaging));
for (i = 0; i < VSHASH_SIZE; i++) {
- assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
+ CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
SalvageHashTable[i].len = 0;
queue_Init(&SalvageHashTable[i]);
}
/* start the salvsync thread */
- assert(pthread_attr_init(&tattr) == 0);
- assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
- assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
+ osi_Assert(pthread_attr_init(&tattr) == 0);
+ osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
+ osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
}
static void
/* just in case we were in AcceptOff mode, and thus this fd wouldn't
* have a handler */
close(salvsync_server_state.fd);
- salvsync_server_state.fd = -1;
+ salvsync_server_state.fd = OSI_NULLSOCKET;
}
static fd_set SALVSYNC_readfds;
/* when we fork, the child needs to close the salvsync server sockets,
* otherwise, it may get salvsync requests, instead of the parent
* salvageserver */
- assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
+ osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
SYNC_getAddr(&state->endpoint, &state->addr);
SYNC_cleanupSock(state);
state->fd = SYNC_getSock(&state->endpoint);
code = SYNC_bindSock(state);
- assert(!code);
+ osi_Assert(!code);
InitHandler();
AcceptOn();
for (;;) {
int maxfd;
+ struct timeval s_timeout;
GetHandler(&SALVSYNC_readfds, &maxfd);
+ s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
+ s_timeout.tv_usec = 0;
/* Note: check for >= 1 below is essential since IOMGR_select
* doesn't have exactly same semantics as select.
*/
- if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
+ if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
CallHandler(&SALVSYNC_readfds);
}
junk = sizeof(other);
fd = accept(afd, (struct sockaddr *)&other, &junk);
- if (fd == -1) {
- Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
- assert(1 == 2);
+ if (fd == OSI_NULLSOCKET) {
+ osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
} else if (!AddHandler(fd, SALVSYNC_com)) {
AcceptOff();
- assert(AddHandler(fd, SALVSYNC_com));
+ osi_Assert(AddHandler(fd, SALVSYNC_com));
}
}
memset(&scom, 0, sizeof(scom));
memset(&sres, 0, sizeof(sres));
memset(&sres_hdr, 0, sizeof(sres_hdr));
-
+
com.payload.buf = (void *)buf;
com.payload.len = SYNC_PROTO_MAX_LEN;
res.payload.buf = (void *) &sres_hdr;
*
* @internal
*
- * @post the volume is enqueued in the to-be-salvaged queue.
- * if the volume was already in the salvage queue, its
- * priority (and thus its location in the queue) are
+ * @post the volume is enqueued in the to-be-salvaged queue.
+ * if the volume was already in the salvage queue, its
+ * priority (and thus its location in the queue) are
* updated.
*/
static afs_int32
memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
- /*
+ /*
* make sure volgroup parent partition path is kept coherent
*
* If we ever want to support non-COW clones on a machine holding
res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
res->sop->prio = node->command.sop.prio;
res->sop->state = node->state;
- if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
+ if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
(node->state == SALVSYNC_STATE_QUEUED)) {
DeleteFromSalvageQueue(node);
}
SALVSYNC_Drop(osi_socket fd)
{
RemoveHandler(fd);
-#ifdef AFS_NT40_ENV
- closesocket(fd);
-#else
- close(fd);
-#endif
+ rk_closesocket(fd);
AcceptOn();
}
AcceptOn(void)
{
if (AcceptHandler == -1) {
- assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
+ osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
AcceptHandler = FindHandler(salvsync_server_state.fd);
}
}
AcceptOff(void)
{
if (AcceptHandler != -1) {
- assert(RemoveHandler(salvsync_server_state.fd));
+ osi_Assert(RemoveHandler(salvsync_server_state.fd));
AcceptHandler = -1;
}
}
int i;
ObtainWriteLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++) {
- HandlerFD[i] = -1;
+ HandlerFD[i] = OSI_NULLSOCKET;
HandlerProc[i] = NULL;
}
ReleaseWriteLock(&SALVSYNC_handler_lock);
int i;
ObtainWriteLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++)
- if (HandlerFD[i] == -1)
+ if (HandlerFD[i] == OSI_NULLSOCKET)
break;
if (i >= MAXHANDLERS) {
ReleaseWriteLock(&SALVSYNC_handler_lock);
return i;
}
ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
- assert(1 == 2);
+ osi_Panic("Failed to find handler\n");
return -1; /* satisfy compiler */
}
if (HandlerFD[i] == afd) {
return i;
}
- assert(1 == 2);
+ osi_Panic("Failed to find handler\n");
return -1; /* satisfy compiler */
}
RemoveHandler(osi_socket afd)
{
ObtainWriteLock(&SALVSYNC_handler_lock);
- HandlerFD[FindHandler_r(afd)] = -1;
+ HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
ReleaseWriteLock(&SALVSYNC_handler_lock);
return 1;
}
FD_ZERO(fdsetp);
ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
for (i = 0; i < MAXHANDLERS; i++)
- if (HandlerFD[i] != -1) {
+ if (HandlerFD[i] != OSI_NULLSOCKET) {
FD_SET(HandlerFD[i], fdsetp);
- if (maxfd < HandlerFD[i])
+#ifndef AFS_NT40_ENV
+ /* On Windows the nfds parameter to select() is ignored */
+ if (maxfd < HandlerFD[i] || maxfd == (int)-1)
maxfd = HandlerFD[i];
+#endif
}
*maxfdp = maxfd;
ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
int code = 0;
struct SalvageQueueNode * node;
- *node_out = node = (struct SalvageQueueNode *)
+ *node_out = node = (struct SalvageQueueNode *)
malloc(sizeof(struct SalvageQueueNode));
if (node == NULL) {
code = 1;
}
static void
-HandlePrio(struct SalvageQueueNode * clone,
+HandlePrio(struct SalvageQueueNode * clone,
struct SalvageQueueNode * node,
afs_uint32 new_prio)
{
UpdateCommandPrio(node);
}
- assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
+ CV_BROADCAST(&salvageQueue.cv);
return 0;
}
salvageQueue.len[node->partition_id]--;
salvageQueue.total_len--;
node->state = SALVSYNC_STATE_UNKNOWN;
- assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
+ CV_BROADCAST(&salvageQueue.cv);
}
}
queue_Append(&pendingQueue, node);
pendingQueue.len++;
node->state = SALVSYNC_STATE_SALVAGING;
- assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
+ CV_BROADCAST(&pendingQueue.queue_change_cv);
}
static void
queue_Remove(node);
pendingQueue.len--;
node->state = SALVSYNC_STATE_UNKNOWN;
- assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
+ CV_BROADCAST(&pendingQueue.queue_change_cv);
}
}
struct SalvageQueueNode * np, * nnp;
for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
- if ((np->command.sop.volume == qry->volume) &&
+ if ((np->command.sop.volume == qry->volume) &&
!strncmp(np->command.sop.partName, qry->partName,
sizeof(qry->partName)))
break;
afs_int32 id;
afs_uint32 prio;
- assert(queue_IsOnQueue(node));
+ osi_Assert(queue_IsOnQueue(node));
prio = node->command.sop.prio;
id = node->partition_id;
/* this will need to be rearchitected if we ever want more than one thread
* to wait for new salvage nodes */
-struct SalvageQueueNode *
+struct SalvageQueueNode *
SALVSYNC_getWork(void)
{
int i;
VOL_CV_WAIT(&salvageQueue.cv);
}
- /*
+ /*
* short circuit for simple case where only one partition has
* scheduled salvages
*/
}
- /*
+ /*
* ok, more than one partition has scheduled salvages.
- * now search for partitions with scheduled salvages, but no pending salvages.
+ * now search for partitions with scheduled salvages, but no pending salvages.
*/
dp = VGetPartitionById_r(next_part_sched, 0);
if (!dp) {
}
fdp = dp;
- for (i=0 ;
- !i || dp != fdp ;
+ for (i=0 ;
+ !i || dp != fdp ;
dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
*/
dp = fdp;
- for (i=0 ;
- !i || dp != fdp ;
+ for (i=0 ;
+ !i || dp != fdp ;
dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
if (salvageQueue.len[dp->index]) {
node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
}
/* we should never reach this line */
- assert(1==2);
+ osi_Panic("Node not found\n");
have_node:
- assert(node != NULL);
+ osi_Assert(node != NULL);
node->pid = 0;
partition_salvaging[node->partition_id]++;
DeleteFromSalvageQueue(node);
static int
ChildFailed(int status)
{
- return (WCOREDUMP(status) ||
- WIFSIGNALED(status) ||
- ((WEXITSTATUS(status) != 0) &&
+ return (WCOREDUMP(status) ||
+ WIFSIGNALED(status) ||
+ ((WEXITSTATUS(status) != 0) &&
(WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
}