2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
32 #include <sys/types.h>
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
51 #include <afs/afsint.h>
53 #include <afs/errors.h>
57 #include <afs/afssyscalls.h>
61 #include "partition.h"
62 #include <rx/rx_queue.h>
63 #include <afs/procmgmt.h>
65 #if !defined(offsetof)
69 #ifdef USE_UNIX_SOCKETS
70 #include <afs/afsutil.h>
75 /*@printflike@*/ extern void Log(const char *format, ...);
77 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
78 * move = dump+restore can run on single server */
80 /* Forward declarations */
81 static void * SALVSYNC_syncThread(void *);
82 static void SALVSYNC_newconnection(int fd);
83 static void SALVSYNC_com(int fd);
84 static void SALVSYNC_Drop(int fd);
85 static void AcceptOn(void);
86 static void AcceptOff(void);
87 static void InitHandler(void);
88 static void CallHandler(fd_set * fdsetp);
89 static int AddHandler(int afd, void (*aproc) (int));
90 static int FindHandler(register int afd);
91 static int FindHandler_r(register int afd);
92 static int RemoveHandler(register int afd);
93 static void GetHandler(fd_set * fdsetp, int *maxfdp);
97 * This lock controls access to the handler array.
99 struct Lock SALVSYNC_handler_lock;
102 #ifdef AFS_DEMAND_ATTACH_FS
104 * SALVSYNC is a feature specific to the demand attach fileserver
107 static int AllocNode(struct SalvageQueueNode ** node);
109 static int AddToSalvageQueue(struct SalvageQueueNode * node);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
111 static void AddToPendingQueue(struct SalvageQueueNode * node);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
113 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
114 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
115 static void UpdateCommandPrio(struct SalvageQueueNode * node);
116 static void HandlePrio(struct SalvageQueueNode * clone,
117 struct SalvageQueueNode * parent,
118 afs_uint32 new_prio);
120 static int LinkNode(struct SalvageQueueNode * parent,
121 struct SalvageQueueNode * clone);
123 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
124 struct SalvageQueueNode ** parent);
125 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
126 struct SalvageQueueNode ** parent);
127 static void AddNodeToHash(struct SalvageQueueNode * node);
128 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
130 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
133 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
134 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
139 extern pthread_mutex_t vol_salvsync_mutex;
142 * salvsync server socket handle.
144 static SYNC_server_state_t salvsync_server_state =
145 { -1, /* file descriptor */
146 SALVSYNC_ENDPOINT_DECL, /* server endpoint */
147 SALVSYNC_PROTO_VERSION, /* protocol version */
148 5, /* bind() retry limit */
149 100, /* listen() queue depth */
150 "SALVSYNC", /* protocol name string */
155 * queue of all volumes waiting to be salvaged.
157 struct SalvageQueue {
158 volatile int total_len;
159 volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
160 volatile int len[VOLMAXPARTS+1];
161 volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
164 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
167 * queue of all volumes currently being salvaged.
170 volatile struct rx_queue q; /**< queue of salvages in progress */
171 volatile int len; /**< length of in-progress queue */
172 pthread_cond_t queue_change_cv;
174 static struct QueueHead pendingQueue; /* volumes being salvaged */
177 * whether a partition has a salvage in progress
179 * the salvager code only permits one salvage per partition at a time
181 * the following hack tries to keep salvaged parallelism high by
182 * only permitting one salvage dispatch per partition at a time
184 * unfortunately, the parallel salvager currently
185 * has a rather braindead routine that won't permit
186 * multiple salvages on the same "device". this
187 * function happens to break pretty badly on lvm, raid luns, etc.
189 * this hack isn't good enough to stop the device limiting code from
190 * crippling performance. someday that code needs to be rewritten
192 static int partition_salvaging[VOLMAXPARTS+1];
194 #define VSHASH_SIZE 64
195 #define VSHASH_MASK (VSHASH_SIZE-1)
196 #define VSHASH(vid) ((vid)&VSHASH_MASK)
198 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
200 static struct SalvageQueueNode *
201 LookupNode(afs_uint32 vid, char * partName,
202 struct SalvageQueueNode ** parent)
204 struct rx_queue *qp, *nqp;
205 struct SalvageQueueNode *vsp;
206 int idx = VSHASH(vid);
208 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
209 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
210 if ((vsp->command.sop.volume == vid) &&
211 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
216 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
222 *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
223 vsp->volgroup.parent : vsp;
232 static struct SalvageQueueNode *
233 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
234 struct SalvageQueueNode ** parent)
236 return LookupNode(qry->volume, qry->partName, parent);
240 AddNodeToHash(struct SalvageQueueNode * node)
242 int idx = VSHASH(node->command.sop.volume);
244 if (queue_IsOnQueue(&node->hash_chain)) {
248 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
249 SalvageHashTable[idx].len++;
253 DeleteNodeFromHash(struct SalvageQueueNode * node)
255 int idx = VSHASH(node->command.sop.volume);
257 if (queue_IsNotOnQueue(&node->hash_chain)) {
261 queue_Remove(&node->hash_chain);
262 SalvageHashTable[idx].len--;
266 SALVSYNC_salvInit(void)
270 pthread_attr_t tattr;
272 /* initialize the queues */
273 Lock_Init(&SALVSYNC_handler_lock);
274 assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
275 for (i = 0; i <= VOLMAXPARTS; i++) {
276 queue_Init(&salvageQueue.part[i]);
277 salvageQueue.len[i] = 0;
279 assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
280 queue_Init(&pendingQueue);
281 salvageQueue.total_len = pendingQueue.len = 0;
282 salvageQueue.last_insert = -1;
283 memset(partition_salvaging, 0, sizeof(partition_salvaging));
285 for (i = 0; i < VSHASH_SIZE; i++) {
286 assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
287 SalvageHashTable[i].len = 0;
288 queue_Init(&SalvageHashTable[i]);
291 /* start the salvsync thread */
292 assert(pthread_attr_init(&tattr) == 0);
293 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
294 assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
298 static fd_set SALVSYNC_readfds;
301 SALVSYNC_syncThread(void * args)
307 SYNC_server_state_t * state = &salvsync_server_state;
309 SYNC_getAddr(&state->endpoint, &state->addr);
310 SYNC_cleanupSock(state);
313 (void)signal(SIGPIPE, SIG_IGN);
316 state->fd = SYNC_getSock(&state->endpoint);
317 code = SYNC_bindSock(state);
325 GetHandler(&SALVSYNC_readfds, &maxfd);
326 /* Note: check for >= 1 below is essential since IOMGR_select
327 * doesn't have exactly same semantics as select.
329 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
330 CallHandler(&SALVSYNC_readfds);
337 SALVSYNC_newconnection(int afd)
339 #ifdef USE_UNIX_SOCKETS
340 struct sockaddr_un other;
341 #else /* USE_UNIX_SOCKETS */
342 struct sockaddr_in other;
345 junk = sizeof(other);
346 fd = accept(afd, (struct sockaddr *)&other, &junk);
348 Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
350 } else if (!AddHandler(fd, SALVSYNC_com)) {
352 assert(AddHandler(fd, SALVSYNC_com));
356 /* this function processes commands from an salvsync file descriptor (fd) */
357 static afs_int32 SALV_cnt = 0;
363 SALVSYNC_response_hdr sres_hdr;
364 SALVSYNC_command scom;
365 SALVSYNC_response sres;
366 SYNC_PROTO_BUF_DECL(buf);
368 com.payload.buf = (void *)buf;
369 com.payload.len = SYNC_PROTO_MAX_LEN;
370 res.payload.buf = (void *) &sres_hdr;
371 res.payload.len = sizeof(sres_hdr);
372 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
373 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
376 scom.sop = (SALVSYNC_command_hdr *) buf;
379 sres.sop = &sres_hdr;
383 if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
384 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
389 if (com.recv_len < sizeof(com.hdr)) {
390 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
391 res.hdr.response = SYNC_COM_ERROR;
392 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
393 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
397 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
398 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
399 res.hdr.response = SYNC_COM_ERROR;
400 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
404 if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
405 res.hdr.response = SYNC_OK;
406 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
410 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
411 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
412 res.hdr.response = SYNC_COM_ERROR;
413 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
414 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
418 res.hdr.com_seq = com.hdr.com_seq;
421 switch (com.hdr.command) {
424 case SALVSYNC_SALVAGE:
425 case SALVSYNC_RAISEPRIO:
426 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
428 case SALVSYNC_CANCEL:
429 /* cancel a salvage */
430 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
432 case SALVSYNC_CANCELALL:
433 /* cancel all queued salvages */
434 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
437 /* query whether a volume is done salvaging */
438 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
440 case SALVSYNC_OP_LINK:
441 /* link a clone to its parent in the scheduler */
442 res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
445 res.hdr.response = SYNC_BAD_COMMAND;
449 sres_hdr.sq_len = salvageQueue.total_len;
450 sres_hdr.pq_len = pendingQueue.len;
454 SYNC_putRes(&salvsync_server_state, fd, &res);
455 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
461 * request that a volume be salvaged.
463 * @param[in] com inbound command object
464 * @param[out] res outbound response object
466 * @return operation status
467 * @retval SYNC_OK success
468 * @retval SYNC_DENIED failed to enqueue request
469 * @retval SYNC_FAILED malformed command packet
471 * @note this is a SALVSYNC protocol rpc handler
475 * @post the volume is enqueued in the to-be-salvaged queue.
476 * if the volume was already in the salvage queue, its
477 * priority (and thus its location in the queue) are
481 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
483 afs_int32 code = SYNC_OK;
484 struct SalvageQueueNode * node, * clone;
487 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
489 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
493 clone = LookupNodeByCommand(com->sop, &node);
496 if (AllocNode(&node)) {
498 res->hdr->reason = SYNC_REASON_NOMEM;
505 HandlePrio(clone, node, com->sop->prio);
507 switch (node->state) {
508 case SALVSYNC_STATE_QUEUED:
509 UpdateCommandPrio(node);
512 case SALVSYNC_STATE_ERROR:
513 case SALVSYNC_STATE_DONE:
514 case SALVSYNC_STATE_UNKNOWN:
515 memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
516 memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
519 * make sure volgroup parent partition path is kept coherent
521 * If we ever want to support non-COW clones on a machine holding
522 * the RW site, please note that this code does not work under the
523 * conditions where someone zaps a COW clone on partition X, and
524 * subsequently creates a full clone on partition Y -- we'd need
525 * an inverse to SALVSYNC_com_Link.
526 * -- tkeiser 11/28/2007
528 strcpy(node->command.sop.partName, com->sop->partName);
530 if (AddToSalvageQueue(node)) {
543 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
544 res->sop->state = node->state;
545 res->sop->prio = node->command.sop.prio;
552 * cancel a pending salvage request.
554 * @param[in] com inbound command object
555 * @param[out] res outbound response object
557 * @return operation status
558 * @retval SYNC_OK success
559 * @retval SYNC_FAILED malformed command packet
561 * @note this is a SALVSYNC protocol rpc handler
566 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
568 afs_int32 code = SYNC_OK;
569 struct SalvageQueueNode * node;
571 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
573 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
577 node = LookupNodeByCommand(com->sop, NULL);
580 res->sop->state = SALVSYNC_STATE_UNKNOWN;
583 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
584 res->sop->prio = node->command.sop.prio;
585 res->sop->state = node->state;
586 if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
587 (node->state == SALVSYNC_STATE_QUEUED)) {
588 DeleteFromSalvageQueue(node);
597 * cancel all pending salvage requests.
599 * @param[in] com incoming command object
600 * @param[out] res outbound response object
602 * @return operation status
603 * @retval SYNC_OK success
605 * @note this is a SALVSYNC protocol rpc handler
610 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
612 struct SalvageQueueNode * np, *nnp;
613 struct DiskPartition64 * dp;
615 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
616 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
617 DeleteFromSalvageQueue(np);
625 * link a queue node for a clone to its parent volume.
627 * @param[in] com inbound command object
628 * @param[out] res outbound response object
630 * @return operation status
631 * @retval SYNC_OK success
632 * @retval SYNC_FAILED malformed command packet
633 * @retval SYNC_DENIED the request could not be completed
635 * @note this is a SALVSYNC protocol rpc handler
637 * @post the requested volume is marked as a child of another volume.
638 * thus, future salvage requests for this volume will result in the
639 * parent of the volume group being scheduled for salvage instead
645 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
647 afs_int32 code = SYNC_OK;
648 struct SalvageQueueNode * clone, * parent;
650 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
652 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
656 /* lookup clone's salvage scheduling node */
657 clone = LookupNodeByCommand(com->sop, NULL);
660 res->hdr->reason = SALVSYNC_REASON_ERROR;
664 /* lookup parent's salvage scheduling node */
665 parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
666 if (parent == NULL) {
667 if (AllocNode(&parent)) {
669 res->hdr->reason = SYNC_REASON_NOMEM;
672 memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
673 memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
674 parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
675 AddNodeToHash(parent);
678 if (LinkNode(parent, clone)) {
688 * query the status of a volume salvage request.
690 * @param[in] com inbound command object
691 * @param[out] res outbound response object
693 * @return operation status
694 * @retval SYNC_OK success
695 * @retval SYNC_FAILED malformed command packet
697 * @note this is a SALVSYNC protocol rpc handler
702 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
704 afs_int32 code = SYNC_OK;
705 struct SalvageQueueNode * node;
707 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
709 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
713 LookupNodeByCommand(com->sop, &node);
715 /* query whether a volume is done salvaging */
717 res->sop->state = SALVSYNC_STATE_UNKNOWN;
720 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
721 res->sop->state = node->state;
722 res->sop->prio = node->command.sop.prio;
730 SALVSYNC_Drop(int fd)
741 static int AcceptHandler = -1; /* handler id for accept, if turned on */
746 if (AcceptHandler == -1) {
747 assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
748 AcceptHandler = FindHandler(salvsync_server_state.fd);
755 if (AcceptHandler != -1) {
756 assert(RemoveHandler(salvsync_server_state.fd));
761 /* The multiple FD handling code. */
763 static int HandlerFD[MAXHANDLERS];
764 static void (*HandlerProc[MAXHANDLERS]) (int);
770 ObtainWriteLock(&SALVSYNC_handler_lock);
771 for (i = 0; i < MAXHANDLERS; i++) {
773 HandlerProc[i] = NULL;
775 ReleaseWriteLock(&SALVSYNC_handler_lock);
779 CallHandler(fd_set * fdsetp)
782 ObtainReadLock(&SALVSYNC_handler_lock);
783 for (i = 0; i < MAXHANDLERS; i++) {
784 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
785 ReleaseReadLock(&SALVSYNC_handler_lock);
786 (*HandlerProc[i]) (HandlerFD[i]);
787 ObtainReadLock(&SALVSYNC_handler_lock);
790 ReleaseReadLock(&SALVSYNC_handler_lock);
794 AddHandler(int afd, void (*aproc) (int))
797 ObtainWriteLock(&SALVSYNC_handler_lock);
798 for (i = 0; i < MAXHANDLERS; i++)
799 if (HandlerFD[i] == -1)
801 if (i >= MAXHANDLERS) {
802 ReleaseWriteLock(&SALVSYNC_handler_lock);
806 HandlerProc[i] = aproc;
807 ReleaseWriteLock(&SALVSYNC_handler_lock);
812 FindHandler(register int afd)
815 ObtainReadLock(&SALVSYNC_handler_lock);
816 for (i = 0; i < MAXHANDLERS; i++)
817 if (HandlerFD[i] == afd) {
818 ReleaseReadLock(&SALVSYNC_handler_lock);
821 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
823 return -1; /* satisfy compiler */
827 FindHandler_r(register int afd)
830 for (i = 0; i < MAXHANDLERS; i++)
831 if (HandlerFD[i] == afd) {
835 return -1; /* satisfy compiler */
839 RemoveHandler(register int afd)
841 ObtainWriteLock(&SALVSYNC_handler_lock);
842 HandlerFD[FindHandler_r(afd)] = -1;
843 ReleaseWriteLock(&SALVSYNC_handler_lock);
848 GetHandler(fd_set * fdsetp, int *maxfdp)
851 register int maxfd = -1;
853 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
854 for (i = 0; i < MAXHANDLERS; i++)
855 if (HandlerFD[i] != -1) {
856 FD_SET(HandlerFD[i], fdsetp);
857 if (maxfd < HandlerFD[i])
858 maxfd = HandlerFD[i];
861 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
865 * allocate a salvage queue node.
867 * @param[out] node_out address in which to store new node pointer
869 * @return operation status
871 * @retval 1 failed to allocate node
876 AllocNode(struct SalvageQueueNode ** node_out)
879 struct SalvageQueueNode * node;
881 *node_out = node = (struct SalvageQueueNode *)
882 malloc(sizeof(struct SalvageQueueNode));
888 memset(node, 0, sizeof(struct SalvageQueueNode));
889 node->type = SALVSYNC_VOLGROUP_PARENT;
890 node->state = SALVSYNC_STATE_UNKNOWN;
897 * link a salvage queue node to its parent.
899 * @param[in] parent pointer to queue node for parent of volume group
900 * @param[in] clone pointer to queue node for a clone
902 * @return operation status
909 LinkNode(struct SalvageQueueNode * parent,
910 struct SalvageQueueNode * clone)
915 /* check for attaching a clone to a clone */
916 if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
921 /* check for pre-existing registration and openings */
922 for (idx = 0; idx < VOLMAXTYPES; idx++) {
923 if (parent->volgroup.children[idx] == clone) {
926 if (parent->volgroup.children[idx] == NULL) {
930 if (idx == VOLMAXTYPES) {
935 /* link parent and child */
936 parent->volgroup.children[idx] = clone;
937 clone->type = SALVSYNC_VOLGROUP_CLONE;
938 clone->volgroup.parent = parent;
942 switch (clone->state) {
943 case SALVSYNC_STATE_QUEUED:
944 DeleteFromSalvageQueue(clone);
946 case SALVSYNC_STATE_SALVAGING:
947 switch (parent->state) {
948 case SALVSYNC_STATE_UNKNOWN:
949 case SALVSYNC_STATE_ERROR:
950 case SALVSYNC_STATE_DONE:
951 parent->command.sop.prio = clone->command.sop.prio;
952 AddToSalvageQueue(parent);
955 case SALVSYNC_STATE_QUEUED:
956 if (clone->command.sop.prio) {
957 parent->command.sop.prio += clone->command.sop.prio;
958 UpdateCommandPrio(parent);
976 HandlePrio(struct SalvageQueueNode * clone,
977 struct SalvageQueueNode * node,
982 switch (node->state) {
983 case SALVSYNC_STATE_ERROR:
984 case SALVSYNC_STATE_DONE:
985 case SALVSYNC_STATE_UNKNOWN:
986 node->command.sop.prio = 0;
990 if (new_prio < clone->command.sop.prio) {
991 /* strange. let's just set our delta to 1 */
994 delta = new_prio - clone->command.sop.prio;
997 if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
998 clone->command.sop.prio = new_prio;
1001 node->command.sop.prio += delta;
1005 AddToSalvageQueue(struct SalvageQueueNode * node)
1008 struct SalvageQueueNode * last = NULL;
1010 id = volutil_GetPartitionID(node->command.sop.partName);
1011 if (id < 0 || id > VOLMAXPARTS) {
1014 if (!VGetPartitionById_r(id, 0)) {
1015 /* don't enqueue salvage requests for unmounted partitions */
1018 if (queue_IsOnQueue(node)) {
1022 if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1023 last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1025 queue_Append(&salvageQueue.part[id], node);
1026 salvageQueue.len[id]++;
1027 salvageQueue.total_len++;
1028 salvageQueue.last_insert = id;
1029 node->partition_id = id;
1030 node->state = SALVSYNC_STATE_QUEUED;
1032 /* reorder, if necessary */
1033 if (last && last->command.sop.prio < node->command.sop.prio) {
1034 UpdateCommandPrio(node);
1037 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1042 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1044 if (queue_IsOnQueue(node)) {
1046 salvageQueue.len[node->partition_id]--;
1047 salvageQueue.total_len--;
1048 node->state = SALVSYNC_STATE_UNKNOWN;
1049 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1054 AddToPendingQueue(struct SalvageQueueNode * node)
1056 queue_Append(&pendingQueue, node);
1058 node->state = SALVSYNC_STATE_SALVAGING;
1059 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1063 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1065 if (queue_IsOnQueue(node)) {
1068 node->state = SALVSYNC_STATE_UNKNOWN;
1069 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1073 static struct SalvageQueueNode *
1074 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1076 struct SalvageQueueNode * np, * nnp;
1078 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1079 if ((np->command.sop.volume == qry->volume) &&
1080 !strncmp(np->command.sop.partName, qry->partName,
1081 sizeof(qry->partName)))
1085 if (queue_IsEnd(&pendingQueue, np))
1090 static struct SalvageQueueNode *
1091 LookupPendingCommandByPid(int pid)
1093 struct SalvageQueueNode * np, * nnp;
1095 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1100 if (queue_IsEnd(&pendingQueue, np))
1106 /* raise the priority of a previously scheduled salvage */
1108 UpdateCommandPrio(struct SalvageQueueNode * node)
1110 struct SalvageQueueNode *np, *nnp;
1114 assert(queue_IsOnQueue(node));
1116 prio = node->command.sop.prio;
1117 id = node->partition_id;
1118 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1120 queue_Prepend(&salvageQueue.part[id], node);
1122 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1123 if (np->command.sop.prio > prio)
1126 if (queue_IsEnd(&salvageQueue.part[id], np)) {
1128 queue_Prepend(&salvageQueue.part[id], node);
1129 } else if (node != np) {
1131 queue_InsertAfter(np, node);
1136 /* this will need to be rearchitected if we ever want more than one thread
1137 * to wait for new salvage nodes */
1138 struct SalvageQueueNode *
1139 SALVSYNC_getWork(void)
1142 struct DiskPartition64 * dp = NULL, * fdp;
1143 static afs_int32 next_part_sched = 0;
1144 struct SalvageQueueNode *node = NULL, *np;
1149 * wait for work to be scheduled
1150 * if there are no disk partitions, just sit in this wait loop forever
1152 while (!salvageQueue.total_len || !DiskPartitionList) {
1153 VOL_CV_WAIT(&salvageQueue.cv);
1157 * short circuit for simple case where only one partition has
1158 * scheduled salvages
1160 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1161 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1162 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1168 * ok, more than one partition has scheduled salvages.
1169 * now search for partitions with scheduled salvages, but no pending salvages.
1171 dp = VGetPartitionById_r(next_part_sched, 0);
1173 dp = DiskPartitionList;
1179 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1180 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1181 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1188 * all partitions with scheduled salvages have at least one pending.
1189 * now do an exhaustive search for a scheduled salvage.
1195 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1196 if (salvageQueue.len[dp->index]) {
1197 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1202 /* we should never reach this line */
1206 assert(node != NULL);
1208 partition_salvaging[node->partition_id]++;
1209 DeleteFromSalvageQueue(node);
1210 AddToPendingQueue(node);
1213 /* update next_part_sched field */
1215 next_part_sched = dp->next->index;
1216 } else if (DiskPartitionList) {
1217 next_part_sched = DiskPartitionList->index;
1219 next_part_sched = -1;
1229 * update internal scheduler state to reflect completion of a work unit.
1231 * @param[in] node salvage queue node object pointer
1232 * @param[in] result worker process result code
1234 * @post scheduler state is updated.
1239 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1244 DeleteFromPendingQueue(node);
1245 partid = node->partition_id;
1246 if (partid >=0 && partid <= VOLMAXPARTS) {
1247 partition_salvaging[partid]--;
1250 node->state = SALVSYNC_STATE_DONE;
1251 } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1252 node->state = SALVSYNC_STATE_ERROR;
1255 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1256 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1257 if (node->volgroup.children[idx]) {
1258 node->volgroup.children[idx]->state = node->state;
1265 * check whether worker child failed.
1267 * @param[in] status status bitfield return by wait()
1269 * @return boolean failure code
1270 * @retval 0 child succeeded
1271 * @retval 1 child failed
1276 ChildFailed(int status)
1278 return (WCOREDUMP(status) ||
1279 WIFSIGNALED(status) ||
1280 ((WEXITSTATUS(status) != 0) &&
1281 (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1286 * notify salvsync scheduler of node completion, by child pid.
1288 * @param[in] pid pid of worker child
1289 * @param[in] status worker status bitfield from wait()
1291 * @post scheduler state is updated.
1292 * if status code is a failure, fileserver notification was attempted
1294 * @see SALVSYNC_doneWork_r
1297 SALVSYNC_doneWorkByPid(int pid, int status)
1299 struct SalvageQueueNode * node;
1301 afs_uint32 volids[VOLMAXTYPES+1];
1304 memset(volids, 0, sizeof(volids));
1307 node = LookupPendingCommandByPid(pid);
1309 SALVSYNC_doneWork_r(node, status);
1311 if (ChildFailed(status)) {
1312 /* populate volume id list for later processing outside the glock */
1313 volids[0] = node->command.sop.volume;
1314 strcpy(partName, node->command.sop.partName);
1315 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1316 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1317 if (node->volgroup.children[idx]) {
1318 volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1327 * if necessary, notify fileserver of
1328 * failure to salvage volume group
1329 * [we cannot guarantee that the child made the
1330 * appropriate notifications (e.g. SIGSEGV)]
1331 * -- tkeiser 11/28/2007
1333 if (ChildFailed(status)) {
1334 for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1336 FSYNC_VolOp(volids[idx],
1338 FSYNC_VOL_FORCE_ERROR,
1346 #endif /* AFS_DEMAND_ATTACH_FS */