2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
29 #include <afs/procmgmt.h>
36 #include <afs/afsint.h>
37 #include <rx/rx_queue.h>
40 #include <afs/errors.h>
43 #include <afs/afssyscalls.h>
47 #include "partition.h"
49 #include <rx/rx_queue.h>
51 #ifdef USE_UNIX_SOCKETS
52 #include <afs/afsutil.h>
57 #define WCOREDUMP(x) ((x) & 0200)
60 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
61 * move = dump+restore can run on single server */
65 * This lock controls access to the handler array.
67 struct Lock SALVSYNC_handler_lock;
70 #ifdef AFS_DEMAND_ATTACH_FS
72 * SALVSYNC is a feature specific to the demand attach fileserver
75 /* Forward declarations */
76 static void * SALVSYNC_syncThread(void *);
77 static void SALVSYNC_newconnection(osi_socket fd);
78 static void SALVSYNC_com(osi_socket fd);
79 static void SALVSYNC_Drop(osi_socket fd);
80 static void AcceptOn(void);
81 static void AcceptOff(void);
82 static void InitHandler(void);
83 static void CallHandler(fd_set * fdsetp);
84 static int AddHandler(osi_socket afd, void (*aproc) (int));
85 static int FindHandler(osi_socket afd);
86 static int FindHandler_r(osi_socket afd);
87 static int RemoveHandler(osi_socket afd);
88 static void GetHandler(fd_set * fdsetp, int *maxfdp);
90 static int AllocNode(struct SalvageQueueNode ** node);
92 static int AddToSalvageQueue(struct SalvageQueueNode * node);
93 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
94 static void AddToPendingQueue(struct SalvageQueueNode * node);
95 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
96 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
97 static void UpdateCommandPrio(struct SalvageQueueNode * node);
98 static void HandlePrio(struct SalvageQueueNode * clone,
99 struct SalvageQueueNode * parent,
100 afs_uint32 new_prio);
102 static int LinkNode(struct SalvageQueueNode * parent,
103 struct SalvageQueueNode * clone);
105 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
106 struct SalvageQueueNode ** parent);
107 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
108 struct SalvageQueueNode ** parent);
109 static void AddNodeToHash(struct SalvageQueueNode * node);
111 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
112 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
113 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
114 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
115 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
119 extern pthread_mutex_t vol_salvsync_mutex;
122 * salvsync server socket handle.
124 static SYNC_server_state_t salvsync_server_state =
125 { OSI_NULLSOCKET, /* file descriptor */
126 SALVSYNC_ENDPOINT_DECL, /* server endpoint */
127 SALVSYNC_PROTO_VERSION, /* protocol version */
128 5, /* bind() retry limit */
129 100, /* listen() queue depth */
130 "SALVSYNC", /* protocol name string */
135 * queue of all volumes waiting to be salvaged.
137 struct SalvageQueue {
138 volatile int total_len;
139 volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
140 volatile int len[VOLMAXPARTS+1];
141 volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
144 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
147 * queue of all volumes currently being salvaged.
150 volatile struct rx_queue q; /**< queue of salvages in progress */
151 volatile int len; /**< length of in-progress queue */
152 pthread_cond_t queue_change_cv;
154 static struct QueueHead pendingQueue; /* volumes being salvaged */
157 * whether a partition has a salvage in progress
159 * the salvager code only permits one salvage per partition at a time
161 * the following hack tries to keep salvaged parallelism high by
162 * only permitting one salvage dispatch per partition at a time
164 * unfortunately, the parallel salvager currently
165 * has a rather braindead routine that won't permit
166 * multiple salvages on the same "device". this
167 * function happens to break pretty badly on lvm, raid luns, etc.
169 * this hack isn't good enough to stop the device limiting code from
170 * crippling performance. someday that code needs to be rewritten
172 static int partition_salvaging[VOLMAXPARTS+1];
174 static int HandlerFD[MAXHANDLERS];
175 static void (*HandlerProc[MAXHANDLERS]) (int);
177 #define VSHASH_SIZE 64
178 #define VSHASH_MASK (VSHASH_SIZE-1)
179 #define VSHASH(vid) ((vid)&VSHASH_MASK)
181 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
183 static struct SalvageQueueNode *
184 LookupNode(VolumeId vid, char * partName,
185 struct SalvageQueueNode ** parent)
187 struct rx_queue *qp, *nqp;
188 struct SalvageQueueNode *vsp = NULL;
189 int idx = VSHASH(vid);
191 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
192 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
193 if ((vsp->command.sop.volume == vid) &&
194 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
199 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
205 *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
206 vsp->volgroup.parent : vsp;
215 static struct SalvageQueueNode *
216 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
217 struct SalvageQueueNode ** parent)
219 return LookupNode(qry->volume, qry->partName, parent);
223 AddNodeToHash(struct SalvageQueueNode * node)
225 int idx = VSHASH(node->command.sop.volume);
227 if (queue_IsOnQueue(&node->hash_chain)) {
231 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
232 SalvageHashTable[idx].len++;
236 SALVSYNC_salvInit(void)
240 pthread_attr_t tattr;
242 /* initialize the queues */
243 Lock_Init(&SALVSYNC_handler_lock);
244 CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
245 for (i = 0; i <= VOLMAXPARTS; i++) {
246 queue_Init(&salvageQueue.part[i]);
247 salvageQueue.len[i] = 0;
249 CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
250 queue_Init(&pendingQueue);
251 salvageQueue.total_len = pendingQueue.len = 0;
252 salvageQueue.last_insert = -1;
253 memset(partition_salvaging, 0, sizeof(partition_salvaging));
255 for (i = 0; i < VSHASH_SIZE; i++) {
256 CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
257 SalvageHashTable[i].len = 0;
258 queue_Init(&SalvageHashTable[i]);
261 /* start the salvsync thread */
262 opr_Verify(pthread_attr_init(&tattr) == 0);
263 opr_Verify(pthread_attr_setdetachstate(&tattr,
264 PTHREAD_CREATE_DETACHED) == 0);
265 opr_Verify(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
272 for (i = 0; i < MAXHANDLERS; ++i) {
273 if (HandlerFD[i] >= 0) {
274 SALVSYNC_Drop(HandlerFD[i]);
278 /* just in case we were in AcceptOff mode, and thus this fd wouldn't
280 close(salvsync_server_state.fd);
281 salvsync_server_state.fd = OSI_NULLSOCKET;
284 static fd_set SALVSYNC_readfds;
287 SALVSYNC_syncThread(void * args)
290 SYNC_server_state_t * state = &salvsync_server_state;
292 /* when we fork, the child needs to close the salvsync server sockets,
293 * otherwise, it may get salvsync requests, instead of the parent
295 opr_Verify(pthread_atfork(NULL, NULL, CleanFDs) == 0);
297 SYNC_getAddr(&state->endpoint, &state->addr);
298 SYNC_cleanupSock(state);
301 (void)signal(SIGPIPE, SIG_IGN);
304 state->fd = SYNC_getSock(&state->endpoint);
305 code = SYNC_bindSock(state);
313 struct timeval s_timeout;
314 GetHandler(&SALVSYNC_readfds, &maxfd);
315 s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
316 s_timeout.tv_usec = 0;
317 /* Note: check for >= 1 below is essential since IOMGR_select
318 * doesn't have exactly same semantics as select.
320 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
321 CallHandler(&SALVSYNC_readfds);
324 AFS_UNREACHED(return(NULL));
328 SALVSYNC_newconnection(int afd)
330 #ifdef USE_UNIX_SOCKETS
331 struct sockaddr_un other;
332 #else /* USE_UNIX_SOCKETS */
333 struct sockaddr_in other;
338 junk = sizeof(other);
339 fd = accept(afd, (struct sockaddr *)&other, &junk);
340 if (fd == OSI_NULLSOCKET) {
341 osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
342 } else if (!AddHandler(fd, SALVSYNC_com)) {
344 opr_Verify(AddHandler(fd, SALVSYNC_com));
348 /* this function processes commands from an salvsync file descriptor (fd) */
349 static afs_int32 SALV_cnt = 0;
351 SALVSYNC_com(osi_socket fd)
355 SALVSYNC_response_hdr sres_hdr;
356 SALVSYNC_command scom;
357 SALVSYNC_response sres;
358 SYNC_PROTO_BUF_DECL(buf);
360 memset(&com, 0, sizeof(com));
361 memset(&res, 0, sizeof(res));
362 memset(&scom, 0, sizeof(scom));
363 memset(&sres, 0, sizeof(sres));
364 memset(&sres_hdr, 0, sizeof(sres_hdr));
366 com.payload.buf = (void *)buf;
367 com.payload.len = SYNC_PROTO_MAX_LEN;
368 res.payload.buf = (void *) &sres_hdr;
369 res.payload.len = sizeof(sres_hdr);
370 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
371 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
374 scom.sop = (SALVSYNC_command_hdr *) buf;
377 sres.sop = &sres_hdr;
381 if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
382 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
387 if (com.recv_len < sizeof(com.hdr)) {
388 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
389 res.hdr.response = SYNC_COM_ERROR;
390 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
391 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
395 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
396 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
397 res.hdr.response = SYNC_COM_ERROR;
398 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
402 if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
403 res.hdr.response = SYNC_OK;
404 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
406 /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
407 * never wait for a response. */
411 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
412 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
413 res.hdr.response = SYNC_COM_ERROR;
414 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
415 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
419 res.hdr.com_seq = com.hdr.com_seq;
422 switch (com.hdr.command) {
425 case SALVSYNC_SALVAGE:
426 case SALVSYNC_RAISEPRIO:
427 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
429 case SALVSYNC_CANCEL:
430 /* cancel a salvage */
431 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
433 case SALVSYNC_CANCELALL:
434 /* cancel all queued salvages */
435 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
438 /* query whether a volume is done salvaging */
439 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
441 case SALVSYNC_OP_LINK:
442 /* link a clone to its parent in the scheduler */
443 res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
446 res.hdr.response = SYNC_BAD_COMMAND;
450 sres_hdr.sq_len = salvageQueue.total_len;
451 sres_hdr.pq_len = pendingQueue.len;
455 SYNC_putRes(&salvsync_server_state, fd, &res);
458 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
464 * request that a volume be salvaged.
466 * @param[in] com inbound command object
467 * @param[out] res outbound response object
469 * @return operation status
470 * @retval SYNC_OK success
471 * @retval SYNC_DENIED failed to enqueue request
472 * @retval SYNC_FAILED malformed command packet
474 * @note this is a SALVSYNC protocol rpc handler
478 * @post the volume is enqueued in the to-be-salvaged queue.
479 * if the volume was already in the salvage queue, its
480 * priority (and thus its location in the queue) are
484 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
486 afs_int32 code = SYNC_OK;
487 struct SalvageQueueNode * node, * clone;
490 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
492 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
496 clone = LookupNodeByCommand(com->sop, &node);
499 if (AllocNode(&node)) {
501 res->hdr->reason = SYNC_REASON_NOMEM;
508 HandlePrio(clone, node, com->sop->prio);
510 switch (node->state) {
511 case SALVSYNC_STATE_QUEUED:
512 UpdateCommandPrio(node);
515 case SALVSYNC_STATE_ERROR:
516 case SALVSYNC_STATE_DONE:
517 case SALVSYNC_STATE_UNKNOWN:
518 memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
519 memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
522 * make sure volgroup parent partition path is kept coherent
524 * If we ever want to support non-COW clones on a machine holding
525 * the RW site, please note that this code does not work under the
526 * conditions where someone zaps a COW clone on partition X, and
527 * subsequently creates a full clone on partition Y -- we'd need
528 * an inverse to SALVSYNC_com_Link.
529 * -- tkeiser 11/28/2007
531 strcpy(node->command.sop.partName, com->sop->partName);
533 if (AddToSalvageQueue(node)) {
546 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
547 res->sop->state = node->state;
548 res->sop->prio = node->command.sop.prio;
555 * cancel a pending salvage request.
557 * @param[in] com inbound command object
558 * @param[out] res outbound response object
560 * @return operation status
561 * @retval SYNC_OK success
562 * @retval SYNC_FAILED malformed command packet
564 * @note this is a SALVSYNC protocol rpc handler
569 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
571 afs_int32 code = SYNC_OK;
572 struct SalvageQueueNode * node;
574 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
576 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
580 node = LookupNodeByCommand(com->sop, NULL);
583 res->sop->state = SALVSYNC_STATE_UNKNOWN;
586 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
587 res->sop->prio = node->command.sop.prio;
588 res->sop->state = node->state;
589 if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
590 (node->state == SALVSYNC_STATE_QUEUED)) {
591 DeleteFromSalvageQueue(node);
600 * cancel all pending salvage requests.
602 * @param[in] com incoming command object
603 * @param[out] res outbound response object
605 * @return operation status
606 * @retval SYNC_OK success
608 * @note this is a SALVSYNC protocol rpc handler
613 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
615 struct SalvageQueueNode * np, *nnp;
616 struct DiskPartition64 * dp;
618 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
619 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
620 DeleteFromSalvageQueue(np);
628 * link a queue node for a clone to its parent volume.
630 * @param[in] com inbound command object
631 * @param[out] res outbound response object
633 * @return operation status
634 * @retval SYNC_OK success
635 * @retval SYNC_FAILED malformed command packet
636 * @retval SYNC_DENIED the request could not be completed
638 * @note this is a SALVSYNC protocol rpc handler
640 * @post the requested volume is marked as a child of another volume.
641 * thus, future salvage requests for this volume will result in the
642 * parent of the volume group being scheduled for salvage instead
648 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
650 afs_int32 code = SYNC_OK;
651 struct SalvageQueueNode * clone, * parent;
653 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
655 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
659 /* lookup clone's salvage scheduling node */
660 clone = LookupNodeByCommand(com->sop, NULL);
663 res->hdr->reason = SALVSYNC_REASON_ERROR;
667 /* lookup parent's salvage scheduling node */
668 parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
669 if (parent == NULL) {
670 if (AllocNode(&parent)) {
672 res->hdr->reason = SYNC_REASON_NOMEM;
675 memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
676 memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
677 parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
678 AddNodeToHash(parent);
681 if (LinkNode(parent, clone)) {
691 * query the status of a volume salvage request.
693 * @param[in] com inbound command object
694 * @param[out] res outbound response object
696 * @return operation status
697 * @retval SYNC_OK success
698 * @retval SYNC_FAILED malformed command packet
700 * @note this is a SALVSYNC protocol rpc handler
705 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
707 afs_int32 code = SYNC_OK;
708 struct SalvageQueueNode * node;
710 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
712 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
716 LookupNodeByCommand(com->sop, &node);
718 /* query whether a volume is done salvaging */
720 res->sop->state = SALVSYNC_STATE_UNKNOWN;
723 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
724 res->sop->state = node->state;
725 res->sop->prio = node->command.sop.prio;
733 SALVSYNC_Drop(osi_socket fd)
740 static int AcceptHandler = -1; /* handler id for accept, if turned on */
745 if (AcceptHandler == -1) {
746 opr_Verify(AddHandler(salvsync_server_state.fd,
747 SALVSYNC_newconnection));
748 AcceptHandler = FindHandler(salvsync_server_state.fd);
755 if (AcceptHandler != -1) {
756 opr_Verify(RemoveHandler(salvsync_server_state.fd));
761 /* The multiple FD handling code. */
767 ObtainWriteLock(&SALVSYNC_handler_lock);
768 for (i = 0; i < MAXHANDLERS; i++) {
769 HandlerFD[i] = OSI_NULLSOCKET;
770 HandlerProc[i] = NULL;
772 ReleaseWriteLock(&SALVSYNC_handler_lock);
776 CallHandler(fd_set * fdsetp)
779 ObtainReadLock(&SALVSYNC_handler_lock);
780 for (i = 0; i < MAXHANDLERS; i++) {
781 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
782 ReleaseReadLock(&SALVSYNC_handler_lock);
783 (*HandlerProc[i]) (HandlerFD[i]);
784 ObtainReadLock(&SALVSYNC_handler_lock);
787 ReleaseReadLock(&SALVSYNC_handler_lock);
791 AddHandler(osi_socket afd, void (*aproc) (int))
794 ObtainWriteLock(&SALVSYNC_handler_lock);
795 for (i = 0; i < MAXHANDLERS; i++)
796 if (HandlerFD[i] == OSI_NULLSOCKET)
798 if (i >= MAXHANDLERS) {
799 ReleaseWriteLock(&SALVSYNC_handler_lock);
803 HandlerProc[i] = aproc;
804 ReleaseWriteLock(&SALVSYNC_handler_lock);
809 FindHandler(osi_socket afd)
812 ObtainReadLock(&SALVSYNC_handler_lock);
813 for (i = 0; i < MAXHANDLERS; i++)
814 if (HandlerFD[i] == afd) {
815 ReleaseReadLock(&SALVSYNC_handler_lock);
818 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
819 osi_Panic("Failed to find handler\n");
820 AFS_UNREACHED(return -1);
824 FindHandler_r(osi_socket afd)
827 for (i = 0; i < MAXHANDLERS; i++)
828 if (HandlerFD[i] == afd) {
831 osi_Panic("Failed to find handler\n");
832 AFS_UNREACHED(return -1);
836 RemoveHandler(osi_socket afd)
838 ObtainWriteLock(&SALVSYNC_handler_lock);
839 HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
840 ReleaseWriteLock(&SALVSYNC_handler_lock);
845 GetHandler(fd_set * fdsetp, int *maxfdp)
850 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
851 for (i = 0; i < MAXHANDLERS; i++)
852 if (HandlerFD[i] != OSI_NULLSOCKET) {
853 FD_SET(HandlerFD[i], fdsetp);
855 /* On Windows the nfds parameter to select() is ignored */
856 if (maxfd < HandlerFD[i] || maxfd == (int)-1)
857 maxfd = HandlerFD[i];
861 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
865 * allocate a salvage queue node.
867 * @param[out] node_out address in which to store new node pointer
869 * @return operation status
871 * @retval 1 failed to allocate node
876 AllocNode(struct SalvageQueueNode ** node_out)
879 struct SalvageQueueNode * node;
881 *node_out = node = calloc(1, sizeof(struct SalvageQueueNode));
887 node->type = SALVSYNC_VOLGROUP_PARENT;
888 node->state = SALVSYNC_STATE_UNKNOWN;
895 * link a salvage queue node to its parent.
897 * @param[in] parent pointer to queue node for parent of volume group
898 * @param[in] clone pointer to queue node for a clone
900 * @return operation status
907 LinkNode(struct SalvageQueueNode * parent,
908 struct SalvageQueueNode * clone)
913 /* check for attaching a clone to a clone */
914 if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
919 /* check for pre-existing registration and openings */
920 for (idx = 0; idx < VOLMAXTYPES; idx++) {
921 if (parent->volgroup.children[idx] == clone) {
924 if (parent->volgroup.children[idx] == NULL) {
928 if (idx == VOLMAXTYPES) {
933 /* link parent and child */
934 parent->volgroup.children[idx] = clone;
935 clone->type = SALVSYNC_VOLGROUP_CLONE;
936 clone->volgroup.parent = parent;
940 switch (clone->state) {
941 case SALVSYNC_STATE_QUEUED:
942 DeleteFromSalvageQueue(clone);
944 case SALVSYNC_STATE_SALVAGING:
945 switch (parent->state) {
946 case SALVSYNC_STATE_UNKNOWN:
947 case SALVSYNC_STATE_ERROR:
948 case SALVSYNC_STATE_DONE:
949 parent->command.sop.prio = clone->command.sop.prio;
950 AddToSalvageQueue(parent);
953 case SALVSYNC_STATE_QUEUED:
954 if (clone->command.sop.prio) {
955 parent->command.sop.prio += clone->command.sop.prio;
956 UpdateCommandPrio(parent);
974 HandlePrio(struct SalvageQueueNode * clone,
975 struct SalvageQueueNode * node,
980 switch (node->state) {
981 case SALVSYNC_STATE_ERROR:
982 case SALVSYNC_STATE_DONE:
983 case SALVSYNC_STATE_UNKNOWN:
984 node->command.sop.prio = 0;
990 if (new_prio < clone->command.sop.prio) {
991 /* strange. let's just set our delta to 1 */
994 delta = new_prio - clone->command.sop.prio;
997 if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
998 clone->command.sop.prio = new_prio;
1001 node->command.sop.prio += delta;
1005 AddToSalvageQueue(struct SalvageQueueNode * node)
1008 struct SalvageQueueNode * last = NULL;
1010 id = volutil_GetPartitionID(node->command.sop.partName);
1011 if (id < 0 || id > VOLMAXPARTS) {
1014 if (!VGetPartitionById_r(id, 0)) {
1015 /* don't enqueue salvage requests for unmounted partitions */
1018 if (queue_IsOnQueue(node)) {
1022 if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1023 last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1025 queue_Append(&salvageQueue.part[id], node);
1026 salvageQueue.len[id]++;
1027 salvageQueue.total_len++;
1028 salvageQueue.last_insert = id;
1029 node->partition_id = id;
1030 node->state = SALVSYNC_STATE_QUEUED;
1032 /* reorder, if necessary */
1033 if (last && last->command.sop.prio < node->command.sop.prio) {
1034 UpdateCommandPrio(node);
1037 CV_BROADCAST(&salvageQueue.cv);
1042 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1044 if (queue_IsOnQueue(node)) {
1046 salvageQueue.len[node->partition_id]--;
1047 salvageQueue.total_len--;
1048 node->state = SALVSYNC_STATE_UNKNOWN;
1049 CV_BROADCAST(&salvageQueue.cv);
1054 AddToPendingQueue(struct SalvageQueueNode * node)
1056 queue_Append(&pendingQueue, node);
1058 node->state = SALVSYNC_STATE_SALVAGING;
1059 CV_BROADCAST(&pendingQueue.queue_change_cv);
1063 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1065 if (queue_IsOnQueue(node)) {
1068 node->state = SALVSYNC_STATE_UNKNOWN;
1069 CV_BROADCAST(&pendingQueue.queue_change_cv);
1073 static struct SalvageQueueNode *
1074 LookupPendingCommandByPid(int pid)
1076 struct SalvageQueueNode * np, * nnp;
1078 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1083 if (queue_IsEnd(&pendingQueue, np))
1089 /* raise the priority of a previously scheduled salvage */
1091 UpdateCommandPrio(struct SalvageQueueNode * node)
1093 struct SalvageQueueNode *np, *nnp;
1097 opr_Assert(queue_IsOnQueue(node));
1099 prio = node->command.sop.prio;
1100 id = node->partition_id;
1101 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1103 queue_Prepend(&salvageQueue.part[id], node);
1105 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1106 if (np->command.sop.prio > prio)
1109 if (queue_IsEnd(&salvageQueue.part[id], np)) {
1111 queue_Prepend(&salvageQueue.part[id], node);
1112 } else if (node != np) {
1114 queue_InsertAfter(np, node);
1119 /* this will need to be rearchitected if we ever want more than one thread
1120 * to wait for new salvage nodes */
1121 struct SalvageQueueNode *
1122 SALVSYNC_getWork(void)
1125 struct DiskPartition64 * dp = NULL, * fdp;
1126 static afs_int32 next_part_sched = 0;
1127 struct SalvageQueueNode *node = NULL;
1132 * wait for work to be scheduled
1133 * if there are no disk partitions, just sit in this wait loop forever
1135 while (!salvageQueue.total_len || !DiskPartitionList) {
1136 VOL_CV_WAIT(&salvageQueue.cv);
1140 * short circuit for simple case where only one partition has
1141 * scheduled salvages
1143 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1144 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1145 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1151 * ok, more than one partition has scheduled salvages.
1152 * now search for partitions with scheduled salvages, but no pending salvages.
1154 dp = VGetPartitionById_r(next_part_sched, 0);
1156 dp = DiskPartitionList;
1162 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1163 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1164 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1171 * all partitions with scheduled salvages have at least one pending.
1172 * now do an exhaustive search for a scheduled salvage.
1178 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1179 if (salvageQueue.len[dp->index]) {
1180 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1185 /* we should never reach this line */
1186 osi_Panic("Node not found\n");
1189 opr_Assert(node != NULL);
1191 partition_salvaging[node->partition_id]++;
1192 DeleteFromSalvageQueue(node);
1193 AddToPendingQueue(node);
1196 /* update next_part_sched field */
1198 next_part_sched = dp->next->index;
1199 } else if (DiskPartitionList) {
1200 next_part_sched = DiskPartitionList->index;
1202 next_part_sched = -1;
1211 * update internal scheduler state to reflect completion of a work unit.
1213 * @param[in] node salvage queue node object pointer
1214 * @param[in] result worker process result code
1216 * @post scheduler state is updated.
1221 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1226 DeleteFromPendingQueue(node);
1227 partid = node->partition_id;
1228 if (partid >=0 && partid <= VOLMAXPARTS) {
1229 partition_salvaging[partid]--;
1232 node->state = SALVSYNC_STATE_DONE;
1233 } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1234 node->state = SALVSYNC_STATE_ERROR;
1237 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1238 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1239 if (node->volgroup.children[idx]) {
1240 node->volgroup.children[idx]->state = node->state;
1247 * check whether worker child failed.
1249 * @param[in] status status bitfield return by wait()
1251 * @return boolean failure code
1252 * @retval 0 child succeeded
1253 * @retval 1 child failed
1258 ChildFailed(int status)
1260 return (WCOREDUMP(status) ||
1261 WIFSIGNALED(status) ||
1262 ((WEXITSTATUS(status) != 0) &&
1263 (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1268 * notify salvsync scheduler of node completion, by child pid.
1270 * @param[in] pid pid of worker child
1271 * @param[in] status worker status bitfield from wait()
1273 * @post scheduler state is updated.
1274 * if status code is a failure, fileserver notification was attempted
1276 * @see SALVSYNC_doneWork_r
1279 SALVSYNC_doneWorkByPid(int pid, int status)
1281 struct SalvageQueueNode * node;
1283 VolumeId volids[VOLMAXTYPES+1];
1286 memset(volids, 0, sizeof(volids));
1289 node = LookupPendingCommandByPid(pid);
1291 SALVSYNC_doneWork_r(node, status);
1293 if (ChildFailed(status)) {
1294 /* populate volume id list for later processing outside the glock */
1295 volids[0] = node->command.sop.volume;
1296 strcpy(partName, node->command.sop.partName);
1297 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1298 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1299 if (node->volgroup.children[idx]) {
1300 volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1309 * if necessary, notify fileserver of
1310 * failure to salvage volume group
1311 * [we cannot guarantee that the child made the
1312 * appropriate notifications (e.g. SIGSEGV)]
1313 * -- tkeiser 11/28/2007
1315 if (ChildFailed(status)) {
1316 for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1318 FSYNC_VolOp(volids[idx],
1320 FSYNC_VOL_FORCE_ERROR,
1328 #endif /* AFS_DEMAND_ATTACH_FS */