2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
30 #include <sys/types.h>
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
49 #include <afs/afsint.h>
51 #include <afs/errors.h>
55 #include <afs/afssyscalls.h>
59 #include "partition.h"
60 #include <rx/rx_queue.h>
61 #include <afs/procmgmt.h>
63 #if !defined(offsetof)
67 #ifdef USE_UNIX_SOCKETS
68 #include <afs/afsutil.h>
73 /*@printflike@*/ extern void Log(const char *format, ...);
75 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
76 * move = dump+restore can run on single server */
80 * This lock controls access to the handler array.
82 struct Lock SALVSYNC_handler_lock;
85 #ifdef AFS_DEMAND_ATTACH_FS
87 * SALVSYNC is a feature specific to the demand attach fileserver
90 /* Forward declarations */
91 static void * SALVSYNC_syncThread(void *);
92 static void SALVSYNC_newconnection(osi_socket fd);
93 static void SALVSYNC_com(osi_socket fd);
94 static void SALVSYNC_Drop(osi_socket fd);
95 static void AcceptOn(void);
96 static void AcceptOff(void);
97 static void InitHandler(void);
98 static void CallHandler(fd_set * fdsetp);
99 static int AddHandler(osi_socket afd, void (*aproc) (int));
100 static int FindHandler(register osi_socket afd);
101 static int FindHandler_r(register osi_socket afd);
102 static int RemoveHandler(register osi_socket afd);
103 static void GetHandler(fd_set * fdsetp, int *maxfdp);
105 static int AllocNode(struct SalvageQueueNode ** node);
107 static int AddToSalvageQueue(struct SalvageQueueNode * node);
108 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
109 static void AddToPendingQueue(struct SalvageQueueNode * node);
110 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
111 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
112 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
113 static void UpdateCommandPrio(struct SalvageQueueNode * node);
114 static void HandlePrio(struct SalvageQueueNode * clone,
115 struct SalvageQueueNode * parent,
116 afs_uint32 new_prio);
118 static int LinkNode(struct SalvageQueueNode * parent,
119 struct SalvageQueueNode * clone);
121 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
122 struct SalvageQueueNode ** parent);
123 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
124 struct SalvageQueueNode ** parent);
125 static void AddNodeToHash(struct SalvageQueueNode * node);
126 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
128 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
129 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
130 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
137 extern pthread_mutex_t vol_salvsync_mutex;
140 * salvsync server socket handle.
142 static SYNC_server_state_t salvsync_server_state =
143 { -1, /* file descriptor */
144 SALVSYNC_ENDPOINT_DECL, /* server endpoint */
145 SALVSYNC_PROTO_VERSION, /* protocol version */
146 5, /* bind() retry limit */
147 100, /* listen() queue depth */
148 "SALVSYNC", /* protocol name string */
153 * queue of all volumes waiting to be salvaged.
155 struct SalvageQueue {
156 volatile int total_len;
157 volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
158 volatile int len[VOLMAXPARTS+1];
159 volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
162 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
165 * queue of all volumes currently being salvaged.
168 volatile struct rx_queue q; /**< queue of salvages in progress */
169 volatile int len; /**< length of in-progress queue */
170 pthread_cond_t queue_change_cv;
172 static struct QueueHead pendingQueue; /* volumes being salvaged */
175 * whether a partition has a salvage in progress
177 * the salvager code only permits one salvage per partition at a time
179 * the following hack tries to keep salvaged parallelism high by
180 * only permitting one salvage dispatch per partition at a time
182 * unfortunately, the parallel salvager currently
183 * has a rather braindead routine that won't permit
184 * multiple salvages on the same "device". this
185 * function happens to break pretty badly on lvm, raid luns, etc.
187 * this hack isn't good enough to stop the device limiting code from
188 * crippling performance. someday that code needs to be rewritten
190 static int partition_salvaging[VOLMAXPARTS+1];
192 static int HandlerFD[MAXHANDLERS];
193 static void (*HandlerProc[MAXHANDLERS]) (int);
195 #define VSHASH_SIZE 64
196 #define VSHASH_MASK (VSHASH_SIZE-1)
197 #define VSHASH(vid) ((vid)&VSHASH_MASK)
199 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
201 static struct SalvageQueueNode *
202 LookupNode(afs_uint32 vid, char * partName,
203 struct SalvageQueueNode ** parent)
205 struct rx_queue *qp, *nqp;
206 struct SalvageQueueNode *vsp;
207 int idx = VSHASH(vid);
209 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
210 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
211 if ((vsp->command.sop.volume == vid) &&
212 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
217 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
223 *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
224 vsp->volgroup.parent : vsp;
233 static struct SalvageQueueNode *
234 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
235 struct SalvageQueueNode ** parent)
237 return LookupNode(qry->volume, qry->partName, parent);
241 AddNodeToHash(struct SalvageQueueNode * node)
243 int idx = VSHASH(node->command.sop.volume);
245 if (queue_IsOnQueue(&node->hash_chain)) {
249 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
250 SalvageHashTable[idx].len++;
254 DeleteNodeFromHash(struct SalvageQueueNode * node)
256 int idx = VSHASH(node->command.sop.volume);
258 if (queue_IsNotOnQueue(&node->hash_chain)) {
262 queue_Remove(&node->hash_chain);
263 SalvageHashTable[idx].len--;
267 SALVSYNC_salvInit(void)
271 pthread_attr_t tattr;
273 /* initialize the queues */
274 Lock_Init(&SALVSYNC_handler_lock);
275 assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
276 for (i = 0; i <= VOLMAXPARTS; i++) {
277 queue_Init(&salvageQueue.part[i]);
278 salvageQueue.len[i] = 0;
280 assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
281 queue_Init(&pendingQueue);
282 salvageQueue.total_len = pendingQueue.len = 0;
283 salvageQueue.last_insert = -1;
284 memset(partition_salvaging, 0, sizeof(partition_salvaging));
286 for (i = 0; i < VSHASH_SIZE; i++) {
287 assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
288 SalvageHashTable[i].len = 0;
289 queue_Init(&SalvageHashTable[i]);
292 /* start the salvsync thread */
293 assert(pthread_attr_init(&tattr) == 0);
294 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
295 assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
302 for (i = 0; i < MAXHANDLERS; ++i) {
303 if (HandlerFD[i] >= 0) {
304 SALVSYNC_Drop(HandlerFD[i]);
308 /* just in case we were in AcceptOff mode, and thus this fd wouldn't
310 close(salvsync_server_state.fd);
311 salvsync_server_state.fd = -1;
314 static fd_set SALVSYNC_readfds;
317 SALVSYNC_syncThread(void * args)
323 SYNC_server_state_t * state = &salvsync_server_state;
325 /* when we fork, the child needs to close the salvsync server sockets,
326 * otherwise, it may get salvsync requests, instead of the parent
328 assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
330 SYNC_getAddr(&state->endpoint, &state->addr);
331 SYNC_cleanupSock(state);
334 (void)signal(SIGPIPE, SIG_IGN);
337 state->fd = SYNC_getSock(&state->endpoint);
338 code = SYNC_bindSock(state);
346 GetHandler(&SALVSYNC_readfds, &maxfd);
347 /* Note: check for >= 1 below is essential since IOMGR_select
348 * doesn't have exactly same semantics as select.
350 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
351 CallHandler(&SALVSYNC_readfds);
358 SALVSYNC_newconnection(int afd)
360 #ifdef USE_UNIX_SOCKETS
361 struct sockaddr_un other;
362 #else /* USE_UNIX_SOCKETS */
363 struct sockaddr_in other;
366 junk = sizeof(other);
367 fd = accept(afd, (struct sockaddr *)&other, &junk);
369 Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
371 } else if (!AddHandler(fd, SALVSYNC_com)) {
373 assert(AddHandler(fd, SALVSYNC_com));
377 /* this function processes commands from an salvsync file descriptor (fd) */
378 static afs_int32 SALV_cnt = 0;
380 SALVSYNC_com(osi_socket fd)
384 SALVSYNC_response_hdr sres_hdr;
385 SALVSYNC_command scom;
386 SALVSYNC_response sres;
387 SYNC_PROTO_BUF_DECL(buf);
389 memset(&com, 0, sizeof(com));
390 memset(&res, 0, sizeof(res));
391 memset(&scom, 0, sizeof(scom));
392 memset(&sres, 0, sizeof(sres));
393 memset(&sres_hdr, 0, sizeof(sres_hdr));
395 com.payload.buf = (void *)buf;
396 com.payload.len = SYNC_PROTO_MAX_LEN;
397 res.payload.buf = (void *) &sres_hdr;
398 res.payload.len = sizeof(sres_hdr);
399 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
400 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
403 scom.sop = (SALVSYNC_command_hdr *) buf;
406 sres.sop = &sres_hdr;
410 if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
411 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
416 if (com.recv_len < sizeof(com.hdr)) {
417 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
418 res.hdr.response = SYNC_COM_ERROR;
419 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
420 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
424 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
425 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
426 res.hdr.response = SYNC_COM_ERROR;
427 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
431 if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
432 res.hdr.response = SYNC_OK;
433 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
435 /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
436 * never wait for a response. */
440 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
441 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
442 res.hdr.response = SYNC_COM_ERROR;
443 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
444 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
448 res.hdr.com_seq = com.hdr.com_seq;
451 switch (com.hdr.command) {
454 case SALVSYNC_SALVAGE:
455 case SALVSYNC_RAISEPRIO:
456 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
458 case SALVSYNC_CANCEL:
459 /* cancel a salvage */
460 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
462 case SALVSYNC_CANCELALL:
463 /* cancel all queued salvages */
464 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
467 /* query whether a volume is done salvaging */
468 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
470 case SALVSYNC_OP_LINK:
471 /* link a clone to its parent in the scheduler */
472 res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
475 res.hdr.response = SYNC_BAD_COMMAND;
479 sres_hdr.sq_len = salvageQueue.total_len;
480 sres_hdr.pq_len = pendingQueue.len;
484 SYNC_putRes(&salvsync_server_state, fd, &res);
487 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
493 * request that a volume be salvaged.
495 * @param[in] com inbound command object
496 * @param[out] res outbound response object
498 * @return operation status
499 * @retval SYNC_OK success
500 * @retval SYNC_DENIED failed to enqueue request
501 * @retval SYNC_FAILED malformed command packet
503 * @note this is a SALVSYNC protocol rpc handler
507 * @post the volume is enqueued in the to-be-salvaged queue.
508 * if the volume was already in the salvage queue, its
509 * priority (and thus its location in the queue) are
513 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
515 afs_int32 code = SYNC_OK;
516 struct SalvageQueueNode * node, * clone;
519 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
521 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
525 clone = LookupNodeByCommand(com->sop, &node);
528 if (AllocNode(&node)) {
530 res->hdr->reason = SYNC_REASON_NOMEM;
537 HandlePrio(clone, node, com->sop->prio);
539 switch (node->state) {
540 case SALVSYNC_STATE_QUEUED:
541 UpdateCommandPrio(node);
544 case SALVSYNC_STATE_ERROR:
545 case SALVSYNC_STATE_DONE:
546 case SALVSYNC_STATE_UNKNOWN:
547 memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
548 memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
551 * make sure volgroup parent partition path is kept coherent
553 * If we ever want to support non-COW clones on a machine holding
554 * the RW site, please note that this code does not work under the
555 * conditions where someone zaps a COW clone on partition X, and
556 * subsequently creates a full clone on partition Y -- we'd need
557 * an inverse to SALVSYNC_com_Link.
558 * -- tkeiser 11/28/2007
560 strcpy(node->command.sop.partName, com->sop->partName);
562 if (AddToSalvageQueue(node)) {
575 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
576 res->sop->state = node->state;
577 res->sop->prio = node->command.sop.prio;
584 * cancel a pending salvage request.
586 * @param[in] com inbound command object
587 * @param[out] res outbound response object
589 * @return operation status
590 * @retval SYNC_OK success
591 * @retval SYNC_FAILED malformed command packet
593 * @note this is a SALVSYNC protocol rpc handler
598 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
600 afs_int32 code = SYNC_OK;
601 struct SalvageQueueNode * node;
603 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
605 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
609 node = LookupNodeByCommand(com->sop, NULL);
612 res->sop->state = SALVSYNC_STATE_UNKNOWN;
615 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
616 res->sop->prio = node->command.sop.prio;
617 res->sop->state = node->state;
618 if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
619 (node->state == SALVSYNC_STATE_QUEUED)) {
620 DeleteFromSalvageQueue(node);
629 * cancel all pending salvage requests.
631 * @param[in] com incoming command object
632 * @param[out] res outbound response object
634 * @return operation status
635 * @retval SYNC_OK success
637 * @note this is a SALVSYNC protocol rpc handler
642 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
644 struct SalvageQueueNode * np, *nnp;
645 struct DiskPartition64 * dp;
647 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
648 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
649 DeleteFromSalvageQueue(np);
657 * link a queue node for a clone to its parent volume.
659 * @param[in] com inbound command object
660 * @param[out] res outbound response object
662 * @return operation status
663 * @retval SYNC_OK success
664 * @retval SYNC_FAILED malformed command packet
665 * @retval SYNC_DENIED the request could not be completed
667 * @note this is a SALVSYNC protocol rpc handler
669 * @post the requested volume is marked as a child of another volume.
670 * thus, future salvage requests for this volume will result in the
671 * parent of the volume group being scheduled for salvage instead
677 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
679 afs_int32 code = SYNC_OK;
680 struct SalvageQueueNode * clone, * parent;
682 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
684 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
688 /* lookup clone's salvage scheduling node */
689 clone = LookupNodeByCommand(com->sop, NULL);
692 res->hdr->reason = SALVSYNC_REASON_ERROR;
696 /* lookup parent's salvage scheduling node */
697 parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
698 if (parent == NULL) {
699 if (AllocNode(&parent)) {
701 res->hdr->reason = SYNC_REASON_NOMEM;
704 memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
705 memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
706 parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
707 AddNodeToHash(parent);
710 if (LinkNode(parent, clone)) {
720 * query the status of a volume salvage request.
722 * @param[in] com inbound command object
723 * @param[out] res outbound response object
725 * @return operation status
726 * @retval SYNC_OK success
727 * @retval SYNC_FAILED malformed command packet
729 * @note this is a SALVSYNC protocol rpc handler
734 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
736 afs_int32 code = SYNC_OK;
737 struct SalvageQueueNode * node;
739 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
741 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
745 LookupNodeByCommand(com->sop, &node);
747 /* query whether a volume is done salvaging */
749 res->sop->state = SALVSYNC_STATE_UNKNOWN;
752 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
753 res->sop->state = node->state;
754 res->sop->prio = node->command.sop.prio;
762 SALVSYNC_Drop(osi_socket fd)
773 static int AcceptHandler = -1; /* handler id for accept, if turned on */
778 if (AcceptHandler == -1) {
779 assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
780 AcceptHandler = FindHandler(salvsync_server_state.fd);
787 if (AcceptHandler != -1) {
788 assert(RemoveHandler(salvsync_server_state.fd));
793 /* The multiple FD handling code. */
799 ObtainWriteLock(&SALVSYNC_handler_lock);
800 for (i = 0; i < MAXHANDLERS; i++) {
802 HandlerProc[i] = NULL;
804 ReleaseWriteLock(&SALVSYNC_handler_lock);
808 CallHandler(fd_set * fdsetp)
811 ObtainReadLock(&SALVSYNC_handler_lock);
812 for (i = 0; i < MAXHANDLERS; i++) {
813 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
814 ReleaseReadLock(&SALVSYNC_handler_lock);
815 (*HandlerProc[i]) (HandlerFD[i]);
816 ObtainReadLock(&SALVSYNC_handler_lock);
819 ReleaseReadLock(&SALVSYNC_handler_lock);
823 AddHandler(osi_socket afd, void (*aproc) (int))
826 ObtainWriteLock(&SALVSYNC_handler_lock);
827 for (i = 0; i < MAXHANDLERS; i++)
828 if (HandlerFD[i] == -1)
830 if (i >= MAXHANDLERS) {
831 ReleaseWriteLock(&SALVSYNC_handler_lock);
835 HandlerProc[i] = aproc;
836 ReleaseWriteLock(&SALVSYNC_handler_lock);
841 FindHandler(register osi_socket afd)
844 ObtainReadLock(&SALVSYNC_handler_lock);
845 for (i = 0; i < MAXHANDLERS; i++)
846 if (HandlerFD[i] == afd) {
847 ReleaseReadLock(&SALVSYNC_handler_lock);
850 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
852 return -1; /* satisfy compiler */
856 FindHandler_r(register osi_socket afd)
859 for (i = 0; i < MAXHANDLERS; i++)
860 if (HandlerFD[i] == afd) {
864 return -1; /* satisfy compiler */
868 RemoveHandler(register osi_socket afd)
870 ObtainWriteLock(&SALVSYNC_handler_lock);
871 HandlerFD[FindHandler_r(afd)] = -1;
872 ReleaseWriteLock(&SALVSYNC_handler_lock);
877 GetHandler(fd_set * fdsetp, int *maxfdp)
880 register int maxfd = -1;
882 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
883 for (i = 0; i < MAXHANDLERS; i++)
884 if (HandlerFD[i] != -1) {
885 FD_SET(HandlerFD[i], fdsetp);
886 if (maxfd < HandlerFD[i])
887 maxfd = HandlerFD[i];
890 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
894 * allocate a salvage queue node.
896 * @param[out] node_out address in which to store new node pointer
898 * @return operation status
900 * @retval 1 failed to allocate node
905 AllocNode(struct SalvageQueueNode ** node_out)
908 struct SalvageQueueNode * node;
910 *node_out = node = (struct SalvageQueueNode *)
911 malloc(sizeof(struct SalvageQueueNode));
917 memset(node, 0, sizeof(struct SalvageQueueNode));
918 node->type = SALVSYNC_VOLGROUP_PARENT;
919 node->state = SALVSYNC_STATE_UNKNOWN;
926 * link a salvage queue node to its parent.
928 * @param[in] parent pointer to queue node for parent of volume group
929 * @param[in] clone pointer to queue node for a clone
931 * @return operation status
938 LinkNode(struct SalvageQueueNode * parent,
939 struct SalvageQueueNode * clone)
944 /* check for attaching a clone to a clone */
945 if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
950 /* check for pre-existing registration and openings */
951 for (idx = 0; idx < VOLMAXTYPES; idx++) {
952 if (parent->volgroup.children[idx] == clone) {
955 if (parent->volgroup.children[idx] == NULL) {
959 if (idx == VOLMAXTYPES) {
964 /* link parent and child */
965 parent->volgroup.children[idx] = clone;
966 clone->type = SALVSYNC_VOLGROUP_CLONE;
967 clone->volgroup.parent = parent;
971 switch (clone->state) {
972 case SALVSYNC_STATE_QUEUED:
973 DeleteFromSalvageQueue(clone);
975 case SALVSYNC_STATE_SALVAGING:
976 switch (parent->state) {
977 case SALVSYNC_STATE_UNKNOWN:
978 case SALVSYNC_STATE_ERROR:
979 case SALVSYNC_STATE_DONE:
980 parent->command.sop.prio = clone->command.sop.prio;
981 AddToSalvageQueue(parent);
984 case SALVSYNC_STATE_QUEUED:
985 if (clone->command.sop.prio) {
986 parent->command.sop.prio += clone->command.sop.prio;
987 UpdateCommandPrio(parent);
1005 HandlePrio(struct SalvageQueueNode * clone,
1006 struct SalvageQueueNode * node,
1007 afs_uint32 new_prio)
1011 switch (node->state) {
1012 case SALVSYNC_STATE_ERROR:
1013 case SALVSYNC_STATE_DONE:
1014 case SALVSYNC_STATE_UNKNOWN:
1015 node->command.sop.prio = 0;
1019 if (new_prio < clone->command.sop.prio) {
1020 /* strange. let's just set our delta to 1 */
1023 delta = new_prio - clone->command.sop.prio;
1026 if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1027 clone->command.sop.prio = new_prio;
1030 node->command.sop.prio += delta;
1034 AddToSalvageQueue(struct SalvageQueueNode * node)
1037 struct SalvageQueueNode * last = NULL;
1039 id = volutil_GetPartitionID(node->command.sop.partName);
1040 if (id < 0 || id > VOLMAXPARTS) {
1043 if (!VGetPartitionById_r(id, 0)) {
1044 /* don't enqueue salvage requests for unmounted partitions */
1047 if (queue_IsOnQueue(node)) {
1051 if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1052 last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1054 queue_Append(&salvageQueue.part[id], node);
1055 salvageQueue.len[id]++;
1056 salvageQueue.total_len++;
1057 salvageQueue.last_insert = id;
1058 node->partition_id = id;
1059 node->state = SALVSYNC_STATE_QUEUED;
1061 /* reorder, if necessary */
1062 if (last && last->command.sop.prio < node->command.sop.prio) {
1063 UpdateCommandPrio(node);
1066 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1071 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1073 if (queue_IsOnQueue(node)) {
1075 salvageQueue.len[node->partition_id]--;
1076 salvageQueue.total_len--;
1077 node->state = SALVSYNC_STATE_UNKNOWN;
1078 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1083 AddToPendingQueue(struct SalvageQueueNode * node)
1085 queue_Append(&pendingQueue, node);
1087 node->state = SALVSYNC_STATE_SALVAGING;
1088 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1092 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1094 if (queue_IsOnQueue(node)) {
1097 node->state = SALVSYNC_STATE_UNKNOWN;
1098 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1102 static struct SalvageQueueNode *
1103 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1105 struct SalvageQueueNode * np, * nnp;
1107 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1108 if ((np->command.sop.volume == qry->volume) &&
1109 !strncmp(np->command.sop.partName, qry->partName,
1110 sizeof(qry->partName)))
1114 if (queue_IsEnd(&pendingQueue, np))
1119 static struct SalvageQueueNode *
1120 LookupPendingCommandByPid(int pid)
1122 struct SalvageQueueNode * np, * nnp;
1124 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1129 if (queue_IsEnd(&pendingQueue, np))
1135 /* raise the priority of a previously scheduled salvage */
1137 UpdateCommandPrio(struct SalvageQueueNode * node)
1139 struct SalvageQueueNode *np, *nnp;
1143 assert(queue_IsOnQueue(node));
1145 prio = node->command.sop.prio;
1146 id = node->partition_id;
1147 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1149 queue_Prepend(&salvageQueue.part[id], node);
1151 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1152 if (np->command.sop.prio > prio)
1155 if (queue_IsEnd(&salvageQueue.part[id], np)) {
1157 queue_Prepend(&salvageQueue.part[id], node);
1158 } else if (node != np) {
1160 queue_InsertAfter(np, node);
1165 /* this will need to be rearchitected if we ever want more than one thread
1166 * to wait for new salvage nodes */
1167 struct SalvageQueueNode *
1168 SALVSYNC_getWork(void)
1171 struct DiskPartition64 * dp = NULL, * fdp;
1172 static afs_int32 next_part_sched = 0;
1173 struct SalvageQueueNode *node = NULL, *np;
1178 * wait for work to be scheduled
1179 * if there are no disk partitions, just sit in this wait loop forever
1181 while (!salvageQueue.total_len || !DiskPartitionList) {
1182 VOL_CV_WAIT(&salvageQueue.cv);
1186 * short circuit for simple case where only one partition has
1187 * scheduled salvages
1189 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1190 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1191 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1197 * ok, more than one partition has scheduled salvages.
1198 * now search for partitions with scheduled salvages, but no pending salvages.
1200 dp = VGetPartitionById_r(next_part_sched, 0);
1202 dp = DiskPartitionList;
1208 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1209 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1210 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1217 * all partitions with scheduled salvages have at least one pending.
1218 * now do an exhaustive search for a scheduled salvage.
1224 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1225 if (salvageQueue.len[dp->index]) {
1226 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1231 /* we should never reach this line */
1235 assert(node != NULL);
1237 partition_salvaging[node->partition_id]++;
1238 DeleteFromSalvageQueue(node);
1239 AddToPendingQueue(node);
1242 /* update next_part_sched field */
1244 next_part_sched = dp->next->index;
1245 } else if (DiskPartitionList) {
1246 next_part_sched = DiskPartitionList->index;
1248 next_part_sched = -1;
1258 * update internal scheduler state to reflect completion of a work unit.
1260 * @param[in] node salvage queue node object pointer
1261 * @param[in] result worker process result code
1263 * @post scheduler state is updated.
1268 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1273 DeleteFromPendingQueue(node);
1274 partid = node->partition_id;
1275 if (partid >=0 && partid <= VOLMAXPARTS) {
1276 partition_salvaging[partid]--;
1279 node->state = SALVSYNC_STATE_DONE;
1280 } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1281 node->state = SALVSYNC_STATE_ERROR;
1284 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1285 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1286 if (node->volgroup.children[idx]) {
1287 node->volgroup.children[idx]->state = node->state;
1294 * check whether worker child failed.
1296 * @param[in] status status bitfield return by wait()
1298 * @return boolean failure code
1299 * @retval 0 child succeeded
1300 * @retval 1 child failed
1305 ChildFailed(int status)
1307 return (WCOREDUMP(status) ||
1308 WIFSIGNALED(status) ||
1309 ((WEXITSTATUS(status) != 0) &&
1310 (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1315 * notify salvsync scheduler of node completion, by child pid.
1317 * @param[in] pid pid of worker child
1318 * @param[in] status worker status bitfield from wait()
1320 * @post scheduler state is updated.
1321 * if status code is a failure, fileserver notification was attempted
1323 * @see SALVSYNC_doneWork_r
1326 SALVSYNC_doneWorkByPid(int pid, int status)
1328 struct SalvageQueueNode * node;
1330 afs_uint32 volids[VOLMAXTYPES+1];
1333 memset(volids, 0, sizeof(volids));
1336 node = LookupPendingCommandByPid(pid);
1338 SALVSYNC_doneWork_r(node, status);
1340 if (ChildFailed(status)) {
1341 /* populate volume id list for later processing outside the glock */
1342 volids[0] = node->command.sop.volume;
1343 strcpy(partName, node->command.sop.partName);
1344 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1345 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1346 if (node->volgroup.children[idx]) {
1347 volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1356 * if necessary, notify fileserver of
1357 * failure to salvage volume group
1358 * [we cannot guarantee that the child made the
1359 * appropriate notifications (e.g. SIGSEGV)]
1360 * -- tkeiser 11/28/2007
1362 if (ChildFailed(status)) {
1363 for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1365 FSYNC_VolOp(volids[idx],
1367 FSYNC_VOL_FORCE_ERROR,
1375 #endif /* AFS_DEMAND_ATTACH_FS */