2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
29 #include <afs/procmgmt.h>
38 #include <afs/afs_assert.h>
40 #include <afs/afsint.h>
42 #include <afs/errors.h>
46 #include <afs/afssyscalls.h>
50 #include "partition.h"
52 #include <rx/rx_queue.h>
54 #ifdef USE_UNIX_SOCKETS
55 #include <afs/afsutil.h>
60 #define WCOREDUMP(x) ((x) & 0200)
63 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
64 * move = dump+restore can run on single server */
68 * This lock controls access to the handler array.
70 struct Lock SALVSYNC_handler_lock;
73 #ifdef AFS_DEMAND_ATTACH_FS
75 * SALVSYNC is a feature specific to the demand attach fileserver
78 /* Forward declarations */
79 static void * SALVSYNC_syncThread(void *);
80 static void SALVSYNC_newconnection(osi_socket fd);
81 static void SALVSYNC_com(osi_socket fd);
82 static void SALVSYNC_Drop(osi_socket fd);
83 static void AcceptOn(void);
84 static void AcceptOff(void);
85 static void InitHandler(void);
86 static void CallHandler(fd_set * fdsetp);
87 static int AddHandler(osi_socket afd, void (*aproc) (int));
88 static int FindHandler(osi_socket afd);
89 static int FindHandler_r(osi_socket afd);
90 static int RemoveHandler(osi_socket afd);
91 static void GetHandler(fd_set * fdsetp, int *maxfdp);
93 static int AllocNode(struct SalvageQueueNode ** node);
95 static int AddToSalvageQueue(struct SalvageQueueNode * node);
96 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
97 static void AddToPendingQueue(struct SalvageQueueNode * node);
98 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
99 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
100 static void UpdateCommandPrio(struct SalvageQueueNode * node);
101 static void HandlePrio(struct SalvageQueueNode * clone,
102 struct SalvageQueueNode * parent,
103 afs_uint32 new_prio);
105 static int LinkNode(struct SalvageQueueNode * parent,
106 struct SalvageQueueNode * clone);
108 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
109 struct SalvageQueueNode ** parent);
110 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
111 struct SalvageQueueNode ** parent);
112 static void AddNodeToHash(struct SalvageQueueNode * node);
114 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
115 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
116 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
117 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
118 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
123 extern pthread_mutex_t vol_salvsync_mutex;
126 * salvsync server socket handle.
128 static SYNC_server_state_t salvsync_server_state =
129 { OSI_NULLSOCKET, /* file descriptor */
130 SALVSYNC_ENDPOINT_DECL, /* server endpoint */
131 SALVSYNC_PROTO_VERSION, /* protocol version */
132 5, /* bind() retry limit */
133 100, /* listen() queue depth */
134 "SALVSYNC", /* protocol name string */
139 * queue of all volumes waiting to be salvaged.
141 struct SalvageQueue {
142 volatile int total_len;
143 volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
144 volatile int len[VOLMAXPARTS+1];
145 volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
148 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
151 * queue of all volumes currently being salvaged.
154 volatile struct rx_queue q; /**< queue of salvages in progress */
155 volatile int len; /**< length of in-progress queue */
156 pthread_cond_t queue_change_cv;
158 static struct QueueHead pendingQueue; /* volumes being salvaged */
161 * whether a partition has a salvage in progress
163 * the salvager code only permits one salvage per partition at a time
165 * the following hack tries to keep salvaged parallelism high by
166 * only permitting one salvage dispatch per partition at a time
168 * unfortunately, the parallel salvager currently
169 * has a rather braindead routine that won't permit
170 * multiple salvages on the same "device". this
171 * function happens to break pretty badly on lvm, raid luns, etc.
173 * this hack isn't good enough to stop the device limiting code from
174 * crippling performance. someday that code needs to be rewritten
176 static int partition_salvaging[VOLMAXPARTS+1];
178 static int HandlerFD[MAXHANDLERS];
179 static void (*HandlerProc[MAXHANDLERS]) (int);
181 #define VSHASH_SIZE 64
182 #define VSHASH_MASK (VSHASH_SIZE-1)
183 #define VSHASH(vid) ((vid)&VSHASH_MASK)
185 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
187 static struct SalvageQueueNode *
188 LookupNode(afs_uint32 vid, char * partName,
189 struct SalvageQueueNode ** parent)
191 struct rx_queue *qp, *nqp;
192 struct SalvageQueueNode *vsp;
193 int idx = VSHASH(vid);
195 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
196 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
197 if ((vsp->command.sop.volume == vid) &&
198 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
203 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
209 *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
210 vsp->volgroup.parent : vsp;
219 static struct SalvageQueueNode *
220 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
221 struct SalvageQueueNode ** parent)
223 return LookupNode(qry->volume, qry->partName, parent);
227 AddNodeToHash(struct SalvageQueueNode * node)
229 int idx = VSHASH(node->command.sop.volume);
231 if (queue_IsOnQueue(&node->hash_chain)) {
235 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
236 SalvageHashTable[idx].len++;
241 DeleteNodeFromHash(struct SalvageQueueNode * node)
243 int idx = VSHASH(node->command.sop.volume);
245 if (queue_IsNotOnQueue(&node->hash_chain)) {
249 queue_Remove(&node->hash_chain);
250 SalvageHashTable[idx].len--;
255 SALVSYNC_salvInit(void)
259 pthread_attr_t tattr;
261 /* initialize the queues */
262 Lock_Init(&SALVSYNC_handler_lock);
263 CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
264 for (i = 0; i <= VOLMAXPARTS; i++) {
265 queue_Init(&salvageQueue.part[i]);
266 salvageQueue.len[i] = 0;
268 CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
269 queue_Init(&pendingQueue);
270 salvageQueue.total_len = pendingQueue.len = 0;
271 salvageQueue.last_insert = -1;
272 memset(partition_salvaging, 0, sizeof(partition_salvaging));
274 for (i = 0; i < VSHASH_SIZE; i++) {
275 CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
276 SalvageHashTable[i].len = 0;
277 queue_Init(&SalvageHashTable[i]);
280 /* start the salvsync thread */
281 osi_Assert(pthread_attr_init(&tattr) == 0);
282 osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
283 osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
290 for (i = 0; i < MAXHANDLERS; ++i) {
291 if (HandlerFD[i] >= 0) {
292 SALVSYNC_Drop(HandlerFD[i]);
296 /* just in case we were in AcceptOff mode, and thus this fd wouldn't
298 close(salvsync_server_state.fd);
299 salvsync_server_state.fd = OSI_NULLSOCKET;
302 static fd_set SALVSYNC_readfds;
305 SALVSYNC_syncThread(void * args)
308 SYNC_server_state_t * state = &salvsync_server_state;
310 /* when we fork, the child needs to close the salvsync server sockets,
311 * otherwise, it may get salvsync requests, instead of the parent
313 osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
315 SYNC_getAddr(&state->endpoint, &state->addr);
316 SYNC_cleanupSock(state);
319 (void)signal(SIGPIPE, SIG_IGN);
322 state->fd = SYNC_getSock(&state->endpoint);
323 code = SYNC_bindSock(state);
331 struct timeval s_timeout;
332 GetHandler(&SALVSYNC_readfds, &maxfd);
333 s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
334 s_timeout.tv_usec = 0;
335 /* Note: check for >= 1 below is essential since IOMGR_select
336 * doesn't have exactly same semantics as select.
338 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
339 CallHandler(&SALVSYNC_readfds);
346 SALVSYNC_newconnection(int afd)
348 #ifdef USE_UNIX_SOCKETS
349 struct sockaddr_un other;
350 #else /* USE_UNIX_SOCKETS */
351 struct sockaddr_in other;
356 junk = sizeof(other);
357 fd = accept(afd, (struct sockaddr *)&other, &junk);
358 if (fd == OSI_NULLSOCKET) {
359 osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
360 } else if (!AddHandler(fd, SALVSYNC_com)) {
362 osi_Assert(AddHandler(fd, SALVSYNC_com));
366 /* this function processes commands from an salvsync file descriptor (fd) */
367 static afs_int32 SALV_cnt = 0;
369 SALVSYNC_com(osi_socket fd)
373 SALVSYNC_response_hdr sres_hdr;
374 SALVSYNC_command scom;
375 SALVSYNC_response sres;
376 SYNC_PROTO_BUF_DECL(buf);
378 memset(&com, 0, sizeof(com));
379 memset(&res, 0, sizeof(res));
380 memset(&scom, 0, sizeof(scom));
381 memset(&sres, 0, sizeof(sres));
382 memset(&sres_hdr, 0, sizeof(sres_hdr));
384 com.payload.buf = (void *)buf;
385 com.payload.len = SYNC_PROTO_MAX_LEN;
386 res.payload.buf = (void *) &sres_hdr;
387 res.payload.len = sizeof(sres_hdr);
388 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
389 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
392 scom.sop = (SALVSYNC_command_hdr *) buf;
395 sres.sop = &sres_hdr;
399 if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
400 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
405 if (com.recv_len < sizeof(com.hdr)) {
406 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
407 res.hdr.response = SYNC_COM_ERROR;
408 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
409 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
413 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
414 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
415 res.hdr.response = SYNC_COM_ERROR;
416 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
420 if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
421 res.hdr.response = SYNC_OK;
422 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
424 /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
425 * never wait for a response. */
429 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
430 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
431 res.hdr.response = SYNC_COM_ERROR;
432 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
433 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
437 res.hdr.com_seq = com.hdr.com_seq;
440 switch (com.hdr.command) {
443 case SALVSYNC_SALVAGE:
444 case SALVSYNC_RAISEPRIO:
445 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
447 case SALVSYNC_CANCEL:
448 /* cancel a salvage */
449 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
451 case SALVSYNC_CANCELALL:
452 /* cancel all queued salvages */
453 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
456 /* query whether a volume is done salvaging */
457 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
459 case SALVSYNC_OP_LINK:
460 /* link a clone to its parent in the scheduler */
461 res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
464 res.hdr.response = SYNC_BAD_COMMAND;
468 sres_hdr.sq_len = salvageQueue.total_len;
469 sres_hdr.pq_len = pendingQueue.len;
473 SYNC_putRes(&salvsync_server_state, fd, &res);
476 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
482 * request that a volume be salvaged.
484 * @param[in] com inbound command object
485 * @param[out] res outbound response object
487 * @return operation status
488 * @retval SYNC_OK success
489 * @retval SYNC_DENIED failed to enqueue request
490 * @retval SYNC_FAILED malformed command packet
492 * @note this is a SALVSYNC protocol rpc handler
496 * @post the volume is enqueued in the to-be-salvaged queue.
497 * if the volume was already in the salvage queue, its
498 * priority (and thus its location in the queue) are
502 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
504 afs_int32 code = SYNC_OK;
505 struct SalvageQueueNode * node, * clone;
508 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
510 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
514 clone = LookupNodeByCommand(com->sop, &node);
517 if (AllocNode(&node)) {
519 res->hdr->reason = SYNC_REASON_NOMEM;
526 HandlePrio(clone, node, com->sop->prio);
528 switch (node->state) {
529 case SALVSYNC_STATE_QUEUED:
530 UpdateCommandPrio(node);
533 case SALVSYNC_STATE_ERROR:
534 case SALVSYNC_STATE_DONE:
535 case SALVSYNC_STATE_UNKNOWN:
536 memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
537 memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
540 * make sure volgroup parent partition path is kept coherent
542 * If we ever want to support non-COW clones on a machine holding
543 * the RW site, please note that this code does not work under the
544 * conditions where someone zaps a COW clone on partition X, and
545 * subsequently creates a full clone on partition Y -- we'd need
546 * an inverse to SALVSYNC_com_Link.
547 * -- tkeiser 11/28/2007
549 strcpy(node->command.sop.partName, com->sop->partName);
551 if (AddToSalvageQueue(node)) {
564 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
565 res->sop->state = node->state;
566 res->sop->prio = node->command.sop.prio;
573 * cancel a pending salvage request.
575 * @param[in] com inbound command object
576 * @param[out] res outbound response object
578 * @return operation status
579 * @retval SYNC_OK success
580 * @retval SYNC_FAILED malformed command packet
582 * @note this is a SALVSYNC protocol rpc handler
587 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
589 afs_int32 code = SYNC_OK;
590 struct SalvageQueueNode * node;
592 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
594 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
598 node = LookupNodeByCommand(com->sop, NULL);
601 res->sop->state = SALVSYNC_STATE_UNKNOWN;
604 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
605 res->sop->prio = node->command.sop.prio;
606 res->sop->state = node->state;
607 if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
608 (node->state == SALVSYNC_STATE_QUEUED)) {
609 DeleteFromSalvageQueue(node);
618 * cancel all pending salvage requests.
620 * @param[in] com incoming command object
621 * @param[out] res outbound response object
623 * @return operation status
624 * @retval SYNC_OK success
626 * @note this is a SALVSYNC protocol rpc handler
631 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
633 struct SalvageQueueNode * np, *nnp;
634 struct DiskPartition64 * dp;
636 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
637 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
638 DeleteFromSalvageQueue(np);
646 * link a queue node for a clone to its parent volume.
648 * @param[in] com inbound command object
649 * @param[out] res outbound response object
651 * @return operation status
652 * @retval SYNC_OK success
653 * @retval SYNC_FAILED malformed command packet
654 * @retval SYNC_DENIED the request could not be completed
656 * @note this is a SALVSYNC protocol rpc handler
658 * @post the requested volume is marked as a child of another volume.
659 * thus, future salvage requests for this volume will result in the
660 * parent of the volume group being scheduled for salvage instead
666 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
668 afs_int32 code = SYNC_OK;
669 struct SalvageQueueNode * clone, * parent;
671 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
673 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
677 /* lookup clone's salvage scheduling node */
678 clone = LookupNodeByCommand(com->sop, NULL);
681 res->hdr->reason = SALVSYNC_REASON_ERROR;
685 /* lookup parent's salvage scheduling node */
686 parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
687 if (parent == NULL) {
688 if (AllocNode(&parent)) {
690 res->hdr->reason = SYNC_REASON_NOMEM;
693 memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
694 memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
695 parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
696 AddNodeToHash(parent);
699 if (LinkNode(parent, clone)) {
709 * query the status of a volume salvage request.
711 * @param[in] com inbound command object
712 * @param[out] res outbound response object
714 * @return operation status
715 * @retval SYNC_OK success
716 * @retval SYNC_FAILED malformed command packet
718 * @note this is a SALVSYNC protocol rpc handler
723 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
725 afs_int32 code = SYNC_OK;
726 struct SalvageQueueNode * node;
728 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
730 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
734 LookupNodeByCommand(com->sop, &node);
736 /* query whether a volume is done salvaging */
738 res->sop->state = SALVSYNC_STATE_UNKNOWN;
741 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
742 res->sop->state = node->state;
743 res->sop->prio = node->command.sop.prio;
751 SALVSYNC_Drop(osi_socket fd)
758 static int AcceptHandler = -1; /* handler id for accept, if turned on */
763 if (AcceptHandler == -1) {
764 osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
765 AcceptHandler = FindHandler(salvsync_server_state.fd);
772 if (AcceptHandler != -1) {
773 osi_Assert(RemoveHandler(salvsync_server_state.fd));
778 /* The multiple FD handling code. */
784 ObtainWriteLock(&SALVSYNC_handler_lock);
785 for (i = 0; i < MAXHANDLERS; i++) {
786 HandlerFD[i] = OSI_NULLSOCKET;
787 HandlerProc[i] = NULL;
789 ReleaseWriteLock(&SALVSYNC_handler_lock);
793 CallHandler(fd_set * fdsetp)
796 ObtainReadLock(&SALVSYNC_handler_lock);
797 for (i = 0; i < MAXHANDLERS; i++) {
798 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
799 ReleaseReadLock(&SALVSYNC_handler_lock);
800 (*HandlerProc[i]) (HandlerFD[i]);
801 ObtainReadLock(&SALVSYNC_handler_lock);
804 ReleaseReadLock(&SALVSYNC_handler_lock);
808 AddHandler(osi_socket afd, void (*aproc) (int))
811 ObtainWriteLock(&SALVSYNC_handler_lock);
812 for (i = 0; i < MAXHANDLERS; i++)
813 if (HandlerFD[i] == OSI_NULLSOCKET)
815 if (i >= MAXHANDLERS) {
816 ReleaseWriteLock(&SALVSYNC_handler_lock);
820 HandlerProc[i] = aproc;
821 ReleaseWriteLock(&SALVSYNC_handler_lock);
826 FindHandler(osi_socket afd)
829 ObtainReadLock(&SALVSYNC_handler_lock);
830 for (i = 0; i < MAXHANDLERS; i++)
831 if (HandlerFD[i] == afd) {
832 ReleaseReadLock(&SALVSYNC_handler_lock);
835 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
836 osi_Panic("Failed to find handler\n");
837 return -1; /* satisfy compiler */
841 FindHandler_r(osi_socket afd)
844 for (i = 0; i < MAXHANDLERS; i++)
845 if (HandlerFD[i] == afd) {
848 osi_Panic("Failed to find handler\n");
849 return -1; /* satisfy compiler */
853 RemoveHandler(osi_socket afd)
855 ObtainWriteLock(&SALVSYNC_handler_lock);
856 HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
857 ReleaseWriteLock(&SALVSYNC_handler_lock);
862 GetHandler(fd_set * fdsetp, int *maxfdp)
867 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
868 for (i = 0; i < MAXHANDLERS; i++)
869 if (HandlerFD[i] != OSI_NULLSOCKET) {
870 FD_SET(HandlerFD[i], fdsetp);
872 /* On Windows the nfds parameter to select() is ignored */
873 if (maxfd < HandlerFD[i] || maxfd == (int)-1)
874 maxfd = HandlerFD[i];
878 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
882 * allocate a salvage queue node.
884 * @param[out] node_out address in which to store new node pointer
886 * @return operation status
888 * @retval 1 failed to allocate node
893 AllocNode(struct SalvageQueueNode ** node_out)
896 struct SalvageQueueNode * node;
898 *node_out = node = (struct SalvageQueueNode *)
899 malloc(sizeof(struct SalvageQueueNode));
905 memset(node, 0, sizeof(struct SalvageQueueNode));
906 node->type = SALVSYNC_VOLGROUP_PARENT;
907 node->state = SALVSYNC_STATE_UNKNOWN;
914 * link a salvage queue node to its parent.
916 * @param[in] parent pointer to queue node for parent of volume group
917 * @param[in] clone pointer to queue node for a clone
919 * @return operation status
926 LinkNode(struct SalvageQueueNode * parent,
927 struct SalvageQueueNode * clone)
932 /* check for attaching a clone to a clone */
933 if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
938 /* check for pre-existing registration and openings */
939 for (idx = 0; idx < VOLMAXTYPES; idx++) {
940 if (parent->volgroup.children[idx] == clone) {
943 if (parent->volgroup.children[idx] == NULL) {
947 if (idx == VOLMAXTYPES) {
952 /* link parent and child */
953 parent->volgroup.children[idx] = clone;
954 clone->type = SALVSYNC_VOLGROUP_CLONE;
955 clone->volgroup.parent = parent;
959 switch (clone->state) {
960 case SALVSYNC_STATE_QUEUED:
961 DeleteFromSalvageQueue(clone);
963 case SALVSYNC_STATE_SALVAGING:
964 switch (parent->state) {
965 case SALVSYNC_STATE_UNKNOWN:
966 case SALVSYNC_STATE_ERROR:
967 case SALVSYNC_STATE_DONE:
968 parent->command.sop.prio = clone->command.sop.prio;
969 AddToSalvageQueue(parent);
972 case SALVSYNC_STATE_QUEUED:
973 if (clone->command.sop.prio) {
974 parent->command.sop.prio += clone->command.sop.prio;
975 UpdateCommandPrio(parent);
993 HandlePrio(struct SalvageQueueNode * clone,
994 struct SalvageQueueNode * node,
999 switch (node->state) {
1000 case SALVSYNC_STATE_ERROR:
1001 case SALVSYNC_STATE_DONE:
1002 case SALVSYNC_STATE_UNKNOWN:
1003 node->command.sop.prio = 0;
1009 if (new_prio < clone->command.sop.prio) {
1010 /* strange. let's just set our delta to 1 */
1013 delta = new_prio - clone->command.sop.prio;
1016 if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1017 clone->command.sop.prio = new_prio;
1020 node->command.sop.prio += delta;
1024 AddToSalvageQueue(struct SalvageQueueNode * node)
1027 struct SalvageQueueNode * last = NULL;
1029 id = volutil_GetPartitionID(node->command.sop.partName);
1030 if (id < 0 || id > VOLMAXPARTS) {
1033 if (!VGetPartitionById_r(id, 0)) {
1034 /* don't enqueue salvage requests for unmounted partitions */
1037 if (queue_IsOnQueue(node)) {
1041 if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1042 last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1044 queue_Append(&salvageQueue.part[id], node);
1045 salvageQueue.len[id]++;
1046 salvageQueue.total_len++;
1047 salvageQueue.last_insert = id;
1048 node->partition_id = id;
1049 node->state = SALVSYNC_STATE_QUEUED;
1051 /* reorder, if necessary */
1052 if (last && last->command.sop.prio < node->command.sop.prio) {
1053 UpdateCommandPrio(node);
1056 CV_BROADCAST(&salvageQueue.cv);
1061 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1063 if (queue_IsOnQueue(node)) {
1065 salvageQueue.len[node->partition_id]--;
1066 salvageQueue.total_len--;
1067 node->state = SALVSYNC_STATE_UNKNOWN;
1068 CV_BROADCAST(&salvageQueue.cv);
1073 AddToPendingQueue(struct SalvageQueueNode * node)
1075 queue_Append(&pendingQueue, node);
1077 node->state = SALVSYNC_STATE_SALVAGING;
1078 CV_BROADCAST(&pendingQueue.queue_change_cv);
1082 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1084 if (queue_IsOnQueue(node)) {
1087 node->state = SALVSYNC_STATE_UNKNOWN;
1088 CV_BROADCAST(&pendingQueue.queue_change_cv);
1093 static struct SalvageQueueNode *
1094 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1096 struct SalvageQueueNode * np, * nnp;
1098 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1099 if ((np->command.sop.volume == qry->volume) &&
1100 !strncmp(np->command.sop.partName, qry->partName,
1101 sizeof(qry->partName)))
1105 if (queue_IsEnd(&pendingQueue, np))
1111 static struct SalvageQueueNode *
1112 LookupPendingCommandByPid(int pid)
1114 struct SalvageQueueNode * np, * nnp;
1116 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1121 if (queue_IsEnd(&pendingQueue, np))
1127 /* raise the priority of a previously scheduled salvage */
1129 UpdateCommandPrio(struct SalvageQueueNode * node)
1131 struct SalvageQueueNode *np, *nnp;
1135 osi_Assert(queue_IsOnQueue(node));
1137 prio = node->command.sop.prio;
1138 id = node->partition_id;
1139 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1141 queue_Prepend(&salvageQueue.part[id], node);
1143 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1144 if (np->command.sop.prio > prio)
1147 if (queue_IsEnd(&salvageQueue.part[id], np)) {
1149 queue_Prepend(&salvageQueue.part[id], node);
1150 } else if (node != np) {
1152 queue_InsertAfter(np, node);
1157 /* this will need to be rearchitected if we ever want more than one thread
1158 * to wait for new salvage nodes */
1159 struct SalvageQueueNode *
1160 SALVSYNC_getWork(void)
1163 struct DiskPartition64 * dp = NULL, * fdp;
1164 static afs_int32 next_part_sched = 0;
1165 struct SalvageQueueNode *node = NULL;
1170 * wait for work to be scheduled
1171 * if there are no disk partitions, just sit in this wait loop forever
1173 while (!salvageQueue.total_len || !DiskPartitionList) {
1174 VOL_CV_WAIT(&salvageQueue.cv);
1178 * short circuit for simple case where only one partition has
1179 * scheduled salvages
1181 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1182 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1183 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1189 * ok, more than one partition has scheduled salvages.
1190 * now search for partitions with scheduled salvages, but no pending salvages.
1192 dp = VGetPartitionById_r(next_part_sched, 0);
1194 dp = DiskPartitionList;
1200 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1201 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1202 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1209 * all partitions with scheduled salvages have at least one pending.
1210 * now do an exhaustive search for a scheduled salvage.
1216 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1217 if (salvageQueue.len[dp->index]) {
1218 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1223 /* we should never reach this line */
1224 osi_Panic("Node not found\n");
1227 osi_Assert(node != NULL);
1229 partition_salvaging[node->partition_id]++;
1230 DeleteFromSalvageQueue(node);
1231 AddToPendingQueue(node);
1234 /* update next_part_sched field */
1236 next_part_sched = dp->next->index;
1237 } else if (DiskPartitionList) {
1238 next_part_sched = DiskPartitionList->index;
1240 next_part_sched = -1;
1249 * update internal scheduler state to reflect completion of a work unit.
1251 * @param[in] node salvage queue node object pointer
1252 * @param[in] result worker process result code
1254 * @post scheduler state is updated.
1259 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1264 DeleteFromPendingQueue(node);
1265 partid = node->partition_id;
1266 if (partid >=0 && partid <= VOLMAXPARTS) {
1267 partition_salvaging[partid]--;
1270 node->state = SALVSYNC_STATE_DONE;
1271 } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1272 node->state = SALVSYNC_STATE_ERROR;
1275 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1276 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1277 if (node->volgroup.children[idx]) {
1278 node->volgroup.children[idx]->state = node->state;
1285 * check whether worker child failed.
1287 * @param[in] status status bitfield return by wait()
1289 * @return boolean failure code
1290 * @retval 0 child succeeded
1291 * @retval 1 child failed
1296 ChildFailed(int status)
1298 return (WCOREDUMP(status) ||
1299 WIFSIGNALED(status) ||
1300 ((WEXITSTATUS(status) != 0) &&
1301 (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1306 * notify salvsync scheduler of node completion, by child pid.
1308 * @param[in] pid pid of worker child
1309 * @param[in] status worker status bitfield from wait()
1311 * @post scheduler state is updated.
1312 * if status code is a failure, fileserver notification was attempted
1314 * @see SALVSYNC_doneWork_r
1317 SALVSYNC_doneWorkByPid(int pid, int status)
1319 struct SalvageQueueNode * node;
1321 afs_uint32 volids[VOLMAXTYPES+1];
1324 memset(volids, 0, sizeof(volids));
1327 node = LookupPendingCommandByPid(pid);
1329 SALVSYNC_doneWork_r(node, status);
1331 if (ChildFailed(status)) {
1332 /* populate volume id list for later processing outside the glock */
1333 volids[0] = node->command.sop.volume;
1334 strcpy(partName, node->command.sop.partName);
1335 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1336 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1337 if (node->volgroup.children[idx]) {
1338 volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1347 * if necessary, notify fileserver of
1348 * failure to salvage volume group
1349 * [we cannot guarantee that the child made the
1350 * appropriate notifications (e.g. SIGSEGV)]
1351 * -- tkeiser 11/28/2007
1353 if (ChildFailed(status)) {
1354 for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1356 FSYNC_VolOp(volids[idx],
1358 FSYNC_VOL_FORCE_ERROR,
1366 #endif /* AFS_DEMAND_ATTACH_FS */