#include <afsconfig.h>
#include <afs/param.h>
+#include <afs/procmgmt.h>
+#include <roken.h>
+
+#include <stddef.h>
-#include <sys/types.h>
-#include <stdio.h>
#ifdef AFS_NT40_ENV
#include <winsock2.h>
-#include <time.h>
-#else
-#include <sys/param.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <sys/time.h>
#endif
-#include <errno.h>
-#include <assert.h>
-#include <signal.h>
-#include <string.h>
-
#include <rx/xdr.h>
#include <afs/afsint.h>
#include "partition.h"
#include "common.h"
#include <rx/rx_queue.h>
-#include <afs/procmgmt.h>
-
-#if !defined(offsetof)
-#include <stddef.h>
-#endif
#ifdef USE_UNIX_SOCKETS
#include <afs/afsutil.h>
* salvsync server socket handle.
*/
static SYNC_server_state_t salvsync_server_state =
- { -1, /* file descriptor */
+ { OSI_NULLSOCKET, /* file descriptor */
SALVSYNC_ENDPOINT_DECL, /* server endpoint */
SALVSYNC_PROTO_VERSION, /* protocol version */
5, /* bind() retry limit */
struct SalvageQueueNode ** parent)
{
struct rx_queue *qp, *nqp;
- struct SalvageQueueNode *vsp;
+ struct SalvageQueueNode *vsp = NULL;
int idx = VSHASH(vid);
for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
/* initialize the queues */
Lock_Init(&SALVSYNC_handler_lock);
- assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
+ CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
for (i = 0; i <= VOLMAXPARTS; i++) {
queue_Init(&salvageQueue.part[i]);
salvageQueue.len[i] = 0;
}
- assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
+ CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
queue_Init(&pendingQueue);
salvageQueue.total_len = pendingQueue.len = 0;
salvageQueue.last_insert = -1;
memset(partition_salvaging, 0, sizeof(partition_salvaging));
for (i = 0; i < VSHASH_SIZE; i++) {
- assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
+ CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
SalvageHashTable[i].len = 0;
queue_Init(&SalvageHashTable[i]);
}
/* start the salvsync thread */
- assert(pthread_attr_init(&tattr) == 0);
- assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
- assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
+ osi_Assert(pthread_attr_init(&tattr) == 0);
+ osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
+ osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
}
static void
/* just in case we were in AcceptOff mode, and thus this fd wouldn't
* have a handler */
close(salvsync_server_state.fd);
- salvsync_server_state.fd = -1;
+ salvsync_server_state.fd = OSI_NULLSOCKET;
}
static fd_set SALVSYNC_readfds;
/* when we fork, the child needs to close the salvsync server sockets,
* otherwise, it may get salvsync requests, instead of the parent
* salvageserver */
- assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
+ osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
SYNC_getAddr(&state->endpoint, &state->addr);
SYNC_cleanupSock(state);
state->fd = SYNC_getSock(&state->endpoint);
code = SYNC_bindSock(state);
- assert(!code);
+ osi_Assert(!code);
InitHandler();
AcceptOn();
for (;;) {
int maxfd;
+ struct timeval s_timeout;
GetHandler(&SALVSYNC_readfds, &maxfd);
+ s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
+ s_timeout.tv_usec = 0;
/* Note: check for >= 1 below is essential since IOMGR_select
* doesn't have exactly same semantics as select.
*/
- if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
+ if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
CallHandler(&SALVSYNC_readfds);
}
junk = sizeof(other);
fd = accept(afd, (struct sockaddr *)&other, &junk);
- if (fd == -1) {
- Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
- assert(1 == 2);
+ if (fd == OSI_NULLSOCKET) {
+ osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
} else if (!AddHandler(fd, SALVSYNC_com)) {
AcceptOff();
- assert(AddHandler(fd, SALVSYNC_com));
+ osi_Assert(AddHandler(fd, SALVSYNC_com));
}
}
SALVSYNC_Drop(osi_socket fd)
{
RemoveHandler(fd);
-#ifdef AFS_NT40_ENV
- closesocket(fd);
-#else
- close(fd);
-#endif
+ rk_closesocket(fd);
AcceptOn();
}
AcceptOn(void)
{
if (AcceptHandler == -1) {
- assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
+ osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
AcceptHandler = FindHandler(salvsync_server_state.fd);
}
}
AcceptOff(void)
{
if (AcceptHandler != -1) {
- assert(RemoveHandler(salvsync_server_state.fd));
+ osi_Assert(RemoveHandler(salvsync_server_state.fd));
AcceptHandler = -1;
}
}
int i;
ObtainWriteLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++) {
- HandlerFD[i] = -1;
+ HandlerFD[i] = OSI_NULLSOCKET;
HandlerProc[i] = NULL;
}
ReleaseWriteLock(&SALVSYNC_handler_lock);
int i;
ObtainWriteLock(&SALVSYNC_handler_lock);
for (i = 0; i < MAXHANDLERS; i++)
- if (HandlerFD[i] == -1)
+ if (HandlerFD[i] == OSI_NULLSOCKET)
break;
if (i >= MAXHANDLERS) {
ReleaseWriteLock(&SALVSYNC_handler_lock);
return i;
}
ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
- assert(1 == 2);
+ osi_Panic("Failed to find handler\n");
return -1; /* satisfy compiler */
}
if (HandlerFD[i] == afd) {
return i;
}
- assert(1 == 2);
+ osi_Panic("Failed to find handler\n");
return -1; /* satisfy compiler */
}
RemoveHandler(osi_socket afd)
{
ObtainWriteLock(&SALVSYNC_handler_lock);
- HandlerFD[FindHandler_r(afd)] = -1;
+ HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
ReleaseWriteLock(&SALVSYNC_handler_lock);
return 1;
}
FD_ZERO(fdsetp);
ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
for (i = 0; i < MAXHANDLERS; i++)
- if (HandlerFD[i] != -1) {
+ if (HandlerFD[i] != OSI_NULLSOCKET) {
FD_SET(HandlerFD[i], fdsetp);
- if (maxfd < HandlerFD[i])
+#ifndef AFS_NT40_ENV
+ /* On Windows the nfds parameter to select() is ignored */
+ if (maxfd < HandlerFD[i] || maxfd == (int)-1)
maxfd = HandlerFD[i];
+#endif
}
*maxfdp = maxfd;
ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
UpdateCommandPrio(node);
}
- assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
+ CV_BROADCAST(&salvageQueue.cv);
return 0;
}
salvageQueue.len[node->partition_id]--;
salvageQueue.total_len--;
node->state = SALVSYNC_STATE_UNKNOWN;
- assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
+ CV_BROADCAST(&salvageQueue.cv);
}
}
queue_Append(&pendingQueue, node);
pendingQueue.len++;
node->state = SALVSYNC_STATE_SALVAGING;
- assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
+ CV_BROADCAST(&pendingQueue.queue_change_cv);
}
static void
queue_Remove(node);
pendingQueue.len--;
node->state = SALVSYNC_STATE_UNKNOWN;
- assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
+ CV_BROADCAST(&pendingQueue.queue_change_cv);
}
}
afs_int32 id;
afs_uint32 prio;
- assert(queue_IsOnQueue(node));
+ osi_Assert(queue_IsOnQueue(node));
prio = node->command.sop.prio;
id = node->partition_id;
}
/* we should never reach this line */
- assert(1==2);
+ osi_Panic("Node not found\n");
have_node:
- assert(node != NULL);
+ osi_Assert(node != NULL);
node->pid = 0;
partition_salvaging[node->partition_id]++;
DeleteFromSalvageQueue(node);