2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
32 #include <sys/types.h>
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
51 #include <afs/afsint.h>
53 #include <afs/errors.h>
57 #include <afs/afssyscalls.h>
61 #include "partition.h"
62 #include <rx/rx_queue.h>
63 #include <afs/procmgmt.h>
65 #if !defined(offsetof)
69 #ifdef USE_UNIX_SOCKETS
70 #include <afs/afsutil.h>
75 /*@printflike@*/ extern void Log(const char *format, ...);
80 #define osi_Assert(e) (void)(e)
82 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
83 * move = dump+restore can run on single server */
85 /* Forward declarations */
86 static void * SALVSYNC_syncThread(void *);
87 static void SALVSYNC_newconnection(int fd);
88 static void SALVSYNC_com(int fd);
89 static void SALVSYNC_Drop(int fd);
90 static void AcceptOn(void);
91 static void AcceptOff(void);
92 static void InitHandler(void);
93 static void CallHandler(fd_set * fdsetp);
94 static int AddHandler(int afd, void (*aproc) (int));
95 static int FindHandler(register int afd);
96 static int FindHandler_r(register int afd);
97 static int RemoveHandler(register int afd);
98 static void GetHandler(fd_set * fdsetp, int *maxfdp);
102 * This lock controls access to the handler array.
104 struct Lock SALVSYNC_handler_lock;
107 #ifdef AFS_DEMAND_ATTACH_FS
109 * SALVSYNC is a feature specific to the demand attach fileserver
112 static int AllocNode(struct SalvageQueueNode ** node);
114 static int AddToSalvageQueue(struct SalvageQueueNode * node);
115 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
116 static void AddToPendingQueue(struct SalvageQueueNode * node);
117 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
118 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
119 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
120 static void UpdateCommandPrio(struct SalvageQueueNode * node);
121 static void HandlePrio(struct SalvageQueueNode * clone,
122 struct SalvageQueueNode * parent,
123 afs_uint32 new_prio);
125 static int LinkNode(struct SalvageQueueNode * parent,
126 struct SalvageQueueNode * clone);
128 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
129 struct SalvageQueueNode ** parent);
130 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
131 struct SalvageQueueNode ** parent);
132 static void AddNodeToHash(struct SalvageQueueNode * node);
133 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
135 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
136 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
137 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
138 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
139 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
144 extern pthread_mutex_t vol_salvsync_mutex;
147 * salvsync server socket handle.
149 static SYNC_server_state_t salvsync_server_state =
150 { -1, /* file descriptor */
151 SALVSYNC_ENDPOINT_DECL, /* server endpoint */
152 SALVSYNC_PROTO_VERSION, /* protocol version */
153 5, /* bind() retry limit */
154 100, /* listen() queue depth */
155 "SALVSYNC", /* protocol name string */
160 * queue of all volumes waiting to be salvaged.
162 struct SalvageQueue {
163 volatile int total_len;
164 volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
165 volatile int len[VOLMAXPARTS+1];
166 volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
169 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
172 * queue of all volumes currently being salvaged.
175 volatile struct rx_queue q; /**< queue of salvages in progress */
176 volatile int len; /**< length of in-progress queue */
177 pthread_cond_t queue_change_cv;
179 static struct QueueHead pendingQueue; /* volumes being salvaged */
182 * whether a partition has a salvage in progress
184 * the salvager code only permits one salvage per partition at a time
186 * the following hack tries to keep salvaged parallelism high by
187 * only permitting one salvage dispatch per partition at a time
189 * unfortunately, the parallel salvager currently
190 * has a rather braindead routine that won't permit
191 * multiple salvages on the same "device". this
192 * function happens to break pretty badly on lvm, raid luns, etc.
194 * this hack isn't good enough to stop the device limiting code from
195 * crippling performance. someday that code needs to be rewritten
197 static int partition_salvaging[VOLMAXPARTS+1];
199 #define VSHASH_SIZE 64
200 #define VSHASH_MASK (VSHASH_SIZE-1)
201 #define VSHASH(vid) ((vid)&VSHASH_MASK)
203 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
205 static struct SalvageQueueNode *
206 LookupNode(afs_uint32 vid, char * partName,
207 struct SalvageQueueNode ** parent)
209 struct rx_queue *qp, *nqp;
210 struct SalvageQueueNode *vsp;
211 int idx = VSHASH(vid);
213 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
214 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
215 if ((vsp->command.sop.volume == vid) &&
216 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
221 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
227 *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
228 vsp->volgroup.parent : vsp;
237 static struct SalvageQueueNode *
238 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
239 struct SalvageQueueNode ** parent)
241 return LookupNode(qry->volume, qry->partName, parent);
245 AddNodeToHash(struct SalvageQueueNode * node)
247 int idx = VSHASH(node->command.sop.volume);
249 if (queue_IsOnQueue(&node->hash_chain)) {
253 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
254 SalvageHashTable[idx].len++;
258 DeleteNodeFromHash(struct SalvageQueueNode * node)
260 int idx = VSHASH(node->command.sop.volume);
262 if (queue_IsNotOnQueue(&node->hash_chain)) {
266 queue_Remove(&node->hash_chain);
267 SalvageHashTable[idx].len--;
271 SALVSYNC_salvInit(void)
275 pthread_attr_t tattr;
277 /* initialize the queues */
278 assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
279 for (i = 0; i <= VOLMAXPARTS; i++) {
280 queue_Init(&salvageQueue.part[i]);
281 salvageQueue.len[i] = 0;
283 assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
284 queue_Init(&pendingQueue);
285 salvageQueue.total_len = pendingQueue.len = 0;
286 salvageQueue.last_insert = -1;
287 memset(partition_salvaging, 0, sizeof(partition_salvaging));
289 for (i = 0; i < VSHASH_SIZE; i++) {
290 assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
291 SalvageHashTable[i].len = 0;
292 queue_Init(&SalvageHashTable[i]);
295 /* start the salvsync thread */
296 assert(pthread_attr_init(&tattr) == 0);
297 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
298 assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
302 static fd_set SALVSYNC_readfds;
305 SALVSYNC_syncThread(void * args)
311 SYNC_server_state_t * state = &salvsync_server_state;
313 SYNC_getAddr(&state->endpoint, &state->addr);
314 SYNC_cleanupSock(state);
317 (void)signal(SIGPIPE, SIG_IGN);
320 /* set our 'thread-id' so that the host hold table works */
321 MUTEX_ENTER(&rx_stats_mutex); /* protects rxi_pthread_hinum */
322 tid = ++rxi_pthread_hinum;
323 MUTEX_EXIT(&rx_stats_mutex);
324 pthread_setspecific(rx_thread_id_key, (void *)tid);
325 Log("Set thread id %d for SALVSYNC_syncThread\n", tid);
327 state->fd = SYNC_getSock(&state->endpoint);
328 code = SYNC_bindSock(state);
336 GetHandler(&SALVSYNC_readfds, &maxfd);
337 /* Note: check for >= 1 below is essential since IOMGR_select
338 * doesn't have exactly same semantics as select.
340 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
341 CallHandler(&SALVSYNC_readfds);
348 SALVSYNC_newconnection(int afd)
350 #ifdef USE_UNIX_SOCKETS
351 struct sockaddr_un other;
352 #else /* USE_UNIX_SOCKETS */
353 struct sockaddr_in other;
356 junk = sizeof(other);
357 fd = accept(afd, (struct sockaddr *)&other, &junk);
359 Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
361 } else if (!AddHandler(fd, SALVSYNC_com)) {
363 assert(AddHandler(fd, SALVSYNC_com));
367 /* this function processes commands from an salvsync file descriptor (fd) */
368 static afs_int32 SALV_cnt = 0;
374 SALVSYNC_response_hdr sres_hdr;
375 SALVSYNC_command scom;
376 SALVSYNC_response sres;
377 SYNC_PROTO_BUF_DECL(buf);
379 com.payload.buf = (void *)buf;
380 com.payload.len = SYNC_PROTO_MAX_LEN;
381 res.payload.buf = (void *) &sres_hdr;
382 res.payload.len = sizeof(sres_hdr);
383 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
384 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
387 scom.sop = (SALVSYNC_command_hdr *) buf;
390 sres.sop = &sres_hdr;
394 if (SYNC_getCom(fd, &com)) {
395 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
400 if (com.recv_len < sizeof(com.hdr)) {
401 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
402 res.hdr.response = SYNC_COM_ERROR;
403 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
404 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
408 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
409 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
410 res.hdr.response = SYNC_COM_ERROR;
411 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
415 if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
416 res.hdr.response = SYNC_OK;
417 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
421 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
422 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
423 res.hdr.response = SYNC_COM_ERROR;
424 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
425 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
430 switch (com.hdr.command) {
433 case SALVSYNC_SALVAGE:
434 case SALVSYNC_RAISEPRIO:
435 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
437 case SALVSYNC_CANCEL:
438 /* cancel a salvage */
439 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
441 case SALVSYNC_CANCELALL:
442 /* cancel all queued salvages */
443 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
446 /* query whether a volume is done salvaging */
447 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
449 case SALVSYNC_OP_LINK:
450 /* link a clone to its parent in the scheduler */
451 res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
454 res.hdr.response = SYNC_BAD_COMMAND;
458 sres_hdr.sq_len = salvageQueue.total_len;
459 sres_hdr.pq_len = pendingQueue.len;
463 SYNC_putRes(fd, &res);
464 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
470 * request that a volume be salvaged.
472 * @param[in] com inbound command object
473 * @param[out] res outbound response object
475 * @return operation status
476 * @retval SYNC_OK success
477 * @retval SYNC_DENIED failed to enqueue request
478 * @retval SYNC_FAILED malformed command packet
480 * @note this is a SALVSYNC protocol rpc handler
484 * @post the volume is enqueued in the to-be-salvaged queue.
485 * if the volume was already in the salvage queue, its
486 * priority (and thus its location in the queue) are
490 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
492 afs_int32 code = SYNC_OK;
493 struct SalvageQueueNode * node, * clone;
496 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
498 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
502 clone = LookupNodeByCommand(com->sop, &node);
505 if (AllocNode(&node)) {
507 res->hdr->reason = SYNC_REASON_NOMEM;
514 HandlePrio(clone, node, com->sop->prio);
516 switch (node->state) {
517 case SALVSYNC_STATE_QUEUED:
518 UpdateCommandPrio(node);
521 case SALVSYNC_STATE_ERROR:
522 case SALVSYNC_STATE_DONE:
523 case SALVSYNC_STATE_UNKNOWN:
524 memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
525 memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
528 * make sure volgroup parent partition path is kept coherent
530 * If we ever want to support non-COW clones on a machine holding
531 * the RW site, please note that this code does not work under the
532 * conditions where someone zaps a COW clone on partition X, and
533 * subsequently creates a full clone on partition Y -- we'd need
534 * an inverse to SALVSYNC_com_Link.
535 * -- tkeiser 11/28/2007
537 strcpy(node->command.sop.partName, com->sop->partName);
539 if (AddToSalvageQueue(node)) {
552 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
553 res->sop->state = node->state;
554 res->sop->prio = node->command.sop.prio;
561 * cancel a pending salvage request.
563 * @param[in] com inbound command object
564 * @param[out] res outbound response object
566 * @return operation status
567 * @retval SYNC_OK success
568 * @retval SYNC_FAILED malformed command packet
570 * @note this is a SALVSYNC protocol rpc handler
575 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
577 afs_int32 code = SYNC_OK;
578 struct SalvageQueueNode * node;
580 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
582 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
586 node = LookupNodeByCommand(com->sop, NULL);
589 res->sop->state = SALVSYNC_STATE_UNKNOWN;
592 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
593 res->sop->prio = node->command.sop.prio;
594 res->sop->state = node->state;
595 if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
596 (node->state == SALVSYNC_STATE_QUEUED)) {
597 DeleteFromSalvageQueue(node);
606 * cancel all pending salvage requests.
608 * @param[in] com incoming command object
609 * @param[out] res outbound response object
611 * @return operation status
612 * @retval SYNC_OK success
614 * @note this is a SALVSYNC protocol rpc handler
619 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
621 struct SalvageQueueNode * np, *nnp;
622 struct DiskPartition * dp;
624 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
625 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
626 DeleteFromSalvageQueue(np);
634 * link a queue node for a clone to its parent volume.
636 * @param[in] com inbound command object
637 * @param[out] res outbound response object
639 * @return operation status
640 * @retval SYNC_OK success
641 * @retval SYNC_FAILED malformed command packet
642 * @retval SYNC_DENIED the request could not be completed
644 * @note this is a SALVSYNC protocol rpc handler
646 * @post the requested volume is marked as a child of another volume.
647 * thus, future salvage requests for this volume will result in the
648 * parent of the volume group being scheduled for salvage instead
654 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
656 afs_int32 code = SYNC_OK;
657 struct SalvageQueueNode * clone, * parent;
659 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
661 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
665 /* lookup clone's salvage scheduling node */
666 clone = LookupNodeByCommand(com->sop, NULL);
669 res->hdr->reason = SALVSYNC_REASON_ERROR;
673 /* lookup parent's salvage scheduling node */
674 parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
675 if (parent == NULL) {
676 if (AllocNode(&parent)) {
678 res->hdr->reason = SYNC_REASON_NOMEM;
681 memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
682 memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
683 parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
684 AddNodeToHash(parent);
687 if (LinkNode(parent, clone)) {
697 * query the status of a volume salvage request.
699 * @param[in] com inbound command object
700 * @param[out] res outbound response object
702 * @return operation status
703 * @retval SYNC_OK success
704 * @retval SYNC_FAILED malformed command packet
706 * @note this is a SALVSYNC protocol rpc handler
711 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
713 afs_int32 code = SYNC_OK;
714 struct SalvageQueueNode * node;
716 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
718 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
722 LookupNodeByCommand(com->sop, &node);
724 /* query whether a volume is done salvaging */
726 res->sop->state = SALVSYNC_STATE_UNKNOWN;
729 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
730 res->sop->state = node->state;
731 res->sop->prio = node->command.sop.prio;
739 SALVSYNC_Drop(int fd)
750 static int AcceptHandler = -1; /* handler id for accept, if turned on */
755 if (AcceptHandler == -1) {
756 assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
757 AcceptHandler = FindHandler(salvsync_server_state.fd);
764 if (AcceptHandler != -1) {
765 assert(RemoveHandler(salvsync_server_state.fd));
770 /* The multiple FD handling code. */
772 static int HandlerFD[MAXHANDLERS];
773 static void (*HandlerProc[MAXHANDLERS]) (int);
779 ObtainWriteLock(&SALVSYNC_handler_lock);
780 for (i = 0; i < MAXHANDLERS; i++) {
782 HandlerProc[i] = NULL;
784 ReleaseWriteLock(&SALVSYNC_handler_lock);
788 CallHandler(fd_set * fdsetp)
791 ObtainReadLock(&SALVSYNC_handler_lock);
792 for (i = 0; i < MAXHANDLERS; i++) {
793 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
794 ReleaseReadLock(&SALVSYNC_handler_lock);
795 (*HandlerProc[i]) (HandlerFD[i]);
796 ObtainReadLock(&SALVSYNC_handler_lock);
799 ReleaseReadLock(&SALVSYNC_handler_lock);
803 AddHandler(int afd, void (*aproc) (int))
806 ObtainWriteLock(&SALVSYNC_handler_lock);
807 for (i = 0; i < MAXHANDLERS; i++)
808 if (HandlerFD[i] == -1)
810 if (i >= MAXHANDLERS) {
811 ReleaseWriteLock(&SALVSYNC_handler_lock);
815 HandlerProc[i] = aproc;
816 ReleaseWriteLock(&SALVSYNC_handler_lock);
821 FindHandler(register int afd)
824 ObtainReadLock(&SALVSYNC_handler_lock);
825 for (i = 0; i < MAXHANDLERS; i++)
826 if (HandlerFD[i] == afd) {
827 ReleaseReadLock(&SALVSYNC_handler_lock);
830 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
832 return -1; /* satisfy compiler */
836 FindHandler_r(register int afd)
839 for (i = 0; i < MAXHANDLERS; i++)
840 if (HandlerFD[i] == afd) {
844 return -1; /* satisfy compiler */
848 RemoveHandler(register int afd)
850 ObtainWriteLock(&SALVSYNC_handler_lock);
851 HandlerFD[FindHandler_r(afd)] = -1;
852 ReleaseWriteLock(&SALVSYNC_handler_lock);
857 GetHandler(fd_set * fdsetp, int *maxfdp)
860 register int maxfd = -1;
862 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
863 for (i = 0; i < MAXHANDLERS; i++)
864 if (HandlerFD[i] != -1) {
865 FD_SET(HandlerFD[i], fdsetp);
866 if (maxfd < HandlerFD[i])
867 maxfd = HandlerFD[i];
870 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
874 * allocate a salvage queue node.
876 * @param[out] node_out address in which to store new node pointer
878 * @return operation status
880 * @retval 1 failed to allocate node
885 AllocNode(struct SalvageQueueNode ** node_out)
888 struct SalvageQueueNode * node;
890 *node_out = node = (struct SalvageQueueNode *)
891 malloc(sizeof(struct SalvageQueueNode));
897 memset(node, 0, sizeof(struct SalvageQueueNode));
898 node->type = SALVSYNC_VOLGROUP_PARENT;
899 node->state = SALVSYNC_STATE_UNKNOWN;
906 * link a salvage queue node to its parent.
908 * @param[in] parent pointer to queue node for parent of volume group
909 * @param[in] clone pointer to queue node for a clone
911 * @return operation status
918 LinkNode(struct SalvageQueueNode * parent,
919 struct SalvageQueueNode * clone)
924 /* check for attaching a clone to a clone */
925 if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
930 /* check for pre-existing registration and openings */
931 for (idx = 0; idx < VOLMAXTYPES; idx++) {
932 if (parent->volgroup.children[idx] == clone) {
935 if (parent->volgroup.children[idx] == NULL) {
939 if (idx == VOLMAXTYPES) {
944 /* link parent and child */
945 parent->volgroup.children[idx] = clone;
946 clone->type = SALVSYNC_VOLGROUP_CLONE;
947 clone->volgroup.parent = parent;
951 switch (clone->state) {
952 case SALVSYNC_STATE_QUEUED:
953 DeleteFromSalvageQueue(clone);
955 case SALVSYNC_STATE_SALVAGING:
956 switch (parent->state) {
957 case SALVSYNC_STATE_UNKNOWN:
958 case SALVSYNC_STATE_ERROR:
959 case SALVSYNC_STATE_DONE:
960 parent->command.sop.prio = clone->command.sop.prio;
961 AddToSalvageQueue(parent);
964 case SALVSYNC_STATE_QUEUED:
965 if (clone->command.sop.prio) {
966 parent->command.sop.prio += clone->command.sop.prio;
967 UpdateCommandPrio(parent);
985 HandlePrio(struct SalvageQueueNode * clone,
986 struct SalvageQueueNode * node,
991 switch (node->state) {
992 case SALVSYNC_STATE_ERROR:
993 case SALVSYNC_STATE_DONE:
994 case SALVSYNC_STATE_UNKNOWN:
995 node->command.sop.prio = 0;
999 if (new_prio < clone->command.sop.prio) {
1000 /* strange. let's just set our delta to 1 */
1003 delta = new_prio - clone->command.sop.prio;
1006 if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1007 clone->command.sop.prio = new_prio;
1010 node->command.sop.prio += delta;
1014 AddToSalvageQueue(struct SalvageQueueNode * node)
1017 struct SalvageQueueNode * last = NULL;
1019 id = volutil_GetPartitionID(node->command.sop.partName);
1020 if (id < 0 || id > VOLMAXPARTS) {
1023 if (!VGetPartitionById_r(id, 0)) {
1024 /* don't enqueue salvage requests for unmounted partitions */
1027 if (queue_IsOnQueue(node)) {
1031 if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1032 last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1034 queue_Append(&salvageQueue.part[id], node);
1035 salvageQueue.len[id]++;
1036 salvageQueue.total_len++;
1037 salvageQueue.last_insert = id;
1038 node->partition_id = id;
1039 node->state = SALVSYNC_STATE_QUEUED;
1041 /* reorder, if necessary */
1042 if (last && last->command.sop.prio < node->command.sop.prio) {
1043 UpdateCommandPrio(node);
1046 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1051 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1053 if (queue_IsOnQueue(node)) {
1055 salvageQueue.len[node->partition_id]--;
1056 salvageQueue.total_len--;
1057 node->state = SALVSYNC_STATE_UNKNOWN;
1058 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1063 AddToPendingQueue(struct SalvageQueueNode * node)
1065 queue_Append(&pendingQueue, node);
1067 node->state = SALVSYNC_STATE_SALVAGING;
1068 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1072 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1074 if (queue_IsOnQueue(node)) {
1077 node->state = SALVSYNC_STATE_UNKNOWN;
1078 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1082 static struct SalvageQueueNode *
1083 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1085 struct SalvageQueueNode * np, * nnp;
1087 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1088 if ((np->command.sop.volume == qry->volume) &&
1089 !strncmp(np->command.sop.partName, qry->partName,
1090 sizeof(qry->partName)))
1094 if (queue_IsEnd(&pendingQueue, np))
1099 static struct SalvageQueueNode *
1100 LookupPendingCommandByPid(int pid)
1102 struct SalvageQueueNode * np, * nnp;
1104 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1109 if (queue_IsEnd(&pendingQueue, np))
1115 /* raise the priority of a previously scheduled salvage */
1117 UpdateCommandPrio(struct SalvageQueueNode * node)
1119 struct SalvageQueueNode *np, *nnp;
1123 assert(queue_IsOnQueue(node));
1125 prio = node->command.sop.prio;
1126 id = node->partition_id;
1127 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1129 queue_Prepend(&salvageQueue.part[id], node);
1131 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1132 if (np->command.sop.prio > prio)
1135 if (queue_IsEnd(&salvageQueue.part[id], np)) {
1137 queue_Prepend(&salvageQueue.part[id], node);
1138 } else if (node != np) {
1140 queue_InsertAfter(np, node);
1145 /* this will need to be rearchitected if we ever want more than one thread
1146 * to wait for new salvage nodes */
1147 struct SalvageQueueNode *
1148 SALVSYNC_getWork(void)
1151 struct DiskPartition * dp = NULL, * fdp;
1152 static afs_int32 next_part_sched = 0;
1153 struct SalvageQueueNode *node = NULL, *np;
1158 * wait for work to be scheduled
1159 * if there are no disk partitions, just sit in this wait loop forever
1161 while (!salvageQueue.total_len || !DiskPartitionList) {
1162 VOL_CV_WAIT(&salvageQueue.cv);
1166 * short circuit for simple case where only one partition has
1167 * scheduled salvages
1169 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1170 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1171 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1177 * ok, more than one partition has scheduled salvages.
1178 * now search for partitions with scheduled salvages, but no pending salvages.
1180 dp = VGetPartitionById_r(next_part_sched, 0);
1182 dp = DiskPartitionList;
1188 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1189 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1190 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1197 * all partitions with scheduled salvages have at least one pending.
1198 * now do an exhaustive search for a scheduled salvage.
1204 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1205 if (salvageQueue.len[dp->index]) {
1206 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1211 /* we should never reach this line */
1215 assert(node != NULL);
1217 partition_salvaging[node->partition_id]++;
1218 DeleteFromSalvageQueue(node);
1219 AddToPendingQueue(node);
1222 /* update next_part_sched field */
1224 next_part_sched = dp->next->index;
1225 } else if (DiskPartitionList) {
1226 next_part_sched = DiskPartitionList->index;
1228 next_part_sched = -1;
1238 * update internal scheduler state to reflect completion of a work unit.
1240 * @param[in] node salvage queue node object pointer
1241 * @param[in] result worker process result code
1243 * @post scheduler state is updated.
1248 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1253 DeleteFromPendingQueue(node);
1254 partid = node->partition_id;
1255 if (partid >=0 && partid <= VOLMAXPARTS) {
1256 partition_salvaging[partid]--;
1259 node->state = SALVSYNC_STATE_DONE;
1260 } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1261 node->state = SALVSYNC_STATE_ERROR;
1264 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1265 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1266 if (node->volgroup.children[idx]) {
1267 node->volgroup.children[idx]->state = node->state;
1274 * check whether worker child failed.
1276 * @param[in] status status bitfield return by wait()
1278 * @return boolean failure code
1279 * @retval 0 child succeeded
1280 * @retval 1 child failed
1285 ChildFailed(int status)
1287 return (WCOREDUMP(status) ||
1288 WIFSIGNALED(status) ||
1289 ((WEXITSTATUS(status) != 0) &&
1290 (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1295 * notify salvsync scheduler of node completion, by child pid.
1297 * @param[in] pid pid of worker child
1298 * @param[in] status worker status bitfield from wait()
1300 * @post scheduler state is updated.
1301 * if status code is a failure, fileserver notification was attempted
1303 * @see SALVSYNC_doneWork_r
1306 SALVSYNC_doneWorkByPid(int pid, int status)
1308 struct SalvageQueueNode * node;
1310 afs_uint32 volids[VOLMAXTYPES+1];
1313 memset(volids, 0, sizeof(volids));
1316 node = LookupPendingCommandByPid(pid);
1318 SALVSYNC_doneWork_r(node, status);
1320 if (ChildFailed(status)) {
1321 /* populate volume id list for later processing outside the glock */
1322 volids[0] = node->command.sop.volume;
1323 strcpy(partName, node->command.sop.partName);
1324 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1325 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1326 if (node->volgroup.children[idx]) {
1327 volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1336 * if necessary, notify fileserver of
1337 * failure to salvage volume group
1338 * [we cannot guarantee that the child made the
1339 * appropriate notifications (e.g. SIGSEGV)]
1340 * -- tkeiser 11/28/2007
1342 if (ChildFailed(status)) {
1343 for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1345 FSYNC_VolOp(volids[idx],
1347 FSYNC_VOL_FORCE_ERROR,
1355 #endif /* AFS_DEMAND_ATTACH_FS */